openzeppelin_monitor/services/blockchain/transports/http/
endpoint_manager.rs

1//! Manages the rotation of blockchain HTTP RPC endpoints
2//!
3//! Provides methods for rotating between multiple URLs and sending requests to the active endpoint
4//! with automatic fallback to other URLs on failure.
5use reqwest_middleware::ClientWithMiddleware;
6use serde::Serialize;
7use serde_json::Value;
8use std::sync::Arc;
9use std::time::Instant;
10use tokio::sync::RwLock;
11use url::Url;
12
13use crate::services::blockchain::transports::{
14	RotatingTransport, TransportError, ROTATE_ON_ERROR_CODES,
15};
16
17/// Manages the rotation of blockchain RPC endpoints
18///
19/// Provides methods for rotating between multiple URLs and sending requests to the active endpoint
20/// with automatic fallback to other URLs on failure.
21///
22/// # Fields
23/// * `active_url` - The current active URL
24/// * `fallback_urls` - A list of fallback URLs to rotate to
25/// * `client` - The client to use for the endpoint manager
26/// * `rotation_lock` - A lock for managing the rotation process
27/// * `network_slug` - The network identifier for metrics labeling
28#[derive(Clone, Debug)]
29pub struct EndpointManager {
30	pub active_url: Arc<RwLock<String>>,
31	pub fallback_urls: Arc<RwLock<Vec<String>>>,
32	client: ClientWithMiddleware,
33	rotation_lock: Arc<tokio::sync::Mutex<()>>,
34	network_slug: String,
35}
36
37/// Represents the outcome of a `EndpointManager::attempt_request_on_url` method call
38/// Used within the `EndpointManager::send_raw_request` method to handle different paths of request execution
39/// and response handling.
40#[derive(Debug)]
41enum SingleRequestAttemptOutcome {
42	/// Successfully got a response (status might still be error)
43	Success(reqwest::Response),
44	/// Error during send (e.g., connection, timeout)
45	NetworkError(reqwest_middleware::Error),
46	/// Error serializing the request body
47	SerializationError(TransportError),
48}
49
50impl EndpointManager {
51	/// Creates a new rotating URL client
52	///
53	/// # Arguments
54	/// * `client` - The client to use for the endpoint manager
55	/// * `active_url` - The initial active URL
56	/// * `fallback_urls` - A list of fallback URLs to rotate to
57	/// * `network_slug` - The network identifier for metrics labeling
58	///
59	/// # Returns
60	pub fn new(
61		client: ClientWithMiddleware,
62		active_url: &str,
63		fallback_urls: Vec<String>,
64		network_slug: String,
65	) -> Self {
66		Self {
67			active_url: Arc::new(RwLock::new(active_url.to_string())),
68			fallback_urls: Arc::new(RwLock::new(fallback_urls)),
69			rotation_lock: Arc::new(tokio::sync::Mutex::new(())),
70			client,
71			network_slug,
72		}
73	}
74
75	/// Updates the client with a new client
76	///
77	/// Useful for updating the client with a new retry policy or strategy
78	///
79	/// # Arguments
80	/// * `client` - The new client to use for the endpoint manager
81	pub fn update_client(&mut self, client: ClientWithMiddleware) {
82		self.client = client;
83	}
84
85	/// Rotates to the next available URL
86	///
87	/// # Arguments
88	/// * `transport` - The transport client implementing the RotatingTransport trait
89	///
90	/// # Returns
91	/// * `Result<String, TransportError>` - The result of the rotation attempt, containing the new active URL or an error
92	pub async fn try_rotate_url<T: RotatingTransport>(
93		&self,
94		transport: &T,
95	) -> Result<String, TransportError> {
96		// Acquire the rotation lock to prevent concurrent rotations
97		let _guard = self.rotation_lock.lock().await;
98		let initial_active_url = self.active_url.read().await.clone();
99		let current_fallbacks_snapshot = self.fallback_urls.read().await.clone();
100
101		tracing::debug!(
102			"Trying to rotate URL: Current Active: '{}', Fallbacks: {:?}",
103			initial_active_url,
104			current_fallbacks_snapshot,
105		);
106
107		// --- Select a new URL ---
108		let new_url = match current_fallbacks_snapshot
109			.iter()
110			.find(|&url| *url != initial_active_url)
111		{
112			Some(url) => url.clone(),
113			None => {
114				let msg = format!(
115					"No fallback URLs available. Current active: '{}', Fallbacks checked: {:?}",
116					initial_active_url, current_fallbacks_snapshot
117				);
118				return Err(TransportError::url_rotation(msg, None, None));
119			}
120		};
121
122		// --- Attempt to connect and update the transport client ---
123		tracing::debug!(
124			"Attempting try_connect to new_url during rotation: '{}'",
125			new_url
126		);
127
128		transport
129			.try_connect(&new_url)
130			.await
131			.map_err(|connect_err| {
132				TransportError::url_rotation(
133					format!("Failed to connect to new URL '{}'", new_url),
134					Some(connect_err.into()),
135					None,
136				)
137			})?;
138
139		tracing::debug!(
140			"Attempting update_client with new_url during rotation: '{}'",
141			new_url
142		);
143
144		transport
145			.update_client(&new_url)
146			.await
147			.map_err(|update_err| {
148				TransportError::url_rotation(
149					format!(
150						"Failed to update transport client with new URL '{}'",
151						new_url
152					),
153					Some(update_err.into()),
154					None,
155				)
156			})?;
157
158		// --- All checks passed, update shared state ---
159		{
160			let mut active_url_guard = self.active_url.write().await;
161			let mut fallback_urls_guard = self.fallback_urls.write().await;
162
163			// Construct the new fallbacks list:
164			// old fallbacks, MINUS the new_url_candidate, PLUS the initial_active_url.
165			let mut next_fallback_urls: Vec<String> = Vec::with_capacity(fallback_urls_guard.len());
166			for url in fallback_urls_guard.iter() {
167				if *url != new_url {
168					next_fallback_urls.push(url.clone());
169				}
170			}
171			next_fallback_urls.push(initial_active_url.clone()); // Add the previously active URL
172
173			tracing::debug!(
174				"Successful URL rotation - from: '{}', to: '{}'. New Fallbacks: {:?}",
175				initial_active_url,
176				new_url,
177				next_fallback_urls
178			);
179
180			*fallback_urls_guard = next_fallback_urls;
181			*active_url_guard = new_url.clone();
182		}
183		Ok(new_url)
184	}
185
186	/// Attempts to send a request to the specified URL
187	/// # Arguments
188	/// * `url` - The URL to send the request to
189	/// * `transport` - The transport client implementing the RotatingTransport trait
190	/// * `method` - The HTTP method to use for the request (e.g., "POST")
191	/// * `params` - Optional parameters for the request, serialized to JSON
192	///
193	/// # Returns
194	/// * `SingleRequestAttemptOutcome` - The outcome of the request attempt
195	async fn try_request_on_url<P>(
196		&self,
197		url: &str,
198		transport: &impl RotatingTransport,
199		method: &str,
200		params: Option<P>,
201	) -> SingleRequestAttemptOutcome
202	where
203		P: Into<Value> + Send + Clone + Serialize,
204	{
205		// Create the request body using the transport's customization method
206		let request_body = transport.customize_request(method, params).await;
207
208		// Serialize the request body to JSON
209		let request_body_str = match serde_json::to_string(&request_body) {
210			Ok(body) => body,
211			Err(e) => {
212				tracing::error!("Failed to serialize request body: {}", e);
213				return SingleRequestAttemptOutcome::SerializationError(
214					TransportError::request_serialization(
215						"Failed to serialize request JSON",
216						Some(Box::new(e)),
217						None,
218					),
219				);
220			}
221		};
222
223		// Send the request to the specified URL
224		let response_result = self
225			.client
226			.post(url)
227			.header("Content-Type", "application/json")
228			.body(request_body_str)
229			.send()
230			.await;
231
232		// Handle the response
233		match response_result {
234			Ok(response) => SingleRequestAttemptOutcome::Success(response),
235			Err(network_error) => {
236				tracing::warn!("Network error while sending request: {}", network_error);
237				SingleRequestAttemptOutcome::NetworkError(network_error)
238			}
239		}
240	}
241
242	/// Sends a raw request to the blockchain RPC endpoint with automatic URL rotation on failure
243	///
244	/// # Arguments
245	/// * `transport` - The transport client implementing the RotatingTransport trait
246	/// * `method` - The RPC method name to call
247	/// * `params` - The parameters for the RPC method call as a JSON Value
248	///
249	/// # Returns
250	/// * `Result<Value, TransportError>` - The JSON response from the RPC endpoint or an error
251	///
252	/// # Behavior
253	/// - Automatically rotates to fallback URLs if the request fails with specific status codes
254	///   (e.g., 429)
255	/// - Retries the request with the new URL after rotation
256	/// - Returns the first successful response or an error if all attempts fail
257	pub async fn send_raw_request<
258		T: RotatingTransport,
259		P: Into<Value> + Send + Clone + Serialize,
260	>(
261		&self,
262		transport: &T,
263		method: &str,
264		params: Option<P>,
265	) -> Result<Value, TransportError> {
266		loop {
267			let attempt_start = Instant::now();
268
269			// Record each attempt so error rates stay consistent
270			crate::utils::metrics::record_rpc_request(&self.network_slug, method);
271
272			let current_url_snapshot = self.active_url.read().await.clone();
273
274			tracing::debug!(
275				"Attempting request on active URL: '{}'",
276				current_url_snapshot
277			);
278
279			// Attempt to send the request to the current active URL
280			let attempt_result = self
281				.try_request_on_url(&current_url_snapshot, transport, method, params.clone())
282				.await;
283
284			match attempt_result {
285				// Handle successful response
286				SingleRequestAttemptOutcome::Success(response) => {
287					let status = response.status();
288					if status.is_success() {
289						// Record successful request duration
290						let duration = attempt_start.elapsed().as_secs_f64();
291						crate::utils::metrics::observe_rpc_duration(&self.network_slug, duration);
292
293						// Successful response, parse JSON
294						return response.json().await.map_err(|e| {
295							TransportError::response_parse(
296								"Failed to parse JSON response".to_string(),
297								Some(Box::new(e)),
298								None,
299							)
300						});
301					} else {
302						// HTTP error
303						let error_body = response.text().await.unwrap_or_default();
304						let status_code = status.as_u16();
305
306						// Record HTTP error metric
307						crate::utils::metrics::record_rpc_error(
308							&self.network_slug,
309							&status_code.to_string(),
310							"http",
311						);
312
313						tracing::warn!(
314							"Request to {} failed with status {}: {}",
315							current_url_snapshot,
316							status,
317							error_body
318						);
319
320						// Check if we should rotate based on status code
321						if ROTATE_ON_ERROR_CODES.contains(&status_code) {
322							// Record rate limit metric (429) with sanitized endpoint (host only, no API keys)
323							let endpoint_label = Url::parse(&current_url_snapshot)
324								.ok()
325								.and_then(|u| u.host_str().map(|h| h.to_string()))
326								.unwrap_or_else(|| "unknown".to_string());
327
328							crate::utils::metrics::record_rate_limit(
329								&self.network_slug,
330								&endpoint_label,
331							);
332
333							tracing::debug!(
334								"send_raw_request: HTTP status {} on '{}' triggers URL rotation attempt",
335								status,
336								current_url_snapshot
337							);
338
339							// Record endpoint rotation due to rate limit
340							crate::utils::metrics::record_endpoint_rotation(
341								&self.network_slug,
342								"rate_limit",
343							);
344
345							match self.try_rotate_url(transport).await {
346								Ok(_new_url) => {
347									continue; // Retry on the new active URL
348								}
349								Err(rotation_error) => {
350									// Return the original HTTP error with rotation error context
351									return Err(TransportError::http(
352										status,
353										current_url_snapshot.clone(),
354										error_body,
355										Some(Box::new(rotation_error)),
356										None,
357									));
358								}
359							}
360						} else {
361							// HTTP error that doesn't trigger rotation
362							tracing::warn!(
363								"HTTP error status {} on {} does not trigger rotation. Failing.",
364								status,
365								current_url_snapshot
366							);
367							return Err(TransportError::http(
368								status,
369								current_url_snapshot,
370								error_body,
371								None,
372								None,
373							));
374						}
375					}
376				}
377				// Handle network error, try rotation
378				SingleRequestAttemptOutcome::NetworkError(network_error) => {
379					// Record network error metric
380					crate::utils::metrics::record_rpc_error(&self.network_slug, "0", "network");
381
382					tracing::warn!(
383						"Network error for {}: {}",
384						current_url_snapshot,
385						network_error,
386					);
387
388					// Record endpoint rotation due to network error
389					crate::utils::metrics::record_endpoint_rotation(
390						&self.network_slug,
391						"network_error",
392					);
393
394					// Always attempt rotation on network errors
395					match self.try_rotate_url(transport).await {
396						Ok(new_url) => {
397							tracing::debug!(
398								"Rotation successful after network error, retrying request on new URL: '{}'",
399								new_url
400							);
401							continue; // Retry on the new active URL
402						}
403						Err(rotation_error) => {
404							// Return network error with rotation error context
405							return Err(TransportError::network(
406								network_error.to_string(),
407								Some(Box::new(rotation_error)),
408								None,
409							));
410						}
411					}
412				}
413				// Non-retryable serialization error
414				SingleRequestAttemptOutcome::SerializationError(serialization_error) => {
415					return Err(serialization_error);
416				}
417			}
418		}
419	}
420}