openzeppelin_monitor/services/blockchain/transports/http/
endpoint_manager.rs1use reqwest_middleware::ClientWithMiddleware;
6use serde::Serialize;
7use serde_json::Value;
8use std::sync::Arc;
9use std::time::Instant;
10use tokio::sync::RwLock;
11use url::Url;
12
13use crate::services::blockchain::transports::{
14 RotatingTransport, TransportError, ROTATE_ON_ERROR_CODES,
15};
16
17#[derive(Clone, Debug)]
29pub struct EndpointManager {
30 pub active_url: Arc<RwLock<String>>,
31 pub fallback_urls: Arc<RwLock<Vec<String>>>,
32 client: ClientWithMiddleware,
33 rotation_lock: Arc<tokio::sync::Mutex<()>>,
34 network_slug: String,
35}
36
37#[derive(Debug)]
41enum SingleRequestAttemptOutcome {
42 Success(reqwest::Response),
44 NetworkError(reqwest_middleware::Error),
46 SerializationError(TransportError),
48}
49
50impl EndpointManager {
51 pub fn new(
61 client: ClientWithMiddleware,
62 active_url: &str,
63 fallback_urls: Vec<String>,
64 network_slug: String,
65 ) -> Self {
66 Self {
67 active_url: Arc::new(RwLock::new(active_url.to_string())),
68 fallback_urls: Arc::new(RwLock::new(fallback_urls)),
69 rotation_lock: Arc::new(tokio::sync::Mutex::new(())),
70 client,
71 network_slug,
72 }
73 }
74
75 pub fn update_client(&mut self, client: ClientWithMiddleware) {
82 self.client = client;
83 }
84
85 pub async fn try_rotate_url<T: RotatingTransport>(
93 &self,
94 transport: &T,
95 ) -> Result<String, TransportError> {
96 let _guard = self.rotation_lock.lock().await;
98 let initial_active_url = self.active_url.read().await.clone();
99 let current_fallbacks_snapshot = self.fallback_urls.read().await.clone();
100
101 tracing::debug!(
102 "Trying to rotate URL: Current Active: '{}', Fallbacks: {:?}",
103 initial_active_url,
104 current_fallbacks_snapshot,
105 );
106
107 let new_url = match current_fallbacks_snapshot
109 .iter()
110 .find(|&url| *url != initial_active_url)
111 {
112 Some(url) => url.clone(),
113 None => {
114 let msg = format!(
115 "No fallback URLs available. Current active: '{}', Fallbacks checked: {:?}",
116 initial_active_url, current_fallbacks_snapshot
117 );
118 return Err(TransportError::url_rotation(msg, None, None));
119 }
120 };
121
122 tracing::debug!(
124 "Attempting try_connect to new_url during rotation: '{}'",
125 new_url
126 );
127
128 transport
129 .try_connect(&new_url)
130 .await
131 .map_err(|connect_err| {
132 TransportError::url_rotation(
133 format!("Failed to connect to new URL '{}'", new_url),
134 Some(connect_err.into()),
135 None,
136 )
137 })?;
138
139 tracing::debug!(
140 "Attempting update_client with new_url during rotation: '{}'",
141 new_url
142 );
143
144 transport
145 .update_client(&new_url)
146 .await
147 .map_err(|update_err| {
148 TransportError::url_rotation(
149 format!(
150 "Failed to update transport client with new URL '{}'",
151 new_url
152 ),
153 Some(update_err.into()),
154 None,
155 )
156 })?;
157
158 {
160 let mut active_url_guard = self.active_url.write().await;
161 let mut fallback_urls_guard = self.fallback_urls.write().await;
162
163 let mut next_fallback_urls: Vec<String> = Vec::with_capacity(fallback_urls_guard.len());
166 for url in fallback_urls_guard.iter() {
167 if *url != new_url {
168 next_fallback_urls.push(url.clone());
169 }
170 }
171 next_fallback_urls.push(initial_active_url.clone()); tracing::debug!(
174 "Successful URL rotation - from: '{}', to: '{}'. New Fallbacks: {:?}",
175 initial_active_url,
176 new_url,
177 next_fallback_urls
178 );
179
180 *fallback_urls_guard = next_fallback_urls;
181 *active_url_guard = new_url.clone();
182 }
183 Ok(new_url)
184 }
185
186 async fn try_request_on_url<P>(
196 &self,
197 url: &str,
198 transport: &impl RotatingTransport,
199 method: &str,
200 params: Option<P>,
201 ) -> SingleRequestAttemptOutcome
202 where
203 P: Into<Value> + Send + Clone + Serialize,
204 {
205 let request_body = transport.customize_request(method, params).await;
207
208 let request_body_str = match serde_json::to_string(&request_body) {
210 Ok(body) => body,
211 Err(e) => {
212 tracing::error!("Failed to serialize request body: {}", e);
213 return SingleRequestAttemptOutcome::SerializationError(
214 TransportError::request_serialization(
215 "Failed to serialize request JSON",
216 Some(Box::new(e)),
217 None,
218 ),
219 );
220 }
221 };
222
223 let response_result = self
225 .client
226 .post(url)
227 .header("Content-Type", "application/json")
228 .body(request_body_str)
229 .send()
230 .await;
231
232 match response_result {
234 Ok(response) => SingleRequestAttemptOutcome::Success(response),
235 Err(network_error) => {
236 tracing::warn!("Network error while sending request: {}", network_error);
237 SingleRequestAttemptOutcome::NetworkError(network_error)
238 }
239 }
240 }
241
242 pub async fn send_raw_request<
258 T: RotatingTransport,
259 P: Into<Value> + Send + Clone + Serialize,
260 >(
261 &self,
262 transport: &T,
263 method: &str,
264 params: Option<P>,
265 ) -> Result<Value, TransportError> {
266 loop {
267 let attempt_start = Instant::now();
268
269 crate::utils::metrics::record_rpc_request(&self.network_slug, method);
271
272 let current_url_snapshot = self.active_url.read().await.clone();
273
274 tracing::debug!(
275 "Attempting request on active URL: '{}'",
276 current_url_snapshot
277 );
278
279 let attempt_result = self
281 .try_request_on_url(¤t_url_snapshot, transport, method, params.clone())
282 .await;
283
284 match attempt_result {
285 SingleRequestAttemptOutcome::Success(response) => {
287 let status = response.status();
288 if status.is_success() {
289 let duration = attempt_start.elapsed().as_secs_f64();
291 crate::utils::metrics::observe_rpc_duration(&self.network_slug, duration);
292
293 return response.json().await.map_err(|e| {
295 TransportError::response_parse(
296 "Failed to parse JSON response".to_string(),
297 Some(Box::new(e)),
298 None,
299 )
300 });
301 } else {
302 let error_body = response.text().await.unwrap_or_default();
304 let status_code = status.as_u16();
305
306 crate::utils::metrics::record_rpc_error(
308 &self.network_slug,
309 &status_code.to_string(),
310 "http",
311 );
312
313 tracing::warn!(
314 "Request to {} failed with status {}: {}",
315 current_url_snapshot,
316 status,
317 error_body
318 );
319
320 if ROTATE_ON_ERROR_CODES.contains(&status_code) {
322 let endpoint_label = Url::parse(¤t_url_snapshot)
324 .ok()
325 .and_then(|u| u.host_str().map(|h| h.to_string()))
326 .unwrap_or_else(|| "unknown".to_string());
327
328 crate::utils::metrics::record_rate_limit(
329 &self.network_slug,
330 &endpoint_label,
331 );
332
333 tracing::debug!(
334 "send_raw_request: HTTP status {} on '{}' triggers URL rotation attempt",
335 status,
336 current_url_snapshot
337 );
338
339 crate::utils::metrics::record_endpoint_rotation(
341 &self.network_slug,
342 "rate_limit",
343 );
344
345 match self.try_rotate_url(transport).await {
346 Ok(_new_url) => {
347 continue; }
349 Err(rotation_error) => {
350 return Err(TransportError::http(
352 status,
353 current_url_snapshot.clone(),
354 error_body,
355 Some(Box::new(rotation_error)),
356 None,
357 ));
358 }
359 }
360 } else {
361 tracing::warn!(
363 "HTTP error status {} on {} does not trigger rotation. Failing.",
364 status,
365 current_url_snapshot
366 );
367 return Err(TransportError::http(
368 status,
369 current_url_snapshot,
370 error_body,
371 None,
372 None,
373 ));
374 }
375 }
376 }
377 SingleRequestAttemptOutcome::NetworkError(network_error) => {
379 crate::utils::metrics::record_rpc_error(&self.network_slug, "0", "network");
381
382 tracing::warn!(
383 "Network error for {}: {}",
384 current_url_snapshot,
385 network_error,
386 );
387
388 crate::utils::metrics::record_endpoint_rotation(
390 &self.network_slug,
391 "network_error",
392 );
393
394 match self.try_rotate_url(transport).await {
396 Ok(new_url) => {
397 tracing::debug!(
398 "Rotation successful after network error, retrying request on new URL: '{}'",
399 new_url
400 );
401 continue; }
403 Err(rotation_error) => {
404 return Err(TransportError::network(
406 network_error.to_string(),
407 Some(Box::new(rotation_error)),
408 None,
409 ));
410 }
411 }
412 }
413 SingleRequestAttemptOutcome::SerializationError(serialization_error) => {
415 return Err(serialization_error);
416 }
417 }
418 }
419 }
420}