openzeppelin_monitor/services/blockchain/
pool.rs

1//! Client pool for managing blockchain clients.
2//!
3//! This module provides a thread-safe client pooling system that:
4//! - Caches blockchain clients by network
5//! - Creates clients lazily on first use
6//! - Handles EVM, Stellar, Midnight, and Solana clients
7//! - Provides type-safe access to clients
8//! - Manages client lifecycles automatically
9//!
10//! The pool uses a fast path for existing clients and a slow path for
11//! creating new ones, optimizing performance while maintaining safety.
12
13use crate::utils::client_storage::ClientStorage;
14use crate::{
15	models::{BlockChainType, Network},
16	services::blockchain::{
17		BlockChainClient, BlockFilterFactory, EVMTransportClient, EvmClient, EvmClientTrait,
18		MidnightClient, MidnightClientTrait, MidnightWsTransportClient, SolanaClient,
19		SolanaClientTrait, SolanaTransportClient, StellarClient, StellarClientTrait,
20		StellarTransportClient,
21	},
22};
23use anyhow::Context;
24use async_trait::async_trait;
25use futures::future::BoxFuture;
26use std::{any::Any, collections::HashMap, sync::Arc};
27
28/// Trait for the client pool.
29#[async_trait]
30pub trait ClientPoolTrait: Send + Sync {
31	type EvmClient: EvmClientTrait + BlockChainClient + BlockFilterFactory<Self::EvmClient>;
32	type StellarClient: StellarClientTrait
33		+ BlockChainClient
34		+ BlockFilterFactory<Self::StellarClient>;
35	type MidnightClient: MidnightClientTrait
36		+ BlockChainClient
37		+ BlockFilterFactory<Self::MidnightClient>;
38	type SolanaClient: SolanaClientTrait + BlockChainClient + BlockFilterFactory<Self::SolanaClient>;
39
40	async fn get_evm_client(
41		&self,
42		network: &Network,
43	) -> Result<Arc<Self::EvmClient>, anyhow::Error>;
44	async fn get_stellar_client(
45		&self,
46		network: &Network,
47	) -> Result<Arc<Self::StellarClient>, anyhow::Error>;
48	async fn get_midnight_client(
49		&self,
50		network: &Network,
51	) -> Result<Arc<Self::MidnightClient>, anyhow::Error>;
52	async fn get_solana_client(
53		&self,
54		network: &Network,
55	) -> Result<Arc<Self::SolanaClient>, anyhow::Error>;
56
57	/// Gets a Solana client configured with specific addresses to monitor
58	///
59	/// This creates a fresh client (not cached) with the optimized
60	/// getSignaturesForAddress approach enabled for the specified addresses.
61	async fn get_solana_client_with_addresses(
62		&self,
63		network: &Network,
64		addresses: Vec<String>,
65	) -> Result<Arc<Self::SolanaClient>, anyhow::Error>;
66}
67
68/// Main client pool manager that handles multiple blockchain types.
69///
70/// Provides type-safe access to cached blockchain clients. Clients are created
71/// on demand when first requested and then cached for future use. Uses RwLock
72/// for thread-safe access and Arc for shared ownership.
73pub struct ClientPool {
74	/// Map of client storages indexed by client type
75	pub storages: HashMap<BlockChainType, Box<dyn Any + Send + Sync>>,
76}
77
78impl ClientPool {
79	/// Creates a new empty client pool.
80	///
81	/// Initializes empty hashmaps for all supported clients types.
82	pub fn new() -> Self {
83		let mut pool = Self {
84			storages: HashMap::new(),
85		};
86
87		// Register client types
88		pool.register_client_type::<EvmClient<EVMTransportClient>>(BlockChainType::EVM);
89		pool.register_client_type::<StellarClient<StellarTransportClient>>(BlockChainType::Stellar);
90		pool.register_client_type::<MidnightClient<MidnightWsTransportClient>>(
91			BlockChainType::Midnight,
92		);
93		pool.register_client_type::<SolanaClient<SolanaTransportClient>>(BlockChainType::Solana);
94
95		pool
96	}
97
98	fn register_client_type<T: 'static + Send + Sync>(&mut self, client_type: BlockChainType) {
99		self.storages
100			.insert(client_type, Box::new(ClientStorage::<T>::new()));
101	}
102
103	/// Internal helper method to get or create a client of any type.
104	///
105	/// Uses a double-checked locking pattern:
106	/// 1. Fast path with read lock to check for existing client
107	/// 2. Slow path with write lock to create new client if needed
108	///
109	/// This ensures thread-safety while maintaining good performance
110	/// for the common case of accessing existing clients.
111	async fn get_or_create_client<T: BlockChainClient + 'static>(
112		&self,
113		client_type: BlockChainType,
114		network: &Network,
115		create_fn: impl Fn(&Network) -> BoxFuture<'static, Result<T, anyhow::Error>>,
116	) -> Result<Arc<T>, anyhow::Error> {
117		let storage = self
118			.storages
119			.get(&client_type)
120			.and_then(|s| s.downcast_ref::<ClientStorage<T>>())
121			.with_context(|| "Invalid client type")?;
122
123		// Fast path: check if client exists
124		if let Some(client) = storage.clients.read().await.get(&network.slug) {
125			return Ok(client.clone());
126		}
127
128		// Slow path: create new client
129		let mut clients = storage.clients.write().await;
130
131		// Double-check client was not created while waiting for the write lock
132		if let Some(client) = clients.get(&network.slug) {
133			return Ok(client.clone());
134		}
135
136		// Create and insert
137		let client = Arc::new(create_fn(network).await?);
138		clients.insert(network.slug.clone(), client.clone());
139		Ok(client)
140	}
141
142	/// Get the number of clients for a given client type.
143	pub async fn get_client_count<T: 'static>(&self, client_type: BlockChainType) -> usize {
144		match self
145			.storages
146			.get(&client_type)
147			.and_then(|s| s.downcast_ref::<ClientStorage<T>>())
148		{
149			Some(storage) => storage.clients.read().await.len(),
150			None => 0,
151		}
152	}
153}
154
155#[async_trait]
156impl ClientPoolTrait for ClientPool {
157	type EvmClient = EvmClient<EVMTransportClient>;
158	type StellarClient = StellarClient<StellarTransportClient>;
159	type MidnightClient = MidnightClient<MidnightWsTransportClient>;
160	type SolanaClient = SolanaClient<SolanaTransportClient>;
161
162	/// Gets or creates an EVM client for the given network.
163	///
164	/// First checks the cache for an existing client. If none exists,
165	/// creates a new client under a write lock.
166	async fn get_evm_client(
167		&self,
168		network: &Network,
169	) -> Result<Arc<Self::EvmClient>, anyhow::Error> {
170		self.get_or_create_client(BlockChainType::EVM, network, |n| {
171			let network = n.clone();
172			Box::pin(async move { Self::EvmClient::new(&network).await })
173		})
174		.await
175		.with_context(|| "Failed to get or create EVM client")
176	}
177
178	/// Gets or creates a Stellar client for the given network.
179	///
180	/// First checks the cache for an existing client. If none exists,
181	/// creates a new client under a write lock.
182	async fn get_stellar_client(
183		&self,
184		network: &Network,
185	) -> Result<Arc<Self::StellarClient>, anyhow::Error> {
186		self.get_or_create_client(BlockChainType::Stellar, network, |n| {
187			let network = n.clone();
188			Box::pin(async move { Self::StellarClient::new(&network).await })
189		})
190		.await
191		.with_context(|| "Failed to get or create Stellar client")
192	}
193
194	/// Gets or creates a Midnight client for the given network.
195	///
196	/// First checks the cache for an existing client. If none exists,
197	/// creates a new client under a write lock.
198	async fn get_midnight_client(
199		&self,
200		network: &Network,
201	) -> Result<Arc<Self::MidnightClient>, anyhow::Error> {
202		self.get_or_create_client(BlockChainType::Midnight, network, |n| {
203			let network = n.clone();
204			Box::pin(async move { Self::MidnightClient::new(&network).await })
205		})
206		.await
207		.with_context(|| "Failed to get or create Midnight client")
208	}
209
210	/// Gets or creates a Solana client for the given network.
211	///
212	/// First checks the cache for an existing client. If none exists,
213	/// creates a new client under a write lock.
214	async fn get_solana_client(
215		&self,
216		network: &Network,
217	) -> Result<Arc<Self::SolanaClient>, anyhow::Error> {
218		self.get_or_create_client(BlockChainType::Solana, network, |n| {
219			let network = n.clone();
220			Box::pin(async move { Self::SolanaClient::new(&network).await })
221		})
222		.await
223		.with_context(|| "Failed to get or create Solana client")
224	}
225
226	/// Gets a Solana client configured with specific addresses to monitor.
227	///
228	/// This creates a fresh client (not cached) with the optimized
229	/// getSignaturesForAddress approach enabled for the specified addresses.
230	async fn get_solana_client_with_addresses(
231		&self,
232		network: &Network,
233		addresses: Vec<String>,
234	) -> Result<Arc<Self::SolanaClient>, anyhow::Error> {
235		let client = Self::SolanaClient::new(network)
236			.await
237			.with_context(|| "Failed to create Solana client")?;
238		let client = client.with_monitored_addresses(addresses);
239		Ok(Arc::new(client))
240	}
241}
242
243impl Default for ClientPool {
244	fn default() -> Self {
245		Self::new()
246	}
247}