openzeppelin_monitor/services/blockchain/transports/http/
transport.rs1use anyhow::Context;
12use async_trait::async_trait;
13use reqwest_middleware::ClientWithMiddleware;
14use serde::Serialize;
15use serde_json::{json, Value};
16use std::{sync::Arc, time::Duration};
17use url::Url;
18
19use crate::{
20 models::Network,
21 services::blockchain::transports::{
22 http::endpoint_manager::EndpointManager, BlockchainTransport, RotatingTransport,
23 TransientErrorRetryStrategy, TransportError,
24 },
25 utils::http::{create_retryable_http_client, RetryConfig},
26};
27
28#[derive(Clone, Debug)]
38pub struct HttpTransportClient {
39 pub client: ClientWithMiddleware,
41 endpoint_manager: EndpointManager,
43 test_connection_payload: Option<String>,
45}
46
47impl HttpTransportClient {
48 pub async fn new(
61 network: &Network,
62 test_connection_payload: Option<String>,
63 ) -> Result<Self, anyhow::Error> {
64 let mut rpc_urls: Vec<_> = network
65 .rpc_urls
66 .iter()
67 .filter(|rpc_url| rpc_url.type_ == "rpc" && rpc_url.weight > 0)
68 .collect();
69
70 rpc_urls.sort_by(|a, b| b.weight.cmp(&a.weight));
71 let http_retry_config = RetryConfig::default();
74 let base_http_client = Arc::new(
76 reqwest::ClientBuilder::new()
77 .pool_idle_timeout(Duration::from_secs(90))
78 .pool_max_idle_per_host(32)
79 .timeout(Duration::from_secs(30))
80 .connect_timeout(Duration::from_secs(20))
81 .use_rustls_tls()
82 .build()
83 .context("Failed to create base HTTP client")?,
84 );
85 let retryable_client = create_retryable_http_client(
90 &http_retry_config,
91 (*base_http_client).clone(),
92 Some(TransientErrorRetryStrategy),
93 );
94 for rpc_url in rpc_urls.iter() {
95 let url = match Url::parse(rpc_url.url.as_ref()) {
96 Ok(url) => url,
97 Err(_) => continue,
98 };
99 let test_request = if let Some(test_payload) = &test_connection_payload {
100 serde_json::from_str(test_payload)
101 .context("Failed to parse test payload as JSON")?
102 } else {
103 json!({
104 "jsonrpc": "2.0",
105 "id": 1,
106 "method": "net_version",
107 "params": []
108 })
109 };
110 let request_result = retryable_client
112 .post(url.clone())
113 .json(&test_request)
114 .send()
115 .await;
116 match request_result {
117 Ok(response) => {
118 if !response.status().is_success() {
120 continue;
122 }
123 let fallback_urls: Vec<String> = rpc_urls
125 .iter()
126 .filter(|url| url.url != rpc_url.url)
127 .map(|url| url.url.as_ref().to_string())
128 .collect();
129
130 let network_slug = network.slug.clone();
131
132 crate::utils::metrics::init_rpc_metrics_for_network(&network_slug);
134
135 return Ok(Self {
137 client: retryable_client.clone(),
138 endpoint_manager: EndpointManager::new(
139 retryable_client,
140 rpc_url.url.as_ref(),
141 fallback_urls,
142 network_slug,
143 ),
144 test_connection_payload,
145 });
146 }
147 Err(_) => {
148 continue;
150 }
151 }
152 }
153
154 Err(anyhow::anyhow!("All RPC URLs failed to connect"))
155 }
156}
157
158#[async_trait]
159impl BlockchainTransport for HttpTransportClient {
160 async fn get_current_url(&self) -> String {
168 self.endpoint_manager.active_url.read().await.clone()
169 }
170
171 async fn send_raw_request<P>(
189 &self,
190 method: &str,
191 params: Option<P>,
192 ) -> Result<Value, TransportError>
193 where
194 P: Into<Value> + Send + Clone + Serialize,
195 {
196 self.endpoint_manager
197 .send_raw_request(self, method, params)
198 .await
199 }
200
201 fn update_endpoint_manager_client(
206 &mut self,
207 client: ClientWithMiddleware,
208 ) -> Result<(), anyhow::Error> {
209 self.endpoint_manager.update_client(client);
210 Ok(())
211 }
212}
213
214#[async_trait]
215impl RotatingTransport for HttpTransportClient {
216 async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error> {
227 let url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
228
229 let test_request = if let Some(test_payload) = &self.test_connection_payload {
230 serde_json::from_str(test_payload).context("Failed to parse test payload as JSON")?
231 } else {
232 json!({
233 "jsonrpc": "2.0",
234 "id": 1,
235 "method": "net_version",
236 "params": []
237 })
238 };
239
240 let request = self.client.post(url.clone()).json(&test_request);
241
242 match request.send().await {
243 Ok(response) => {
244 let status = response.status();
245 if !status.is_success() {
246 Err(anyhow::anyhow!(
247 "Failed to connect to {}: {}",
248 url,
249 status.as_u16()
250 ))
251 } else {
252 Ok(())
253 }
254 }
255 Err(e) => Err(anyhow::anyhow!("Failed to connect to {}: {}", url, e)),
256 }
257 }
258
259 async fn update_client(&self, url: &str) -> Result<(), anyhow::Error> {
270 let parsed_url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
271 let normalized_url = parsed_url.as_str().trim_end_matches('/');
273
274 let mut active_url = self.endpoint_manager.active_url.write().await;
277 *active_url = normalized_url.to_string();
278 Ok(())
279 }
280}