neo3/neo_clients/rpc/
pubsub.rs1use std::{
2 collections::VecDeque,
3 marker::PhantomData,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use crate::neo_clients::{JsonRpcProvider, RpcClient};
9use futures_util::stream::Stream;
10use pin_project::{pin_project, pinned_drop};
11use primitive_types::U256;
12use serde::de::DeserializeOwned;
13use serde_json::value::RawValue;
14use tracing::error;
15
16pub trait PubsubClient: JsonRpcProvider {
18 type NotificationStream: futures_core::Stream<Item = Box<RawValue>> + Send + Unpin;
20
21 fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, Self::Error>;
23
24 fn unsubscribe<T: Into<U256>>(&self, id: T) -> Result<(), Self::Error>;
26}
27
28#[must_use = "subscriptions do nothing unless you stream them"]
29#[pin_project(PinnedDrop)]
30pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> {
32 pub id: U256,
34
35 loaded_elements: VecDeque<R>,
36
37 pub(crate) provider: &'a RpcClient<P>,
38
39 #[pin]
40 rx: P::NotificationStream,
41
42 ret: PhantomData<R>,
43}
44
45impl<'a, P, R> SubscriptionStream<'a, P, R>
46where
47 P: PubsubClient,
48 R: DeserializeOwned,
49{
50 pub fn new(id: U256, provider: &'a RpcClient<P>) -> Result<Self, P::Error> {
58 let rx = provider.as_ref().subscribe(id)?;
60 Ok(Self { id, provider, rx, ret: PhantomData, loaded_elements: VecDeque::new() })
61 }
62
63 pub fn set_loaded_elements(&mut self, loaded_elements: VecDeque<R>) {
71 self.loaded_elements = loaded_elements;
72 }
73}
74
75impl<'a, P, R> Stream for SubscriptionStream<'a, P, R>
81where
82 P: PubsubClient,
83 R: DeserializeOwned,
84{
85 type Item = R;
86
87 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
88 if !self.loaded_elements.is_empty() {
89 let next_element = self.get_mut().loaded_elements.pop_front();
90 return Poll::Ready(next_element);
91 }
92
93 let mut this = self.project();
94 loop {
95 return match futures_util::ready!(this.rx.as_mut().poll_next(ctx)) {
96 Some(item) => match serde_json::from_str(item.get()) {
97 Ok(res) => Poll::Ready(Some(res)),
98 Err(err) => {
99 error!("failed to deserialize item {:?}", err);
100 continue;
101 },
102 },
103 None => Poll::Ready(None),
104 };
105 }
106 }
107}
108
109#[pinned_drop]
110impl<P, R> PinnedDrop for SubscriptionStream<'_, P, R>
111where
112 P: PubsubClient,
113 R: DeserializeOwned,
114{
115 fn drop(self: Pin<&mut Self>) {
116 let _ = (*self.provider).as_ref().unsubscribe(self.id);
120 }
121}