neo3/neo_clients/
connection_pool.rs

1use crate::{
2	neo_clients::{APITrait, HttpProvider, RpcClient},
3	neo_error::{Neo3Error, Neo3Result},
4};
5use std::{
6	collections::VecDeque,
7	sync::Arc,
8	time::{Duration, Instant},
9};
10use tokio::sync::{RwLock, Semaphore};
11
12/// Configuration for connection pool
13#[derive(Debug, Clone)]
14pub struct PoolConfig {
15	/// Maximum number of concurrent connections
16	pub max_connections: usize,
17	/// Minimum number of idle connections to maintain
18	pub min_idle: usize,
19	/// Maximum time a connection can be idle before being closed
20	pub max_idle_time: Duration,
21	/// Connection timeout
22	pub connection_timeout: Duration,
23	/// Request timeout
24	pub request_timeout: Duration,
25	/// Maximum number of retries for failed requests
26	pub max_retries: u32,
27	/// Delay between retries
28	pub retry_delay: Duration,
29}
30
31impl Default for PoolConfig {
32	fn default() -> Self {
33		Self {
34			max_connections: 10,
35			min_idle: 2,
36			max_idle_time: Duration::from_secs(300), // 5 minutes
37			connection_timeout: Duration::from_secs(30),
38			request_timeout: Duration::from_secs(60),
39			max_retries: 3,
40			retry_delay: Duration::from_millis(1000),
41		}
42	}
43}
44
45/// A pooled connection wrapper
46#[derive(Debug)]
47struct PooledConnection {
48	client: RpcClient<HttpProvider>,
49	created_at: Instant,
50	last_used: Instant,
51	is_healthy: bool,
52}
53
54impl PooledConnection {
55	fn new(endpoint: &str) -> Neo3Result<Self> {
56		let provider = HttpProvider::new(endpoint).map_err(|e| {
57			Neo3Error::Network(crate::neo_error::NetworkError::ConnectionFailed(e.to_string()))
58		})?;
59		let client = RpcClient::new(provider);
60
61		Ok(Self { client, created_at: Instant::now(), last_used: Instant::now(), is_healthy: true })
62	}
63
64	fn is_expired(&self, max_idle_time: Duration) -> bool {
65		self.last_used.elapsed() > max_idle_time
66	}
67
68	fn mark_used(&mut self) {
69		self.last_used = Instant::now();
70	}
71
72	async fn health_check(&mut self) -> bool {
73		match self.client.get_version().await {
74			Ok(_) => {
75				self.is_healthy = true;
76				true
77			},
78			Err(_) => {
79				self.is_healthy = false;
80				false
81			},
82		}
83	}
84}
85
86/// High-performance connection pool for Neo RPC clients
87pub struct ConnectionPool {
88	config: PoolConfig,
89	endpoint: String,
90	connections: Arc<RwLock<VecDeque<PooledConnection>>>,
91	semaphore: Arc<Semaphore>,
92	stats: Arc<RwLock<PoolStats>>,
93}
94
95/// Connection pool statistics
96#[derive(Debug, Default)]
97pub struct PoolStats {
98	pub total_connections_created: u64,
99	pub total_requests: u64,
100	pub successful_requests: u64,
101	pub failed_requests: u64,
102	pub retried_requests: u64,
103	pub current_active_connections: usize,
104	pub current_idle_connections: usize,
105}
106
107impl ConnectionPool {
108	/// Create a new connection pool
109	pub fn new(endpoint: String, config: PoolConfig) -> Self {
110		let semaphore = Arc::new(Semaphore::new(config.max_connections));
111
112		Self {
113			config,
114			endpoint,
115			connections: Arc::new(RwLock::new(VecDeque::new())),
116			semaphore,
117			stats: Arc::new(RwLock::new(PoolStats::default())),
118		}
119	}
120
121	/// Execute a request with automatic retry and connection management
122	pub async fn execute<F, T>(&self, operation: F) -> Neo3Result<T>
123	where
124		F: Fn(
125				&RpcClient<HttpProvider>,
126			)
127				-> std::pin::Pin<Box<dyn std::future::Future<Output = Neo3Result<T>> + Send + '_>>
128			+ Send
129			+ Sync,
130		T: Send,
131	{
132		let _permit = self.semaphore.acquire().await.map_err(|_| {
133			Neo3Error::Network(crate::neo_error::NetworkError::ConnectionFailed(
134				"Failed to acquire connection permit".to_string(),
135			))
136		})?;
137
138		let mut retries = 0;
139		loop {
140			// Update stats
141			{
142				let mut stats = self.stats.write().await;
143				stats.total_requests += 1;
144			}
145
146			// Get or create connection
147			let mut connection = self.get_connection().await?;
148
149			// Execute operation with timeout
150			let result =
151				tokio::time::timeout(self.config.request_timeout, operation(&connection.client))
152					.await;
153
154			match result {
155				Ok(Ok(value)) => {
156					// Success - return connection to pool and return result
157					connection.mark_used();
158					self.return_connection(connection).await;
159
160					let mut stats = self.stats.write().await;
161					stats.successful_requests += 1;
162
163					return Ok(value);
164				},
165				Ok(Err(e)) => {
166					// Mark connection as unhealthy
167					connection.is_healthy = false;
168
169					if retries < self.config.max_retries {
170						retries += 1;
171
172						let mut stats = self.stats.write().await;
173						stats.retried_requests += 1;
174
175						tokio::time::sleep(self.config.retry_delay * retries).await;
176						continue;
177					} else {
178						let mut stats = self.stats.write().await;
179						stats.failed_requests += 1;
180
181						return Err(e);
182					}
183				},
184				Err(_) => {
185					// Timeout - mark connection as unhealthy
186					connection.is_healthy = false;
187
188					if retries < self.config.max_retries {
189						retries += 1;
190
191						let mut stats = self.stats.write().await;
192						stats.retried_requests += 1;
193
194						tokio::time::sleep(self.config.retry_delay * retries).await;
195						continue;
196					} else {
197						let mut stats = self.stats.write().await;
198						stats.failed_requests += 1;
199
200						return Err(Neo3Error::Network(crate::neo_error::NetworkError::Timeout));
201					}
202				},
203			}
204		}
205	}
206
207	/// Get a connection from the pool or create a new one
208	async fn get_connection(&self) -> Neo3Result<PooledConnection> {
209		// Try to get an existing connection
210		{
211			let mut connections = self.connections.write().await;
212			while let Some(mut conn) = connections.pop_front() {
213				if !conn.is_expired(self.config.max_idle_time) && conn.is_healthy {
214					conn.mark_used();
215					return Ok(conn);
216				}
217			}
218		}
219
220		// Create new connection
221		let connection = PooledConnection::new(&self.endpoint)?;
222
223		let mut stats = self.stats.write().await;
224		stats.total_connections_created += 1;
225		stats.current_active_connections += 1;
226
227		Ok(connection)
228	}
229
230	/// Return a connection to the pool
231	async fn return_connection(&self, connection: PooledConnection) {
232		if connection.is_healthy && !connection.is_expired(self.config.max_idle_time) {
233			let mut connections = self.connections.write().await;
234			connections.push_back(connection);
235
236			let mut stats = self.stats.write().await;
237			stats.current_active_connections = stats.current_active_connections.saturating_sub(1);
238			stats.current_idle_connections = connections.len();
239		} else {
240			let mut stats = self.stats.write().await;
241			stats.current_active_connections = stats.current_active_connections.saturating_sub(1);
242		}
243	}
244
245	/// Perform health checks on idle connections
246	pub async fn health_check(&self) {
247		let mut connections = self.connections.write().await;
248		let mut healthy_connections = VecDeque::new();
249
250		while let Some(mut conn) = connections.pop_front() {
251			if conn.health_check().await {
252				healthy_connections.push_back(conn);
253			}
254		}
255
256		*connections = healthy_connections;
257
258		let mut stats = self.stats.write().await;
259		stats.current_idle_connections = connections.len();
260	}
261
262	/// Get current pool statistics
263	pub async fn get_stats(&self) -> PoolStats {
264		let stats = self.stats.read().await;
265		PoolStats {
266			total_connections_created: stats.total_connections_created,
267			total_requests: stats.total_requests,
268			successful_requests: stats.successful_requests,
269			failed_requests: stats.failed_requests,
270			retried_requests: stats.retried_requests,
271			current_active_connections: stats.current_active_connections,
272			current_idle_connections: stats.current_idle_connections,
273		}
274	}
275
276	/// Close all connections and clean up the pool
277	pub async fn close(&self) {
278		let mut connections = self.connections.write().await;
279		connections.clear();
280
281		let mut stats = self.stats.write().await;
282		stats.current_active_connections = 0;
283		stats.current_idle_connections = 0;
284	}
285}
286
287#[cfg(test)]
288mod tests {
289	use super::*;
290
291	#[tokio::test]
292	async fn test_pool_creation() {
293		let config = PoolConfig::default();
294		let pool = ConnectionPool::new("https://testnet.neo.org:443".to_string(), config);
295
296		let stats = pool.get_stats().await;
297		assert_eq!(stats.total_connections_created, 0);
298		assert_eq!(stats.current_active_connections, 0);
299	}
300
301	#[tokio::test]
302	async fn test_pool_stats() {
303		let config = PoolConfig { max_connections: 2, ..Default::default() };
304		let pool = ConnectionPool::new("https://testnet.neo.org:443".to_string(), config);
305
306		// Execute a simple operation
307		let result = pool
308			.execute(|client| {
309				Box::pin(async move {
310					client.get_version().await.map_err(|e| {
311						Neo3Error::Network(crate::neo_error::NetworkError::RpcError {
312							code: -1,
313							message: e.to_string(),
314						})
315					})
316				})
317			})
318			.await;
319
320		// Check that stats were updated
321		let stats = pool.get_stats().await;
322		assert!(stats.total_requests > 0);
323	}
324}