1use anyhow::Context;
7use async_trait::async_trait;
8use serde_json::json;
9use std::marker::PhantomData;
10use tracing::instrument;
11
12use crate::{
13 models::{
14 BlockType, ContractSpec, Network, SolanaBlock, SolanaConfirmedBlock, SolanaContractSpec,
15 SolanaInstruction, SolanaTransaction, SolanaTransactionInfo, SolanaTransactionMessage,
16 SolanaTransactionMeta,
17 },
18 services::{
19 blockchain::{
20 client::{BlockChainClient, BlockFilterFactory},
21 transports::{SolanaGetBlockConfig, SolanaTransportClient},
22 BlockchainTransport,
23 },
24 filter::SolanaBlockFilter,
25 },
26};
27
28use super::error::{error_codes, is_slot_unavailable_error, SolanaClientError};
29
30mod rpc_methods {
32 pub const GET_SLOT: &str = "getSlot";
33 pub const GET_BLOCK: &str = "getBlock";
34 pub const GET_BLOCKS: &str = "getBlocks";
35 pub const GET_TRANSACTION: &str = "getTransaction";
36 pub const GET_ACCOUNT_INFO: &str = "getAccountInfo";
37 pub const GET_PROGRAM_ACCOUNTS: &str = "getProgramAccounts";
38 pub const GET_SIGNATURES_FOR_ADDRESS: &str = "getSignaturesForAddress";
39}
40
41#[derive(Debug, Clone)]
43pub struct SignatureInfo {
44 pub signature: String,
46 pub slot: u64,
48 pub err: Option<serde_json::Value>,
50 pub block_time: Option<i64>,
52}
53
54#[derive(Clone)]
59pub struct SolanaClient<T: Send + Sync + Clone> {
60 http_client: T,
62 monitored_addresses: Vec<String>,
65}
66
67impl<T: Send + Sync + Clone> SolanaClient<T> {
68 pub fn new_with_transport(http_client: T) -> Self {
70 Self {
71 http_client,
72 monitored_addresses: Vec::new(),
73 }
74 }
75
76 pub fn with_monitored_addresses(mut self, addresses: Vec<String>) -> Self {
84 self.monitored_addresses = addresses;
85 self
86 }
87
88 pub fn set_monitored_addresses(&mut self, addresses: Vec<String>) {
90 self.monitored_addresses = addresses;
91 }
92
93 pub fn monitored_addresses(&self) -> &[String] {
95 &self.monitored_addresses
96 }
97
98 fn check_and_handle_rpc_error(
100 &self,
101 response_body: &serde_json::Value,
102 slot: u64,
103 method_name: &'static str,
104 ) -> Result<(), SolanaClientError> {
105 if let Some(json_rpc_error) = response_body.get("error") {
106 let rpc_code = json_rpc_error
107 .get("code")
108 .and_then(|c| c.as_i64())
109 .unwrap_or(0);
110 let rpc_message = json_rpc_error
111 .get("message")
112 .and_then(|m| m.as_str())
113 .unwrap_or("Unknown RPC error")
114 .to_string();
115
116 if is_slot_unavailable_error(rpc_code) {
118 return Err(SolanaClientError::slot_not_available(
119 slot,
120 rpc_message,
121 None,
122 None,
123 ));
124 }
125
126 if rpc_code == error_codes::BLOCK_NOT_AVAILABLE {
128 return Err(SolanaClientError::block_not_available(
129 slot,
130 rpc_message,
131 None,
132 None,
133 ));
134 }
135
136 let message = format!(
138 "Solana RPC request failed for method '{}': {} (code {})",
139 method_name, rpc_message, rpc_code
140 );
141
142 return Err(SolanaClientError::rpc_error(message, None, None));
143 }
144 Ok(())
145 }
146
147 fn parse_block_response(
149 &self,
150 slot: u64,
151 response_body: &serde_json::Value,
152 ) -> Result<SolanaBlock, SolanaClientError> {
153 let result = response_body.get("result").ok_or_else(|| {
154 SolanaClientError::unexpected_response_structure(
155 "Missing 'result' field in block response",
156 None,
157 None,
158 )
159 })?;
160
161 if result.is_null() {
163 return Err(SolanaClientError::block_not_available(
164 slot,
165 "Block data is null (slot may have been skipped)",
166 None,
167 None,
168 ));
169 }
170
171 let blockhash = result
172 .get("blockhash")
173 .and_then(|v| v.as_str())
174 .unwrap_or_default()
175 .to_string();
176
177 let previous_blockhash = result
178 .get("previousBlockhash")
179 .and_then(|v| v.as_str())
180 .unwrap_or_default()
181 .to_string();
182
183 let parent_slot = result
184 .get("parentSlot")
185 .and_then(|v| v.as_u64())
186 .unwrap_or(0);
187
188 let block_time = result.get("blockTime").and_then(|v| v.as_i64());
189
190 let block_height = result.get("blockHeight").and_then(|v| v.as_u64());
191
192 let transactions = self.parse_transactions_from_block(slot, result)?;
194
195 let confirmed_block = SolanaConfirmedBlock {
196 slot,
197 blockhash,
198 previous_blockhash,
199 parent_slot,
200 block_time,
201 block_height,
202 transactions,
203 };
204
205 Ok(SolanaBlock::from(confirmed_block))
206 }
207
208 fn parse_transactions_from_block(
210 &self,
211 slot: u64,
212 block_result: &serde_json::Value,
213 ) -> Result<Vec<SolanaTransaction>, SolanaClientError> {
214 let raw_transactions = match block_result.get("transactions") {
215 Some(txs) if txs.is_array() => txs.as_array().unwrap(),
216 _ => return Ok(Vec::new()),
217 };
218
219 let mut transactions = Vec::with_capacity(raw_transactions.len());
220
221 for raw_tx in raw_transactions {
222 if let Some(tx) = self.parse_single_transaction(slot, raw_tx)? {
223 transactions.push(tx);
224 }
225 }
226
227 Ok(transactions)
228 }
229
230 fn parse_single_transaction(
232 &self,
233 slot: u64,
234 raw_tx: &serde_json::Value,
235 ) -> Result<Option<SolanaTransaction>, SolanaClientError> {
236 let transaction = match raw_tx.get("transaction") {
238 Some(tx) => tx,
239 None => return Ok(None),
240 };
241
242 let meta = raw_tx.get("meta");
244
245 let signature = transaction
247 .get("signatures")
248 .and_then(|sigs| sigs.get(0))
249 .and_then(|sig| sig.as_str())
250 .unwrap_or_default()
251 .to_string();
252
253 let message = transaction.get("message");
255
256 let account_keys: Vec<String> = message
258 .and_then(|m| m.get("accountKeys"))
259 .and_then(|keys| keys.as_array())
260 .map(|keys| {
261 keys.iter()
262 .filter_map(|k| {
263 if let Some(s) = k.as_str() {
265 Some(s.to_string())
266 } else {
267 k.get("pubkey")
268 .and_then(|p| p.as_str())
269 .map(|s| s.to_string())
270 }
271 })
272 .collect()
273 })
274 .unwrap_or_default();
275
276 let recent_blockhash = message
278 .and_then(|m| m.get("recentBlockhash"))
279 .and_then(|h| h.as_str())
280 .unwrap_or_default()
281 .to_string();
282
283 let instructions = self.parse_instructions(message, &account_keys)?;
285
286 let tx_message = SolanaTransactionMessage {
288 account_keys,
289 recent_blockhash,
290 instructions,
291 address_table_lookups: Vec::new(),
292 };
293
294 let tx_meta = meta.map(|m| {
296 let err = m.get("err").and_then(|e| {
298 if e.is_null() {
299 None } else {
301 Some(e.clone()) }
303 });
304 let fee = m.get("fee").and_then(|f| f.as_u64()).unwrap_or(0);
305 let pre_balances: Vec<u64> = m
306 .get("preBalances")
307 .and_then(|b| b.as_array())
308 .map(|arr| arr.iter().filter_map(|v| v.as_u64()).collect())
309 .unwrap_or_default();
310 let post_balances: Vec<u64> = m
311 .get("postBalances")
312 .and_then(|b| b.as_array())
313 .map(|arr| arr.iter().filter_map(|v| v.as_u64()).collect())
314 .unwrap_or_default();
315 let log_messages: Vec<String> = m
316 .get("logMessages")
317 .and_then(|logs| logs.as_array())
318 .map(|logs| {
319 logs.iter()
320 .filter_map(|l| l.as_str().map(|s| s.to_string()))
321 .collect()
322 })
323 .unwrap_or_default();
324
325 SolanaTransactionMeta {
326 err,
327 fee,
328 pre_balances,
329 post_balances,
330 pre_token_balances: Vec::new(),
331 post_token_balances: Vec::new(),
332 inner_instructions: Vec::new(),
333 log_messages,
334 compute_units_consumed: m.get("computeUnitsConsumed").and_then(|c| c.as_u64()),
335 loaded_addresses: None,
336 }
337 });
338
339 let tx_info = SolanaTransactionInfo {
340 signature,
341 slot,
342 block_time: None,
343 transaction: tx_message,
344 meta: tx_meta,
345 };
346
347 Ok(Some(SolanaTransaction::from(tx_info)))
348 }
349
350 fn parse_instructions(
352 &self,
353 message: Option<&serde_json::Value>,
354 _account_keys: &[String],
355 ) -> Result<Vec<SolanaInstruction>, SolanaClientError> {
356 let raw_instructions = match message.and_then(|m| m.get("instructions")) {
357 Some(instrs) if instrs.is_array() => instrs.as_array().unwrap(),
358 _ => return Ok(Vec::new()),
359 };
360
361 let mut instructions = Vec::with_capacity(raw_instructions.len());
362
363 for raw_instr in raw_instructions {
364 let program_id_index = raw_instr
366 .get("programIdIndex")
367 .and_then(|idx| idx.as_u64())
368 .unwrap_or(0) as u8;
369
370 let accounts: Vec<u8> = raw_instr
372 .get("accounts")
373 .and_then(|accs| accs.as_array())
374 .map(|accs| {
375 accs.iter()
376 .filter_map(|idx| idx.as_u64().map(|i| i as u8))
377 .collect()
378 })
379 .unwrap_or_default();
380
381 let data = raw_instr
383 .get("data")
384 .and_then(|d| d.as_str())
385 .unwrap_or_default()
386 .to_string();
387
388 let parsed = raw_instr.get("parsed").map(|p| {
390 let instruction_type = p.get("type").and_then(|t| t.as_str()).unwrap_or_default();
391 let info = p.get("info").cloned().unwrap_or(serde_json::Value::Null);
392 crate::models::SolanaParsedInstruction {
393 instruction_type: instruction_type.to_string(),
394 info,
395 }
396 });
397
398 let program = raw_instr
399 .get("program")
400 .and_then(|p| p.as_str())
401 .map(|s| s.to_string());
402
403 let program_id = raw_instr
404 .get("programId")
405 .and_then(|p| p.as_str())
406 .map(|s| s.to_string());
407
408 instructions.push(SolanaInstruction {
409 program_id_index,
410 accounts,
411 data,
412 parsed,
413 program,
414 program_id,
415 });
416 }
417
418 Ok(instructions)
419 }
420}
421
422impl SolanaClient<SolanaTransportClient> {
423 pub async fn new(network: &Network) -> Result<Self, anyhow::Error> {
425 let http_client = SolanaTransportClient::new(network).await?;
426 Ok(Self::new_with_transport(http_client))
427 }
428}
429
430#[async_trait]
432pub trait SolanaClientTrait {
433 async fn get_transactions(&self, slot: u64) -> Result<Vec<SolanaTransaction>, anyhow::Error>;
435
436 async fn get_transaction(
438 &self,
439 signature: String,
440 ) -> Result<Option<SolanaTransaction>, anyhow::Error>;
441
442 async fn get_signatures_for_address_with_info(
445 &self,
446 address: String,
447 limit: Option<usize>,
448 min_slot: Option<u64>,
449 until_signature: Option<String>,
450 ) -> Result<Vec<SignatureInfo>, anyhow::Error>;
451
452 async fn get_all_signatures_for_address(
455 &self,
456 address: String,
457 start_slot: u64,
458 end_slot: u64,
459 ) -> Result<Vec<SignatureInfo>, anyhow::Error>;
460
461 async fn get_transactions_for_addresses(
464 &self,
465 addresses: &[String],
466 start_slot: u64,
467 end_slot: Option<u64>,
468 ) -> Result<Vec<SolanaTransaction>, anyhow::Error>;
469
470 async fn get_blocks_for_addresses(
476 &self,
477 addresses: &[String],
478 start_slot: u64,
479 end_slot: Option<u64>,
480 ) -> Result<Vec<BlockType>, anyhow::Error>;
481
482 async fn get_account_info(&self, pubkey: String) -> Result<serde_json::Value, anyhow::Error>;
484
485 async fn get_program_accounts(
487 &self,
488 program_id: String,
489 ) -> Result<Vec<serde_json::Value>, anyhow::Error>;
490}
491
492#[async_trait]
493impl<T: Send + Sync + Clone + BlockchainTransport> SolanaClientTrait for SolanaClient<T> {
494 #[instrument(skip(self), fields(slot))]
495 async fn get_transactions(&self, slot: u64) -> Result<Vec<SolanaTransaction>, anyhow::Error> {
496 let config = SolanaGetBlockConfig::full();
497 let params = json!([slot, config]);
498
499 let response = self
500 .http_client
501 .send_raw_request(rpc_methods::GET_BLOCK, Some(params))
502 .await
503 .with_context(|| format!("Failed to get block for slot {}", slot))?;
504
505 if let Err(rpc_error) =
506 self.check_and_handle_rpc_error(&response, slot, rpc_methods::GET_BLOCK)
507 {
508 return Err(anyhow::anyhow!(rpc_error)
509 .context(format!("Solana RPC error while fetching slot {}", slot)));
510 }
511
512 let block = self.parse_block_response(slot, &response).map_err(|e| {
513 anyhow::anyhow!(e).context(format!("Failed to parse block response for slot {}", slot))
514 })?;
515
516 Ok(block.transactions.clone())
517 }
518
519 #[instrument(skip(self), fields(signature))]
520 async fn get_transaction(
521 &self,
522 signature: String,
523 ) -> Result<Option<SolanaTransaction>, anyhow::Error> {
524 let config = json!({
525 "encoding": "json",
526 "commitment": "finalized",
527 "maxSupportedTransactionVersion": 0
528 });
529 let params = json!([signature, config]);
530
531 let response = self
532 .http_client
533 .send_raw_request(rpc_methods::GET_TRANSACTION, Some(params))
534 .await
535 .with_context(|| format!("Failed to get transaction {}", signature))?;
536
537 let result = response.get("result");
539 if result.is_none() || result.unwrap().is_null() {
540 return Ok(None);
541 }
542
543 let result = result.unwrap();
544
545 let slot = result.get("slot").and_then(|s| s.as_u64()).unwrap_or(0);
547
548 let wrapped_tx = json!({
551 "transaction": result.get("transaction"),
552 "meta": result.get("meta")
553 });
554
555 match self.parse_single_transaction(slot, &wrapped_tx) {
556 Ok(Some(mut tx)) => {
557 if let Some(block_time) = result.get("blockTime").and_then(|t| t.as_i64()) {
559 tx.0.block_time = Some(block_time);
560 }
561 Ok(Some(tx))
562 }
563 Ok(None) => Ok(None),
564 Err(e) => Err(anyhow::anyhow!(e).context("Failed to parse transaction")),
565 }
566 }
567
568 #[instrument(skip(self), fields(address, limit, min_slot))]
569 async fn get_signatures_for_address_with_info(
570 &self,
571 address: String,
572 limit: Option<usize>,
573 min_slot: Option<u64>,
574 until_signature: Option<String>,
575 ) -> Result<Vec<SignatureInfo>, anyhow::Error> {
576 let address = &address;
577 let until_signature = until_signature.as_deref();
578 let mut config = json!({
579 "commitment": "finalized",
580 "limit": limit.unwrap_or(1000)
581 });
582
583 if let Some(min) = min_slot {
585 config["minContextSlot"] = json!(min);
586 }
587
588 if let Some(until) = until_signature {
590 config["until"] = json!(until);
591 }
592
593 let params = json!([address, config]);
594
595 let response = self
596 .http_client
597 .send_raw_request(rpc_methods::GET_SIGNATURES_FOR_ADDRESS, Some(params))
598 .await
599 .with_context(|| format!("Failed to get signatures for address {}", address))?;
600
601 let result = response
602 .get("result")
603 .and_then(|r| r.as_array())
604 .ok_or_else(|| anyhow::anyhow!("Invalid response structure"))?;
605
606 let signatures: Vec<SignatureInfo> = result
607 .iter()
608 .filter_map(|item| {
609 let signature = item.get("signature")?.as_str()?.to_string();
610 let slot = item.get("slot")?.as_u64()?;
611 let err =
612 item.get("err")
613 .and_then(|e| if e.is_null() { None } else { Some(e.clone()) });
614 let block_time = item.get("blockTime").and_then(|t| t.as_i64());
615
616 Some(SignatureInfo {
617 signature,
618 slot,
619 err,
620 block_time,
621 })
622 })
623 .collect();
624
625 Ok(signatures)
626 }
627
628 #[instrument(skip(self), fields(address, start_slot, end_slot))]
629 async fn get_all_signatures_for_address(
630 &self,
631 address: String,
632 start_slot: u64,
633 end_slot: u64,
634 ) -> Result<Vec<SignatureInfo>, anyhow::Error> {
635 const PAGE_LIMIT: usize = 1000;
636 const MAX_SIGNATURES: usize = 100_000; let mut all_signatures = Vec::new();
639 let mut until_signature: Option<String> = None;
640 let mut iteration = 0;
641
642 loop {
643 let batch = self
644 .get_signatures_for_address_with_info(
645 address.clone(),
646 Some(PAGE_LIMIT),
647 Some(start_slot),
648 until_signature.clone(),
649 )
650 .await?;
651
652 if batch.is_empty() {
653 break;
654 }
655
656 let filtered: Vec<SignatureInfo> = batch
658 .into_iter()
659 .filter(|sig| sig.slot >= start_slot && sig.slot <= end_slot)
660 .collect();
661
662 let batch_len = filtered.len();
663 until_signature = filtered.last().map(|s| s.signature.clone());
664 all_signatures.extend(filtered);
665
666 if batch_len < PAGE_LIMIT {
668 break; }
670
671 if all_signatures.len() >= MAX_SIGNATURES {
672 tracing::warn!(
673 address = %address,
674 count = all_signatures.len(),
675 "Reached maximum signature limit, stopping pagination"
676 );
677 break;
678 }
679
680 iteration += 1;
681 }
682
683 tracing::debug!(
684 address = %address,
685 signatures = all_signatures.len(),
686 iterations = iteration + 1,
687 "Completed signature pagination"
688 );
689
690 Ok(all_signatures)
691 }
692
693 #[instrument(skip(self), fields(addresses_count = addresses.len(), start_slot, end_slot))]
694 async fn get_transactions_for_addresses(
695 &self,
696 addresses: &[String],
697 start_slot: u64,
698 end_slot: Option<u64>,
699 ) -> Result<Vec<SolanaTransaction>, anyhow::Error> {
700 use futures::stream::{self, StreamExt};
701 use std::collections::HashSet;
702
703 let end_slot = end_slot.unwrap_or(start_slot);
704
705 if addresses.is_empty() {
706 return Ok(Vec::new());
707 }
708
709 tracing::debug!(
710 addresses = ?addresses,
711 start_slot = start_slot,
712 end_slot = end_slot,
713 "Fetching transactions for addresses using signatures approach"
714 );
715
716 let mut all_signatures: HashSet<String> = HashSet::new();
718
719 for address in addresses {
720 let signatures = self
722 .get_all_signatures_for_address(address.clone(), start_slot, end_slot)
723 .await?;
724
725 tracing::debug!(
726 address = %address,
727 signatures_count = signatures.len(),
728 "Got signatures for address"
729 );
730
731 for sig_info in signatures {
732 all_signatures.insert(sig_info.signature);
733 }
734 }
735
736 tracing::debug!(
737 unique_signatures = all_signatures.len(),
738 "Fetching transactions for unique signatures in slot range"
739 );
740
741 let transactions: Vec<SolanaTransaction> = stream::iter(all_signatures)
743 .map(|signature| async move {
744 let sig = signature.clone();
745 match self.get_transaction(signature).await {
746 Ok(Some(tx)) => Some(tx),
747 Ok(None) => {
748 tracing::debug!(signature = %sig, "Transaction not found");
749 None
750 }
751 Err(e) => {
752 tracing::warn!(signature = %sig, error = %e, "Failed to fetch transaction");
753 None
754 }
755 }
756 })
757 .buffer_unordered(20) .filter_map(|result| async move { result })
759 .collect()
760 .await;
761
762 tracing::debug!(
763 fetched_transactions = transactions.len(),
764 "Successfully fetched transactions"
765 );
766
767 Ok(transactions)
768 }
769
770 #[instrument(skip(self), fields(pubkey))]
771 async fn get_account_info(&self, pubkey: String) -> Result<serde_json::Value, anyhow::Error> {
772 let config = json!({
773 "encoding": "jsonParsed",
774 "commitment": "finalized"
775 });
776 let params = json!([&pubkey, config]);
777
778 let response = self
779 .http_client
780 .send_raw_request(rpc_methods::GET_ACCOUNT_INFO, Some(params))
781 .await
782 .with_context(|| format!("Failed to get account info for {}", pubkey))?;
783
784 let result = response
785 .get("result")
786 .cloned()
787 .ok_or_else(|| anyhow::anyhow!("Invalid response structure"))?;
788
789 Ok(result)
790 }
791
792 #[instrument(skip(self), fields(program_id))]
793 async fn get_program_accounts(
794 &self,
795 program_id: String,
796 ) -> Result<Vec<serde_json::Value>, anyhow::Error> {
797 let config = json!({
798 "encoding": "jsonParsed",
799 "commitment": "finalized"
800 });
801 let params = json!([&program_id, config]);
802
803 let response = self
804 .http_client
805 .send_raw_request(rpc_methods::GET_PROGRAM_ACCOUNTS, Some(params))
806 .await
807 .with_context(|| format!("Failed to get program accounts for {}", program_id))?;
808
809 let result = response
810 .get("result")
811 .and_then(|r| r.as_array())
812 .cloned()
813 .ok_or_else(|| anyhow::anyhow!("Invalid response structure"))?;
814
815 Ok(result)
816 }
817
818 #[instrument(skip(self), fields(addresses_count = addresses.len(), start_slot, end_slot))]
819 async fn get_blocks_for_addresses(
820 &self,
821 addresses: &[String],
822 start_slot: u64,
823 end_slot: Option<u64>,
824 ) -> Result<Vec<BlockType>, anyhow::Error> {
825 use std::collections::BTreeMap;
826
827 let transactions = self
829 .get_transactions_for_addresses(addresses, start_slot, end_slot)
830 .await?;
831
832 if transactions.is_empty() {
833 return Ok(Vec::new());
834 }
835
836 let mut slot_transactions: BTreeMap<u64, Vec<SolanaTransaction>> = BTreeMap::new();
838 for tx in transactions {
839 let slot = tx.slot();
840 slot_transactions.entry(slot).or_default().push(tx);
841 }
842
843 let blocks: Vec<BlockType> = slot_transactions
845 .into_iter()
846 .map(|(slot, txs)| {
847 let confirmed_block = SolanaConfirmedBlock {
848 slot,
849 blockhash: String::new(), previous_blockhash: String::new(),
851 parent_slot: slot.saturating_sub(1),
852 block_time: txs.first().and_then(|tx| tx.0.block_time),
853 block_height: None,
854 transactions: txs,
855 };
856 BlockType::Solana(Box::new(SolanaBlock::from(confirmed_block)))
857 })
858 .collect();
859
860 tracing::debug!(
861 blocks_count = blocks.len(),
862 "Created virtual blocks from address-filtered transactions"
863 );
864
865 Ok(blocks)
866 }
867}
868
869impl<T: Send + Sync + Clone + BlockchainTransport> BlockFilterFactory<Self> for SolanaClient<T> {
870 type Filter = SolanaBlockFilter<Self>;
871
872 fn filter() -> Self::Filter {
873 SolanaBlockFilter {
874 _client: PhantomData {},
875 }
876 }
877}
878
879#[async_trait]
880impl<T: Send + Sync + Clone + BlockchainTransport> BlockChainClient for SolanaClient<T> {
881 #[instrument(skip(self))]
882 async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
883 let config = json!({ "commitment": "finalized" });
884 let params = json!([config]);
885
886 let response = self
887 .http_client
888 .send_raw_request(rpc_methods::GET_SLOT, Some(params))
889 .await
890 .with_context(|| "Failed to get latest slot")?;
891
892 let slot = response["result"]
893 .as_u64()
894 .ok_or_else(|| anyhow::anyhow!("Invalid slot number in response"))?;
895
896 Ok(slot)
897 }
898
899 #[instrument(skip(self), fields(start_block, end_block))]
900 async fn get_blocks(
901 &self,
902 start_block: u64,
903 end_block: Option<u64>,
904 ) -> Result<Vec<BlockType>, anyhow::Error> {
905 if !self.monitored_addresses.is_empty() {
907 tracing::debug!(
908 addresses = ?self.monitored_addresses,
909 start_block = start_block,
910 end_block = ?end_block,
911 "Using optimized getSignaturesForAddress approach"
912 );
913 return SolanaClientTrait::get_blocks_for_addresses(
914 self,
915 &self.monitored_addresses,
916 start_block,
917 end_block,
918 )
919 .await;
920 }
921
922 if let Some(end_block) = end_block {
925 if start_block > end_block {
926 let message = format!(
927 "start_block {} cannot be greater than end_block {}",
928 start_block, end_block
929 );
930 let input_error = SolanaClientError::invalid_input(message, None, None);
931 return Err(anyhow::anyhow!(input_error))
932 .context("Invalid input parameters for Solana RPC");
933 }
934 }
935
936 let target_block = end_block.unwrap_or(start_block);
937
938 let slots = if start_block == target_block {
940 vec![start_block]
941 } else {
942 let params = json!([start_block, target_block, { "commitment": "finalized" }]);
943 let response = self
944 .http_client
945 .send_raw_request(rpc_methods::GET_BLOCKS, Some(params))
946 .await
947 .with_context(|| {
948 format!(
949 "Failed to get blocks list from {} to {}",
950 start_block, target_block
951 )
952 })?;
953
954 let slots: Vec<u64> = response["result"]
955 .as_array()
956 .ok_or_else(|| anyhow::anyhow!("Invalid blocks list response"))?
957 .iter()
958 .filter_map(|v| v.as_u64())
959 .collect();
960
961 if slots.is_empty() {
962 return Ok(Vec::new());
963 }
964
965 slots
966 };
967
968 let mut blocks = Vec::with_capacity(slots.len());
970 let config = SolanaGetBlockConfig::full();
971
972 for slot in slots {
973 let params = json!([slot, config]);
974
975 let response = self
976 .http_client
977 .send_raw_request(rpc_methods::GET_BLOCK, Some(params))
978 .await;
979
980 match response {
981 Ok(response_body) => {
982 if let Err(rpc_error) = self.check_and_handle_rpc_error(
983 &response_body,
984 slot,
985 rpc_methods::GET_BLOCK,
986 ) {
987 if rpc_error.is_slot_not_available() || rpc_error.is_block_not_available() {
988 tracing::debug!("Skipping unavailable slot {}: {}", slot, rpc_error);
989 continue;
990 }
991 return Err(anyhow::anyhow!(rpc_error)
992 .context(format!("Solana RPC error while fetching slot {}", slot)));
993 }
994
995 match self.parse_block_response(slot, &response_body) {
996 Ok(block) => {
997 blocks.push(BlockType::Solana(Box::new(block)));
998 }
999 Err(parse_error) => {
1000 if parse_error.is_block_not_available() {
1001 tracing::debug!(
1002 "Skipping slot {} due to parse error: {}",
1003 slot,
1004 parse_error
1005 );
1006 continue;
1007 }
1008 return Err(anyhow::anyhow!(parse_error)
1009 .context(format!("Failed to parse block for slot {}", slot)));
1010 }
1011 }
1012 }
1013 Err(transport_err) => {
1014 return Err(anyhow::anyhow!(transport_err)).context(format!(
1015 "Failed to fetch block from Solana RPC for slot: {}",
1016 slot
1017 ));
1018 }
1019 }
1020 }
1021
1022 Ok(blocks)
1023 }
1024
1025 #[instrument(skip(self), fields(contract_id))]
1026 async fn get_contract_spec(&self, contract_id: &str) -> Result<ContractSpec, anyhow::Error> {
1027 tracing::warn!(
1028 "Automatic IDL fetching not yet implemented for program {}. \
1029 Please provide the IDL manually in the monitor configuration.",
1030 contract_id
1031 );
1032
1033 Ok(ContractSpec::Solana(SolanaContractSpec::default()))
1034 }
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039 use super::*;
1040
1041 #[test]
1042 fn test_solana_client_implements_traits() {
1043 fn assert_send_sync<T: Send + Sync>() {}
1044 fn assert_clone<T: Clone>() {}
1045
1046 assert_send_sync::<SolanaClient<SolanaTransportClient>>();
1047 assert_clone::<SolanaClient<SolanaTransportClient>>();
1048 }
1049}