openzeppelin_monitor/services/blockchain/clients/solana/
client.rs

1//! Solana blockchain client implementation.
2//!
3//! This module provides functionality to interact with the Solana blockchain,
4//! supporting operations like block retrieval, transaction lookup, and program account queries.
5
6use anyhow::Context;
7use async_trait::async_trait;
8use serde_json::json;
9use std::marker::PhantomData;
10use tracing::instrument;
11
12use crate::{
13	models::{
14		BlockType, ContractSpec, Network, SolanaBlock, SolanaConfirmedBlock, SolanaContractSpec,
15		SolanaInstruction, SolanaTransaction, SolanaTransactionInfo, SolanaTransactionMessage,
16		SolanaTransactionMeta,
17	},
18	services::{
19		blockchain::{
20			client::{BlockChainClient, BlockFilterFactory},
21			transports::{SolanaGetBlockConfig, SolanaTransportClient},
22			BlockchainTransport,
23		},
24		filter::SolanaBlockFilter,
25	},
26};
27
28use super::error::{error_codes, is_slot_unavailable_error, SolanaClientError};
29
30/// Solana RPC method constants
31mod rpc_methods {
32	pub const GET_SLOT: &str = "getSlot";
33	pub const GET_BLOCK: &str = "getBlock";
34	pub const GET_BLOCKS: &str = "getBlocks";
35	pub const GET_TRANSACTION: &str = "getTransaction";
36	pub const GET_ACCOUNT_INFO: &str = "getAccountInfo";
37	pub const GET_PROGRAM_ACCOUNTS: &str = "getProgramAccounts";
38	pub const GET_SIGNATURES_FOR_ADDRESS: &str = "getSignaturesForAddress";
39}
40
41/// Information about a transaction signature from getSignaturesForAddress
42#[derive(Debug, Clone)]
43pub struct SignatureInfo {
44	/// The transaction signature
45	pub signature: String,
46	/// The slot the transaction was processed in
47	pub slot: u64,
48	/// Whether the transaction had an error (None = success)
49	pub err: Option<serde_json::Value>,
50	/// Block time if available
51	pub block_time: Option<i64>,
52}
53
54/// Client implementation for the Solana blockchain
55///
56/// Provides high-level access to Solana blockchain data and operations through HTTP transport.
57/// Supports optimized block fetching when monitored addresses are configured.
58#[derive(Clone)]
59pub struct SolanaClient<T: Send + Sync + Clone> {
60	/// The underlying Solana transport client for RPC communication
61	http_client: T,
62	/// Addresses to monitor for optimized block fetching (e.g., program IDs)
63	/// When set, get_blocks uses getSignaturesForAddress instead of getBlock
64	monitored_addresses: Vec<String>,
65}
66
67impl<T: Send + Sync + Clone> SolanaClient<T> {
68	/// Creates a new Solana client instance with a specific transport client
69	pub fn new_with_transport(http_client: T) -> Self {
70		Self {
71			http_client,
72			monitored_addresses: Vec::new(),
73		}
74	}
75
76	/// Configures the client with addresses to monitor
77	///
78	/// When addresses are set, `get_blocks` will use the optimized
79	/// `getSignaturesForAddress` approach instead of fetching all blocks.
80	///
81	/// # Arguments
82	/// * `addresses` - Program IDs or addresses to monitor
83	pub fn with_monitored_addresses(mut self, addresses: Vec<String>) -> Self {
84		self.monitored_addresses = addresses;
85		self
86	}
87
88	/// Sets the monitored addresses (mutable version)
89	pub fn set_monitored_addresses(&mut self, addresses: Vec<String>) {
90		self.monitored_addresses = addresses;
91	}
92
93	/// Returns the currently monitored addresses
94	pub fn monitored_addresses(&self) -> &[String] {
95		&self.monitored_addresses
96	}
97
98	/// Checks a JSON-RPC response for error information and converts it into a `SolanaClientError` if present.
99	fn check_and_handle_rpc_error(
100		&self,
101		response_body: &serde_json::Value,
102		slot: u64,
103		method_name: &'static str,
104	) -> Result<(), SolanaClientError> {
105		if let Some(json_rpc_error) = response_body.get("error") {
106			let rpc_code = json_rpc_error
107				.get("code")
108				.and_then(|c| c.as_i64())
109				.unwrap_or(0);
110			let rpc_message = json_rpc_error
111				.get("message")
112				.and_then(|m| m.as_str())
113				.unwrap_or("Unknown RPC error")
114				.to_string();
115
116			// Check for slot unavailable errors
117			if is_slot_unavailable_error(rpc_code) {
118				return Err(SolanaClientError::slot_not_available(
119					slot,
120					rpc_message,
121					None,
122					None,
123				));
124			}
125
126			// Check for block not available
127			if rpc_code == error_codes::BLOCK_NOT_AVAILABLE {
128				return Err(SolanaClientError::block_not_available(
129					slot,
130					rpc_message,
131					None,
132					None,
133				));
134			}
135
136			// Other JSON-RPC error
137			let message = format!(
138				"Solana RPC request failed for method '{}': {} (code {})",
139				method_name, rpc_message, rpc_code
140			);
141
142			return Err(SolanaClientError::rpc_error(message, None, None));
143		}
144		Ok(())
145	}
146
147	/// Parses a raw block response into a SolanaBlock
148	fn parse_block_response(
149		&self,
150		slot: u64,
151		response_body: &serde_json::Value,
152	) -> Result<SolanaBlock, SolanaClientError> {
153		let result = response_body.get("result").ok_or_else(|| {
154			SolanaClientError::unexpected_response_structure(
155				"Missing 'result' field in block response",
156				None,
157				None,
158			)
159		})?;
160
161		// Handle null result (slot was skipped or block not available)
162		if result.is_null() {
163			return Err(SolanaClientError::block_not_available(
164				slot,
165				"Block data is null (slot may have been skipped)",
166				None,
167				None,
168			));
169		}
170
171		let blockhash = result
172			.get("blockhash")
173			.and_then(|v| v.as_str())
174			.unwrap_or_default()
175			.to_string();
176
177		let previous_blockhash = result
178			.get("previousBlockhash")
179			.and_then(|v| v.as_str())
180			.unwrap_or_default()
181			.to_string();
182
183		let parent_slot = result
184			.get("parentSlot")
185			.and_then(|v| v.as_u64())
186			.unwrap_or(0);
187
188		let block_time = result.get("blockTime").and_then(|v| v.as_i64());
189
190		let block_height = result.get("blockHeight").and_then(|v| v.as_u64());
191
192		// Parse transactions
193		let transactions = self.parse_transactions_from_block(slot, result)?;
194
195		let confirmed_block = SolanaConfirmedBlock {
196			slot,
197			blockhash,
198			previous_blockhash,
199			parent_slot,
200			block_time,
201			block_height,
202			transactions,
203		};
204
205		Ok(SolanaBlock::from(confirmed_block))
206	}
207
208	/// Parses transactions from a block response
209	fn parse_transactions_from_block(
210		&self,
211		slot: u64,
212		block_result: &serde_json::Value,
213	) -> Result<Vec<SolanaTransaction>, SolanaClientError> {
214		let raw_transactions = match block_result.get("transactions") {
215			Some(txs) if txs.is_array() => txs.as_array().unwrap(),
216			_ => return Ok(Vec::new()),
217		};
218
219		let mut transactions = Vec::with_capacity(raw_transactions.len());
220
221		for raw_tx in raw_transactions {
222			if let Some(tx) = self.parse_single_transaction(slot, raw_tx)? {
223				transactions.push(tx);
224			}
225		}
226
227		Ok(transactions)
228	}
229
230	/// Parses a single transaction from the block response
231	fn parse_single_transaction(
232		&self,
233		slot: u64,
234		raw_tx: &serde_json::Value,
235	) -> Result<Option<SolanaTransaction>, SolanaClientError> {
236		// Get transaction data
237		let transaction = match raw_tx.get("transaction") {
238			Some(tx) => tx,
239			None => return Ok(None),
240		};
241
242		// Get meta data
243		let meta = raw_tx.get("meta");
244
245		// Parse signature
246		let signature = transaction
247			.get("signatures")
248			.and_then(|sigs| sigs.get(0))
249			.and_then(|sig| sig.as_str())
250			.unwrap_or_default()
251			.to_string();
252
253		// Parse message
254		let message = transaction.get("message");
255
256		// Parse account keys
257		let account_keys: Vec<String> = message
258			.and_then(|m| m.get("accountKeys"))
259			.and_then(|keys| keys.as_array())
260			.map(|keys| {
261				keys.iter()
262					.filter_map(|k| {
263						// Handle both string and object formats
264						if let Some(s) = k.as_str() {
265							Some(s.to_string())
266						} else {
267							k.get("pubkey")
268								.and_then(|p| p.as_str())
269								.map(|s| s.to_string())
270						}
271					})
272					.collect()
273			})
274			.unwrap_or_default();
275
276		// Parse recent blockhash
277		let recent_blockhash = message
278			.and_then(|m| m.get("recentBlockhash"))
279			.and_then(|h| h.as_str())
280			.unwrap_or_default()
281			.to_string();
282
283		// Parse instructions
284		let instructions = self.parse_instructions(message, &account_keys)?;
285
286		// Create transaction message
287		let tx_message = SolanaTransactionMessage {
288			account_keys,
289			recent_blockhash,
290			instructions,
291			address_table_lookups: Vec::new(),
292		};
293
294		// Parse meta
295		let tx_meta = meta.map(|m| {
296			// err is null for successful transactions, so we need to handle that
297			let err = m.get("err").and_then(|e| {
298				if e.is_null() {
299					None // Success - no error
300				} else {
301					Some(e.clone()) // Failure - has error
302				}
303			});
304			let fee = m.get("fee").and_then(|f| f.as_u64()).unwrap_or(0);
305			let pre_balances: Vec<u64> = m
306				.get("preBalances")
307				.and_then(|b| b.as_array())
308				.map(|arr| arr.iter().filter_map(|v| v.as_u64()).collect())
309				.unwrap_or_default();
310			let post_balances: Vec<u64> = m
311				.get("postBalances")
312				.and_then(|b| b.as_array())
313				.map(|arr| arr.iter().filter_map(|v| v.as_u64()).collect())
314				.unwrap_or_default();
315			let log_messages: Vec<String> = m
316				.get("logMessages")
317				.and_then(|logs| logs.as_array())
318				.map(|logs| {
319					logs.iter()
320						.filter_map(|l| l.as_str().map(|s| s.to_string()))
321						.collect()
322				})
323				.unwrap_or_default();
324
325			SolanaTransactionMeta {
326				err,
327				fee,
328				pre_balances,
329				post_balances,
330				pre_token_balances: Vec::new(),
331				post_token_balances: Vec::new(),
332				inner_instructions: Vec::new(),
333				log_messages,
334				compute_units_consumed: m.get("computeUnitsConsumed").and_then(|c| c.as_u64()),
335				loaded_addresses: None,
336			}
337		});
338
339		let tx_info = SolanaTransactionInfo {
340			signature,
341			slot,
342			block_time: None,
343			transaction: tx_message,
344			meta: tx_meta,
345		};
346
347		Ok(Some(SolanaTransaction::from(tx_info)))
348	}
349
350	/// Parses instructions from transaction message
351	fn parse_instructions(
352		&self,
353		message: Option<&serde_json::Value>,
354		_account_keys: &[String],
355	) -> Result<Vec<SolanaInstruction>, SolanaClientError> {
356		let raw_instructions = match message.and_then(|m| m.get("instructions")) {
357			Some(instrs) if instrs.is_array() => instrs.as_array().unwrap(),
358			_ => return Ok(Vec::new()),
359		};
360
361		let mut instructions = Vec::with_capacity(raw_instructions.len());
362
363		for raw_instr in raw_instructions {
364			// Get program ID index
365			let program_id_index = raw_instr
366				.get("programIdIndex")
367				.and_then(|idx| idx.as_u64())
368				.unwrap_or(0) as u8;
369
370			// Get account indices
371			let accounts: Vec<u8> = raw_instr
372				.get("accounts")
373				.and_then(|accs| accs.as_array())
374				.map(|accs| {
375					accs.iter()
376						.filter_map(|idx| idx.as_u64().map(|i| i as u8))
377						.collect()
378				})
379				.unwrap_or_default();
380
381			// Get data (base58 encoded)
382			let data = raw_instr
383				.get("data")
384				.and_then(|d| d.as_str())
385				.unwrap_or_default()
386				.to_string();
387
388			// Check for parsed instruction
389			let parsed = raw_instr.get("parsed").map(|p| {
390				let instruction_type = p.get("type").and_then(|t| t.as_str()).unwrap_or_default();
391				let info = p.get("info").cloned().unwrap_or(serde_json::Value::Null);
392				crate::models::SolanaParsedInstruction {
393					instruction_type: instruction_type.to_string(),
394					info,
395				}
396			});
397
398			let program = raw_instr
399				.get("program")
400				.and_then(|p| p.as_str())
401				.map(|s| s.to_string());
402
403			let program_id = raw_instr
404				.get("programId")
405				.and_then(|p| p.as_str())
406				.map(|s| s.to_string());
407
408			instructions.push(SolanaInstruction {
409				program_id_index,
410				accounts,
411				data,
412				parsed,
413				program,
414				program_id,
415			});
416		}
417
418		Ok(instructions)
419	}
420}
421
422impl SolanaClient<SolanaTransportClient> {
423	/// Creates a new Solana client instance
424	pub async fn new(network: &Network) -> Result<Self, anyhow::Error> {
425		let http_client = SolanaTransportClient::new(network).await?;
426		Ok(Self::new_with_transport(http_client))
427	}
428}
429
430/// Extended functionality specific to the Solana blockchain
431#[async_trait]
432pub trait SolanaClientTrait {
433	/// Retrieves transactions for a specific slot
434	async fn get_transactions(&self, slot: u64) -> Result<Vec<SolanaTransaction>, anyhow::Error>;
435
436	/// Retrieves a single transaction by signature
437	async fn get_transaction(
438		&self,
439		signature: String,
440	) -> Result<Option<SolanaTransaction>, anyhow::Error>;
441
442	/// Retrieves signatures with full info (slot, err, block_time) for an address
443	/// Optionally filter by slot range
444	async fn get_signatures_for_address_with_info(
445		&self,
446		address: String,
447		limit: Option<usize>,
448		min_slot: Option<u64>,
449		until_signature: Option<String>,
450	) -> Result<Vec<SignatureInfo>, anyhow::Error>;
451
452	/// Retrieves all signatures for an address within a slot range with automatic pagination
453	/// This method handles pagination internally and returns all signatures up to a safety limit
454	async fn get_all_signatures_for_address(
455		&self,
456		address: String,
457		start_slot: u64,
458		end_slot: u64,
459	) -> Result<Vec<SignatureInfo>, anyhow::Error>;
460
461	/// Retrieves transactions for multiple addresses within a slot range
462	/// This is the optimized method that uses getSignaturesForAddress instead of getBlock
463	async fn get_transactions_for_addresses(
464		&self,
465		addresses: &[String],
466		start_slot: u64,
467		end_slot: Option<u64>,
468	) -> Result<Vec<SolanaTransaction>, anyhow::Error>;
469
470	/// Retrieves blocks containing only transactions relevant to the specified addresses
471	/// This is the main optimization: instead of fetching all blocks, we fetch only
472	/// transactions that involve the monitored addresses and group them into virtual blocks
473	///
474	/// Returns BlockType::Solana blocks, compatible with the existing filter infrastructure
475	async fn get_blocks_for_addresses(
476		&self,
477		addresses: &[String],
478		start_slot: u64,
479		end_slot: Option<u64>,
480	) -> Result<Vec<BlockType>, anyhow::Error>;
481
482	/// Retrieves account info for a given public key
483	async fn get_account_info(&self, pubkey: String) -> Result<serde_json::Value, anyhow::Error>;
484
485	/// Retrieves program accounts for a given program ID
486	async fn get_program_accounts(
487		&self,
488		program_id: String,
489	) -> Result<Vec<serde_json::Value>, anyhow::Error>;
490}
491
492#[async_trait]
493impl<T: Send + Sync + Clone + BlockchainTransport> SolanaClientTrait for SolanaClient<T> {
494	#[instrument(skip(self), fields(slot))]
495	async fn get_transactions(&self, slot: u64) -> Result<Vec<SolanaTransaction>, anyhow::Error> {
496		let config = SolanaGetBlockConfig::full();
497		let params = json!([slot, config]);
498
499		let response = self
500			.http_client
501			.send_raw_request(rpc_methods::GET_BLOCK, Some(params))
502			.await
503			.with_context(|| format!("Failed to get block for slot {}", slot))?;
504
505		if let Err(rpc_error) =
506			self.check_and_handle_rpc_error(&response, slot, rpc_methods::GET_BLOCK)
507		{
508			return Err(anyhow::anyhow!(rpc_error)
509				.context(format!("Solana RPC error while fetching slot {}", slot)));
510		}
511
512		let block = self.parse_block_response(slot, &response).map_err(|e| {
513			anyhow::anyhow!(e).context(format!("Failed to parse block response for slot {}", slot))
514		})?;
515
516		Ok(block.transactions.clone())
517	}
518
519	#[instrument(skip(self), fields(signature))]
520	async fn get_transaction(
521		&self,
522		signature: String,
523	) -> Result<Option<SolanaTransaction>, anyhow::Error> {
524		let config = json!({
525			"encoding": "json",
526			"commitment": "finalized",
527			"maxSupportedTransactionVersion": 0
528		});
529		let params = json!([signature, config]);
530
531		let response = self
532			.http_client
533			.send_raw_request(rpc_methods::GET_TRANSACTION, Some(params))
534			.await
535			.with_context(|| format!("Failed to get transaction {}", signature))?;
536
537		// Check for null result (transaction not found)
538		let result = response.get("result");
539		if result.is_none() || result.unwrap().is_null() {
540			return Ok(None);
541		}
542
543		let result = result.unwrap();
544
545		// Extract slot from response
546		let slot = result.get("slot").and_then(|s| s.as_u64()).unwrap_or(0);
547
548		// Parse the transaction using existing parsing logic
549		// We need to wrap it in the format expected by parse_single_transaction
550		let wrapped_tx = json!({
551			"transaction": result.get("transaction"),
552			"meta": result.get("meta")
553		});
554
555		match self.parse_single_transaction(slot, &wrapped_tx) {
556			Ok(Some(mut tx)) => {
557				// Update block_time if available
558				if let Some(block_time) = result.get("blockTime").and_then(|t| t.as_i64()) {
559					tx.0.block_time = Some(block_time);
560				}
561				Ok(Some(tx))
562			}
563			Ok(None) => Ok(None),
564			Err(e) => Err(anyhow::anyhow!(e).context("Failed to parse transaction")),
565		}
566	}
567
568	#[instrument(skip(self), fields(address, limit, min_slot))]
569	async fn get_signatures_for_address_with_info(
570		&self,
571		address: String,
572		limit: Option<usize>,
573		min_slot: Option<u64>,
574		until_signature: Option<String>,
575	) -> Result<Vec<SignatureInfo>, anyhow::Error> {
576		let address = &address;
577		let until_signature = until_signature.as_deref();
578		let mut config = json!({
579			"commitment": "finalized",
580			"limit": limit.unwrap_or(1000)
581		});
582
583		// Add minContextSlot if specified (helps filter old transactions)
584		if let Some(min) = min_slot {
585			config["minContextSlot"] = json!(min);
586		}
587
588		// Add until signature to paginate
589		if let Some(until) = until_signature {
590			config["until"] = json!(until);
591		}
592
593		let params = json!([address, config]);
594
595		let response = self
596			.http_client
597			.send_raw_request(rpc_methods::GET_SIGNATURES_FOR_ADDRESS, Some(params))
598			.await
599			.with_context(|| format!("Failed to get signatures for address {}", address))?;
600
601		let result = response
602			.get("result")
603			.and_then(|r| r.as_array())
604			.ok_or_else(|| anyhow::anyhow!("Invalid response structure"))?;
605
606		let signatures: Vec<SignatureInfo> = result
607			.iter()
608			.filter_map(|item| {
609				let signature = item.get("signature")?.as_str()?.to_string();
610				let slot = item.get("slot")?.as_u64()?;
611				let err =
612					item.get("err")
613						.and_then(|e| if e.is_null() { None } else { Some(e.clone()) });
614				let block_time = item.get("blockTime").and_then(|t| t.as_i64());
615
616				Some(SignatureInfo {
617					signature,
618					slot,
619					err,
620					block_time,
621				})
622			})
623			.collect();
624
625		Ok(signatures)
626	}
627
628	#[instrument(skip(self), fields(address, start_slot, end_slot))]
629	async fn get_all_signatures_for_address(
630		&self,
631		address: String,
632		start_slot: u64,
633		end_slot: u64,
634	) -> Result<Vec<SignatureInfo>, anyhow::Error> {
635		const PAGE_LIMIT: usize = 1000;
636		const MAX_SIGNATURES: usize = 100_000; // Safety limit
637
638		let mut all_signatures = Vec::new();
639		let mut until_signature: Option<String> = None;
640		let mut iteration = 0;
641
642		loop {
643			let batch = self
644				.get_signatures_for_address_with_info(
645					address.clone(),
646					Some(PAGE_LIMIT),
647					Some(start_slot),
648					until_signature.clone(),
649				)
650				.await?;
651
652			if batch.is_empty() {
653				break;
654			}
655
656			// Filter by slot range and collect
657			let filtered: Vec<SignatureInfo> = batch
658				.into_iter()
659				.filter(|sig| sig.slot >= start_slot && sig.slot <= end_slot)
660				.collect();
661
662			let batch_len = filtered.len();
663			until_signature = filtered.last().map(|s| s.signature.clone());
664			all_signatures.extend(filtered);
665
666			// Break conditions
667			if batch_len < PAGE_LIMIT {
668				break; // Last page
669			}
670
671			if all_signatures.len() >= MAX_SIGNATURES {
672				tracing::warn!(
673					address = %address,
674					count = all_signatures.len(),
675					"Reached maximum signature limit, stopping pagination"
676				);
677				break;
678			}
679
680			iteration += 1;
681		}
682
683		tracing::debug!(
684			address = %address,
685			signatures = all_signatures.len(),
686			iterations = iteration + 1,
687			"Completed signature pagination"
688		);
689
690		Ok(all_signatures)
691	}
692
693	#[instrument(skip(self), fields(addresses_count = addresses.len(), start_slot, end_slot))]
694	async fn get_transactions_for_addresses(
695		&self,
696		addresses: &[String],
697		start_slot: u64,
698		end_slot: Option<u64>,
699	) -> Result<Vec<SolanaTransaction>, anyhow::Error> {
700		use futures::stream::{self, StreamExt};
701		use std::collections::HashSet;
702
703		let end_slot = end_slot.unwrap_or(start_slot);
704
705		if addresses.is_empty() {
706			return Ok(Vec::new());
707		}
708
709		tracing::debug!(
710			addresses = ?addresses,
711			start_slot = start_slot,
712			end_slot = end_slot,
713			"Fetching transactions for addresses using signatures approach"
714		);
715
716		// Collect all unique signatures from all addresses within the slot range
717		let mut all_signatures: HashSet<String> = HashSet::new();
718
719		for address in addresses {
720			// Use paginated method to fetch all signatures
721			let signatures = self
722				.get_all_signatures_for_address(address.clone(), start_slot, end_slot)
723				.await?;
724
725			tracing::debug!(
726				address = %address,
727				signatures_count = signatures.len(),
728				"Got signatures for address"
729			);
730
731			for sig_info in signatures {
732				all_signatures.insert(sig_info.signature);
733			}
734		}
735
736		tracing::debug!(
737			unique_signatures = all_signatures.len(),
738			"Fetching transactions for unique signatures in slot range"
739		);
740
741		// Fetch transactions in parallel with controlled concurrency
742		let transactions: Vec<SolanaTransaction> = stream::iter(all_signatures)
743			.map(|signature| async move {
744				let sig = signature.clone();
745				match self.get_transaction(signature).await {
746					Ok(Some(tx)) => Some(tx),
747					Ok(None) => {
748						tracing::debug!(signature = %sig, "Transaction not found");
749						None
750					}
751					Err(e) => {
752						tracing::warn!(signature = %sig, error = %e, "Failed to fetch transaction");
753						None
754					}
755				}
756			})
757			.buffer_unordered(20) // 20 concurrent requests
758			.filter_map(|result| async move { result })
759			.collect()
760			.await;
761
762		tracing::debug!(
763			fetched_transactions = transactions.len(),
764			"Successfully fetched transactions"
765		);
766
767		Ok(transactions)
768	}
769
770	#[instrument(skip(self), fields(pubkey))]
771	async fn get_account_info(&self, pubkey: String) -> Result<serde_json::Value, anyhow::Error> {
772		let config = json!({
773			"encoding": "jsonParsed",
774			"commitment": "finalized"
775		});
776		let params = json!([&pubkey, config]);
777
778		let response = self
779			.http_client
780			.send_raw_request(rpc_methods::GET_ACCOUNT_INFO, Some(params))
781			.await
782			.with_context(|| format!("Failed to get account info for {}", pubkey))?;
783
784		let result = response
785			.get("result")
786			.cloned()
787			.ok_or_else(|| anyhow::anyhow!("Invalid response structure"))?;
788
789		Ok(result)
790	}
791
792	#[instrument(skip(self), fields(program_id))]
793	async fn get_program_accounts(
794		&self,
795		program_id: String,
796	) -> Result<Vec<serde_json::Value>, anyhow::Error> {
797		let config = json!({
798			"encoding": "jsonParsed",
799			"commitment": "finalized"
800		});
801		let params = json!([&program_id, config]);
802
803		let response = self
804			.http_client
805			.send_raw_request(rpc_methods::GET_PROGRAM_ACCOUNTS, Some(params))
806			.await
807			.with_context(|| format!("Failed to get program accounts for {}", program_id))?;
808
809		let result = response
810			.get("result")
811			.and_then(|r| r.as_array())
812			.cloned()
813			.ok_or_else(|| anyhow::anyhow!("Invalid response structure"))?;
814
815		Ok(result)
816	}
817
818	#[instrument(skip(self), fields(addresses_count = addresses.len(), start_slot, end_slot))]
819	async fn get_blocks_for_addresses(
820		&self,
821		addresses: &[String],
822		start_slot: u64,
823		end_slot: Option<u64>,
824	) -> Result<Vec<BlockType>, anyhow::Error> {
825		use std::collections::BTreeMap;
826
827		// Fetch transactions using the optimized signatures approach
828		let transactions = self
829			.get_transactions_for_addresses(addresses, start_slot, end_slot)
830			.await?;
831
832		if transactions.is_empty() {
833			return Ok(Vec::new());
834		}
835
836		// Group transactions by slot
837		let mut slot_transactions: BTreeMap<u64, Vec<SolanaTransaction>> = BTreeMap::new();
838		for tx in transactions {
839			let slot = tx.slot();
840			slot_transactions.entry(slot).or_default().push(tx);
841		}
842
843		// Create virtual blocks for each slot
844		let blocks: Vec<BlockType> = slot_transactions
845			.into_iter()
846			.map(|(slot, txs)| {
847				let confirmed_block = SolanaConfirmedBlock {
848					slot,
849					blockhash: String::new(), // Not available from getTransaction
850					previous_blockhash: String::new(),
851					parent_slot: slot.saturating_sub(1),
852					block_time: txs.first().and_then(|tx| tx.0.block_time),
853					block_height: None,
854					transactions: txs,
855				};
856				BlockType::Solana(Box::new(SolanaBlock::from(confirmed_block)))
857			})
858			.collect();
859
860		tracing::debug!(
861			blocks_count = blocks.len(),
862			"Created virtual blocks from address-filtered transactions"
863		);
864
865		Ok(blocks)
866	}
867}
868
869impl<T: Send + Sync + Clone + BlockchainTransport> BlockFilterFactory<Self> for SolanaClient<T> {
870	type Filter = SolanaBlockFilter<Self>;
871
872	fn filter() -> Self::Filter {
873		SolanaBlockFilter {
874			_client: PhantomData {},
875		}
876	}
877}
878
879#[async_trait]
880impl<T: Send + Sync + Clone + BlockchainTransport> BlockChainClient for SolanaClient<T> {
881	#[instrument(skip(self))]
882	async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
883		let config = json!({ "commitment": "finalized" });
884		let params = json!([config]);
885
886		let response = self
887			.http_client
888			.send_raw_request(rpc_methods::GET_SLOT, Some(params))
889			.await
890			.with_context(|| "Failed to get latest slot")?;
891
892		let slot = response["result"]
893			.as_u64()
894			.ok_or_else(|| anyhow::anyhow!("Invalid slot number in response"))?;
895
896		Ok(slot)
897	}
898
899	#[instrument(skip(self), fields(start_block, end_block))]
900	async fn get_blocks(
901		&self,
902		start_block: u64,
903		end_block: Option<u64>,
904	) -> Result<Vec<BlockType>, anyhow::Error> {
905		// If monitored addresses are configured, use the optimized approach
906		if !self.monitored_addresses.is_empty() {
907			tracing::debug!(
908				addresses = ?self.monitored_addresses,
909				start_block = start_block,
910				end_block = ?end_block,
911				"Using optimized getSignaturesForAddress approach"
912			);
913			return SolanaClientTrait::get_blocks_for_addresses(
914				self,
915				&self.monitored_addresses,
916				start_block,
917				end_block,
918			)
919			.await;
920		}
921
922		// Standard approach: fetch all blocks
923		// Validate input parameters
924		if let Some(end_block) = end_block {
925			if start_block > end_block {
926				let message = format!(
927					"start_block {} cannot be greater than end_block {}",
928					start_block, end_block
929				);
930				let input_error = SolanaClientError::invalid_input(message, None, None);
931				return Err(anyhow::anyhow!(input_error))
932					.context("Invalid input parameters for Solana RPC");
933			}
934		}
935
936		let target_block = end_block.unwrap_or(start_block);
937
938		// First, get the list of available slots in the range
939		let slots = if start_block == target_block {
940			vec![start_block]
941		} else {
942			let params = json!([start_block, target_block, { "commitment": "finalized" }]);
943			let response = self
944				.http_client
945				.send_raw_request(rpc_methods::GET_BLOCKS, Some(params))
946				.await
947				.with_context(|| {
948					format!(
949						"Failed to get blocks list from {} to {}",
950						start_block, target_block
951					)
952				})?;
953
954			let slots: Vec<u64> = response["result"]
955				.as_array()
956				.ok_or_else(|| anyhow::anyhow!("Invalid blocks list response"))?
957				.iter()
958				.filter_map(|v| v.as_u64())
959				.collect();
960
961			if slots.is_empty() {
962				return Ok(Vec::new());
963			}
964
965			slots
966		};
967
968		// Fetch each block
969		let mut blocks = Vec::with_capacity(slots.len());
970		let config = SolanaGetBlockConfig::full();
971
972		for slot in slots {
973			let params = json!([slot, config]);
974
975			let response = self
976				.http_client
977				.send_raw_request(rpc_methods::GET_BLOCK, Some(params))
978				.await;
979
980			match response {
981				Ok(response_body) => {
982					if let Err(rpc_error) = self.check_and_handle_rpc_error(
983						&response_body,
984						slot,
985						rpc_methods::GET_BLOCK,
986					) {
987						if rpc_error.is_slot_not_available() || rpc_error.is_block_not_available() {
988							tracing::debug!("Skipping unavailable slot {}: {}", slot, rpc_error);
989							continue;
990						}
991						return Err(anyhow::anyhow!(rpc_error)
992							.context(format!("Solana RPC error while fetching slot {}", slot)));
993					}
994
995					match self.parse_block_response(slot, &response_body) {
996						Ok(block) => {
997							blocks.push(BlockType::Solana(Box::new(block)));
998						}
999						Err(parse_error) => {
1000							if parse_error.is_block_not_available() {
1001								tracing::debug!(
1002									"Skipping slot {} due to parse error: {}",
1003									slot,
1004									parse_error
1005								);
1006								continue;
1007							}
1008							return Err(anyhow::anyhow!(parse_error)
1009								.context(format!("Failed to parse block for slot {}", slot)));
1010						}
1011					}
1012				}
1013				Err(transport_err) => {
1014					return Err(anyhow::anyhow!(transport_err)).context(format!(
1015						"Failed to fetch block from Solana RPC for slot: {}",
1016						slot
1017					));
1018				}
1019			}
1020		}
1021
1022		Ok(blocks)
1023	}
1024
1025	#[instrument(skip(self), fields(contract_id))]
1026	async fn get_contract_spec(&self, contract_id: &str) -> Result<ContractSpec, anyhow::Error> {
1027		tracing::warn!(
1028			"Automatic IDL fetching not yet implemented for program {}. \
1029             Please provide the IDL manually in the monitor configuration.",
1030			contract_id
1031		);
1032
1033		Ok(ContractSpec::Solana(SolanaContractSpec::default()))
1034	}
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039	use super::*;
1040
1041	#[test]
1042	fn test_solana_client_implements_traits() {
1043		fn assert_send_sync<T: Send + Sync>() {}
1044		fn assert_clone<T: Clone>() {}
1045
1046		assert_send_sync::<SolanaClient<SolanaTransportClient>>();
1047		assert_clone::<SolanaClient<SolanaTransportClient>>();
1048	}
1049}