openzeppelin_monitor/services/blockchain/transports/ws/
transport.rs1use async_trait::async_trait;
7use futures_util::{SinkExt, StreamExt};
8use reqwest_middleware::ClientWithMiddleware;
9use serde::Serialize;
10use serde_json::{json, Value};
11use std::{
12 sync::atomic::{AtomicU64, Ordering},
13 sync::Arc,
14 time::Duration,
15};
16use tokio::{net::TcpStream, sync::Mutex, time::timeout};
17use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
18
19use crate::{
20 models::Network,
21 services::blockchain::{
22 transports::{
23 ws::{
24 config::WsConfig, connection::WebSocketConnection,
25 endpoint_manager::EndpointManager,
26 },
27 BlockchainTransport, RotatingTransport,
28 },
29 TransportError,
30 },
31};
32
33#[derive(Clone, Debug)]
44pub struct WsTransportClient {
45 pub connection: Arc<Mutex<WebSocketConnection>>,
47 endpoint_manager: Arc<EndpointManager>,
49 config: WsConfig,
51 request_id_counter: Arc<AtomicU64>,
53}
54
55impl WsTransportClient {
56 pub async fn new(network: &Network, config: Option<WsConfig>) -> Result<Self, anyhow::Error> {
70 let config = config.unwrap_or_else(|| WsConfig::from_network(network));
71
72 let mut ws_urls: Vec<_> = network
74 .rpc_urls
75 .iter()
76 .filter(|rpc_url| rpc_url.type_ == "ws_rpc" && rpc_url.weight > 0)
77 .collect();
78
79 ws_urls.sort_by(|a, b| b.weight.cmp(&a.weight));
80
81 if ws_urls.is_empty() {
82 return Err(anyhow::anyhow!("No WebSocket URLs available"));
83 }
84
85 let mut active_url = None;
87 let mut fallback_urls = Vec::new();
88
89 for rpc_url in ws_urls {
90 let url = rpc_url.url.as_ref().to_string();
91 if active_url.is_none() {
92 match timeout(config.connection_timeout, connect_async(&url)).await {
93 Ok(Ok(_)) => {
94 active_url = Some(url.clone());
95 continue;
97 }
98 Ok(Err(e)) => {
99 tracing::warn!("WS connect failed for {}: {}", url, e);
100 }
102 Err(e) => {
103 tracing::warn!("WS connect timeout for {}: {}", url, e);
104 }
106 }
107 }
108 fallback_urls.push(url);
110 }
111
112 let active_url =
113 active_url.ok_or_else(|| anyhow::anyhow!("Failed to connect to any WebSocket URL"))?;
114 let endpoint_manager = Arc::new(EndpointManager::new(&config, &active_url, fallback_urls));
115 let connection = Arc::new(Mutex::new(WebSocketConnection::default()));
116
117 let client = Self {
118 connection,
119 endpoint_manager,
120 config,
121 request_id_counter: Arc::new(AtomicU64::new(1)),
122 };
123
124 client.connect().await?;
126
127 Ok(client)
128 }
129
130 async fn connect(&self) -> Result<(), anyhow::Error> {
135 let url = self.endpoint_manager.get_active_url().await?;
136 self.try_connect(&url).await
137 }
138
139 async fn send_raw_request<P>(
155 &self,
156 method: &str,
157 params: Option<P>,
158 ) -> Result<Value, TransportError>
159 where
160 P: Into<Value> + Send + Clone + Serialize,
161 {
162 loop {
163 let mut connection = self.connection.lock().await;
164 if !connection.is_connected() {
165 return Err(TransportError::network("Not connected", None, None));
166 }
167 connection.update_activity();
168
169 let handle_connection_error = |connection: &mut WebSocketConnection| {
171 connection.is_healthy = false;
172 connection.stream = None;
173 };
174
175 let stream = match connection.stream.as_mut() {
176 Some(stream) => stream,
177 None => {
178 handle_connection_error(&mut connection);
179 drop(connection);
180 if !self.endpoint_manager.should_rotate().await {
181 return Err(TransportError::network("Not connected", None, None));
182 }
183 self.endpoint_manager.rotate_url(self).await.map_err(|e| {
184 TransportError::url_rotation("Failed to rotate URL", Some(e.into()), None)
185 })?;
186 continue;
187 }
188 };
189
190 let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
192 let request_body = json!({
193 "jsonrpc": "2.0",
194 "id": request_id,
195 "method": method,
196 "params": params.clone().map(|p| p.into())
197 });
198
199 if let Err(e) = stream
201 .send(Message::Text(request_body.to_string().into()))
202 .await
203 {
204 handle_connection_error(&mut connection);
205 drop(connection);
206 if !self.endpoint_manager.should_rotate().await {
207 return Err(TransportError::network(
208 format!("Failed to send request: {}", e),
209 None,
210 None,
211 ));
212 }
213 self.endpoint_manager.rotate_url(self).await.map_err(|e| {
214 TransportError::url_rotation("Failed to rotate URL", Some(e.into()), None)
215 })?;
216 continue;
217 }
218
219 async fn handle_ping(
221 stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
222 data: Vec<u8>,
223 ) -> Result<(), anyhow::Error> {
224 stream
225 .send(Message::Pong(data.into()))
226 .await
227 .map_err(|e| anyhow::anyhow!("Failed to send pong: {}", e))
228 }
229
230 async fn wait_for_response(
232 stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
233 timeout: Duration,
234 ) -> Result<Message, anyhow::Error> {
235 tokio::time::timeout(timeout, stream.next())
236 .await
237 .map_err(|_| anyhow::anyhow!("Response timeout"))?
238 .ok_or_else(|| anyhow::anyhow!("Connection closed"))?
239 .map_err(|e| anyhow::anyhow!("WebSocket error: {}", e))
240 }
241
242 loop {
244 match wait_for_response(stream, self.config.message_timeout).await {
245 Ok(Message::Text(text)) => {
246 let response: Value = serde_json::from_str(&text).map_err(|e| {
248 TransportError::response_parse(
249 "Failed to parse response",
250 Some(e.into()),
251 None,
252 )
253 })?;
254
255 if let Some(response_id) = response.get("id").and_then(|v| v.as_u64()) {
257 if response_id == request_id {
258 return Ok(response);
260 }
261 continue;
263 }
264
265 return Ok(response);
267 }
268 Ok(Message::Ping(data)) => {
269 if let Err(e) = handle_ping(stream, data.to_vec()).await {
271 handle_connection_error(&mut connection);
272 drop(connection);
273 if !self.endpoint_manager.should_rotate().await {
274 return Err(TransportError::network(
275 "Failed to send pong",
276 Some(e.into()),
277 None,
278 ));
279 }
280 self.endpoint_manager.rotate_url(self).await.map_err(|e| {
281 TransportError::url_rotation(
282 "Failed to rotate URL",
283 Some(e.into()),
284 None,
285 )
286 })?;
287 break;
288 }
289 }
290 Ok(_) => {
291 handle_connection_error(&mut connection);
292 drop(connection);
293 if !self.endpoint_manager.should_rotate().await {
294 return Err(TransportError::network(
295 "Unexpected message type",
296 None,
297 None,
298 ));
299 }
300 self.endpoint_manager.rotate_url(self).await.map_err(|e| {
301 TransportError::url_rotation(
302 "Failed to rotate URL",
303 Some(e.into()),
304 None,
305 )
306 })?;
307 break;
308 }
309 Err(e) => {
310 handle_connection_error(&mut connection);
311 drop(connection);
312 if !self.endpoint_manager.should_rotate().await {
313 return Err(TransportError::network(
314 "Failed to handle response",
315 Some(e.into()),
316 None,
317 ));
318 }
319 self.endpoint_manager.rotate_url(self).await.map_err(|e| {
320 TransportError::url_rotation(
321 "Failed to rotate URL",
322 Some(e.into()),
323 None,
324 )
325 })?;
326 break;
327 }
328 }
329 }
330 }
331 }
332}
333
334#[async_trait]
335impl BlockchainTransport for WsTransportClient {
336 async fn get_current_url(&self) -> String {
344 self.endpoint_manager.active_url.read().await.clone()
345 }
346
347 async fn send_raw_request<P>(
365 &self,
366 method: &str,
367 params: Option<P>,
368 ) -> Result<Value, TransportError>
369 where
370 P: Into<Value> + Send + Clone + Serialize,
371 {
372 WsTransportClient::send_raw_request(self, method, params).await
373 }
374
375 fn update_endpoint_manager_client(
379 &mut self,
380 _client: ClientWithMiddleware,
381 ) -> Result<(), anyhow::Error> {
382 Err(anyhow::anyhow!(
383 "`update_endpoint_manager_client` not implemented"
384 ))
385 }
386}
387
388#[async_trait]
389impl RotatingTransport for WsTransportClient {
390 async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error> {
401 let mut connection = self.connection.lock().await;
402
403 match timeout(self.config.connection_timeout, connect_async(url)).await {
404 Ok(Ok((ws_stream, _))) => {
405 connection.stream = Some(ws_stream);
406 connection.is_healthy = true;
407 connection.update_activity();
408 Ok(())
409 }
410 Ok(Err(e)) => {
411 connection.is_healthy = false;
412 Err(anyhow::anyhow!("Failed to connect: {}", e))
413 }
414 Err(_) => {
415 connection.is_healthy = false;
416 Err(anyhow::anyhow!("Connection timeout"))
417 }
418 }
419 }
420
421 async fn update_client(&self, url: &str) -> Result<(), anyhow::Error> {
432 *self.endpoint_manager.active_url.write().await = url.to_string();
433 Ok(())
434 }
435}