neo3/neo_fs/
client.rs

1// Copyright (c) 2023-2025 R3E Network
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6//     http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Client for interacting with NeoFS.
15
16use crate::{
17	neo_fs::{
18		AccessPermission, BearerToken, Container, ContainerId, MultipartUpload,
19		MultipartUploadResult, NeoFSError, NeoFSResult, NeoFSService, Object, ObjectId, OwnerId,
20		Part, SessionToken,
21	},
22	neo_protocol::Account,
23};
24use async_trait::async_trait;
25use base64;
26use reqwest::{
27	header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE},
28	Client,
29};
30use serde_json::{json, Value};
31use std::{collections::HashMap, fmt::Debug};
32
33/// Default mainnet NeoFS gRPC endpoint
34pub const DEFAULT_MAINNET_ENDPOINT: &str = "grpc.mainnet.fs.neo.org:8082";
35
36/// Default testnet NeoFS gRPC endpoint
37pub const DEFAULT_TESTNET_ENDPOINT: &str = "grpc.testnet.fs.neo.org:8082";
38
39/// Default mainnet NeoFS HTTP gateway
40pub const DEFAULT_MAINNET_HTTP_GATEWAY: &str = "https://http.mainnet.fs.neo.org";
41
42/// Default testnet NeoFS HTTP gateway
43pub const DEFAULT_TESTNET_HTTP_GATEWAY: &str = "https://http.testnet.fs.neo.org";
44
45/// Default mainnet NeoFS REST API
46pub const DEFAULT_MAINNET_REST_API: &str = "https://rest.mainnet.fs.neo.org";
47
48/// Default testnet NeoFS REST API
49pub const DEFAULT_TESTNET_REST_API: &str = "https://rest.testnet.fs.neo.org";
50
51/// Configuration for NeoFS authentication
52#[derive(Debug, Clone)]
53pub struct NeoFSAuth {
54	/// The wallet address for authentication
55	pub wallet_address: String,
56	/// Optional private key for signing requests
57	pub private_key: Option<String>,
58}
59
60/// Configuration for NeoFS client
61#[derive(Debug, Clone)]
62pub struct NeoFSConfig {
63	/// NeoFS endpoint URL
64	pub endpoint: String,
65	/// Authentication information
66	pub auth: Option<NeoFSAuth>,
67	/// Connection timeout in seconds
68	pub timeout_sec: u64,
69	/// Whether to use insecure connection
70	pub insecure: bool,
71}
72
73/// Client for interacting with NeoFS
74#[derive(Debug, Clone)]
75pub struct NeoFSClient {
76	config: NeoFSConfig,
77	account: Option<Account>,
78	http_client: Client,
79	base_url: String,
80}
81
82impl NeoFSClient {
83	/// Creates a new NeoFS client with the given configuration
84	pub fn new(config: NeoFSConfig) -> Self {
85		let http_client = Client::new();
86		let base_url = if config.endpoint.starts_with("http") {
87			config.endpoint.clone()
88		} else {
89			// Convert gRPC endpoint to HTTP gateway
90			if config.endpoint.contains("mainnet") {
91				DEFAULT_MAINNET_HTTP_GATEWAY.to_string()
92			} else {
93				DEFAULT_TESTNET_HTTP_GATEWAY.to_string()
94			}
95		};
96
97		Self { config, account: None, http_client, base_url }
98	}
99
100	/// Creates a new NeoFS client with default configuration
101	pub fn default() -> Self {
102		Self {
103			config: NeoFSConfig {
104				endpoint: DEFAULT_MAINNET_ENDPOINT.to_string(),
105				auth: None,
106				timeout_sec: 10,
107				insecure: false,
108			},
109			account: None,
110			http_client: Client::new(),
111			base_url: DEFAULT_MAINNET_HTTP_GATEWAY.to_string(),
112		}
113	}
114
115	/// Sets the account to use for authentication
116	pub fn with_account(mut self, account: Account) -> Self {
117		self.account = Some(account);
118		self
119	}
120
121	/// Gets the account's owner ID
122	pub fn get_owner_id(&self) -> NeoFSResult<OwnerId> {
123		if let Some(account) = &self.account {
124			let pubkey = account
125				.get_public_key()
126				.ok_or(NeoFSError::AuthenticationError("No public key available".to_string()))?
127				.to_string();
128
129			Ok(OwnerId(pubkey))
130		} else {
131			Err(NeoFSError::AuthenticationError(
132				"No account provided for authentication".to_string(),
133			))
134		}
135	}
136
137	/// Creates HTTP headers for authenticated requests
138	fn create_auth_headers(&self) -> NeoFSResult<HeaderMap> {
139		let mut headers = HeaderMap::new();
140		headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
141
142		if let Some(auth) = &self.config.auth {
143			// Create a simple bearer token from wallet address
144			let token = format!("Bearer {}", auth.wallet_address);
145			headers.insert(
146				AUTHORIZATION,
147				HeaderValue::from_str(&token).map_err(|e| {
148					NeoFSError::AuthenticationError(format!("Invalid auth header: {}", e))
149				})?,
150			);
151		}
152
153		Ok(headers)
154	}
155
156	/// Makes an HTTP request to the NeoFS REST API
157	async fn make_request(
158		&self,
159		method: &str,
160		endpoint: &str,
161		body: Option<Value>,
162	) -> NeoFSResult<Value> {
163		let url = format!("{}/v1/{}", self.base_url, endpoint);
164		let headers = self.create_auth_headers()?;
165
166		let mut request = match method {
167			"GET" => self.http_client.get(&url),
168			"POST" => self.http_client.post(&url),
169			"PUT" => self.http_client.put(&url),
170			"DELETE" => self.http_client.delete(&url),
171			_ => {
172				return Err(NeoFSError::InvalidArgument(format!(
173					"Unsupported HTTP method: {}",
174					method
175				)))
176			},
177		};
178
179		request = request.headers(headers);
180
181		if let Some(json_body) = body {
182			request = request.json(&json_body);
183		}
184
185		let response = request
186			.send()
187			.await
188			.map_err(|e| NeoFSError::ConnectionError(format!("HTTP request failed: {}", e)))?;
189
190		if !response.status().is_success() {
191			return Err(NeoFSError::UnexpectedResponse(format!(
192				"HTTP error: {}",
193				response.status()
194			)));
195		}
196
197		let json: Value = response.json().await.map_err(|e| {
198			NeoFSError::SerializationError(format!("Failed to parse JSON response: {}", e))
199		})?;
200
201		Ok(json)
202	}
203
204	// MULTIPART UPLOAD OPERATIONS
205
206	/// Initializes a multipart upload
207	pub async fn init_multipart_upload(
208		&self,
209		container_id: &ContainerId,
210		object: &Object,
211		part_size: u64,
212	) -> NeoFSResult<MultipartUpload> {
213		let owner_id = self.get_owner_id()?;
214
215		// Create multipart upload request
216		let request_body = json!({
217			"multipartUpload": {
218				"containerId": container_id.0,
219				"ownerId": owner_id.0,
220				"attributes": object.attributes,
221				"partSize": part_size,
222				"maxParts": 10000
223			}
224		});
225
226		let response = self.make_request("POST", "multipart/init", Some(request_body)).await?;
227
228		let upload_id = response.get("uploadId").and_then(|v| v.as_str()).ok_or_else(|| {
229			NeoFSError::UnexpectedResponse("Missing uploadId in response".to_string())
230		})?;
231
232		Ok(MultipartUpload {
233			id: None,
234			container_id: container_id.clone(),
235			owner_id,
236			upload_id: upload_id.to_string(),
237			attributes: object.attributes.clone(),
238			part_size,
239			max_parts: 10000,
240		})
241	}
242
243	/// Uploads a part of a multipart upload
244	pub async fn upload_part(&self, upload: &MultipartUpload, part: Part) -> NeoFSResult<()> {
245		let request_body = json!({
246			"uploadId": upload.upload_id,
247			"partNumber": part.part_number,
248			"data": base64::encode(&part.payload)
249		});
250
251		let endpoint = format!("multipart/{}/parts", upload.upload_id);
252		let _response = self.make_request("POST", &endpoint, Some(request_body)).await?;
253
254		Ok(())
255	}
256
257	/// Completes a multipart upload
258	pub async fn complete_multipart_upload(
259		&self,
260		upload: &MultipartUpload,
261		part_numbers: Vec<u32>,
262	) -> NeoFSResult<MultipartUploadResult> {
263		let request_body = json!({
264			"uploadId": upload.upload_id,
265			"parts": part_numbers
266		});
267
268		let endpoint = format!("multipart/{}/complete", upload.upload_id);
269		let response = self.make_request("POST", &endpoint, Some(request_body)).await?;
270
271		let object_id = response.get("objectId").and_then(|v| v.as_str()).ok_or_else(|| {
272			NeoFSError::UnexpectedResponse("Missing objectId in response".to_string())
273		})?;
274
275		Ok(MultipartUploadResult {
276			object_id: ObjectId(object_id.to_string()),
277			container_id: upload.container_id.clone(),
278		})
279	}
280
281	/// Aborts a multipart upload
282	pub async fn abort_multipart_upload(&self, upload: &MultipartUpload) -> NeoFSResult<()> {
283		let endpoint = format!("multipart/{}/abort", upload.upload_id);
284		let _response = self.make_request("DELETE", &endpoint, None).await?;
285		Ok(())
286	}
287}
288
289#[async_trait]
290impl NeoFSService for NeoFSClient {
291	async fn create_container(&self, container: &Container) -> NeoFSResult<ContainerId> {
292		let owner_id = self.get_owner_id()?;
293
294		let request_body = json!({
295			"container": {
296				"ownerId": owner_id.0,
297				"basicAcl": container.basic_acl,
298				"attributes": container.attributes,
299				"placementPolicy": container.placement_policy
300			}
301		});
302
303		let response = self.make_request("POST", "containers", Some(request_body)).await?;
304
305		if let Some(container_id) = response.get("containerId").and_then(|v| v.as_str()) {
306			Ok(ContainerId(container_id.to_string()))
307		} else {
308			Err(NeoFSError::UnexpectedResponse("Missing containerId in response".to_string()))
309		}
310	}
311
312	async fn get_container(&self, id: &ContainerId) -> NeoFSResult<Container> {
313		let endpoint = format!("containers/{}", id.0);
314		let response = self.make_request("GET", &endpoint, None).await?;
315
316		if let Some(container_data) = response.get("container") {
317			let owner_id = container_data
318				.get("ownerId")
319				.and_then(|v| v.as_str())
320				.ok_or_else(|| NeoFSError::UnexpectedResponse("Missing ownerId".to_string()))?;
321
322			let mut container = Container::new(id.clone(), OwnerId(owner_id.to_string()));
323
324			if let Some(basic_acl) = container_data.get("basicAcl").and_then(|v| v.as_u64()) {
325				container.basic_acl = basic_acl as u32;
326			}
327
328			if let Some(attributes) = container_data.get("attributes").and_then(|v| v.as_object()) {
329				for (key, value) in attributes {
330					if let Some(val_str) = value.as_str() {
331						container.attributes.add(key.clone(), val_str.to_string());
332					}
333				}
334			}
335
336			Ok(container)
337		} else {
338			Err(NeoFSError::UnexpectedResponse("Missing container data in response".to_string()))
339		}
340	}
341
342	async fn list_containers(&self) -> NeoFSResult<Vec<ContainerId>> {
343		let owner_id = self.get_owner_id()?;
344		let endpoint = format!("containers?ownerId={}", owner_id.0);
345		let response = self.make_request("GET", &endpoint, None).await?;
346
347		if let Some(containers) = response.get("containers").and_then(|v| v.as_array()) {
348			let container_ids = containers
349				.iter()
350				.filter_map(|v| v.get("containerId").and_then(|id| id.as_str()))
351				.map(|id| ContainerId(id.to_string()))
352				.collect();
353			Ok(container_ids)
354		} else {
355			Ok(vec![]) // Return empty list if no containers found
356		}
357	}
358
359	async fn delete_container(&self, id: &ContainerId) -> NeoFSResult<bool> {
360		let endpoint = format!("containers/{}", id.0);
361		let _response = self.make_request("DELETE", &endpoint, None).await?;
362		Ok(true)
363	}
364
365	async fn put_object(
366		&self,
367		container_id: &ContainerId,
368		object: &Object,
369	) -> NeoFSResult<ObjectId> {
370		let owner_id = self.get_owner_id()?;
371
372		let request_body = json!({
373			"object": {
374				"containerId": container_id.0,
375				"ownerId": owner_id.0,
376				"attributes": object.attributes,
377				"payload": base64::encode(&object.payload)
378			}
379		});
380
381		let endpoint = format!("objects/{}", container_id.0);
382		let response = self.make_request("POST", &endpoint, Some(request_body)).await?;
383
384		if let Some(object_id) = response.get("objectId").and_then(|v| v.as_str()) {
385			Ok(ObjectId(object_id.to_string()))
386		} else {
387			Err(NeoFSError::UnexpectedResponse("Missing objectId in response".to_string()))
388		}
389	}
390
391	async fn get_object(
392		&self,
393		container_id: &ContainerId,
394		object_id: &ObjectId,
395	) -> NeoFSResult<Object> {
396		let endpoint = format!("objects/{}/{}", container_id.0, object_id.0);
397		let response = self.make_request("GET", &endpoint, None).await?;
398
399		if let Some(object_data) = response.get("object") {
400			let owner_id = object_data
401				.get("ownerId")
402				.and_then(|v| v.as_str())
403				.ok_or_else(|| NeoFSError::UnexpectedResponse("Missing ownerId".to_string()))?;
404
405			let mut object = Object::new(container_id.clone(), OwnerId(owner_id.to_string()));
406
407			if let Some(payload_b64) = object_data.get("payload").and_then(|v| v.as_str()) {
408				object.payload = base64::decode(payload_b64).map_err(|e| {
409					NeoFSError::UnexpectedResponse(format!("Invalid base64 payload: {}", e))
410				})?;
411			}
412
413			if let Some(attributes) = object_data.get("attributes").and_then(|v| v.as_object()) {
414				for (key, value) in attributes {
415					if let Some(val_str) = value.as_str() {
416						object.attributes.add(key.clone(), val_str.to_string());
417					}
418				}
419			}
420
421			Ok(object)
422		} else {
423			Err(NeoFSError::UnexpectedResponse("Missing object data in response".to_string()))
424		}
425	}
426
427	async fn list_objects(&self, container_id: &ContainerId) -> NeoFSResult<Vec<ObjectId>> {
428		let endpoint = format!("objects/{}", container_id.0);
429		let response = self.make_request("GET", &endpoint, None).await?;
430
431		if let Some(objects) = response.get("objects").and_then(|v| v.as_array()) {
432			let object_ids = objects
433				.iter()
434				.filter_map(|v| v.get("objectId").and_then(|id| id.as_str()))
435				.map(|id| ObjectId(id.to_string()))
436				.collect();
437			Ok(object_ids)
438		} else {
439			Ok(vec![]) // Return empty list if no objects found
440		}
441	}
442
443	async fn delete_object(
444		&self,
445		container_id: &ContainerId,
446		object_id: &ObjectId,
447	) -> NeoFSResult<bool> {
448		let endpoint = format!("objects/{}/{}", container_id.0, object_id.0);
449		let _response = self.make_request("DELETE", &endpoint, None).await?;
450		Ok(true)
451	}
452
453	async fn create_bearer_token(
454		&self,
455		container_id: &ContainerId,
456		permissions: Vec<AccessPermission>,
457		expires_sec: u64,
458	) -> NeoFSResult<BearerToken> {
459		let owner_id = self.get_owner_id()?;
460		let expiration = chrono::Utc::now() + chrono::Duration::seconds(expires_sec as i64);
461
462		let request_body = json!({
463			"bearerToken": {
464				"containerId": container_id.0,
465				"ownerId": owner_id.0,
466				"permissions": permissions,
467				"expiresAt": expiration.timestamp()
468			}
469		});
470
471		let response = self.make_request("POST", "auth/bearer", Some(request_body)).await?;
472
473		if let Some(token) = response.get("token").and_then(|v| v.as_str()) {
474			Ok(BearerToken {
475				owner_id,
476				token_id: format!("bearer-{}", chrono::Utc::now().timestamp()),
477				container_id: container_id.clone(),
478				operations: permissions,
479				expiration,
480				signature: vec![],
481			})
482		} else {
483			Err(NeoFSError::UnexpectedResponse("Missing token in response".to_string()))
484		}
485	}
486
487	async fn get_session_token(&self) -> NeoFSResult<SessionToken> {
488		let owner_id = self.get_owner_id()?;
489
490		let request_body = json!({
491			"sessionToken": {
492				"ownerId": owner_id.0
493			}
494		});
495
496		let response = self.make_request("POST", "auth/session", Some(request_body)).await?;
497
498		if let Some(token_data) = response.get("sessionToken") {
499			let token_id = token_data
500				.get("tokenId")
501				.and_then(|v| v.as_str())
502				.ok_or_else(|| NeoFSError::UnexpectedResponse("Missing tokenId".to_string()))?;
503
504			let session_key = token_data
505				.get("sessionKey")
506				.and_then(|v| v.as_str())
507				.ok_or_else(|| NeoFSError::UnexpectedResponse("Missing sessionKey".to_string()))?;
508
509			let signature = token_data
510				.get("signature")
511				.and_then(|v| v.as_str())
512				.map(|s| base64::decode(s).unwrap_or_default())
513				.unwrap_or_default();
514
515			Ok(SessionToken {
516				token_id: token_id.to_string(),
517				owner_id,
518				expiration: chrono::Utc::now() + chrono::Duration::hours(1),
519				session_key: session_key.to_string(),
520				signature,
521			})
522		} else {
523			Err(NeoFSError::UnexpectedResponse("Missing sessionToken in response".to_string()))
524		}
525	}
526
527	async fn initiate_multipart_upload(
528		&self,
529		container_id: &ContainerId,
530		object: &Object,
531	) -> NeoFSResult<MultipartUpload> {
532		self.init_multipart_upload(container_id, object, 1024 * 1024).await
533	}
534
535	async fn upload_part(
536		&self,
537		upload: &MultipartUpload,
538		part_number: u32,
539		data: Vec<u8>,
540	) -> NeoFSResult<Part> {
541		// Create the part
542		let part = Part::new(part_number, data);
543
544		// Upload the part using the internal method
545		self.upload_part(upload, part.clone()).await?;
546
547		Ok(part)
548	}
549
550	async fn complete_multipart_upload(
551		&self,
552		upload: &MultipartUpload,
553		parts: Vec<Part>,
554	) -> NeoFSResult<MultipartUploadResult> {
555		// Extract part numbers from parts
556		let part_numbers = parts.iter().map(|p| p.part_number).collect();
557		self.complete_multipart_upload(upload, part_numbers).await
558	}
559
560	async fn abort_multipart_upload(&self, upload: &MultipartUpload) -> NeoFSResult<bool> {
561		self.abort_multipart_upload(upload).await?;
562		Ok(true)
563	}
564}