neo3/neo_fs/
client.rs

1// Copyright (c) 2023-2025 R3E Network
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6//     http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Client for interacting with NeoFS.
15
16use crate::{
17	neo_fs::{
18		AccessPermission, BearerToken, Container, ContainerId, MultipartUpload,
19		MultipartUploadResult, NeoFSError, NeoFSResult, NeoFSService, Object, ObjectId, OwnerId,
20		Part, SessionToken,
21	},
22	neo_protocol::Account,
23};
24use async_trait::async_trait;
25use base64;
26use reqwest::{
27	header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE},
28	Client,
29};
30use serde_json::{json, Value};
31use std::{collections::HashMap, fmt::Debug};
32
33/// Default mainnet NeoFS gRPC endpoint
34pub const DEFAULT_MAINNET_ENDPOINT: &str = "grpc.mainnet.fs.neo.org:8082";
35
36/// Default testnet NeoFS gRPC endpoint
37pub const DEFAULT_TESTNET_ENDPOINT: &str = "grpc.testnet.fs.neo.org:8082";
38
39/// Default mainnet NeoFS HTTP gateway
40pub const DEFAULT_MAINNET_HTTP_GATEWAY: &str = "https://http.mainnet.fs.neo.org";
41
42/// Default testnet NeoFS HTTP gateway
43pub const DEFAULT_TESTNET_HTTP_GATEWAY: &str = "https://http.testnet.fs.neo.org";
44
45/// Default mainnet NeoFS REST API
46pub const DEFAULT_MAINNET_REST_API: &str = "https://rest.mainnet.fs.neo.org";
47
48/// Default testnet NeoFS REST API
49pub const DEFAULT_TESTNET_REST_API: &str = "https://rest.testnet.fs.neo.org";
50
51/// Configuration for NeoFS authentication
52#[derive(Debug, Clone)]
53pub struct NeoFSAuth {
54	/// The wallet address for authentication
55	pub wallet_address: String,
56	/// Optional private key for signing requests
57	pub private_key: Option<String>,
58}
59
60/// Configuration for NeoFS client
61#[derive(Debug, Clone)]
62pub struct NeoFSConfig {
63	/// NeoFS endpoint URL
64	pub endpoint: String,
65	/// Authentication information
66	pub auth: Option<NeoFSAuth>,
67	/// Connection timeout in seconds
68	pub timeout_sec: u64,
69	/// Whether to use insecure connection
70	pub insecure: bool,
71}
72
73/// Client for interacting with NeoFS
74#[derive(Debug, Clone)]
75pub struct NeoFSClient {
76	config: NeoFSConfig,
77	account: Option<Account>,
78	http_client: Client,
79	base_url: String,
80}
81
82impl NeoFSClient {
83	/// Creates a new NeoFS client with the given configuration
84	pub fn new(config: NeoFSConfig) -> Self {
85		let http_client = Client::new();
86		let base_url = if config.endpoint.starts_with("http") {
87			config.endpoint.clone()
88		} else {
89			// Convert gRPC endpoint to HTTP gateway
90			if config.endpoint.contains("mainnet") {
91				DEFAULT_MAINNET_HTTP_GATEWAY.to_string()
92			} else {
93				DEFAULT_TESTNET_HTTP_GATEWAY.to_string()
94			}
95		};
96
97		Self { config, account: None, http_client, base_url }
98	}
99
100	/// Creates a new NeoFS client with default configuration
101	pub fn default() -> Self {
102		Self {
103			config: NeoFSConfig {
104				endpoint: DEFAULT_MAINNET_ENDPOINT.to_string(),
105				auth: None,
106				timeout_sec: 10,
107				insecure: false,
108			},
109			account: None,
110			http_client: Client::new(),
111			base_url: DEFAULT_MAINNET_HTTP_GATEWAY.to_string(),
112		}
113	}
114
115	/// Sets the account to use for authentication
116	pub fn with_account(mut self, account: Account) -> Self {
117		self.account = Some(account);
118		self
119	}
120
121	/// Gets the account's owner ID
122	pub fn get_owner_id(&self) -> NeoFSResult<OwnerId> {
123		if let Some(account) = &self.account {
124			let pubkey = account
125				.get_public_key()
126				.ok_or(NeoFSError::AuthenticationError("No public key available".to_string()))?
127				.to_string();
128
129			Ok(OwnerId(pubkey))
130		} else {
131			Err(NeoFSError::AuthenticationError(
132				"No account provided for authentication".to_string(),
133			))
134		}
135	}
136
137	/// Creates HTTP headers for authenticated requests
138	fn create_auth_headers(&self) -> NeoFSResult<HeaderMap> {
139		let mut headers = HeaderMap::new();
140		headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
141
142		if let Some(auth) = &self.config.auth {
143			// Create a simple bearer token from wallet address
144			let token = format!("Bearer {}", auth.wallet_address);
145			headers.insert(
146				AUTHORIZATION,
147				HeaderValue::from_str(&token).map_err(|e| {
148					NeoFSError::AuthenticationError(format!("Invalid auth header: {}", e))
149				})?,
150			);
151		}
152
153		Ok(headers)
154	}
155
156	/// Makes an HTTP request to the NeoFS REST API
157	async fn make_request(
158		&self,
159		method: &str,
160		endpoint: &str,
161		body: Option<Value>,
162	) -> NeoFSResult<Value> {
163		let url = format!("{}/v1/{}", self.base_url, endpoint);
164		let headers = self.create_auth_headers()?;
165
166		let mut request = match method {
167			"GET" => self.http_client.get(&url),
168			"POST" => self.http_client.post(&url),
169			"PUT" => self.http_client.put(&url),
170			"DELETE" => self.http_client.delete(&url),
171			_ =>
172				return Err(NeoFSError::InvalidArgument(format!(
173					"Unsupported HTTP method: {}",
174					method
175				))),
176		};
177
178		request = request.headers(headers);
179
180		if let Some(json_body) = body {
181			request = request.json(&json_body);
182		}
183
184		let response = request
185			.send()
186			.await
187			.map_err(|e| NeoFSError::ConnectionError(format!("HTTP request failed: {}", e)))?;
188
189		if !response.status().is_success() {
190			return Err(NeoFSError::UnexpectedResponse(format!(
191				"HTTP error: {}",
192				response.status()
193			)));
194		}
195
196		let json: Value = response.json().await.map_err(|e| {
197			NeoFSError::SerializationError(format!("Failed to parse JSON response: {}", e))
198		})?;
199
200		Ok(json)
201	}
202
203	// MULTIPART UPLOAD OPERATIONS
204
205	/// Initializes a multipart upload
206	pub async fn init_multipart_upload(
207		&self,
208		container_id: &ContainerId,
209		object: &Object,
210		part_size: u64,
211	) -> NeoFSResult<MultipartUpload> {
212		let owner_id = self.get_owner_id()?;
213
214		// Create multipart upload request
215		let request_body = json!({
216			"multipartUpload": {
217				"containerId": container_id.0,
218				"ownerId": owner_id.0,
219				"attributes": object.attributes,
220				"partSize": part_size,
221				"maxParts": 10000
222			}
223		});
224
225		let response = self.make_request("POST", "multipart/init", Some(request_body)).await?;
226
227		let upload_id = response.get("uploadId").and_then(|v| v.as_str()).ok_or_else(|| {
228			NeoFSError::UnexpectedResponse("Missing uploadId in response".to_string())
229		})?;
230
231		Ok(MultipartUpload {
232			id: None,
233			container_id: container_id.clone(),
234			owner_id,
235			upload_id: upload_id.to_string(),
236			attributes: object.attributes.clone(),
237			part_size,
238			max_parts: 10000,
239		})
240	}
241
242	/// Uploads a part of a multipart upload
243	pub async fn upload_part(&self, upload: &MultipartUpload, part: Part) -> NeoFSResult<()> {
244		let request_body = json!({
245			"uploadId": upload.upload_id,
246			"partNumber": part.part_number,
247			"data": base64::encode(&part.payload)
248		});
249
250		let endpoint = format!("multipart/{}/parts", upload.upload_id);
251		let _response = self.make_request("POST", &endpoint, Some(request_body)).await?;
252
253		Ok(())
254	}
255
256	/// Completes a multipart upload
257	pub async fn complete_multipart_upload(
258		&self,
259		upload: &MultipartUpload,
260		part_numbers: Vec<u32>,
261	) -> NeoFSResult<MultipartUploadResult> {
262		let request_body = json!({
263			"uploadId": upload.upload_id,
264			"parts": part_numbers
265		});
266
267		let endpoint = format!("multipart/{}/complete", upload.upload_id);
268		let response = self.make_request("POST", &endpoint, Some(request_body)).await?;
269
270		let object_id = response.get("objectId").and_then(|v| v.as_str()).ok_or_else(|| {
271			NeoFSError::UnexpectedResponse("Missing objectId in response".to_string())
272		})?;
273
274		Ok(MultipartUploadResult {
275			object_id: ObjectId(object_id.to_string()),
276			container_id: upload.container_id.clone(),
277		})
278	}
279
280	/// Aborts a multipart upload
281	pub async fn abort_multipart_upload(&self, upload: &MultipartUpload) -> NeoFSResult<()> {
282		let endpoint = format!("multipart/{}/abort", upload.upload_id);
283		let _response = self.make_request("DELETE", &endpoint, None).await?;
284		Ok(())
285	}
286}
287
288#[async_trait]
289impl NeoFSService for NeoFSClient {
290	async fn create_container(&self, container: &Container) -> NeoFSResult<ContainerId> {
291		let owner_id = self.get_owner_id()?;
292
293		let request_body = json!({
294			"container": {
295				"ownerId": owner_id.0,
296				"basicAcl": container.basic_acl,
297				"attributes": container.attributes,
298				"placementPolicy": container.placement_policy
299			}
300		});
301
302		let response = self.make_request("POST", "containers", Some(request_body)).await?;
303
304		if let Some(container_id) = response.get("containerId").and_then(|v| v.as_str()) {
305			Ok(ContainerId(container_id.to_string()))
306		} else {
307			Err(NeoFSError::UnexpectedResponse("Missing containerId in response".to_string()))
308		}
309	}
310
311	async fn get_container(&self, id: &ContainerId) -> NeoFSResult<Container> {
312		let endpoint = format!("containers/{}", id.0);
313		let response = self.make_request("GET", &endpoint, None).await?;
314
315		if let Some(container_data) = response.get("container") {
316			let owner_id = container_data
317				.get("ownerId")
318				.and_then(|v| v.as_str())
319				.ok_or_else(|| NeoFSError::UnexpectedResponse("Missing ownerId".to_string()))?;
320
321			let mut container = Container::new(id.clone(), OwnerId(owner_id.to_string()));
322
323			if let Some(basic_acl) = container_data.get("basicAcl").and_then(|v| v.as_u64()) {
324				container.basic_acl = basic_acl as u32;
325			}
326
327			if let Some(attributes) = container_data.get("attributes").and_then(|v| v.as_object()) {
328				for (key, value) in attributes {
329					if let Some(val_str) = value.as_str() {
330						container.attributes.add(key.clone(), val_str.to_string());
331					}
332				}
333			}
334
335			Ok(container)
336		} else {
337			Err(NeoFSError::UnexpectedResponse("Missing container data in response".to_string()))
338		}
339	}
340
341	async fn list_containers(&self) -> NeoFSResult<Vec<ContainerId>> {
342		let owner_id = self.get_owner_id()?;
343		let endpoint = format!("containers?ownerId={}", owner_id.0);
344		let response = self.make_request("GET", &endpoint, None).await?;
345
346		if let Some(containers) = response.get("containers").and_then(|v| v.as_array()) {
347			let container_ids = containers
348				.iter()
349				.filter_map(|v| v.get("containerId").and_then(|id| id.as_str()))
350				.map(|id| ContainerId(id.to_string()))
351				.collect();
352			Ok(container_ids)
353		} else {
354			Ok(vec![]) // Return empty list if no containers found
355		}
356	}
357
358	async fn delete_container(&self, id: &ContainerId) -> NeoFSResult<bool> {
359		let endpoint = format!("containers/{}", id.0);
360		let _response = self.make_request("DELETE", &endpoint, None).await?;
361		Ok(true)
362	}
363
364	async fn put_object(
365		&self,
366		container_id: &ContainerId,
367		object: &Object,
368	) -> NeoFSResult<ObjectId> {
369		let owner_id = self.get_owner_id()?;
370
371		let request_body = json!({
372			"object": {
373				"containerId": container_id.0,
374				"ownerId": owner_id.0,
375				"attributes": object.attributes,
376				"payload": base64::encode(&object.payload)
377			}
378		});
379
380		let endpoint = format!("objects/{}", container_id.0);
381		let response = self.make_request("POST", &endpoint, Some(request_body)).await?;
382
383		if let Some(object_id) = response.get("objectId").and_then(|v| v.as_str()) {
384			Ok(ObjectId(object_id.to_string()))
385		} else {
386			Err(NeoFSError::UnexpectedResponse("Missing objectId in response".to_string()))
387		}
388	}
389
390	async fn get_object(
391		&self,
392		container_id: &ContainerId,
393		object_id: &ObjectId,
394	) -> NeoFSResult<Object> {
395		let endpoint = format!("objects/{}/{}", container_id.0, object_id.0);
396		let response = self.make_request("GET", &endpoint, None).await?;
397
398		if let Some(object_data) = response.get("object") {
399			let owner_id = object_data
400				.get("ownerId")
401				.and_then(|v| v.as_str())
402				.ok_or_else(|| NeoFSError::UnexpectedResponse("Missing ownerId".to_string()))?;
403
404			let mut object = Object::new(container_id.clone(), OwnerId(owner_id.to_string()));
405
406			if let Some(payload_b64) = object_data.get("payload").and_then(|v| v.as_str()) {
407				object.payload = base64::decode(payload_b64).map_err(|e| {
408					NeoFSError::UnexpectedResponse(format!("Invalid base64 payload: {}", e))
409				})?;
410			}
411
412			if let Some(attributes) = object_data.get("attributes").and_then(|v| v.as_object()) {
413				for (key, value) in attributes {
414					if let Some(val_str) = value.as_str() {
415						object.attributes.add(key.clone(), val_str.to_string());
416					}
417				}
418			}
419
420			Ok(object)
421		} else {
422			Err(NeoFSError::UnexpectedResponse("Missing object data in response".to_string()))
423		}
424	}
425
426	async fn list_objects(&self, container_id: &ContainerId) -> NeoFSResult<Vec<ObjectId>> {
427		let endpoint = format!("objects/{}", container_id.0);
428		let response = self.make_request("GET", &endpoint, None).await?;
429
430		if let Some(objects) = response.get("objects").and_then(|v| v.as_array()) {
431			let object_ids = objects
432				.iter()
433				.filter_map(|v| v.get("objectId").and_then(|id| id.as_str()))
434				.map(|id| ObjectId(id.to_string()))
435				.collect();
436			Ok(object_ids)
437		} else {
438			Ok(vec![]) // Return empty list if no objects found
439		}
440	}
441
442	async fn delete_object(
443		&self,
444		container_id: &ContainerId,
445		object_id: &ObjectId,
446	) -> NeoFSResult<bool> {
447		let endpoint = format!("objects/{}/{}", container_id.0, object_id.0);
448		let _response = self.make_request("DELETE", &endpoint, None).await?;
449		Ok(true)
450	}
451
452	async fn create_bearer_token(
453		&self,
454		container_id: &ContainerId,
455		permissions: Vec<AccessPermission>,
456		expires_sec: u64,
457	) -> NeoFSResult<BearerToken> {
458		let owner_id = self.get_owner_id()?;
459		let expiration = chrono::Utc::now() + chrono::Duration::seconds(expires_sec as i64);
460
461		let request_body = json!({
462			"bearerToken": {
463				"containerId": container_id.0,
464				"ownerId": owner_id.0,
465				"permissions": permissions,
466				"expiresAt": expiration.timestamp()
467			}
468		});
469
470		let response = self.make_request("POST", "auth/bearer", Some(request_body)).await?;
471
472		if let Some(token) = response.get("token").and_then(|v| v.as_str()) {
473			Ok(BearerToken {
474				owner_id,
475				token_id: format!("bearer-{}", chrono::Utc::now().timestamp()),
476				container_id: container_id.clone(),
477				operations: permissions,
478				expiration,
479				signature: vec![],
480			})
481		} else {
482			Err(NeoFSError::UnexpectedResponse("Missing token in response".to_string()))
483		}
484	}
485
486	async fn get_session_token(&self) -> NeoFSResult<SessionToken> {
487		let owner_id = self.get_owner_id()?;
488
489		let request_body = json!({
490			"sessionToken": {
491				"ownerId": owner_id.0
492			}
493		});
494
495		let response = self.make_request("POST", "auth/session", Some(request_body)).await?;
496
497		if let Some(token_data) = response.get("sessionToken") {
498			let token_id = token_data
499				.get("tokenId")
500				.and_then(|v| v.as_str())
501				.ok_or_else(|| NeoFSError::UnexpectedResponse("Missing tokenId".to_string()))?;
502
503			let session_key = token_data
504				.get("sessionKey")
505				.and_then(|v| v.as_str())
506				.ok_or_else(|| NeoFSError::UnexpectedResponse("Missing sessionKey".to_string()))?;
507
508			let signature = token_data
509				.get("signature")
510				.and_then(|v| v.as_str())
511				.map(|s| base64::decode(s).unwrap_or_default())
512				.unwrap_or_default();
513
514			Ok(SessionToken {
515				token_id: token_id.to_string(),
516				owner_id,
517				expiration: chrono::Utc::now() + chrono::Duration::hours(1),
518				session_key: session_key.to_string(),
519				signature,
520			})
521		} else {
522			Err(NeoFSError::UnexpectedResponse("Missing sessionToken in response".to_string()))
523		}
524	}
525
526	async fn initiate_multipart_upload(
527		&self,
528		container_id: &ContainerId,
529		object: &Object,
530	) -> NeoFSResult<MultipartUpload> {
531		self.init_multipart_upload(container_id, object, 1024 * 1024).await
532	}
533
534	async fn upload_part(
535		&self,
536		upload: &MultipartUpload,
537		part_number: u32,
538		data: Vec<u8>,
539	) -> NeoFSResult<Part> {
540		// Create the part
541		let part = Part::new(part_number, data);
542
543		// Upload the part using the internal method
544		self.upload_part(upload, part.clone()).await?;
545
546		Ok(part)
547	}
548
549	async fn complete_multipart_upload(
550		&self,
551		upload: &MultipartUpload,
552		parts: Vec<Part>,
553	) -> NeoFSResult<MultipartUploadResult> {
554		// Extract part numbers from parts
555		let part_numbers = parts.iter().map(|p| p.part_number).collect();
556		self.complete_multipart_upload(upload, part_numbers).await
557	}
558
559	async fn abort_multipart_upload(&self, upload: &MultipartUpload) -> NeoFSResult<bool> {
560		self.abort_multipart_upload(upload).await?;
561		Ok(true)
562	}
563}