renegade_sdk/renegade_wallet_client/websocket/
client.rs1use std::sync::{Arc, OnceLock};
4use std::{collections::HashMap, time::Duration};
5
6use crate::auth::HmacKey;
7use futures_util::Stream;
8use futures_util::stream::SplitSink;
9use renegade_external_api::types::websocket::ServerWebsocketMessageBody;
10use renegade_external_api::types::{
11 AdminBalanceUpdateMessage, AdminOrderUpdateMessage, BalanceUpdateMessage, FillMessage,
12 OrderUpdateMessage, TaskUpdateMessage,
13};
14use tokio::net::TcpStream;
15use tokio::sync::{
16 OnceCell as AsyncOnceCell, RwLock,
17 mpsc::{self, UnboundedReceiver, UnboundedSender},
18};
19use tokio_stream::StreamExt;
20use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
21use tokio_tungstenite::{connect_async, tungstenite::Message};
22use tracing::{error, warn};
23use uuid::Uuid;
24
25use crate::websocket::subscriptions::{
26 SubscriptionManager, SubscriptionRx, SubscriptionTx, TopicStream,
27};
28use crate::websocket::{TaskWaiter, TaskWaiterManager};
29use crate::{RenegadeClientError, renegade_wallet_client::config::RenegadeClientConfig};
30
31const DEFAULT_WS_PORT: u16 = 4000;
37
38const WS_RECONNECTION_DELAY: Duration = Duration::from_secs(1);
40
41pub const ADMIN_BALANCES_TOPIC: &str = "/v2/admin/balances";
43
44pub const ADMIN_ORDERS_TOPIC: &str = "/v2/admin/orders";
46
47pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
53pub type WsSink = SplitSink<WsStream, Message>;
55
56pub type SubscribeTx = UnboundedSender<Uuid>;
58pub type SubscribeRx = UnboundedReceiver<Uuid>;
60pub type SharedMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
62
63pub fn create_subscription_channel() -> (SubscriptionTx, SubscriptionRx) {
69 mpsc::unbounded_channel()
70}
71
72#[derive(Clone)]
78pub struct RenegadeWebsocketClient {
79 base_url: String,
81 account_id: Uuid,
83 auth_hmac_key: HmacKey,
85 admin_hmac_key: Option<HmacKey>,
88 subscriptions: OnceLock<Arc<SubscriptionManager>>,
92 task_waiter_manager: AsyncOnceCell<Arc<TaskWaiterManager>>,
95}
96
97impl RenegadeWebsocketClient {
98 pub fn new(
100 config: &RenegadeClientConfig,
101 account_id: Uuid,
102 auth_hmac_key: HmacKey,
103 admin_hmac_key: Option<HmacKey>,
104 ) -> Self {
105 let base_url = config.relayer_base_url.replace("http", "ws");
106 let base_url = format!("{base_url}:{DEFAULT_WS_PORT}");
107
108 Self {
109 base_url,
110 account_id,
111 auth_hmac_key,
112 admin_hmac_key,
113 subscriptions: OnceLock::new(),
114 task_waiter_manager: AsyncOnceCell::new(),
115 }
116 }
117
118 async fn subscribe_to_topic(&self, topic: String) -> Result<TopicStream, RenegadeClientError> {
124 self.ensure_subscriptions_initialized();
125
126 let subscriptions = self.subscriptions.get().unwrap();
127 subscriptions.subscribe_to_topic(topic).await
128 }
129
130 pub async fn subscribe_task_updates(
134 &self,
135 ) -> Result<impl Stream<Item = TaskUpdateMessage> + use<>, RenegadeClientError> {
136 let stream = self.subscribe_to_topic(self.tasks_topic()).await?;
137
138 let filtered_stream = stream.filter_map(|maybe_ws_msg| {
139 maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
140 ServerWebsocketMessageBody::TaskUpdate(message) => Some(message),
141 _ => None,
142 })
143 });
144
145 Ok(filtered_stream)
146 }
147
148 fn tasks_topic(&self) -> String {
150 format!("/v2/account/{}/tasks", self.account_id)
151 }
152
153 pub async fn subscribe_balance_updates(
157 &self,
158 ) -> Result<impl Stream<Item = BalanceUpdateMessage>, RenegadeClientError> {
159 let stream = self.subscribe_to_topic(self.balances_topic()).await?;
160
161 let filtered_stream = stream.filter_map(|maybe_ws_msg| {
162 maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
163 ServerWebsocketMessageBody::BalanceUpdate(message) => Some(message),
164 _ => None,
165 })
166 });
167
168 Ok(filtered_stream)
169 }
170
171 fn balances_topic(&self) -> String {
173 format!("/v2/account/{}/balances", self.account_id)
174 }
175
176 pub async fn subscribe_order_updates(
180 &self,
181 ) -> Result<impl Stream<Item = OrderUpdateMessage>, RenegadeClientError> {
182 let stream = self.subscribe_to_topic(self.orders_topic()).await?;
183
184 let filtered_stream = stream.filter_map(|maybe_ws_msg| {
185 maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
186 ServerWebsocketMessageBody::OrderUpdate(message) => Some(message),
187 _ => None,
188 })
189 });
190
191 Ok(filtered_stream)
192 }
193
194 fn orders_topic(&self) -> String {
196 format!("/v2/account/{}/orders", self.account_id)
197 }
198
199 pub async fn subscribe_fills(
203 &self,
204 ) -> Result<impl Stream<Item = FillMessage>, RenegadeClientError> {
205 let stream = self.subscribe_to_topic(self.fills_topic()).await?;
206
207 let filtered_stream = stream.filter_map(|maybe_ws_msg| {
208 maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
209 ServerWebsocketMessageBody::Fill(message) => Some(message),
210 _ => None,
211 })
212 });
213
214 Ok(filtered_stream)
215 }
216
217 fn fills_topic(&self) -> String {
219 format!("/v2/account/{}/fills", self.account_id)
220 }
221
222 pub async fn subscribe_admin_balance_updates(
226 &self,
227 ) -> Result<impl Stream<Item = AdminBalanceUpdateMessage>, RenegadeClientError> {
228 let stream = self.subscribe_to_topic(ADMIN_BALANCES_TOPIC.to_string()).await?;
229
230 let filtered_stream = stream.filter_map(|maybe_ws_msg| {
231 maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
232 ServerWebsocketMessageBody::AdminBalanceUpdate(message) => Some(message),
233 _ => None,
234 })
235 });
236
237 Ok(filtered_stream)
238 }
239
240 pub async fn subscribe_admin_order_updates(
242 &self,
243 ) -> Result<impl Stream<Item = AdminOrderUpdateMessage>, RenegadeClientError> {
244 let stream = self.subscribe_to_topic(ADMIN_ORDERS_TOPIC.to_string()).await?;
245
246 let filtered_stream = stream.filter_map(|maybe_ws_msg| {
247 maybe_ws_msg.ok().and_then(|ws_msg| match ws_msg {
248 ServerWebsocketMessageBody::AdminOrderUpdate(message) => Some(message),
249 _ => None,
250 })
251 });
252
253 Ok(filtered_stream)
254 }
255
256 pub async fn watch_task(
262 &self,
263 task_id: Uuid,
264 timeout: Duration,
265 ) -> Result<TaskWaiter, RenegadeClientError> {
266 let task_waiter_manager = self.ensure_task_waiters_initialized().await?;
267
268 Ok(task_waiter_manager.create_task_waiter(task_id, timeout).await)
269 }
270
271 async fn ensure_task_waiters_initialized(
274 &self,
275 ) -> Result<&Arc<TaskWaiterManager>, RenegadeClientError> {
276 let this = self.clone();
277 self.task_waiter_manager
278 .get_or_try_init(|| async move {
279 let tasks_topic = this.subscribe_task_updates().await?;
280 Ok(Arc::new(TaskWaiterManager::new(tasks_topic)))
281 })
282 .await
283 }
284
285 fn ensure_subscriptions_initialized(&self) {
295 self.subscriptions.get_or_init(|| {
296 let (subscriptions_tx, subscriptions_rx) = create_subscription_channel();
297 let subscriptions = Arc::new(SubscriptionManager::new(
298 self.auth_hmac_key,
299 self.admin_hmac_key,
300 subscriptions_tx,
301 ));
302
303 tokio::spawn(Self::ws_reconnection_loop(
304 self.base_url.clone(),
305 subscriptions.clone(),
306 subscriptions_rx,
307 ));
308
309 subscriptions
310 });
311 }
312
313 async fn ws_reconnection_loop(
316 base_url: String,
317 subscriptions: Arc<SubscriptionManager>,
318 mut subscriptions_rx: SubscriptionRx,
319 ) {
320 loop {
321 let maybe_ws_stream = connect_async(&base_url).await;
322 match maybe_ws_stream {
323 Ok((ws_stream, _)) => {
324 subscriptions.manage_subscriptions(ws_stream, &mut subscriptions_rx).await;
325 },
326 Err(e) => {
327 error!("Error connecting to websocket: {e}");
328 },
329 }
330
331 warn!("Re-establishing websocket connection...");
332 tokio::time::sleep(WS_RECONNECTION_DELAY).await;
333 }
334 }
335}