neo3/neo_clients/rpc/transports/
retry.rs1use std::{
5 fmt::Debug,
6 sync::atomic::{AtomicU32, Ordering},
7 time::Duration,
8};
9
10use super::{common::JsonRpcError, http_provider::ClientError};
11use crate::neo_clients::{JsonRpcProvider, ProviderError};
12use async_trait::async_trait;
13use reqwest::StatusCode;
14use serde::{de::DeserializeOwned, Deserialize, Serialize};
15use thiserror::Error;
16use tracing::trace;
17
18pub trait RetryPolicy<E>: Send + Sync + Debug {
21 fn should_retry(&self, error: &E) -> bool;
23
24 fn backoff_hint(&self, error: &E) -> Option<Duration>;
26}
27
28#[derive(Debug)]
52pub struct RetryClient<T>
53where
54 T: JsonRpcProvider,
55 T::Error: Sync + Send + 'static + Debug,
56{
57 inner: T,
58 requests_enqueued: AtomicU32,
59 policy: Box<dyn RetryPolicy<T::Error>>,
61 timeout_retries: u32,
63 rate_limit_retries: u32,
65 initial_backoff: Duration,
67 compute_units_per_second: u64,
69}
70
71impl<T> RetryClient<T>
72where
73 T: JsonRpcProvider,
74 T::Error: Sync + Send + 'static + Debug,
75{
76 pub fn new(
93 inner: T,
94 policy: Box<dyn RetryPolicy<T::Error>>,
95 max_retry: u32,
96 initial_backoff: u64,
98 ) -> Self {
99 RetryClientBuilder::default()
100 .initial_backoff(Duration::from_millis(initial_backoff))
101 .rate_limit_retries(max_retry)
102 .build(inner, policy)
103 }
104
105 pub fn set_compute_units(&mut self, cpus: u64) -> &mut Self {
112 self.compute_units_per_second = cpus;
113 self
114 }
115}
116
117#[derive(Debug, Clone, Eq, PartialEq)]
119pub struct RetryClientBuilder {
120 timeout_retries: u32,
122 rate_limit_retries: u32,
124 initial_backoff: Duration,
126 compute_units_per_second: u64,
128}
129
130impl RetryClientBuilder {
133 pub fn timeout_retries(mut self, timeout_retries: u32) -> Self {
137 self.timeout_retries = timeout_retries;
138 self
139 }
140
141 pub fn rate_limit_retries(mut self, rate_limit_retries: u32) -> Self {
143 self.rate_limit_retries = rate_limit_retries;
144 self
145 }
146
147 pub fn compute_units_per_second(mut self, compute_units_per_second: u64) -> Self {
151 self.compute_units_per_second = compute_units_per_second;
152 self
153 }
154
155 pub fn initial_backoff(mut self, initial_backoff: Duration) -> Self {
157 self.initial_backoff = initial_backoff;
158 self
159 }
160
161 pub fn build<T>(self, client: T, policy: Box<dyn RetryPolicy<T::Error>>) -> RetryClient<T>
163 where
164 T: JsonRpcProvider,
165 T::Error: Sync + Send + 'static + Debug,
166 {
167 let RetryClientBuilder {
168 timeout_retries,
169 rate_limit_retries,
170 initial_backoff,
171 compute_units_per_second,
172 } = self;
173 RetryClient {
174 inner: client,
175 requests_enqueued: AtomicU32::new(0),
176 policy,
177 timeout_retries,
178 rate_limit_retries,
179 initial_backoff,
180 compute_units_per_second,
181 }
182 }
183}
184
185impl Default for RetryClientBuilder {
187 fn default() -> Self {
188 Self {
189 timeout_retries: 3,
190 rate_limit_retries: 10,
192 initial_backoff: Duration::from_millis(1000),
193 compute_units_per_second: 330,
195 }
196 }
197}
198
199#[derive(Error, Debug)]
204pub enum RetryClientError {
205 #[error(transparent)]
207 ProviderError(ProviderError),
208 TimeoutError,
210 #[error(transparent)]
212 SerdeJson(serde_json::Error),
213}
214
215impl std::fmt::Display for RetryClientError {
216 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217 write!(f, "{self:?}")
218 }
219}
220
221impl From<RetryClientError> for ProviderError {
222 fn from(src: RetryClientError) -> Self {
223 match src {
224 RetryClientError::ProviderError(err) => err,
225 RetryClientError::SerdeJson(err) => err.into(),
227 _ => ProviderError::CustomError(src.to_string()),
228 }
229 }
230}
231
232#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
233#[cfg_attr(target_arch = "wasm32", async_trait(? Send))]
234impl<T> JsonRpcProvider for RetryClient<T>
235where
236 T: JsonRpcProvider + 'static,
237 T::Error: Sync + Send + 'static + Debug,
238{
239 type Error = RetryClientError;
240
241 async fn fetch<A, R>(&self, method: &str, params: A) -> Result<R, Self::Error>
242 where
243 A: Debug + Serialize + Send + Sync,
244 R: DeserializeOwned + Send,
245 {
246 enum RetryParams<Params> {
250 Value(Params),
251 Zst(()),
252 }
253
254 let params = if std::mem::size_of::<A>() == 0 {
255 RetryParams::Zst(())
256 } else {
257 let params = serde_json::to_value(params).map_err(RetryClientError::SerdeJson)?;
258 RetryParams::Value(params)
259 };
260
261 let ahead_in_queue = self.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64;
262
263 let mut rate_limit_retry_number: u32 = 0;
264 let mut timeout_retries: u32 = 0;
265
266 loop {
267 let err;
268
269 {
272 let resp = match params {
273 RetryParams::Value(ref params) => self.inner.fetch(method, params).await,
274 RetryParams::Zst(unit) => self.inner.fetch(method, unit).await,
275 };
276 match resp {
277 Ok(ret) => {
278 self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
279 return Ok(ret);
280 },
281 Err(err_) => err = err_,
282 }
283 }
284
285 let should_retry = self.policy.should_retry(&err);
286 if should_retry {
287 rate_limit_retry_number += 1;
288 if rate_limit_retry_number > self.rate_limit_retries {
289 trace!("request timed out after {} retries", self.rate_limit_retries);
290 return Err(RetryClientError::TimeoutError);
291 }
292
293 let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64;
294
295 let mut next_backoff = self.policy.backoff_hint(&err).unwrap_or_else(|| {
298 Duration::from_millis(self.initial_backoff.as_millis() as u64)
299 });
300
301 const AVG_COST: u64 = 17u64;
310 let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs(
311 AVG_COST,
312 self.compute_units_per_second,
313 current_queued_requests,
314 ahead_in_queue,
315 );
316 next_backoff += Duration::from_secs(seconds_to_wait_for_compute_budget);
317
318 trace!("retrying and backing off for {:?}", next_backoff);
319
320 #[cfg(target_arch = "wasm32")]
321 futures_timer::Delay::new(next_backoff).await;
322
323 #[cfg(not(target_arch = "wasm32"))]
324 tokio::time::sleep(next_backoff).await;
325 } else {
326 let err: ProviderError = err.into();
327 if timeout_retries < self.timeout_retries && maybe_connectivity(&err) {
328 timeout_retries += 1;
329 trace!(err = ?err, "retrying due to spurious network");
330 continue;
331 }
332
333 trace!(err = ?err, "should not retry");
334 self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
335 return Err(RetryClientError::ProviderError(err));
336 }
337 }
338 }
339}
340
341#[derive(Debug, Default)]
347pub struct HttpRateLimitRetryPolicy;
348
349impl RetryPolicy<ClientError> for HttpRateLimitRetryPolicy {
350 fn should_retry(&self, error: &ClientError) -> bool {
351 fn should_retry_json_rpc_error(err: &JsonRpcError) -> bool {
352 let JsonRpcError { code, message, .. } = err;
353 if *code == 429 {
355 return true;
356 }
357
358 if *code == -32005 {
360 return true;
361 }
362
363 if *code == -32016 && message.contains("rate limit") {
365 return true;
366 }
367
368 match message.as_str() {
369 "header not found" => true,
371 "daily request count exceeded, request rate limited" => true,
373 _ => false,
374 }
375 }
376
377 match error {
378 ClientError::ReqwestError(err) => err.status() == Some(StatusCode::TOO_MANY_REQUESTS),
379 ClientError::JsonRpcError(err) => should_retry_json_rpc_error(err),
380 ClientError::SerdeJson { text, .. } => {
381 #[derive(Deserialize)]
384 struct Resp {
385 error: JsonRpcError,
386 }
387
388 if let Ok(resp) = serde_json::from_str::<Resp>(text) {
389 return should_retry_json_rpc_error(&resp.error);
390 }
391 false
392 },
393 }
394 }
395
396 fn backoff_hint(&self, error: &ClientError) -> Option<Duration> {
397 if let ClientError::JsonRpcError(JsonRpcError { data, .. }) = error {
398 let data = data.as_ref()?;
399
400 let backoff_seconds = &data["rate"]["backoff_seconds"];
403 if let Some(seconds) = backoff_seconds.as_u64() {
405 return Some(Duration::from_secs(seconds));
406 }
407 if let Some(seconds) = backoff_seconds.as_f64() {
408 return Some(Duration::from_secs(seconds as u64 + 1));
409 }
410 }
411
412 None
413 }
414}
415
416fn compute_unit_offset_in_secs(
428 avg_cost: u64,
429 compute_units_per_second: u64,
430 current_queued_requests: u64,
431 ahead_in_queue: u64,
432) -> u64 {
433 let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost);
434 if current_queued_requests > request_capacity_per_second {
435 current_queued_requests
436 .min(ahead_in_queue)
437 .saturating_div(request_capacity_per_second)
438 } else {
439 0
440 }
441}
442
443fn maybe_connectivity(err: &ProviderError) -> bool {
446 if let ProviderError::HTTPError(reqwest_err) = err {
447 if reqwest_err.is_timeout() {
448 return true;
449 }
450
451 #[cfg(not(target_arch = "wasm32"))]
452 if reqwest_err.is_connect() {
453 return true;
454 }
455
456 if let Some(status) = reqwest_err.status() {
458 let code = status.as_u16();
459 if (500..600).contains(&code) {
460 return true;
461 }
462 }
463 }
464 false
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 const AVG_COST: u64 = 17u64;
473 const COMPUTE_UNITS: u64 = 330u64;
474
475 fn compute_offset(current_queued_requests: u64, ahead_in_queue: u64) -> u64 {
476 compute_unit_offset_in_secs(
477 AVG_COST,
478 COMPUTE_UNITS,
479 current_queued_requests,
480 ahead_in_queue,
481 )
482 }
483
484 #[test]
485 fn can_measure_unit_offset_single_request() {
486 let current_queued_requests = 1;
487 let ahead_in_queue = 0;
488 let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
489 assert_eq!(to_wait, 0);
490
491 let current_queued_requests = 19;
492 let ahead_in_queue = 18;
493 let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
494 assert_eq!(to_wait, 0);
495 }
496
497 #[test]
498 fn can_measure_unit_offset_1x_over_budget() {
499 let current_queued_requests = 20;
500 let ahead_in_queue = 19;
501 let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
502 assert_eq!(to_wait, 1);
504 }
505
506 #[test]
507 fn can_measure_unit_offset_2x_over_budget() {
508 let current_queued_requests = 49;
509 let ahead_in_queue = 48;
510 let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
511 assert_eq!(to_wait, 2);
513
514 let current_queued_requests = 49;
515 let ahead_in_queue = 20;
516 let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
517 assert_eq!(to_wait, 1);
519 }
520
521 #[test]
522 fn can_extract_backoff() {
523 let resp = r#"{"rate": {"allowed_rps": 1, "backoff_seconds": 30, "current_rps": 1.1}, "see": "https://infura.io/dashboard"}"#;
524
525 let err = ClientError::JsonRpcError(JsonRpcError {
526 code: 0,
527 message: "daily request count exceeded, request rate limited".to_string(),
528 data: Some(serde_json::from_str(resp).unwrap()),
529 });
530 let backoff = HttpRateLimitRetryPolicy.backoff_hint(&err).unwrap();
531 assert_eq!(backoff, Duration::from_secs(30));
532
533 let err = ClientError::JsonRpcError(JsonRpcError {
534 code: 0,
535 message: "daily request count exceeded, request rate limited".to_string(),
536 data: Some(serde_json::Value::String("blocked".to_string())),
537 });
538 let backoff = HttpRateLimitRetryPolicy.backoff_hint(&err);
539 assert!(backoff.is_none());
540 }
541
542 #[test]
543 fn test_alchemy_ip_rate_limit() {
544 let s = "{\"code\":-32016,\"message\":\"Your IP has exceeded its requests per second capacity. To increase your rate limits, please sign up for a free Alchemy account at https://www.alchemy.com/optimism.\"}";
545 let err: JsonRpcError = serde_json::from_str(s).unwrap();
546 let err = ClientError::JsonRpcError(err);
547
548 let should_retry = HttpRateLimitRetryPolicy.should_retry(&err);
549 assert!(should_retry);
550 }
551
552 #[test]
553 fn test_rate_limit_omitted_id() {
554 let s = r#"{"jsonrpc":"2.0","error":{"code":-32016,"message":"Your IP has exceeded its requests per second capacity. To increase your rate limits, please sign up for a free Alchemy account at https://www.alchemy.com/optimism."},"id":null}"#;
555
556 let err = ClientError::SerdeJson {
557 err: serde::de::Error::custom("unexpected notification over HTTP transport"),
558 text: s.to_string(),
559 };
560
561 let should_retry = HttpRateLimitRetryPolicy.should_retry(&err);
562 assert!(should_retry);
563 }
564}