neo3/neo_clients/
connection_pool.rs1use crate::{
2 neo_clients::{APITrait, HttpProvider, RpcClient},
3 neo_error::{Neo3Error, Neo3Result},
4};
5use std::{
6 collections::VecDeque,
7 sync::Arc,
8 time::{Duration, Instant},
9};
10use tokio::sync::{RwLock, Semaphore};
11
12#[derive(Debug, Clone)]
14pub struct PoolConfig {
15 pub max_connections: usize,
17 pub min_idle: usize,
19 pub max_idle_time: Duration,
21 pub connection_timeout: Duration,
23 pub request_timeout: Duration,
25 pub max_retries: u32,
27 pub retry_delay: Duration,
29}
30
31impl Default for PoolConfig {
32 fn default() -> Self {
33 Self {
34 max_connections: 10,
35 min_idle: 2,
36 max_idle_time: Duration::from_secs(300), connection_timeout: Duration::from_secs(30),
38 request_timeout: Duration::from_secs(60),
39 max_retries: 3,
40 retry_delay: Duration::from_millis(1000),
41 }
42 }
43}
44
45#[derive(Debug)]
47struct PooledConnection {
48 client: RpcClient<HttpProvider>,
49 created_at: Instant,
50 last_used: Instant,
51 is_healthy: bool,
52}
53
54impl PooledConnection {
55 fn new(endpoint: &str) -> Neo3Result<Self> {
56 let provider = HttpProvider::new(endpoint).map_err(|e| {
57 Neo3Error::Network(crate::neo_error::NetworkError::ConnectionFailed(e.to_string()))
58 })?;
59 let client = RpcClient::new(provider);
60
61 Ok(Self { client, created_at: Instant::now(), last_used: Instant::now(), is_healthy: true })
62 }
63
64 fn is_expired(&self, max_idle_time: Duration) -> bool {
65 self.last_used.elapsed() > max_idle_time
66 }
67
68 fn mark_used(&mut self) {
69 self.last_used = Instant::now();
70 }
71
72 async fn health_check(&mut self) -> bool {
73 match self.client.get_version().await {
74 Ok(_) => {
75 self.is_healthy = true;
76 true
77 },
78 Err(_) => {
79 self.is_healthy = false;
80 false
81 },
82 }
83 }
84}
85
86pub struct ConnectionPool {
88 config: PoolConfig,
89 endpoint: String,
90 connections: Arc<RwLock<VecDeque<PooledConnection>>>,
91 semaphore: Arc<Semaphore>,
92 stats: Arc<RwLock<PoolStats>>,
93}
94
95#[derive(Debug, Default)]
97pub struct PoolStats {
98 pub total_connections_created: u64,
99 pub total_requests: u64,
100 pub successful_requests: u64,
101 pub failed_requests: u64,
102 pub retried_requests: u64,
103 pub current_active_connections: usize,
104 pub current_idle_connections: usize,
105}
106
107impl ConnectionPool {
108 pub fn new(endpoint: String, config: PoolConfig) -> Self {
110 let semaphore = Arc::new(Semaphore::new(config.max_connections));
111
112 Self {
113 config,
114 endpoint,
115 connections: Arc::new(RwLock::new(VecDeque::new())),
116 semaphore,
117 stats: Arc::new(RwLock::new(PoolStats::default())),
118 }
119 }
120
121 pub async fn execute<F, T>(&self, operation: F) -> Neo3Result<T>
123 where
124 F: Fn(
125 &RpcClient<HttpProvider>,
126 )
127 -> std::pin::Pin<Box<dyn std::future::Future<Output = Neo3Result<T>> + Send + '_>>
128 + Send
129 + Sync,
130 T: Send,
131 {
132 let _permit = self.semaphore.acquire().await.map_err(|_| {
133 Neo3Error::Network(crate::neo_error::NetworkError::ConnectionFailed(
134 "Failed to acquire connection permit".to_string(),
135 ))
136 })?;
137
138 let mut retries = 0;
139 loop {
140 {
142 let mut stats = self.stats.write().await;
143 stats.total_requests += 1;
144 }
145
146 let mut connection = self.get_connection().await?;
148
149 let result =
151 tokio::time::timeout(self.config.request_timeout, operation(&connection.client))
152 .await;
153
154 match result {
155 Ok(Ok(value)) => {
156 connection.mark_used();
158 self.return_connection(connection).await;
159
160 let mut stats = self.stats.write().await;
161 stats.successful_requests += 1;
162
163 return Ok(value);
164 },
165 Ok(Err(e)) => {
166 connection.is_healthy = false;
168
169 if retries < self.config.max_retries {
170 retries += 1;
171
172 let mut stats = self.stats.write().await;
173 stats.retried_requests += 1;
174
175 tokio::time::sleep(self.config.retry_delay * retries).await;
176 continue;
177 } else {
178 let mut stats = self.stats.write().await;
179 stats.failed_requests += 1;
180
181 return Err(e);
182 }
183 },
184 Err(_) => {
185 connection.is_healthy = false;
187
188 if retries < self.config.max_retries {
189 retries += 1;
190
191 let mut stats = self.stats.write().await;
192 stats.retried_requests += 1;
193
194 tokio::time::sleep(self.config.retry_delay * retries).await;
195 continue;
196 } else {
197 let mut stats = self.stats.write().await;
198 stats.failed_requests += 1;
199
200 return Err(Neo3Error::Network(crate::neo_error::NetworkError::Timeout));
201 }
202 },
203 }
204 }
205 }
206
207 async fn get_connection(&self) -> Neo3Result<PooledConnection> {
209 {
211 let mut connections = self.connections.write().await;
212 while let Some(mut conn) = connections.pop_front() {
213 if !conn.is_expired(self.config.max_idle_time) && conn.is_healthy {
214 conn.mark_used();
215 return Ok(conn);
216 }
217 }
218 }
219
220 let connection = PooledConnection::new(&self.endpoint)?;
222
223 let mut stats = self.stats.write().await;
224 stats.total_connections_created += 1;
225 stats.current_active_connections += 1;
226
227 Ok(connection)
228 }
229
230 async fn return_connection(&self, connection: PooledConnection) {
232 if connection.is_healthy && !connection.is_expired(self.config.max_idle_time) {
233 let mut connections = self.connections.write().await;
234 connections.push_back(connection);
235
236 let mut stats = self.stats.write().await;
237 stats.current_active_connections = stats.current_active_connections.saturating_sub(1);
238 stats.current_idle_connections = connections.len();
239 } else {
240 let mut stats = self.stats.write().await;
241 stats.current_active_connections = stats.current_active_connections.saturating_sub(1);
242 }
243 }
244
245 pub async fn health_check(&self) {
247 let mut connections = self.connections.write().await;
248 let mut healthy_connections = VecDeque::new();
249
250 while let Some(mut conn) = connections.pop_front() {
251 if conn.health_check().await {
252 healthy_connections.push_back(conn);
253 }
254 }
255
256 *connections = healthy_connections;
257
258 let mut stats = self.stats.write().await;
259 stats.current_idle_connections = connections.len();
260 }
261
262 pub async fn get_stats(&self) -> PoolStats {
264 let stats = self.stats.read().await;
265 PoolStats {
266 total_connections_created: stats.total_connections_created,
267 total_requests: stats.total_requests,
268 successful_requests: stats.successful_requests,
269 failed_requests: stats.failed_requests,
270 retried_requests: stats.retried_requests,
271 current_active_connections: stats.current_active_connections,
272 current_idle_connections: stats.current_idle_connections,
273 }
274 }
275
276 pub async fn close(&self) {
278 let mut connections = self.connections.write().await;
279 connections.clear();
280
281 let mut stats = self.stats.write().await;
282 stats.current_active_connections = 0;
283 stats.current_idle_connections = 0;
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[tokio::test]
292 async fn test_pool_creation() {
293 let config = PoolConfig::default();
294 let pool = ConnectionPool::new("https://testnet.neo.org:443".to_string(), config);
295
296 let stats = pool.get_stats().await;
297 assert_eq!(stats.total_connections_created, 0);
298 assert_eq!(stats.current_active_connections, 0);
299 }
300
301 #[tokio::test]
302 async fn test_pool_stats() {
303 let config = PoolConfig { max_connections: 2, ..Default::default() };
304 let pool = ConnectionPool::new("https://testnet.neo.org:443".to_string(), config);
305
306 let result = pool
308 .execute(|client| {
309 Box::pin(async move {
310 client.get_version().await.map_err(|e| {
311 Neo3Error::Network(crate::neo_error::NetworkError::RpcError {
312 code: -1,
313 message: e.to_string(),
314 })
315 })
316 })
317 })
318 .await;
319
320 let stats = pool.get_stats().await;
322 assert!(stats.total_requests > 0);
323 }
324}