openzeppelin_monitor/services/blockchain/transports/ws/
endpoint_manager.rs1use std::sync::Arc;
11use tokio::sync::{Mutex, RwLock};
12use tokio::time::timeout;
13
14use crate::services::blockchain::transports::{ws::config::WsConfig, RotatingTransport};
15
16#[derive(Clone, Debug)]
27pub struct EndpointManager {
28 pub active_url: Arc<RwLock<String>>,
30 pub fallback_urls: Arc<RwLock<Vec<String>>>,
32 rotation_lock: Arc<Mutex<()>>,
34 config: WsConfig,
36}
37
38impl EndpointManager {
39 pub fn new(config: &WsConfig, active_url: &str, fallback_urls: Vec<String>) -> Self {
52 Self {
53 active_url: Arc::new(RwLock::new(active_url.to_string())),
54 fallback_urls: Arc::new(RwLock::new(fallback_urls)),
55 rotation_lock: Arc::new(Mutex::new(())),
56 config: config.clone(),
57 }
58 }
59
60 pub async fn rotate_url<T: RotatingTransport>(
72 &self,
73 transport: &T,
74 ) -> Result<(), anyhow::Error> {
75 let _guard = self.rotation_lock.lock().await;
76 let current_active = self.active_url.read().await.clone();
77 let mut attempts = 0;
78
79 while attempts < self.config.max_reconnect_attempts {
80 let new_url = {
81 let mut fallback_urls = self.fallback_urls.write().await;
82 if fallback_urls.is_empty() {
83 return Err(anyhow::anyhow!("No fallback URLs available"));
84 }
85
86 let idx = fallback_urls.iter().position(|url| url != ¤t_active);
88
89 match idx {
90 Some(pos) => fallback_urls.remove(pos),
91 None => {
92 return Err(anyhow::anyhow!("No fallback URLs available"));
93 }
94 }
95 };
96
97 match timeout(
99 self.config.connection_timeout,
100 transport.try_connect(&new_url),
101 )
102 .await
103 {
104 Ok(Ok(_)) => {
105 transport.update_client(&new_url).await?;
106 {
107 let mut active_url = self.active_url.write().await;
108 let mut fallback_urls = self.fallback_urls.write().await;
109 tracing::debug!(
110 "Successful rotation - from: {}, to: {}",
111 current_active,
112 new_url
113 );
114 fallback_urls.push(current_active);
115 *active_url = new_url;
116 }
117 return Ok(());
118 }
119 Ok(Err(e)) => {
120 let mut fallback_urls = self.fallback_urls.write().await;
121 fallback_urls.push(new_url);
122 tracing::warn!("Failed to connect to fallback URL: {}", e);
123 }
124 Err(_) => {
125 let mut fallback_urls = self.fallback_urls.write().await;
126 fallback_urls.push(new_url);
127 tracing::warn!("Connection timeout during rotation");
128 }
129 }
130
131 attempts += 1;
132 if attempts < self.config.max_reconnect_attempts {
133 tokio::time::sleep(self.config.reconnect_timeout).await;
134 }
135 }
136
137 Err(anyhow::anyhow!(
138 "Failed to reconnect after {} attempts",
139 self.config.max_reconnect_attempts
140 ))
141 }
142
143 pub async fn get_active_url(&self) -> Result<String, anyhow::Error> {
149 let url = self.active_url.read().await;
150 if url.is_empty() {
151 Err(anyhow::anyhow!("No active URL set"))
152 } else {
153 Ok(url.clone())
154 }
155 }
156
157 pub async fn should_rotate(&self) -> bool {
164 let fallback_urls = self.fallback_urls.read().await;
165 !fallback_urls.is_empty()
166 }
167}