neo3/neo_clients/rpc/transports/
retry.rs

1//! A [JsonRpcProvider] implementation that retries requests filtered by [RetryPolicy]
2//! with an exponential backoff.
3
4use std::{
5	fmt::Debug,
6	sync::atomic::{AtomicU32, Ordering},
7	time::Duration,
8};
9
10use super::{common::JsonRpcError, http_provider::ClientError};
11use crate::neo_clients::{JsonRpcProvider, ProviderError};
12use async_trait::async_trait;
13use reqwest::StatusCode;
14use serde::{de::DeserializeOwned, Deserialize, Serialize};
15use thiserror::Error;
16use tracing::trace;
17
18/// [RetryPolicy] defines logic for which [JsonRpcProvider::Error] instances should
19/// the client retry the request and try to recover from.
20pub trait RetryPolicy<E>: Send + Sync + Debug {
21	/// Whether to retry the request based on the given `error`
22	fn should_retry(&self, error: &E) -> bool;
23
24	/// Providers may include the `backoff` in the error response directly
25	fn backoff_hint(&self, error: &E) -> Option<Duration>;
26}
27
28/// [RetryClient] presents as a wrapper around [JsonRpcProvider] that will retry
29/// requests based with an exponential backoff and filtering based on [RetryPolicy].
30///
31/// The `RetryPolicy`, mainly for rate-limiting errors, can be adjusted for specific applications,
32/// endpoints. In addition to the `RetryPolicy` errors due to connectivity issues, like timed out
33/// connections or responses in range `5xx` can be retried separately.
34///
35/// # Example
36///
37/// ```
38/// use neo3::neo_clients::{HttpProvider, HttpRateLimitRetryPolicy, RetryClientBuilder};
39/// use std::time::Duration;
40/// use url::Url;
41///
42/// async fn demo() {
43///     let http = HttpProvider::new(Url::parse("http://localhost:8545").unwrap()).unwrap();
44///     let client = RetryClientBuilder::default()
45///         .rate_limit_retries(10)
46///         .timeout_retries(3)
47///         .initial_backoff(Duration::from_millis(500))
48///         .build(http, Box::new(HttpRateLimitRetryPolicy::default()));
49/// }
50/// ```
51#[derive(Debug)]
52pub struct RetryClient<T>
53where
54	T: JsonRpcProvider,
55	T::Error: Sync + Send + 'static + Debug,
56{
57	inner: T,
58	requests_enqueued: AtomicU32,
59	/// The policy to use to determine whether to retry a request due to rate limiting
60	policy: Box<dyn RetryPolicy<T::Error>>,
61	/// How many connection `TimedOut` should be retried.
62	timeout_retries: u32,
63	/// How many retries for rate limited responses
64	rate_limit_retries: u32,
65	/// How long to wait initially
66	initial_backoff: Duration,
67	/// available CPU per second
68	compute_units_per_second: u64,
69}
70
71impl<T> RetryClient<T>
72where
73	T: JsonRpcProvider,
74	T::Error: Sync + Send + 'static + Debug,
75{
76	/// Creates a new `RetryClient` that wraps a client and adds retry and backoff support
77	///
78	/// # Example
79	///
80	/// ```
81	/// use neo3::neo_clients::{HttpProvider, HttpRateLimitRetryPolicy, RetryClient};
82	/// use std::time::Duration;
83	/// use url::Url;
84	///
85	/// async fn demo() {
86	///     let http = HttpProvider::new(Url::parse("http://localhost:8545").unwrap()).unwrap();
87	///     let backoff_timeout = 3000; // in ms
88	///     let max_retries = 10;
89	///     let client = RetryClient::new(http, Box::new(HttpRateLimitRetryPolicy::default()), max_retries, backoff_timeout);
90	/// }
91	/// ```
92	pub fn new(
93		inner: T,
94		policy: Box<dyn RetryPolicy<T::Error>>,
95		max_retry: u32,
96		// in milliseconds
97		initial_backoff: u64,
98	) -> Self {
99		RetryClientBuilder::default()
100			.initial_backoff(Duration::from_millis(initial_backoff))
101			.rate_limit_retries(max_retry)
102			.build(inner, policy)
103	}
104
105	/// Sets the free compute units per second limit.
106	///
107	/// This is the maximum number of weighted request that can be handled per second by the
108	/// endpoint before rate limit kicks in.
109	///
110	/// This is used to guesstimate how long to wait until to retry again
111	pub fn set_compute_units(&mut self, cpus: u64) -> &mut Self {
112		self.compute_units_per_second = cpus;
113		self
114	}
115}
116
117/// Builder for a [`RetryClient`]
118#[derive(Debug, Clone, Eq, PartialEq)]
119pub struct RetryClientBuilder {
120	/// How many connection `TimedOut` should be retried.
121	timeout_retries: u32,
122	/// How many retries for rate limited responses
123	rate_limit_retries: u32,
124	/// How long to wait initially
125	initial_backoff: Duration,
126	/// available CPU per second
127	compute_units_per_second: u64,
128}
129
130// === impl RetryClientBuilder ===
131
132impl RetryClientBuilder {
133	/// Sets the number of retries after a connection times out
134	///
135	/// **Note:** this will only be used for `request::Error::TimedOut`
136	pub fn timeout_retries(mut self, timeout_retries: u32) -> Self {
137		self.timeout_retries = timeout_retries;
138		self
139	}
140
141	/// How many retries for rate limited responses
142	pub fn rate_limit_retries(mut self, rate_limit_retries: u32) -> Self {
143		self.rate_limit_retries = rate_limit_retries;
144		self
145	}
146
147	/// Sets the number of assumed available compute units per second
148	///
149	/// See also, <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
150	pub fn compute_units_per_second(mut self, compute_units_per_second: u64) -> Self {
151		self.compute_units_per_second = compute_units_per_second;
152		self
153	}
154
155	/// Sets the duration to wait initially before retrying
156	pub fn initial_backoff(mut self, initial_backoff: Duration) -> Self {
157		self.initial_backoff = initial_backoff;
158		self
159	}
160
161	/// Creates the `RetryClient` with the configured settings
162	pub fn build<T>(self, client: T, policy: Box<dyn RetryPolicy<T::Error>>) -> RetryClient<T>
163	where
164		T: JsonRpcProvider,
165		T::Error: Sync + Send + 'static + Debug,
166	{
167		let RetryClientBuilder {
168			timeout_retries,
169			rate_limit_retries,
170			initial_backoff,
171			compute_units_per_second,
172		} = self;
173		RetryClient {
174			inner: client,
175			requests_enqueued: AtomicU32::new(0),
176			policy,
177			timeout_retries,
178			rate_limit_retries,
179			initial_backoff,
180			compute_units_per_second,
181		}
182	}
183}
184
185// Some sensible defaults
186impl Default for RetryClientBuilder {
187	fn default() -> Self {
188		Self {
189			timeout_retries: 3,
190			// this should be enough to even out heavy loads
191			rate_limit_retries: 10,
192			initial_backoff: Duration::from_millis(1000),
193			// alchemy max cpus <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
194			compute_units_per_second: 330,
195		}
196	}
197}
198
199/// Error thrown when:
200/// 1. Internal client throws an error we do not wish to try to recover from.
201/// 2. Params serialization failed.
202/// 3. Request timed out i.e. max retries were already made.
203#[derive(Error, Debug)]
204pub enum RetryClientError {
205	/// Internal provider error
206	#[error(transparent)]
207	ProviderError(ProviderError),
208	/// Timeout while making requests
209	TimeoutError,
210	/// (De)Serialization error
211	#[error(transparent)]
212	SerdeJson(serde_json::Error),
213}
214
215impl std::fmt::Display for RetryClientError {
216	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217		write!(f, "{self:?}")
218	}
219}
220
221impl From<RetryClientError> for ProviderError {
222	fn from(src: RetryClientError) -> Self {
223		match src {
224			RetryClientError::ProviderError(err) => err,
225			// RetryClientError::TimeoutError => ProviderError::JsonRpcClientError(Box::new(src)),
226			RetryClientError::SerdeJson(err) => err.into(),
227			_ => ProviderError::CustomError(src.to_string()),
228		}
229	}
230}
231
232#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
233#[cfg_attr(target_arch = "wasm32", async_trait(? Send))]
234impl<T> JsonRpcProvider for RetryClient<T>
235where
236	T: JsonRpcProvider + 'static,
237	T::Error: Sync + Send + 'static + Debug,
238{
239	type Error = RetryClientError;
240
241	async fn fetch<A, R>(&self, method: &str, params: A) -> Result<R, Self::Error>
242	where
243		A: Debug + Serialize + Send + Sync,
244		R: DeserializeOwned + Send,
245	{
246		// Helper type that caches the `params` value across several retries
247		// This is necessary because the wrapper provider is supposed to skip he `params` if it's of
248		// size 0, see `crate::transports::common::Request`
249		enum RetryParams<Params> {
250			Value(Params),
251			Zst(()),
252		}
253
254		let params = if std::mem::size_of::<A>() == 0 {
255			RetryParams::Zst(())
256		} else {
257			let params = serde_json::to_value(params).map_err(RetryClientError::SerdeJson)?;
258			RetryParams::Value(params)
259		};
260
261		let ahead_in_queue = self.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64;
262
263		let mut rate_limit_retry_number: u32 = 0;
264		let mut timeout_retries: u32 = 0;
265
266		loop {
267			let err;
268
269			// hack to not hold `R` across an await in the sleep future and prevent requiring
270			// R: Send + Sync
271			{
272				let resp = match params {
273					RetryParams::Value(ref params) => self.inner.fetch(method, params).await,
274					RetryParams::Zst(unit) => self.inner.fetch(method, unit).await,
275				};
276				match resp {
277					Ok(ret) => {
278						self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
279						return Ok(ret);
280					},
281					Err(err_) => err = err_,
282				}
283			}
284
285			let should_retry = self.policy.should_retry(&err);
286			if should_retry {
287				rate_limit_retry_number += 1;
288				if rate_limit_retry_number > self.rate_limit_retries {
289					trace!("request timed out after {} retries", self.rate_limit_retries);
290					return Err(RetryClientError::TimeoutError);
291				}
292
293				let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64;
294
295				// try to extract the requested backoff from the error or compute the next backoff
296				// based on retry count
297				let mut next_backoff = self.policy.backoff_hint(&err).unwrap_or_else(|| {
298					Duration::from_millis(self.initial_backoff.as_millis() as u64)
299				});
300
301				// requests are usually weighted and can vary from 10 CU to several 100 CU, cheaper
302				// requests are more common some example alchemy weights:
303				// - `neo_getStorageAt`: 17
304				// - `neo_getBlockByNumber`: 16
305				// - `neo_newFilter`: 20
306				//
307				// (coming from forking mode) assuming here that storage request will be the driver
308				// for Rate limits we choose `17` as the average cost of any request
309				const AVG_COST: u64 = 17u64;
310				let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs(
311					AVG_COST,
312					self.compute_units_per_second,
313					current_queued_requests,
314					ahead_in_queue,
315				);
316				next_backoff += Duration::from_secs(seconds_to_wait_for_compute_budget);
317
318				trace!("retrying and backing off for {:?}", next_backoff);
319
320				#[cfg(target_arch = "wasm32")]
321				futures_timer::Delay::new(next_backoff).await;
322
323				#[cfg(not(target_arch = "wasm32"))]
324				tokio::time::sleep(next_backoff).await;
325			} else {
326				let err: ProviderError = err.into();
327				if timeout_retries < self.timeout_retries && maybe_connectivity(&err) {
328					timeout_retries += 1;
329					trace!(err = ?err, "retrying due to spurious network");
330					continue;
331				}
332
333				trace!(err = ?err, "should not retry");
334				self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
335				return Err(RetryClientError::ProviderError(err));
336			}
337		}
338	}
339}
340
341/// Implements [RetryPolicy] that will retry requests that errored with
342/// status code 429 i.e. TOO_MANY_REQUESTS
343///
344/// Infura often fails with a `"header not found"` rpc error which is apparently linked to load
345/// balancing, which are retried as well.
346#[derive(Debug, Default)]
347pub struct HttpRateLimitRetryPolicy;
348
349impl RetryPolicy<ClientError> for HttpRateLimitRetryPolicy {
350	fn should_retry(&self, error: &ClientError) -> bool {
351		fn should_retry_json_rpc_error(err: &JsonRpcError) -> bool {
352			let JsonRpcError { code, message, .. } = err;
353			// alchemy throws it this way
354			if *code == 429 {
355				return true;
356			}
357
358			// This is an infura error code for `exceeded project rate limit`
359			if *code == -32005 {
360				return true;
361			}
362
363			// alternative alchemy error for specific IPs
364			if *code == -32016 && message.contains("rate limit") {
365				return true;
366			}
367
368			match message.as_str() {
369				// this is commonly thrown by infura and is apparently a load balancer issue, see also <https://github.com/MetaMask/metamask-extension/issues/7234>
370				"header not found" => true,
371				// also thrown by infura if out of budget for the day and ratelimited
372				"daily request count exceeded, request rate limited" => true,
373				_ => false,
374			}
375		}
376
377		match error {
378			ClientError::ReqwestError(err) => err.status() == Some(StatusCode::TOO_MANY_REQUESTS),
379			ClientError::JsonRpcError(err) => should_retry_json_rpc_error(err),
380			ClientError::SerdeJson { text, .. } => {
381				// some providers send invalid JSON RPC in the error case (no `id:u64`), but the
382				// text should be a `JsonRpcError`
383				#[derive(Deserialize)]
384				struct Resp {
385					error: JsonRpcError,
386				}
387
388				if let Ok(resp) = serde_json::from_str::<Resp>(text) {
389					return should_retry_json_rpc_error(&resp.error);
390				}
391				false
392			},
393		}
394	}
395
396	fn backoff_hint(&self, error: &ClientError) -> Option<Duration> {
397		if let ClientError::JsonRpcError(JsonRpcError { data, .. }) = error {
398			let data = data.as_ref()?;
399
400			// if daily rate limit exceeded, infura returns the requested backoff in the error
401			// response
402			let backoff_seconds = &data["rate"]["backoff_seconds"];
403			// infura rate limit error
404			if let Some(seconds) = backoff_seconds.as_u64() {
405				return Some(Duration::from_secs(seconds));
406			}
407			if let Some(seconds) = backoff_seconds.as_f64() {
408				return Some(Duration::from_secs(seconds as u64 + 1));
409			}
410		}
411
412		None
413	}
414}
415
416/// Calculates an offset in seconds by taking into account the number of currently queued requests,
417/// number of requests that were ahead in the queue when the request was first issued, the average
418/// cost a weighted request (heuristic), and the number of available compute units per seconds.
419///
420/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request
421/// is supposed to wait to not get rate limited. The budget per second is
422/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory)
423/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited.
424/// By taking into account the number of concurrent request and the position in queue when the
425/// request was first issued and determine the number of seconds a request is supposed to wait, if
426/// at all
427fn compute_unit_offset_in_secs(
428	avg_cost: u64,
429	compute_units_per_second: u64,
430	current_queued_requests: u64,
431	ahead_in_queue: u64,
432) -> u64 {
433	let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost);
434	if current_queued_requests > request_capacity_per_second {
435		current_queued_requests
436			.min(ahead_in_queue)
437			.saturating_div(request_capacity_per_second)
438	} else {
439		0
440	}
441}
442
443/// Checks whether the `error` is the result of a connectivity issue, like
444/// `request::Error::TimedOut`
445fn maybe_connectivity(err: &ProviderError) -> bool {
446	if let ProviderError::HTTPError(reqwest_err) = err {
447		if reqwest_err.is_timeout() {
448			return true;
449		}
450
451		#[cfg(not(target_arch = "wasm32"))]
452		if reqwest_err.is_connect() {
453			return true;
454		}
455
456		// Error HTTP codes (5xx) are considered connectivity issues and will prompt retry
457		if let Some(status) = reqwest_err.status() {
458			let code = status.as_u16();
459			if (500..600).contains(&code) {
460				return true;
461			}
462		}
463	}
464	false
465}
466
467#[cfg(test)]
468mod tests {
469	use super::*;
470
471	// assumed average cost of a request
472	const AVG_COST: u64 = 17u64;
473	const COMPUTE_UNITS: u64 = 330u64;
474
475	fn compute_offset(current_queued_requests: u64, ahead_in_queue: u64) -> u64 {
476		compute_unit_offset_in_secs(
477			AVG_COST,
478			COMPUTE_UNITS,
479			current_queued_requests,
480			ahead_in_queue,
481		)
482	}
483
484	#[test]
485	fn can_measure_unit_offset_single_request() {
486		let current_queued_requests = 1;
487		let ahead_in_queue = 0;
488		let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
489		assert_eq!(to_wait, 0);
490
491		let current_queued_requests = 19;
492		let ahead_in_queue = 18;
493		let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
494		assert_eq!(to_wait, 0);
495	}
496
497	#[test]
498	fn can_measure_unit_offset_1x_over_budget() {
499		let current_queued_requests = 20;
500		let ahead_in_queue = 19;
501		let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
502		// need to wait 1 second
503		assert_eq!(to_wait, 1);
504	}
505
506	#[test]
507	fn can_measure_unit_offset_2x_over_budget() {
508		let current_queued_requests = 49;
509		let ahead_in_queue = 48;
510		let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
511		// need to wait 1 second
512		assert_eq!(to_wait, 2);
513
514		let current_queued_requests = 49;
515		let ahead_in_queue = 20;
516		let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
517		// need to wait 1 second
518		assert_eq!(to_wait, 1);
519	}
520
521	#[test]
522	fn can_extract_backoff() {
523		let resp = r#"{"rate": {"allowed_rps": 1, "backoff_seconds": 30, "current_rps": 1.1}, "see": "https://infura.io/dashboard"}"#;
524
525		let err = ClientError::JsonRpcError(JsonRpcError {
526			code: 0,
527			message: "daily request count exceeded, request rate limited".to_string(),
528			data: Some(serde_json::from_str(resp).unwrap()),
529		});
530		let backoff = HttpRateLimitRetryPolicy.backoff_hint(&err).unwrap();
531		assert_eq!(backoff, Duration::from_secs(30));
532
533		let err = ClientError::JsonRpcError(JsonRpcError {
534			code: 0,
535			message: "daily request count exceeded, request rate limited".to_string(),
536			data: Some(serde_json::Value::String("blocked".to_string())),
537		});
538		let backoff = HttpRateLimitRetryPolicy.backoff_hint(&err);
539		assert!(backoff.is_none());
540	}
541
542	#[test]
543	fn test_alchemy_ip_rate_limit() {
544		let s = "{\"code\":-32016,\"message\":\"Your IP has exceeded its requests per second capacity. To increase your rate limits, please sign up for a free Alchemy account at https://www.alchemy.com/optimism.\"}";
545		let err: JsonRpcError = serde_json::from_str(s).unwrap();
546		let err = ClientError::JsonRpcError(err);
547
548		let should_retry = HttpRateLimitRetryPolicy.should_retry(&err);
549		assert!(should_retry);
550	}
551
552	#[test]
553	fn test_rate_limit_omitted_id() {
554		let s = r#"{"jsonrpc":"2.0","error":{"code":-32016,"message":"Your IP has exceeded its requests per second capacity. To increase your rate limits, please sign up for a free Alchemy account at https://www.alchemy.com/optimism."},"id":null}"#;
555
556		let err = ClientError::SerdeJson {
557			err: serde::de::Error::custom("unexpected notification over HTTP transport"),
558			text: s.to_string(),
559		};
560
561		let should_retry = HttpRateLimitRetryPolicy.should_retry(&err);
562		assert!(should_retry);
563	}
564}