openzeppelin_monitor/services/blockchain/transports/http/
transport.rs

1//! HTTP transport implementation for blockchain interactions.
2//!
3//! This module provides a generic HTTP client implementation for interacting with blockchain nodes
4//! via JSON-RPC, supporting:
5//! - Multiple RPC endpoints with automatic failover
6//! - Configurable retry policies
7//! - Authentication via bearer tokens
8//! - Connection health checks
9//! - Endpoint rotation for high availability
10
11use anyhow::Context;
12use async_trait::async_trait;
13use reqwest_middleware::ClientWithMiddleware;
14use serde::Serialize;
15use serde_json::{json, Value};
16use std::{sync::Arc, time::Duration};
17use url::Url;
18
19use crate::{
20	models::Network,
21	services::blockchain::transports::{
22		http::endpoint_manager::EndpointManager, BlockchainTransport, RotatingTransport,
23		TransientErrorRetryStrategy, TransportError,
24	},
25	utils::http::{create_retryable_http_client, RetryConfig},
26};
27
28/// Basic HTTP transport client for blockchain interactions
29///
30/// This client provides a foundation for making JSON-RPC requests to blockchain nodes
31/// with built-in support for:
32/// - Connection pooling and reuse
33/// - Automatic endpoint rotation on failure
34/// - Configurable retry policies
35///
36/// The client is thread-safe and can be shared across multiple tasks.
37#[derive(Clone, Debug)]
38pub struct HttpTransportClient {
39	/// Retryable HTTP client for making requests
40	pub client: ClientWithMiddleware,
41	/// Manages RPC endpoint rotation and request handling for high availability
42	endpoint_manager: EndpointManager,
43	/// The stringified JSON RPC payload to use for testing the connection
44	test_connection_payload: Option<String>,
45}
46
47impl HttpTransportClient {
48	/// Creates a new HTTP transport client with automatic endpoint management
49	///
50	/// This constructor attempts to connect to available endpoints in order of their
51	/// weight until a successful connection is established. It configures default
52	/// timeout and retry policies suitable for blockchain interactions.
53	///
54	/// # Arguments
55	/// * `network` - Network configuration containing RPC URLs, weights, and other details
56	/// * `test_connection_payload` - Optional JSON RPC payload to test the connection (default is net_version)
57	///
58	/// # Returns
59	/// * `Result<Self, anyhow::Error>` - New client instance or connection error
60	pub async fn new(
61		network: &Network,
62		test_connection_payload: Option<String>,
63	) -> Result<Self, anyhow::Error> {
64		let mut rpc_urls: Vec<_> = network
65			.rpc_urls
66			.iter()
67			.filter(|rpc_url| rpc_url.type_ == "rpc" && rpc_url.weight > 0)
68			.collect();
69
70		rpc_urls.sort_by(|a, b| b.weight.cmp(&a.weight));
71		// Create a retry policy with default settings
72		// Shared config for endpoint manager and test connection
73		let http_retry_config = RetryConfig::default();
74		// Create the base HTTP client
75		let base_http_client = Arc::new(
76			reqwest::ClientBuilder::new()
77				.pool_idle_timeout(Duration::from_secs(90))
78				.pool_max_idle_per_host(32)
79				.timeout(Duration::from_secs(30))
80				.connect_timeout(Duration::from_secs(20))
81				.use_rustls_tls()
82				.build()
83				.context("Failed to create base HTTP client")?,
84		);
85		// Create a retryable HTTP client with the base client and retry policy
86		// Shared across:
87		// - EndpointManager for handling endpoint rotation
88		// - Connection testing for verifying endpoint availability
89		let retryable_client = create_retryable_http_client(
90			&http_retry_config,
91			(*base_http_client).clone(),
92			Some(TransientErrorRetryStrategy),
93		);
94		for rpc_url in rpc_urls.iter() {
95			let url = match Url::parse(rpc_url.url.as_ref()) {
96				Ok(url) => url,
97				Err(_) => continue,
98			};
99			let test_request = if let Some(test_payload) = &test_connection_payload {
100				serde_json::from_str(test_payload)
101					.context("Failed to parse test payload as JSON")?
102			} else {
103				json!({
104					"jsonrpc": "2.0",
105					"id": 1,
106					"method": "net_version",
107					"params": []
108				})
109			};
110			// Attempt to connect to the endpoint
111			let request_result = retryable_client
112				.post(url.clone())
113				.json(&test_request)
114				.send()
115				.await;
116			match request_result {
117				Ok(response) => {
118					// Check if the response indicates an error status (4xx or 5xx)
119					if !response.status().is_success() {
120						// Skip this URL if we got an error status
121						continue;
122					}
123					// Create list of fallback URLs (all URLs except the current one)
124					let fallback_urls: Vec<String> = rpc_urls
125						.iter()
126						.filter(|url| url.url != rpc_url.url)
127						.map(|url| url.url.as_ref().to_string())
128						.collect();
129
130					let network_slug = network.slug.clone();
131
132					// Initialize RPC metrics for this network so they appear in Prometheus
133					crate::utils::metrics::init_rpc_metrics_for_network(&network_slug);
134
135					// Successfully connected - create and return the client
136					return Ok(Self {
137						client: retryable_client.clone(),
138						endpoint_manager: EndpointManager::new(
139							retryable_client,
140							rpc_url.url.as_ref(),
141							fallback_urls,
142							network_slug,
143						),
144						test_connection_payload,
145					});
146				}
147				Err(_) => {
148					// Connection failed - try next URL
149					continue;
150				}
151			}
152		}
153
154		Err(anyhow::anyhow!("All RPC URLs failed to connect"))
155	}
156}
157
158#[async_trait]
159impl BlockchainTransport for HttpTransportClient {
160	/// Retrieves the currently active RPC endpoint URL
161	///
162	/// This method is useful for monitoring which endpoint is currently in use,
163	/// especially in scenarios with multiple failover URLs.
164	///
165	/// # Returns
166	/// * `String` - The URL of the currently active endpoint
167	async fn get_current_url(&self) -> String {
168		self.endpoint_manager.active_url.read().await.clone()
169	}
170
171	/// Sends a JSON-RPC request to the blockchain node
172	///
173	/// This method handles the formatting of the JSON-RPC request, including:
174	/// - Adding required JSON-RPC 2.0 fields
175	/// - Generating unique request IDs
176	/// - Converting parameters to the correct format
177	/// - Handling authentication
178	///
179	/// # Arguments
180	/// * `method` - The JSON-RPC method name to call
181	/// * `params` - Optional parameters for the method call
182	///
183	/// # Returns
184	/// * `Result<Value, TransportError>` - JSON response or error with context
185	///
186	/// # Type Parameters
187	/// * `P` - Parameter type that can be serialized to JSON
188	async fn send_raw_request<P>(
189		&self,
190		method: &str,
191		params: Option<P>,
192	) -> Result<Value, TransportError>
193	where
194		P: Into<Value> + Send + Clone + Serialize,
195	{
196		self.endpoint_manager
197			.send_raw_request(self, method, params)
198			.await
199	}
200
201	/// Update endpoint manager with a new client
202	///
203	/// # Arguments
204	/// * `client` - The new client to use for the endpoint manager
205	fn update_endpoint_manager_client(
206		&mut self,
207		client: ClientWithMiddleware,
208	) -> Result<(), anyhow::Error> {
209		self.endpoint_manager.update_client(client);
210		Ok(())
211	}
212}
213
214#[async_trait]
215impl RotatingTransport for HttpTransportClient {
216	/// Tests connectivity to a specific RPC endpoint
217	///
218	/// Performs a basic JSON-RPC request to verify the endpoint is responsive
219	/// and correctly handling requests.
220	///
221	/// # Arguments
222	/// * `url` - The URL to test
223	///
224	/// # Returns
225	/// * `Result<(), anyhow::Error>` - Success or detailed error message
226	async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error> {
227		let url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
228
229		let test_request = if let Some(test_payload) = &self.test_connection_payload {
230			serde_json::from_str(test_payload).context("Failed to parse test payload as JSON")?
231		} else {
232			json!({
233				"jsonrpc": "2.0",
234				"id": 1,
235				"method": "net_version",
236				"params": []
237			})
238		};
239
240		let request = self.client.post(url.clone()).json(&test_request);
241
242		match request.send().await {
243			Ok(response) => {
244				let status = response.status();
245				if !status.is_success() {
246					Err(anyhow::anyhow!(
247						"Failed to connect to {}: {}",
248						url,
249						status.as_u16()
250					))
251				} else {
252					Ok(())
253				}
254			}
255			Err(e) => Err(anyhow::anyhow!("Failed to connect to {}: {}", url, e)),
256		}
257	}
258
259	/// Updates the active endpoint URL
260	///
261	/// This method is called when rotating to a new endpoint, typically
262	/// after a failure of the current endpoint.
263	///
264	/// # Arguments
265	/// * `url` - The new URL to use for subsequent requests
266	///
267	/// # Returns
268	/// * `Result<(), anyhow::Error>` - Success or error status
269	async fn update_client(&self, url: &str) -> Result<(), anyhow::Error> {
270		let parsed_url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
271		// Normalize the URL by trimming trailing slash if present
272		let normalized_url = parsed_url.as_str().trim_end_matches('/');
273
274		// For HTTP client, we don't need to update the client itself
275		// We just need to update the endpoint manager's active URL
276		let mut active_url = self.endpoint_manager.active_url.write().await;
277		*active_url = normalized_url.to_string();
278		Ok(())
279	}
280}