1use anyhow::Context;
8use futures::future::BoxFuture;
9use std::sync::Arc;
10
11use crate::{
12 models::{BlockRecoveryConfig, BlockType, Network, ProcessedBlock},
13 services::{
14 blockchain::BlockChainClient,
15 blockwatcher::{
16 error::BlockWatcherError,
17 storage::{BlockStorage, MissedBlockStatus},
18 tracker::BlockTrackerTrait,
19 },
20 },
21};
22
23#[derive(Debug, Clone, Default)]
25pub struct RecoveryResult {
26 pub attempted: usize,
28 pub recovered: usize,
30 pub failed: usize,
32 pub pruned: usize,
34}
35
36#[allow(clippy::too_many_arguments)]
54pub async fn process_missed_blocks<S, C, H, T, TR>(
55 network: &Network,
56 recovery_config: &BlockRecoveryConfig,
57 rpc_client: &C,
58 block_storage: Arc<S>,
59 block_handler: Arc<H>,
60 trigger_handler: Arc<T>,
61 _block_tracker: Arc<TR>,
62) -> Result<RecoveryResult, BlockWatcherError>
63where
64 S: BlockStorage + Send + Sync,
65 C: BlockChainClient + Send + Sync,
66 H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync,
67 T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync,
68 TR: BlockTrackerTrait + Send + Sync,
69{
70 let mut result = RecoveryResult::default();
71
72 let current_block = rpc_client
74 .get_latest_block_number()
75 .await
76 .with_context(|| "Failed to get latest block number for recovery")?;
77
78 let current_confirmed = current_block.saturating_sub(network.confirmation_blocks);
79
80 let pruned = block_storage
82 .prune_old_missed_blocks(
83 &network.slug,
84 recovery_config.max_block_age,
85 current_confirmed,
86 )
87 .await
88 .with_context(|| "Failed to prune old missed blocks")?;
89
90 result.pruned = pruned;
91
92 if pruned > 0 {
93 tracing::info!(
94 network = %network.slug,
95 pruned = pruned,
96 "Pruned {} old missed blocks",
97 pruned
98 );
99 }
100
101 let mut missed_blocks = block_storage
103 .get_missed_blocks(
104 &network.slug,
105 recovery_config.max_block_age,
106 current_confirmed,
107 recovery_config.max_retries,
108 )
109 .await
110 .with_context(|| "Failed to get missed blocks for recovery")?;
111
112 missed_blocks.sort_by_key(|b| b.block_number);
114
115 missed_blocks.truncate(recovery_config.max_blocks_per_run as usize);
117
118 if missed_blocks.is_empty() {
119 tracing::debug!(
120 network = %network.slug,
121 "No missed blocks eligible for recovery"
122 );
123 return Ok(result);
124 }
125
126 tracing::info!(
127 network = %network.slug,
128 count = missed_blocks.len(),
129 "Attempting recovery of {} missed blocks",
130 missed_blocks.len()
131 );
132
133 let mut recovered_blocks = Vec::new();
134
135 for entry in missed_blocks {
136 result.attempted += 1;
137 let block_number = entry.block_number;
138
139 if let Err(e) = block_storage
141 .update_missed_block_status(
142 &network.slug,
143 block_number,
144 MissedBlockStatus::Recovering,
145 None,
146 )
147 .await
148 {
149 tracing::warn!(
150 network = %network.slug,
151 block = block_number,
152 error = %e,
153 "Failed to update block status to Recovering"
154 );
155 }
156
157 match rpc_client
159 .get_blocks(block_number, Some(block_number))
160 .await
161 {
162 Ok(blocks) if !blocks.is_empty() => {
163 let block = blocks.into_iter().next().unwrap();
164
165 let processed_block = (block_handler)(block, network.clone()).await;
167
168 let _handle = (trigger_handler)(&processed_block);
170
171 if let Err(e) = block_storage
173 .update_missed_block_status(
174 &network.slug,
175 block_number,
176 MissedBlockStatus::Recovered,
177 None,
178 )
179 .await
180 {
181 tracing::warn!(
182 network = %network.slug,
183 block = block_number,
184 error = %e,
185 "Failed to update block status to Recovered"
186 );
187 }
188
189 recovered_blocks.push(block_number);
190 result.recovered += 1;
191
192 tracing::info!(
193 network = %network.slug,
194 block = block_number,
195 "Successfully recovered missed block"
196 );
197 }
198 Ok(_) => {
199 let new_retry_count = entry.retry_count + 1;
201 let error_msg = "Block not found in RPC response".to_string();
202
203 let new_status = if new_retry_count >= recovery_config.max_retries {
204 MissedBlockStatus::Failed
205 } else {
206 MissedBlockStatus::Pending
207 };
208
209 if let Err(e) = block_storage
210 .update_missed_block_status(
211 &network.slug,
212 block_number,
213 new_status.clone(),
214 Some(error_msg.clone()),
215 )
216 .await
217 {
218 tracing::warn!(
219 network = %network.slug,
220 block = block_number,
221 error = %e,
222 "Failed to update block status after empty response"
223 );
224 }
225
226 if new_status == MissedBlockStatus::Failed {
227 result.failed += 1;
228 tracing::error!(
229 network = %network.slug,
230 block = block_number,
231 retries = new_retry_count,
232 "Block recovery failed after max retries: {}",
233 error_msg
234 );
235 } else {
236 tracing::warn!(
237 network = %network.slug,
238 block = block_number,
239 retry = new_retry_count,
240 "Block recovery attempt failed, will retry: {}",
241 error_msg
242 );
243 }
244
245 tokio::time::sleep(tokio::time::Duration::from_millis(
247 recovery_config.retry_delay_ms,
248 ))
249 .await;
250 }
251 Err(e) => {
252 let new_retry_count = entry.retry_count + 1;
253 let error_msg = e.to_string();
254
255 let new_status = if new_retry_count >= recovery_config.max_retries {
256 MissedBlockStatus::Failed
257 } else {
258 MissedBlockStatus::Pending
259 };
260
261 if let Err(update_err) = block_storage
262 .update_missed_block_status(
263 &network.slug,
264 block_number,
265 new_status.clone(),
266 Some(error_msg.clone()),
267 )
268 .await
269 {
270 tracing::warn!(
271 network = %network.slug,
272 block = block_number,
273 error = %update_err,
274 "Failed to update block status after RPC error"
275 );
276 }
277
278 if new_status == MissedBlockStatus::Failed {
279 result.failed += 1;
280 tracing::error!(
281 network = %network.slug,
282 block = block_number,
283 retries = new_retry_count,
284 "Block recovery failed after max retries: {}",
285 error_msg
286 );
287 } else {
288 tracing::warn!(
289 network = %network.slug,
290 block = block_number,
291 retry = new_retry_count,
292 "Block recovery attempt failed, will retry: {}",
293 error_msg
294 );
295 }
296
297 tokio::time::sleep(tokio::time::Duration::from_millis(
299 recovery_config.retry_delay_ms,
300 ))
301 .await;
302 }
303 }
304 }
305
306 if !recovered_blocks.is_empty() {
308 if let Err(e) = block_storage
309 .remove_recovered_blocks(&network.slug, &recovered_blocks)
310 .await
311 {
312 tracing::warn!(
313 network = %network.slug,
314 error = %e,
315 "Failed to remove recovered blocks from storage"
316 );
317 }
318 }
319
320 tracing::info!(
321 network = %network.slug,
322 attempted = result.attempted,
323 recovered = result.recovered,
324 failed = result.failed,
325 pruned = result.pruned,
326 "Recovery job completed"
327 );
328
329 Ok(result)
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use crate::models::{BlockChainType, RpcUrl, SecretString, SecretValue};
336 use crate::services::blockwatcher::storage::{
337 BlockStorage, FileBlockStorage, MissedBlockEntry,
338 };
339 use crate::services::blockwatcher::tracker::BlockTracker;
340 use std::sync::atomic::{AtomicUsize, Ordering};
341 use tempfile::tempdir;
342
343 fn create_test_network() -> Network {
344 Network {
345 network_type: BlockChainType::EVM,
346 slug: "test_network".to_string(),
347 name: "Test Network".to_string(),
348 rpc_urls: vec![RpcUrl {
349 type_: "rpc".to_string(),
350 url: SecretValue::Plain(SecretString::new("http://localhost:8545".to_string())),
351 weight: 100,
352 }],
353 chain_id: Some(1),
354 network_passphrase: None,
355 block_time_ms: 12000,
356 confirmation_blocks: 12,
357 cron_schedule: "*/10 * * * * *".to_string(),
358 max_past_blocks: Some(100),
359 store_blocks: Some(true),
360 recovery_config: Some(BlockRecoveryConfig {
361 enabled: true,
362 cron_schedule: "0 */5 * * * *".to_string(),
363 max_blocks_per_run: 10,
364 max_block_age: 1000,
365 max_retries: 3,
366 retry_delay_ms: 100,
367 }),
368 }
369 }
370
371 fn create_recovery_config() -> BlockRecoveryConfig {
372 BlockRecoveryConfig {
373 enabled: true,
374 cron_schedule: "0 */5 * * * *".to_string(),
375 max_blocks_per_run: 10,
376 max_block_age: 1000,
377 max_retries: 3,
378 retry_delay_ms: 100,
379 }
380 }
381
382 #[derive(Clone)]
384 struct MockRpcClientWithEmptyResponse {
385 latest_block: u64,
386 fail_blocks: Vec<u64>,
387 empty_response_blocks: Vec<u64>,
388 call_count: Arc<AtomicUsize>,
389 }
390
391 impl MockRpcClientWithEmptyResponse {
392 fn new(latest_block: u64, fail_blocks: Vec<u64>, empty_response_blocks: Vec<u64>) -> Self {
393 Self {
394 latest_block,
395 fail_blocks,
396 empty_response_blocks,
397 call_count: Arc::new(AtomicUsize::new(0)),
398 }
399 }
400 }
401
402 #[async_trait::async_trait]
403 impl BlockChainClient for MockRpcClientWithEmptyResponse {
404 async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
405 Ok(self.latest_block)
406 }
407
408 async fn get_blocks(
409 &self,
410 start: u64,
411 _end: Option<u64>,
412 ) -> Result<Vec<BlockType>, anyhow::Error> {
413 self.call_count.fetch_add(1, Ordering::SeqCst);
414
415 if self.fail_blocks.contains(&start) {
416 return Err(anyhow::anyhow!("Simulated RPC failure for block {}", start));
417 }
418
419 if self.empty_response_blocks.contains(&start) {
420 return Ok(vec![]);
422 }
423
424 Ok(vec![BlockType::EVM(Box::default())])
426 }
427 }
428
429 #[derive(Clone)]
431 struct MockRpcClient {
432 latest_block: u64,
433 fail_blocks: Vec<u64>,
434 call_count: Arc<AtomicUsize>,
435 }
436
437 impl MockRpcClient {
438 fn new(latest_block: u64, fail_blocks: Vec<u64>) -> Self {
439 Self {
440 latest_block,
441 fail_blocks,
442 call_count: Arc::new(AtomicUsize::new(0)),
443 }
444 }
445 }
446
447 #[async_trait::async_trait]
448 impl BlockChainClient for MockRpcClient {
449 async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
450 Ok(self.latest_block)
451 }
452
453 async fn get_blocks(
454 &self,
455 start: u64,
456 _end: Option<u64>,
457 ) -> Result<Vec<BlockType>, anyhow::Error> {
458 self.call_count.fetch_add(1, Ordering::SeqCst);
459
460 if self.fail_blocks.contains(&start) {
461 return Err(anyhow::anyhow!("Simulated RPC failure for block {}", start));
462 }
463
464 Ok(vec![BlockType::EVM(Box::default())])
466 }
467 }
468
469 #[derive(Clone)]
471 struct MockFailingStorage {
472 inner: FileBlockStorage,
473 fail_update_status: bool,
474 fail_remove_recovered: bool,
475 }
476
477 impl MockFailingStorage {
478 fn new(storage_path: std::path::PathBuf) -> Self {
479 Self {
480 inner: FileBlockStorage::new(storage_path),
481 fail_update_status: false,
482 fail_remove_recovered: false,
483 }
484 }
485
486 fn with_failing_update_status(mut self) -> Self {
487 self.fail_update_status = true;
488 self
489 }
490
491 fn with_failing_remove_recovered(mut self) -> Self {
492 self.fail_remove_recovered = true;
493 self
494 }
495 }
496
497 #[async_trait::async_trait]
498 impl BlockStorage for MockFailingStorage {
499 async fn get_last_processed_block(
500 &self,
501 network_id: &str,
502 ) -> Result<Option<u64>, anyhow::Error> {
503 self.inner.get_last_processed_block(network_id).await
504 }
505
506 async fn save_last_processed_block(
507 &self,
508 network_id: &str,
509 block: u64,
510 ) -> Result<(), anyhow::Error> {
511 self.inner
512 .save_last_processed_block(network_id, block)
513 .await
514 }
515
516 async fn save_blocks(
517 &self,
518 network_id: &str,
519 blocks: &[BlockType],
520 ) -> Result<(), anyhow::Error> {
521 self.inner.save_blocks(network_id, blocks).await
522 }
523
524 async fn delete_blocks(&self, network_id: &str) -> Result<(), anyhow::Error> {
525 self.inner.delete_blocks(network_id).await
526 }
527
528 async fn save_missed_blocks(
529 &self,
530 network_id: &str,
531 blocks: &[u64],
532 ) -> Result<(), anyhow::Error> {
533 self.inner.save_missed_blocks(network_id, blocks).await
534 }
535
536 async fn get_missed_blocks(
537 &self,
538 network_id: &str,
539 max_block_age: u64,
540 current_block: u64,
541 max_retries: u32,
542 ) -> Result<Vec<MissedBlockEntry>, anyhow::Error> {
543 self.inner
544 .get_missed_blocks(network_id, max_block_age, current_block, max_retries)
545 .await
546 }
547
548 async fn update_missed_block_status(
549 &self,
550 network_id: &str,
551 block_number: u64,
552 status: MissedBlockStatus,
553 error: Option<String>,
554 ) -> Result<(), anyhow::Error> {
555 if self.fail_update_status {
556 return Err(anyhow::anyhow!("Simulated update status failure"));
557 }
558 self.inner
559 .update_missed_block_status(network_id, block_number, status, error)
560 .await
561 }
562
563 async fn remove_recovered_blocks(
564 &self,
565 network_id: &str,
566 block_numbers: &[u64],
567 ) -> Result<(), anyhow::Error> {
568 if self.fail_remove_recovered {
569 return Err(anyhow::anyhow!("Simulated remove recovered failure"));
570 }
571 self.inner
572 .remove_recovered_blocks(network_id, block_numbers)
573 .await
574 }
575
576 async fn prune_old_missed_blocks(
577 &self,
578 network_id: &str,
579 max_block_age: u64,
580 current_block: u64,
581 ) -> Result<usize, anyhow::Error> {
582 self.inner
583 .prune_old_missed_blocks(network_id, max_block_age, current_block)
584 .await
585 }
586 }
587
588 fn create_block_handler() -> Arc<
589 impl Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
590 > {
591 Arc::new(|block: BlockType, network: Network| {
592 Box::pin(async move {
593 ProcessedBlock {
594 network_slug: network.slug,
595 block_number: block.number().unwrap_or(0),
596 processing_results: vec![],
597 }
598 }) as BoxFuture<'static, ProcessedBlock>
599 })
600 }
601
602 fn create_trigger_handler(
603 ) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static> {
604 Arc::new(|_block: &ProcessedBlock| tokio::spawn(async move {}))
605 }
606
607 #[tokio::test]
608 async fn test_recovery_with_no_missed_blocks() {
609 let temp_dir = tempdir().unwrap();
610 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
611
612 let network = create_test_network();
613 let recovery_config = create_recovery_config();
614 let rpc_client = MockRpcClient::new(1000, vec![]);
615 let block_tracker = Arc::new(BlockTracker::new(100));
616
617 let block_handler = create_block_handler();
618 let trigger_handler = create_trigger_handler();
619
620 let result = process_missed_blocks(
621 &network,
622 &recovery_config,
623 &rpc_client,
624 storage,
625 block_handler,
626 trigger_handler,
627 block_tracker,
628 )
629 .await
630 .unwrap();
631
632 assert_eq!(result.attempted, 0);
633 assert_eq!(result.recovered, 0);
634 assert_eq!(result.failed, 0);
635 }
636
637 #[tokio::test]
638 async fn test_recovery_processes_missed_blocks() {
639 let temp_dir = tempdir().unwrap();
640 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
641
642 storage
644 .save_missed_blocks("test_network", &[100, 101, 102])
645 .await
646 .unwrap();
647
648 let network = create_test_network();
649 let recovery_config = create_recovery_config();
650 let rpc_client = MockRpcClient::new(1000, vec![]); let block_tracker = Arc::new(BlockTracker::new(100));
652
653 let block_handler = create_block_handler();
654 let trigger_handler = create_trigger_handler();
655
656 let result = process_missed_blocks(
657 &network,
658 &recovery_config,
659 &rpc_client,
660 storage.clone(),
661 block_handler,
662 trigger_handler,
663 block_tracker,
664 )
665 .await
666 .unwrap();
667
668 assert_eq!(result.attempted, 3);
669 assert_eq!(result.recovered, 3);
670 assert_eq!(result.failed, 0);
671
672 let remaining = storage
674 .get_missed_blocks("test_network", 1000, 1000, 3)
675 .await
676 .unwrap();
677 assert!(remaining.is_empty());
678 }
679
680 #[tokio::test]
681 async fn test_recovery_handles_rpc_failures() {
682 let temp_dir = tempdir().unwrap();
683 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
684
685 storage
687 .save_missed_blocks("test_network", &[100])
688 .await
689 .unwrap();
690
691 let network = create_test_network();
692 let mut recovery_config = create_recovery_config();
693 recovery_config.max_retries = 1; let rpc_client = MockRpcClient::new(1000, vec![100]); let block_tracker = Arc::new(BlockTracker::new(100));
697
698 let block_handler = create_block_handler();
699 let trigger_handler = create_trigger_handler();
700
701 let result = process_missed_blocks(
702 &network,
703 &recovery_config,
704 &rpc_client,
705 storage,
706 block_handler,
707 trigger_handler,
708 block_tracker,
709 )
710 .await
711 .unwrap();
712
713 assert_eq!(result.attempted, 1);
714 assert_eq!(result.recovered, 0);
715 assert_eq!(result.failed, 1);
716 }
717
718 #[tokio::test]
719 async fn test_recovery_respects_max_blocks_per_run() {
720 let temp_dir = tempdir().unwrap();
721 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
722
723 storage
725 .save_missed_blocks("test_network", &[100, 101, 102, 103, 104])
726 .await
727 .unwrap();
728
729 let network = create_test_network();
730 let mut recovery_config = create_recovery_config();
731 recovery_config.max_blocks_per_run = 2; let rpc_client = MockRpcClient::new(1000, vec![]);
734 let block_tracker = Arc::new(BlockTracker::new(100));
735
736 let block_handler = create_block_handler();
737 let trigger_handler = create_trigger_handler();
738
739 let result = process_missed_blocks(
740 &network,
741 &recovery_config,
742 &rpc_client,
743 storage.clone(),
744 block_handler,
745 trigger_handler,
746 block_tracker,
747 )
748 .await
749 .unwrap();
750
751 assert_eq!(result.attempted, 2);
752 assert_eq!(result.recovered, 2);
753
754 let remaining = storage
756 .get_missed_blocks("test_network", 1000, 1000, 3)
757 .await
758 .unwrap();
759 assert_eq!(remaining.len(), 3);
760 }
761
762 #[tokio::test]
763 async fn test_recovery_prunes_old_blocks() {
764 let temp_dir = tempdir().unwrap();
765 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
766
767 storage
769 .save_missed_blocks("test_network", &[10]) .await
771 .unwrap();
772
773 let network = create_test_network();
774 let mut recovery_config = create_recovery_config();
775 recovery_config.max_block_age = 100; let rpc_client = MockRpcClient::new(1000, vec![]);
778 let block_tracker = Arc::new(BlockTracker::new(100));
779
780 let block_handler = create_block_handler();
781 let trigger_handler = create_trigger_handler();
782
783 let result = process_missed_blocks(
784 &network,
785 &recovery_config,
786 &rpc_client,
787 storage,
788 block_handler,
789 trigger_handler,
790 block_tracker,
791 )
792 .await
793 .unwrap();
794
795 assert_eq!(result.pruned, 1);
797 assert_eq!(result.attempted, 0); }
799
800 #[tokio::test]
801 async fn test_recovery_handles_empty_rpc_response_with_retry() {
802 let temp_dir = tempdir().unwrap();
803 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
804
805 storage
807 .save_missed_blocks("test_network", &[100])
808 .await
809 .unwrap();
810
811 let network = create_test_network();
812 let mut recovery_config = create_recovery_config();
813 recovery_config.max_retries = 3; recovery_config.retry_delay_ms = 10; let rpc_client = MockRpcClientWithEmptyResponse::new(1000, vec![], vec![100]);
818 let block_tracker = Arc::new(BlockTracker::new(100));
819
820 let block_handler = create_block_handler();
821 let trigger_handler = create_trigger_handler();
822
823 let result = process_missed_blocks(
824 &network,
825 &recovery_config,
826 &rpc_client,
827 storage.clone(),
828 block_handler,
829 trigger_handler,
830 block_tracker,
831 )
832 .await
833 .unwrap();
834
835 assert_eq!(result.attempted, 1);
837 assert_eq!(result.recovered, 0);
838 assert_eq!(result.failed, 0); let remaining = storage
842 .get_missed_blocks("test_network", 1000, 1000, 3)
843 .await
844 .unwrap();
845 assert_eq!(remaining.len(), 1);
846 assert_eq!(remaining[0].retry_count, 1);
847 }
848
849 #[tokio::test]
850 async fn test_recovery_handles_empty_rpc_response_max_retries() {
851 let temp_dir = tempdir().unwrap();
852 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
853
854 storage
856 .save_missed_blocks("test_network", &[100])
857 .await
858 .unwrap();
859
860 let network = create_test_network();
861 let mut recovery_config = create_recovery_config();
862 recovery_config.max_retries = 1; recovery_config.retry_delay_ms = 10;
864
865 let rpc_client = MockRpcClientWithEmptyResponse::new(1000, vec![], vec![100]);
867 let block_tracker = Arc::new(BlockTracker::new(100));
868
869 let block_handler = create_block_handler();
870 let trigger_handler = create_trigger_handler();
871
872 let result = process_missed_blocks(
873 &network,
874 &recovery_config,
875 &rpc_client,
876 storage,
877 block_handler,
878 trigger_handler,
879 block_tracker,
880 )
881 .await
882 .unwrap();
883
884 assert_eq!(result.attempted, 1);
886 assert_eq!(result.recovered, 0);
887 assert_eq!(result.failed, 1);
888 }
889
890 #[tokio::test]
891 async fn test_recovery_handles_rpc_failure_with_retry() {
892 let temp_dir = tempdir().unwrap();
893 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
894
895 storage
897 .save_missed_blocks("test_network", &[100])
898 .await
899 .unwrap();
900
901 let network = create_test_network();
902 let mut recovery_config = create_recovery_config();
903 recovery_config.max_retries = 3; recovery_config.retry_delay_ms = 10;
905
906 let rpc_client = MockRpcClient::new(1000, vec![100]); let block_tracker = Arc::new(BlockTracker::new(100));
908
909 let block_handler = create_block_handler();
910 let trigger_handler = create_trigger_handler();
911
912 let result = process_missed_blocks(
913 &network,
914 &recovery_config,
915 &rpc_client,
916 storage.clone(),
917 block_handler,
918 trigger_handler,
919 block_tracker,
920 )
921 .await
922 .unwrap();
923
924 assert_eq!(result.attempted, 1);
926 assert_eq!(result.recovered, 0);
927 assert_eq!(result.failed, 0);
928
929 let remaining = storage
931 .get_missed_blocks("test_network", 1000, 1000, 3)
932 .await
933 .unwrap();
934 assert_eq!(remaining.len(), 1);
935 assert_eq!(remaining[0].retry_count, 1);
936 }
937
938 #[tokio::test]
939 async fn test_recovery_continues_on_update_status_error_recovering() {
940 let temp_dir = tempdir().unwrap();
941 let storage = Arc::new(
942 MockFailingStorage::new(temp_dir.path().to_path_buf()).with_failing_update_status(),
943 );
944
945 storage
947 .inner
948 .save_missed_blocks("test_network", &[100])
949 .await
950 .unwrap();
951
952 let network = create_test_network();
953 let recovery_config = create_recovery_config();
954 let rpc_client = MockRpcClient::new(1000, vec![]);
955 let block_tracker = Arc::new(BlockTracker::new(100));
956
957 let block_handler = create_block_handler();
958 let trigger_handler = create_trigger_handler();
959
960 let result = process_missed_blocks(
962 &network,
963 &recovery_config,
964 &rpc_client,
965 storage,
966 block_handler,
967 trigger_handler,
968 block_tracker,
969 )
970 .await
971 .unwrap();
972
973 assert_eq!(result.attempted, 1);
975 assert_eq!(result.recovered, 1);
977 }
978
979 #[tokio::test]
980 async fn test_recovery_continues_on_remove_recovered_error() {
981 let temp_dir = tempdir().unwrap();
982 let storage = Arc::new(
983 MockFailingStorage::new(temp_dir.path().to_path_buf()).with_failing_remove_recovered(),
984 );
985
986 storage
988 .inner
989 .save_missed_blocks("test_network", &[100])
990 .await
991 .unwrap();
992
993 let network = create_test_network();
994 let recovery_config = create_recovery_config();
995 let rpc_client = MockRpcClient::new(1000, vec![]);
996 let block_tracker = Arc::new(BlockTracker::new(100));
997
998 let block_handler = create_block_handler();
999 let trigger_handler = create_trigger_handler();
1000
1001 let result = process_missed_blocks(
1003 &network,
1004 &recovery_config,
1005 &rpc_client,
1006 storage,
1007 block_handler,
1008 trigger_handler,
1009 block_tracker,
1010 )
1011 .await
1012 .unwrap();
1013
1014 assert_eq!(result.attempted, 1);
1015 assert_eq!(result.recovered, 1);
1016 assert_eq!(result.failed, 0);
1017 }
1018
1019 #[tokio::test]
1020 async fn test_recovery_logs_pruned_blocks() {
1021 let temp_dir = tempdir().unwrap();
1022 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1023
1024 storage
1026 .save_missed_blocks("test_network", &[10, 20, 30])
1027 .await
1028 .unwrap();
1029
1030 let network = create_test_network();
1031 let mut recovery_config = create_recovery_config();
1032 recovery_config.max_block_age = 100;
1033
1034 let rpc_client = MockRpcClient::new(1000, vec![]);
1035 let block_tracker = Arc::new(BlockTracker::new(100));
1036
1037 let block_handler = create_block_handler();
1038 let trigger_handler = create_trigger_handler();
1039
1040 let result = process_missed_blocks(
1041 &network,
1042 &recovery_config,
1043 &rpc_client,
1044 storage,
1045 block_handler,
1046 trigger_handler,
1047 block_tracker,
1048 )
1049 .await
1050 .unwrap();
1051
1052 assert_eq!(result.pruned, 3);
1054 assert_eq!(result.attempted, 0);
1055 }
1056
1057 #[tokio::test]
1058 async fn test_recovery_mixed_success_and_failures() {
1059 let temp_dir = tempdir().unwrap();
1060 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1061
1062 storage
1064 .save_missed_blocks("test_network", &[100, 101, 102])
1065 .await
1066 .unwrap();
1067
1068 let network = create_test_network();
1069 let mut recovery_config = create_recovery_config();
1070 recovery_config.max_retries = 1;
1071 recovery_config.retry_delay_ms = 10;
1072
1073 let rpc_client = MockRpcClient::new(1000, vec![101]);
1075 let block_tracker = Arc::new(BlockTracker::new(100));
1076
1077 let block_handler = create_block_handler();
1078 let trigger_handler = create_trigger_handler();
1079
1080 let result = process_missed_blocks(
1081 &network,
1082 &recovery_config,
1083 &rpc_client,
1084 storage.clone(),
1085 block_handler,
1086 trigger_handler,
1087 block_tracker,
1088 )
1089 .await
1090 .unwrap();
1091
1092 assert_eq!(result.attempted, 3);
1093 assert_eq!(result.recovered, 2); assert_eq!(result.failed, 1); let json_path = temp_dir.path().join("test_network_missed_blocks.json");
1098 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
1099 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
1100 assert_eq!(entries.len(), 1);
1101 assert_eq!(entries[0].block_number, 101);
1102 assert_eq!(entries[0].status, MissedBlockStatus::Failed);
1103 }
1104}