renegade_sdk/renegade_wallet_client/websocket/
client.rs

1//! The websocket client for listening to Renegade events
2
3use std::sync::{Arc, OnceLock};
4use std::{collections::HashMap, time::Duration};
5
6use crate::auth::HmacKey;
7use futures_util::Stream;
8use futures_util::stream::SplitSink;
9use renegade_external_api::types::websocket::ServerWebsocketMessageBody;
10use renegade_external_api::types::{
11    AdminBalanceUpdateMessage, AdminOrderUpdateMessage, BalanceUpdateMessage, FillMessage,
12    OrderUpdateMessage, TaskUpdateMessage,
13};
14use tokio::net::TcpStream;
15use tokio::sync::{
16    OnceCell as AsyncOnceCell, RwLock,
17    mpsc::{self, UnboundedReceiver, UnboundedSender},
18};
19use tokio_stream::StreamExt;
20use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
21use tokio_tungstenite::{connect_async, tungstenite::Message};
22use tracing::{error, warn};
23use uuid::Uuid;
24
25use crate::websocket::subscriptions::{
26    SubscriptionManager, SubscriptionRx, SubscriptionTx, TopicStream,
27};
28use crate::websocket::{TaskWaiter, TaskWaiterManager};
29use crate::{RenegadeClientError, renegade_wallet_client::config::RenegadeClientConfig};
30
31// -------------
32// | Constants |
33// -------------
34
35/// The default port on which relayers run websocket servers
36const DEFAULT_WS_PORT: u16 = 4000;
37
38/// The delay between websocket reconnection attempts
39const WS_RECONNECTION_DELAY: Duration = Duration::from_secs(1);
40
41/// The admin balances websocket topic
42pub const ADMIN_BALANCES_TOPIC: &str = "/v2/admin/balances";
43
44/// The admin orders websocket topic
45pub const ADMIN_ORDERS_TOPIC: &str = "/v2/admin/orders";
46
47// ---------
48// | Types |
49// ---------
50
51/// A read/write websocket stream
52pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
53/// A websocket sink (write end)
54pub type WsSink = SplitSink<WsStream, Message>;
55
56/// A channel on which to request websocket subscriptions
57pub type SubscribeTx = UnboundedSender<Uuid>;
58/// A channel on which to receive websocket subscriptions
59pub type SubscribeRx = UnboundedReceiver<Uuid>;
60/// A shared map type
61pub type SharedMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
62
63// -----------
64// | Helpers |
65// -----------
66
67/// Create a new subscription channel
68pub fn create_subscription_channel() -> (SubscriptionTx, SubscriptionRx) {
69    mpsc::unbounded_channel()
70}
71
72// --------------------
73// | Websocket Client |
74// --------------------
75
76/// The websocket client for listening to Renegade events
77#[derive(Clone)]
78pub struct RenegadeWebsocketClient {
79    /// The base url of the websocket server
80    base_url: String,
81    /// The account ID
82    account_id: Uuid,
83    /// The account's HMAC key
84    auth_hmac_key: HmacKey,
85    /// The admin HMAC key used to authenticate admin websocket topic
86    /// subscriptions
87    admin_hmac_key: Option<HmacKey>,
88    /// The topic subscription manager. This is lazily initialized along with
89    /// the underlying websocket connection when the first subscription
90    /// request is made.
91    subscriptions: OnceLock<Arc<SubscriptionManager>>,
92    /// The task waiter manager. This is lazily initialized when the first task
93    /// waiter is created.
94    task_waiter_manager: AsyncOnceCell<Arc<TaskWaiterManager>>,
95}
96
97impl RenegadeWebsocketClient {
98    /// Create a new websocket client
99    pub fn new(
100        config: &RenegadeClientConfig,
101        account_id: Uuid,
102        auth_hmac_key: HmacKey,
103        admin_hmac_key: Option<HmacKey>,
104    ) -> Self {
105        let base_url = config.relayer_base_url.replace("http", "ws");
106        let base_url = format!("{base_url}:{DEFAULT_WS_PORT}");
107
108        Self {
109            base_url,
110            account_id,
111            auth_hmac_key,
112            admin_hmac_key,
113            subscriptions: OnceLock::new(),
114            task_waiter_manager: AsyncOnceCell::new(),
115        }
116    }
117
118    // -----------------
119    // | Subscriptions |
120    // -----------------
121
122    /// Subscribe to a new websocket topic
123    async fn subscribe_to_topic(&self, topic: String) -> Result<TopicStream, RenegadeClientError> {
124        self.ensure_subscriptions_initialized();
125
126        let subscriptions = self.subscriptions.get().unwrap();
127        subscriptions.subscribe_to_topic(topic).await
128    }
129
130    // --- Tasks --- //
131
132    /// Subscribe to the account's task updates stream
133    pub async fn subscribe_task_updates(
134        &self,
135    ) -> Result<impl Stream<Item = TaskUpdateMessage> + use<>, RenegadeClientError> {
136        let stream = self.subscribe_to_topic(self.tasks_topic()).await?;
137
138        let filtered_stream = stream.filter_map(|maybe_ws_msg| {
139            maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
140                ServerWebsocketMessageBody::TaskUpdate(message) => Some(message),
141                _ => None,
142            })
143        });
144
145        Ok(filtered_stream)
146    }
147
148    /// Construct the account's task updates topic name
149    fn tasks_topic(&self) -> String {
150        format!("/v2/account/{}/tasks", self.account_id)
151    }
152
153    // --- Balances --- //
154
155    /// Subscribe to the account's balance updates stream
156    pub async fn subscribe_balance_updates(
157        &self,
158    ) -> Result<impl Stream<Item = BalanceUpdateMessage>, RenegadeClientError> {
159        let stream = self.subscribe_to_topic(self.balances_topic()).await?;
160
161        let filtered_stream = stream.filter_map(|maybe_ws_msg| {
162            maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
163                ServerWebsocketMessageBody::BalanceUpdate(message) => Some(message),
164                _ => None,
165            })
166        });
167
168        Ok(filtered_stream)
169    }
170
171    /// Construct the account's balance updates topic name
172    fn balances_topic(&self) -> String {
173        format!("/v2/account/{}/balances", self.account_id)
174    }
175
176    // --- Orders --- //
177
178    /// Subscribe to the account's order updates stream
179    pub async fn subscribe_order_updates(
180        &self,
181    ) -> Result<impl Stream<Item = OrderUpdateMessage>, RenegadeClientError> {
182        let stream = self.subscribe_to_topic(self.orders_topic()).await?;
183
184        let filtered_stream = stream.filter_map(|maybe_ws_msg| {
185            maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
186                ServerWebsocketMessageBody::OrderUpdate(message) => Some(message),
187                _ => None,
188            })
189        });
190
191        Ok(filtered_stream)
192    }
193
194    /// Construct the account's order updates topic name
195    fn orders_topic(&self) -> String {
196        format!("/v2/account/{}/orders", self.account_id)
197    }
198
199    // --- Fills --- //
200
201    /// Subscribe to the account's fills stream
202    pub async fn subscribe_fills(
203        &self,
204    ) -> Result<impl Stream<Item = FillMessage>, RenegadeClientError> {
205        let stream = self.subscribe_to_topic(self.fills_topic()).await?;
206
207        let filtered_stream = stream.filter_map(|maybe_ws_msg| {
208            maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
209                ServerWebsocketMessageBody::Fill(message) => Some(message),
210                _ => None,
211            })
212        });
213
214        Ok(filtered_stream)
215    }
216
217    /// Construct the account's fills topic name
218    fn fills_topic(&self) -> String {
219        format!("/v2/account/{}/fills", self.account_id)
220    }
221
222    // --- Admin --- //
223
224    /// Subscribe to the admin balances updates stream
225    pub async fn subscribe_admin_balance_updates(
226        &self,
227    ) -> Result<impl Stream<Item = AdminBalanceUpdateMessage>, RenegadeClientError> {
228        let stream = self.subscribe_to_topic(ADMIN_BALANCES_TOPIC.to_string()).await?;
229
230        let filtered_stream = stream.filter_map(|maybe_ws_msg| {
231            maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
232                ServerWebsocketMessageBody::AdminBalanceUpdate(message) => Some(message),
233                _ => None,
234            })
235        });
236
237        Ok(filtered_stream)
238    }
239
240    /// Subscribe to the admin order updates stream
241    pub async fn subscribe_admin_order_updates(
242        &self,
243    ) -> Result<impl Stream<Item = AdminOrderUpdateMessage>, RenegadeClientError> {
244        let stream = self.subscribe_to_topic(ADMIN_ORDERS_TOPIC.to_string()).await?;
245
246        let filtered_stream = stream.filter_map(|maybe_ws_msg| {
247            maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
248                ServerWebsocketMessageBody::AdminOrderUpdate(message) => Some(message),
249                _ => None,
250            })
251        });
252
253        Ok(filtered_stream)
254    }
255
256    // ----------------
257    // | Task Waiters |
258    // ----------------
259
260    /// Subscribe to a new task's status
261    pub async fn watch_task(
262        &self,
263        task_id: Uuid,
264        timeout: Duration,
265    ) -> Result<TaskWaiter, RenegadeClientError> {
266        let task_waiter_manager = self.ensure_task_waiters_initialized().await?;
267
268        Ok(task_waiter_manager.create_task_waiter(task_id, timeout).await)
269    }
270
271    /// Ensure that the task waiter manager is initialized with a subscription
272    /// to the tasks topic.
273    async fn ensure_task_waiters_initialized(
274        &self,
275    ) -> Result<&Arc<TaskWaiterManager>, RenegadeClientError> {
276        let this = self.clone();
277        self.task_waiter_manager
278            .get_or_try_init(|| async move {
279                let tasks_topic = this.subscribe_task_updates().await?;
280                Ok(Arc::new(TaskWaiterManager::new(tasks_topic)))
281            })
282            .await
283    }
284
285    // ----------------------
286    // | Connection Handler |
287    // ----------------------
288
289    /// Connect to the websocket server & initialize the subscription manager.
290    ///
291    /// This will ensure that the websocket connection is only ever initialized
292    /// once if it is needed. The initialization spawns a thread which handles
293    /// websocket reconnection & subscription management.
294    fn ensure_subscriptions_initialized(&self) {
295        self.subscriptions.get_or_init(|| {
296            let (subscriptions_tx, subscriptions_rx) = create_subscription_channel();
297            let subscriptions = Arc::new(SubscriptionManager::new(
298                self.auth_hmac_key,
299                self.admin_hmac_key,
300                subscriptions_tx,
301            ));
302
303            tokio::spawn(Self::ws_reconnection_loop(
304                self.base_url.clone(),
305                subscriptions.clone(),
306                subscriptions_rx,
307            ));
308
309            subscriptions
310        });
311    }
312
313    /// Websocket reconnection loop. Re-establishes the websocket connection
314    /// if it could not be established or was closed for any reason.
315    async fn ws_reconnection_loop(
316        base_url: String,
317        subscriptions: Arc<SubscriptionManager>,
318        mut subscriptions_rx: SubscriptionRx,
319    ) {
320        loop {
321            let maybe_ws_stream = connect_async(&base_url).await;
322            match maybe_ws_stream {
323                Ok((ws_stream, _)) => {
324                    subscriptions.manage_subscriptions(ws_stream, &mut subscriptions_rx).await;
325                },
326                Err(e) => {
327                    error!("Error connecting to websocket: {e}");
328                },
329            }
330
331            warn!("Re-establishing websocket connection...");
332            tokio::time::sleep(WS_RECONNECTION_DELAY).await;
333        }
334    }
335}