openzeppelin_monitor/services/blockwatcher/
recovery.rs

1//! Missed block recovery module.
2//!
3//! This module provides functionality to recover blocks that were missed
4//! during normal monitoring cycles. It runs as a separate scheduled job
5//! to avoid adding RPC load during normal operations.
6
7use anyhow::Context;
8use futures::future::BoxFuture;
9use std::sync::Arc;
10
11use crate::{
12	models::{BlockRecoveryConfig, BlockType, Network, ProcessedBlock},
13	services::{
14		blockchain::BlockChainClient,
15		blockwatcher::{
16			error::BlockWatcherError,
17			storage::{BlockStorage, MissedBlockStatus},
18			tracker::BlockTrackerTrait,
19		},
20	},
21};
22
23/// Result of a recovery job execution
24#[derive(Debug, Clone, Default)]
25pub struct RecoveryResult {
26	/// Number of blocks attempted for recovery
27	pub attempted: usize,
28	/// Number of blocks successfully recovered
29	pub recovered: usize,
30	/// Number of blocks that failed recovery
31	pub failed: usize,
32	/// Number of old blocks pruned
33	pub pruned: usize,
34}
35
36/// Processes missed blocks for recovery
37///
38/// This function runs as part of the recovery job and attempts to fetch
39/// and process blocks that were previously missed.
40///
41/// # Algorithm
42/// 1. Get current block number
43/// 2. Prune blocks older than `max_block_age`
44/// 3. Load missed blocks with `status == Pending` and `retry_count < max_retries`
45/// 4. Limit to `max_blocks_per_run`, sorted by block number (oldest first)
46/// 5. For each block:
47///    - Mark as `Recovering`
48///    - Fetch via RPC
49///    - On success: process through handlers, mark `Recovered`, remove from file
50///    - On failure: increment `retry_count`, record error, apply `retry_delay_ms`
51///    - If `retry_count >= max_retries`: mark as `Failed`
52/// 6. Return statistics
53#[allow(clippy::too_many_arguments)]
54pub async fn process_missed_blocks<S, C, H, T, TR>(
55	network: &Network,
56	recovery_config: &BlockRecoveryConfig,
57	rpc_client: &C,
58	block_storage: Arc<S>,
59	block_handler: Arc<H>,
60	trigger_handler: Arc<T>,
61	_block_tracker: Arc<TR>,
62) -> Result<RecoveryResult, BlockWatcherError>
63where
64	S: BlockStorage + Send + Sync,
65	C: BlockChainClient + Send + Sync,
66	H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync,
67	T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync,
68	TR: BlockTrackerTrait + Send + Sync,
69{
70	let mut result = RecoveryResult::default();
71
72	// Get current block number
73	let current_block = rpc_client
74		.get_latest_block_number()
75		.await
76		.with_context(|| "Failed to get latest block number for recovery")?;
77
78	let current_confirmed = current_block.saturating_sub(network.confirmation_blocks);
79
80	// Prune old blocks first
81	let pruned = block_storage
82		.prune_old_missed_blocks(
83			&network.slug,
84			recovery_config.max_block_age,
85			current_confirmed,
86		)
87		.await
88		.with_context(|| "Failed to prune old missed blocks")?;
89
90	result.pruned = pruned;
91
92	if pruned > 0 {
93		tracing::info!(
94			network = %network.slug,
95			pruned = pruned,
96			"Pruned {} old missed blocks",
97			pruned
98		);
99	}
100
101	// Get eligible missed blocks (filtered by age, status, and max_retries)
102	let mut missed_blocks = block_storage
103		.get_missed_blocks(
104			&network.slug,
105			recovery_config.max_block_age,
106			current_confirmed,
107			recovery_config.max_retries,
108		)
109		.await
110		.with_context(|| "Failed to get missed blocks for recovery")?;
111
112	// Sort by block number (oldest first)
113	missed_blocks.sort_by_key(|b| b.block_number);
114
115	// Limit to max_blocks_per_run
116	missed_blocks.truncate(recovery_config.max_blocks_per_run as usize);
117
118	if missed_blocks.is_empty() {
119		tracing::debug!(
120			network = %network.slug,
121			"No missed blocks eligible for recovery"
122		);
123		return Ok(result);
124	}
125
126	tracing::info!(
127		network = %network.slug,
128		count = missed_blocks.len(),
129		"Attempting recovery of {} missed blocks",
130		missed_blocks.len()
131	);
132
133	let mut recovered_blocks = Vec::new();
134
135	for entry in missed_blocks {
136		result.attempted += 1;
137		let block_number = entry.block_number;
138
139		// Mark as Recovering
140		if let Err(e) = block_storage
141			.update_missed_block_status(
142				&network.slug,
143				block_number,
144				MissedBlockStatus::Recovering,
145				None,
146			)
147			.await
148		{
149			tracing::warn!(
150				network = %network.slug,
151				block = block_number,
152				error = %e,
153				"Failed to update block status to Recovering"
154			);
155		}
156
157		// Attempt to fetch the block
158		match rpc_client
159			.get_blocks(block_number, Some(block_number))
160			.await
161		{
162			Ok(blocks) if !blocks.is_empty() => {
163				let block = blocks.into_iter().next().unwrap();
164
165				// Process through block handler
166				let processed_block = (block_handler)(block, network.clone()).await;
167
168				// Execute trigger handler
169				let _handle = (trigger_handler)(&processed_block);
170
171				// Mark as Recovered
172				if let Err(e) = block_storage
173					.update_missed_block_status(
174						&network.slug,
175						block_number,
176						MissedBlockStatus::Recovered,
177						None,
178					)
179					.await
180				{
181					tracing::warn!(
182						network = %network.slug,
183						block = block_number,
184						error = %e,
185						"Failed to update block status to Recovered"
186					);
187				}
188
189				recovered_blocks.push(block_number);
190				result.recovered += 1;
191
192				tracing::info!(
193					network = %network.slug,
194					block = block_number,
195					"Successfully recovered missed block"
196				);
197			}
198			Ok(_) => {
199				// Block not found (empty response)
200				let new_retry_count = entry.retry_count + 1;
201				let error_msg = "Block not found in RPC response".to_string();
202
203				let new_status = if new_retry_count >= recovery_config.max_retries {
204					MissedBlockStatus::Failed
205				} else {
206					MissedBlockStatus::Pending
207				};
208
209				if let Err(e) = block_storage
210					.update_missed_block_status(
211						&network.slug,
212						block_number,
213						new_status.clone(),
214						Some(error_msg.clone()),
215					)
216					.await
217				{
218					tracing::warn!(
219						network = %network.slug,
220						block = block_number,
221						error = %e,
222						"Failed to update block status after empty response"
223					);
224				}
225
226				if new_status == MissedBlockStatus::Failed {
227					result.failed += 1;
228					tracing::error!(
229						network = %network.slug,
230						block = block_number,
231						retries = new_retry_count,
232						"Block recovery failed after max retries: {}",
233						error_msg
234					);
235				} else {
236					tracing::warn!(
237						network = %network.slug,
238						block = block_number,
239						retry = new_retry_count,
240						"Block recovery attempt failed, will retry: {}",
241						error_msg
242					);
243				}
244
245				// Apply retry delay
246				tokio::time::sleep(tokio::time::Duration::from_millis(
247					recovery_config.retry_delay_ms,
248				))
249				.await;
250			}
251			Err(e) => {
252				let new_retry_count = entry.retry_count + 1;
253				let error_msg = e.to_string();
254
255				let new_status = if new_retry_count >= recovery_config.max_retries {
256					MissedBlockStatus::Failed
257				} else {
258					MissedBlockStatus::Pending
259				};
260
261				if let Err(update_err) = block_storage
262					.update_missed_block_status(
263						&network.slug,
264						block_number,
265						new_status.clone(),
266						Some(error_msg.clone()),
267					)
268					.await
269				{
270					tracing::warn!(
271						network = %network.slug,
272						block = block_number,
273						error = %update_err,
274						"Failed to update block status after RPC error"
275					);
276				}
277
278				if new_status == MissedBlockStatus::Failed {
279					result.failed += 1;
280					tracing::error!(
281						network = %network.slug,
282						block = block_number,
283						retries = new_retry_count,
284						"Block recovery failed after max retries: {}",
285						error_msg
286					);
287				} else {
288					tracing::warn!(
289						network = %network.slug,
290						block = block_number,
291						retry = new_retry_count,
292						"Block recovery attempt failed, will retry: {}",
293						error_msg
294					);
295				}
296
297				// Apply retry delay
298				tokio::time::sleep(tokio::time::Duration::from_millis(
299					recovery_config.retry_delay_ms,
300				))
301				.await;
302			}
303		}
304	}
305
306	// Remove recovered blocks from storage
307	if !recovered_blocks.is_empty() {
308		if let Err(e) = block_storage
309			.remove_recovered_blocks(&network.slug, &recovered_blocks)
310			.await
311		{
312			tracing::warn!(
313				network = %network.slug,
314				error = %e,
315				"Failed to remove recovered blocks from storage"
316			);
317		}
318	}
319
320	tracing::info!(
321		network = %network.slug,
322		attempted = result.attempted,
323		recovered = result.recovered,
324		failed = result.failed,
325		pruned = result.pruned,
326		"Recovery job completed"
327	);
328
329	Ok(result)
330}
331
332#[cfg(test)]
333mod tests {
334	use super::*;
335	use crate::models::{BlockChainType, RpcUrl, SecretString, SecretValue};
336	use crate::services::blockwatcher::storage::{
337		BlockStorage, FileBlockStorage, MissedBlockEntry,
338	};
339	use crate::services::blockwatcher::tracker::BlockTracker;
340	use std::sync::atomic::{AtomicUsize, Ordering};
341	use tempfile::tempdir;
342
343	fn create_test_network() -> Network {
344		Network {
345			network_type: BlockChainType::EVM,
346			slug: "test_network".to_string(),
347			name: "Test Network".to_string(),
348			rpc_urls: vec![RpcUrl {
349				type_: "rpc".to_string(),
350				url: SecretValue::Plain(SecretString::new("http://localhost:8545".to_string())),
351				weight: 100,
352			}],
353			chain_id: Some(1),
354			network_passphrase: None,
355			block_time_ms: 12000,
356			confirmation_blocks: 12,
357			cron_schedule: "*/10 * * * * *".to_string(),
358			max_past_blocks: Some(100),
359			store_blocks: Some(true),
360			recovery_config: Some(BlockRecoveryConfig {
361				enabled: true,
362				cron_schedule: "0 */5 * * * *".to_string(),
363				max_blocks_per_run: 10,
364				max_block_age: 1000,
365				max_retries: 3,
366				retry_delay_ms: 100,
367			}),
368		}
369	}
370
371	fn create_recovery_config() -> BlockRecoveryConfig {
372		BlockRecoveryConfig {
373			enabled: true,
374			cron_schedule: "0 */5 * * * *".to_string(),
375			max_blocks_per_run: 10,
376			max_block_age: 1000,
377			max_retries: 3,
378			retry_delay_ms: 100,
379		}
380	}
381
382	/// Mock RPC client that can return empty responses for specific blocks
383	#[derive(Clone)]
384	struct MockRpcClientWithEmptyResponse {
385		latest_block: u64,
386		fail_blocks: Vec<u64>,
387		empty_response_blocks: Vec<u64>,
388		call_count: Arc<AtomicUsize>,
389	}
390
391	impl MockRpcClientWithEmptyResponse {
392		fn new(latest_block: u64, fail_blocks: Vec<u64>, empty_response_blocks: Vec<u64>) -> Self {
393			Self {
394				latest_block,
395				fail_blocks,
396				empty_response_blocks,
397				call_count: Arc::new(AtomicUsize::new(0)),
398			}
399		}
400	}
401
402	#[async_trait::async_trait]
403	impl BlockChainClient for MockRpcClientWithEmptyResponse {
404		async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
405			Ok(self.latest_block)
406		}
407
408		async fn get_blocks(
409			&self,
410			start: u64,
411			_end: Option<u64>,
412		) -> Result<Vec<BlockType>, anyhow::Error> {
413			self.call_count.fetch_add(1, Ordering::SeqCst);
414
415			if self.fail_blocks.contains(&start) {
416				return Err(anyhow::anyhow!("Simulated RPC failure for block {}", start));
417			}
418
419			if self.empty_response_blocks.contains(&start) {
420				// Return empty response (block not found)
421				return Ok(vec![]);
422			}
423
424			// Return a mock EVM block
425			Ok(vec![BlockType::EVM(Box::default())])
426		}
427	}
428
429	// Mock RPC client for testing
430	#[derive(Clone)]
431	struct MockRpcClient {
432		latest_block: u64,
433		fail_blocks: Vec<u64>,
434		call_count: Arc<AtomicUsize>,
435	}
436
437	impl MockRpcClient {
438		fn new(latest_block: u64, fail_blocks: Vec<u64>) -> Self {
439			Self {
440				latest_block,
441				fail_blocks,
442				call_count: Arc::new(AtomicUsize::new(0)),
443			}
444		}
445	}
446
447	#[async_trait::async_trait]
448	impl BlockChainClient for MockRpcClient {
449		async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
450			Ok(self.latest_block)
451		}
452
453		async fn get_blocks(
454			&self,
455			start: u64,
456			_end: Option<u64>,
457		) -> Result<Vec<BlockType>, anyhow::Error> {
458			self.call_count.fetch_add(1, Ordering::SeqCst);
459
460			if self.fail_blocks.contains(&start) {
461				return Err(anyhow::anyhow!("Simulated RPC failure for block {}", start));
462			}
463
464			// Return a mock EVM block
465			Ok(vec![BlockType::EVM(Box::default())])
466		}
467	}
468
469	/// Mock storage that fails on specific operations
470	#[derive(Clone)]
471	struct MockFailingStorage {
472		inner: FileBlockStorage,
473		fail_update_status: bool,
474		fail_remove_recovered: bool,
475	}
476
477	impl MockFailingStorage {
478		fn new(storage_path: std::path::PathBuf) -> Self {
479			Self {
480				inner: FileBlockStorage::new(storage_path),
481				fail_update_status: false,
482				fail_remove_recovered: false,
483			}
484		}
485
486		fn with_failing_update_status(mut self) -> Self {
487			self.fail_update_status = true;
488			self
489		}
490
491		fn with_failing_remove_recovered(mut self) -> Self {
492			self.fail_remove_recovered = true;
493			self
494		}
495	}
496
497	#[async_trait::async_trait]
498	impl BlockStorage for MockFailingStorage {
499		async fn get_last_processed_block(
500			&self,
501			network_id: &str,
502		) -> Result<Option<u64>, anyhow::Error> {
503			self.inner.get_last_processed_block(network_id).await
504		}
505
506		async fn save_last_processed_block(
507			&self,
508			network_id: &str,
509			block: u64,
510		) -> Result<(), anyhow::Error> {
511			self.inner
512				.save_last_processed_block(network_id, block)
513				.await
514		}
515
516		async fn save_blocks(
517			&self,
518			network_id: &str,
519			blocks: &[BlockType],
520		) -> Result<(), anyhow::Error> {
521			self.inner.save_blocks(network_id, blocks).await
522		}
523
524		async fn delete_blocks(&self, network_id: &str) -> Result<(), anyhow::Error> {
525			self.inner.delete_blocks(network_id).await
526		}
527
528		async fn save_missed_blocks(
529			&self,
530			network_id: &str,
531			blocks: &[u64],
532		) -> Result<(), anyhow::Error> {
533			self.inner.save_missed_blocks(network_id, blocks).await
534		}
535
536		async fn get_missed_blocks(
537			&self,
538			network_id: &str,
539			max_block_age: u64,
540			current_block: u64,
541			max_retries: u32,
542		) -> Result<Vec<MissedBlockEntry>, anyhow::Error> {
543			self.inner
544				.get_missed_blocks(network_id, max_block_age, current_block, max_retries)
545				.await
546		}
547
548		async fn update_missed_block_status(
549			&self,
550			network_id: &str,
551			block_number: u64,
552			status: MissedBlockStatus,
553			error: Option<String>,
554		) -> Result<(), anyhow::Error> {
555			if self.fail_update_status {
556				return Err(anyhow::anyhow!("Simulated update status failure"));
557			}
558			self.inner
559				.update_missed_block_status(network_id, block_number, status, error)
560				.await
561		}
562
563		async fn remove_recovered_blocks(
564			&self,
565			network_id: &str,
566			block_numbers: &[u64],
567		) -> Result<(), anyhow::Error> {
568			if self.fail_remove_recovered {
569				return Err(anyhow::anyhow!("Simulated remove recovered failure"));
570			}
571			self.inner
572				.remove_recovered_blocks(network_id, block_numbers)
573				.await
574		}
575
576		async fn prune_old_missed_blocks(
577			&self,
578			network_id: &str,
579			max_block_age: u64,
580			current_block: u64,
581		) -> Result<usize, anyhow::Error> {
582			self.inner
583				.prune_old_missed_blocks(network_id, max_block_age, current_block)
584				.await
585		}
586	}
587
588	fn create_block_handler() -> Arc<
589		impl Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
590	> {
591		Arc::new(|block: BlockType, network: Network| {
592			Box::pin(async move {
593				ProcessedBlock {
594					network_slug: network.slug,
595					block_number: block.number().unwrap_or(0),
596					processing_results: vec![],
597				}
598			}) as BoxFuture<'static, ProcessedBlock>
599		})
600	}
601
602	fn create_trigger_handler(
603	) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static> {
604		Arc::new(|_block: &ProcessedBlock| tokio::spawn(async move {}))
605	}
606
607	#[tokio::test]
608	async fn test_recovery_with_no_missed_blocks() {
609		let temp_dir = tempdir().unwrap();
610		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
611
612		let network = create_test_network();
613		let recovery_config = create_recovery_config();
614		let rpc_client = MockRpcClient::new(1000, vec![]);
615		let block_tracker = Arc::new(BlockTracker::new(100));
616
617		let block_handler = create_block_handler();
618		let trigger_handler = create_trigger_handler();
619
620		let result = process_missed_blocks(
621			&network,
622			&recovery_config,
623			&rpc_client,
624			storage,
625			block_handler,
626			trigger_handler,
627			block_tracker,
628		)
629		.await
630		.unwrap();
631
632		assert_eq!(result.attempted, 0);
633		assert_eq!(result.recovered, 0);
634		assert_eq!(result.failed, 0);
635	}
636
637	#[tokio::test]
638	async fn test_recovery_processes_missed_blocks() {
639		let temp_dir = tempdir().unwrap();
640		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
641
642		// Add some missed blocks
643		storage
644			.save_missed_blocks("test_network", &[100, 101, 102])
645			.await
646			.unwrap();
647
648		let network = create_test_network();
649		let recovery_config = create_recovery_config();
650		let rpc_client = MockRpcClient::new(1000, vec![]); // No failures
651		let block_tracker = Arc::new(BlockTracker::new(100));
652
653		let block_handler = create_block_handler();
654		let trigger_handler = create_trigger_handler();
655
656		let result = process_missed_blocks(
657			&network,
658			&recovery_config,
659			&rpc_client,
660			storage.clone(),
661			block_handler,
662			trigger_handler,
663			block_tracker,
664		)
665		.await
666		.unwrap();
667
668		assert_eq!(result.attempted, 3);
669		assert_eq!(result.recovered, 3);
670		assert_eq!(result.failed, 0);
671
672		// Verify blocks were removed from storage
673		let remaining = storage
674			.get_missed_blocks("test_network", 1000, 1000, 3)
675			.await
676			.unwrap();
677		assert!(remaining.is_empty());
678	}
679
680	#[tokio::test]
681	async fn test_recovery_handles_rpc_failures() {
682		let temp_dir = tempdir().unwrap();
683		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
684
685		// Add a missed block that will fail
686		storage
687			.save_missed_blocks("test_network", &[100])
688			.await
689			.unwrap();
690
691		let network = create_test_network();
692		let mut recovery_config = create_recovery_config();
693		recovery_config.max_retries = 1; // Only 1 retry so it fails quickly
694
695		let rpc_client = MockRpcClient::new(1000, vec![100]); // Block 100 will fail
696		let block_tracker = Arc::new(BlockTracker::new(100));
697
698		let block_handler = create_block_handler();
699		let trigger_handler = create_trigger_handler();
700
701		let result = process_missed_blocks(
702			&network,
703			&recovery_config,
704			&rpc_client,
705			storage,
706			block_handler,
707			trigger_handler,
708			block_tracker,
709		)
710		.await
711		.unwrap();
712
713		assert_eq!(result.attempted, 1);
714		assert_eq!(result.recovered, 0);
715		assert_eq!(result.failed, 1);
716	}
717
718	#[tokio::test]
719	async fn test_recovery_respects_max_blocks_per_run() {
720		let temp_dir = tempdir().unwrap();
721		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
722
723		// Add more blocks than max_blocks_per_run
724		storage
725			.save_missed_blocks("test_network", &[100, 101, 102, 103, 104])
726			.await
727			.unwrap();
728
729		let network = create_test_network();
730		let mut recovery_config = create_recovery_config();
731		recovery_config.max_blocks_per_run = 2; // Only process 2
732
733		let rpc_client = MockRpcClient::new(1000, vec![]);
734		let block_tracker = Arc::new(BlockTracker::new(100));
735
736		let block_handler = create_block_handler();
737		let trigger_handler = create_trigger_handler();
738
739		let result = process_missed_blocks(
740			&network,
741			&recovery_config,
742			&rpc_client,
743			storage.clone(),
744			block_handler,
745			trigger_handler,
746			block_tracker,
747		)
748		.await
749		.unwrap();
750
751		assert_eq!(result.attempted, 2);
752		assert_eq!(result.recovered, 2);
753
754		// Should have 3 blocks remaining
755		let remaining = storage
756			.get_missed_blocks("test_network", 1000, 1000, 3)
757			.await
758			.unwrap();
759		assert_eq!(remaining.len(), 3);
760	}
761
762	#[tokio::test]
763	async fn test_recovery_prunes_old_blocks() {
764		let temp_dir = tempdir().unwrap();
765		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
766
767		// Add a very old block (outside max_block_age)
768		storage
769			.save_missed_blocks("test_network", &[10]) // Very old block
770			.await
771			.unwrap();
772
773		let network = create_test_network();
774		let mut recovery_config = create_recovery_config();
775		recovery_config.max_block_age = 100; // Only keep blocks within 100 of current
776
777		let rpc_client = MockRpcClient::new(1000, vec![]);
778		let block_tracker = Arc::new(BlockTracker::new(100));
779
780		let block_handler = create_block_handler();
781		let trigger_handler = create_trigger_handler();
782
783		let result = process_missed_blocks(
784			&network,
785			&recovery_config,
786			&rpc_client,
787			storage,
788			block_handler,
789			trigger_handler,
790			block_tracker,
791		)
792		.await
793		.unwrap();
794
795		// Block should be pruned (current is 1000, block 10 is way older than 100 blocks)
796		assert_eq!(result.pruned, 1);
797		assert_eq!(result.attempted, 0); // No blocks to attempt after pruning
798	}
799
800	#[tokio::test]
801	async fn test_recovery_handles_empty_rpc_response_with_retry() {
802		let temp_dir = tempdir().unwrap();
803		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
804
805		// Add a missed block that will return empty response
806		storage
807			.save_missed_blocks("test_network", &[100])
808			.await
809			.unwrap();
810
811		let network = create_test_network();
812		let mut recovery_config = create_recovery_config();
813		recovery_config.max_retries = 3; // Multiple retries so it won't fail immediately
814		recovery_config.retry_delay_ms = 10; // Short delay for testing
815
816		// Block 100 will return empty response (block not found)
817		let rpc_client = MockRpcClientWithEmptyResponse::new(1000, vec![], vec![100]);
818		let block_tracker = Arc::new(BlockTracker::new(100));
819
820		let block_handler = create_block_handler();
821		let trigger_handler = create_trigger_handler();
822
823		let result = process_missed_blocks(
824			&network,
825			&recovery_config,
826			&rpc_client,
827			storage.clone(),
828			block_handler,
829			trigger_handler,
830			block_tracker,
831		)
832		.await
833		.unwrap();
834
835		// Block should be attempted but not recovered (empty response)
836		assert_eq!(result.attempted, 1);
837		assert_eq!(result.recovered, 0);
838		assert_eq!(result.failed, 0); // Not failed yet, will retry
839
840		// Verify block is still in storage with incremented retry count
841		let remaining = storage
842			.get_missed_blocks("test_network", 1000, 1000, 3)
843			.await
844			.unwrap();
845		assert_eq!(remaining.len(), 1);
846		assert_eq!(remaining[0].retry_count, 1);
847	}
848
849	#[tokio::test]
850	async fn test_recovery_handles_empty_rpc_response_max_retries() {
851		let temp_dir = tempdir().unwrap();
852		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
853
854		// Add a missed block that will return empty response
855		storage
856			.save_missed_blocks("test_network", &[100])
857			.await
858			.unwrap();
859
860		let network = create_test_network();
861		let mut recovery_config = create_recovery_config();
862		recovery_config.max_retries = 1; // Only 1 retry so it fails immediately
863		recovery_config.retry_delay_ms = 10;
864
865		// Block 100 will return empty response (block not found)
866		let rpc_client = MockRpcClientWithEmptyResponse::new(1000, vec![], vec![100]);
867		let block_tracker = Arc::new(BlockTracker::new(100));
868
869		let block_handler = create_block_handler();
870		let trigger_handler = create_trigger_handler();
871
872		let result = process_missed_blocks(
873			&network,
874			&recovery_config,
875			&rpc_client,
876			storage,
877			block_handler,
878			trigger_handler,
879			block_tracker,
880		)
881		.await
882		.unwrap();
883
884		// Block should fail after max retries with empty response
885		assert_eq!(result.attempted, 1);
886		assert_eq!(result.recovered, 0);
887		assert_eq!(result.failed, 1);
888	}
889
890	#[tokio::test]
891	async fn test_recovery_handles_rpc_failure_with_retry() {
892		let temp_dir = tempdir().unwrap();
893		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
894
895		// Add a missed block that will fail
896		storage
897			.save_missed_blocks("test_network", &[100])
898			.await
899			.unwrap();
900
901		let network = create_test_network();
902		let mut recovery_config = create_recovery_config();
903		recovery_config.max_retries = 3; // Multiple retries
904		recovery_config.retry_delay_ms = 10;
905
906		let rpc_client = MockRpcClient::new(1000, vec![100]); // Block 100 will fail
907		let block_tracker = Arc::new(BlockTracker::new(100));
908
909		let block_handler = create_block_handler();
910		let trigger_handler = create_trigger_handler();
911
912		let result = process_missed_blocks(
913			&network,
914			&recovery_config,
915			&rpc_client,
916			storage.clone(),
917			block_handler,
918			trigger_handler,
919			block_tracker,
920		)
921		.await
922		.unwrap();
923
924		// Block should be attempted but not recovered, not failed (will retry)
925		assert_eq!(result.attempted, 1);
926		assert_eq!(result.recovered, 0);
927		assert_eq!(result.failed, 0);
928
929		// Verify retry count was incremented
930		let remaining = storage
931			.get_missed_blocks("test_network", 1000, 1000, 3)
932			.await
933			.unwrap();
934		assert_eq!(remaining.len(), 1);
935		assert_eq!(remaining[0].retry_count, 1);
936	}
937
938	#[tokio::test]
939	async fn test_recovery_continues_on_update_status_error_recovering() {
940		let temp_dir = tempdir().unwrap();
941		let storage = Arc::new(
942			MockFailingStorage::new(temp_dir.path().to_path_buf()).with_failing_update_status(),
943		);
944
945		// First save the missed block using the inner storage
946		storage
947			.inner
948			.save_missed_blocks("test_network", &[100])
949			.await
950			.unwrap();
951
952		let network = create_test_network();
953		let recovery_config = create_recovery_config();
954		let rpc_client = MockRpcClient::new(1000, vec![]);
955		let block_tracker = Arc::new(BlockTracker::new(100));
956
957		let block_handler = create_block_handler();
958		let trigger_handler = create_trigger_handler();
959
960		// Recovery should continue despite status update errors
961		let result = process_missed_blocks(
962			&network,
963			&recovery_config,
964			&rpc_client,
965			storage,
966			block_handler,
967			trigger_handler,
968			block_tracker,
969		)
970		.await
971		.unwrap();
972
973		// Recovery still proceeds even with status update failures
974		assert_eq!(result.attempted, 1);
975		// The block is still processed through handlers even if status update fails
976		assert_eq!(result.recovered, 1);
977	}
978
979	#[tokio::test]
980	async fn test_recovery_continues_on_remove_recovered_error() {
981		let temp_dir = tempdir().unwrap();
982		let storage = Arc::new(
983			MockFailingStorage::new(temp_dir.path().to_path_buf()).with_failing_remove_recovered(),
984		);
985
986		// First save the missed block using the inner storage
987		storage
988			.inner
989			.save_missed_blocks("test_network", &[100])
990			.await
991			.unwrap();
992
993		let network = create_test_network();
994		let recovery_config = create_recovery_config();
995		let rpc_client = MockRpcClient::new(1000, vec![]);
996		let block_tracker = Arc::new(BlockTracker::new(100));
997
998		let block_handler = create_block_handler();
999		let trigger_handler = create_trigger_handler();
1000
1001		// Recovery should complete even if remove_recovered fails
1002		let result = process_missed_blocks(
1003			&network,
1004			&recovery_config,
1005			&rpc_client,
1006			storage,
1007			block_handler,
1008			trigger_handler,
1009			block_tracker,
1010		)
1011		.await
1012		.unwrap();
1013
1014		assert_eq!(result.attempted, 1);
1015		assert_eq!(result.recovered, 1);
1016		assert_eq!(result.failed, 0);
1017	}
1018
1019	#[tokio::test]
1020	async fn test_recovery_logs_pruned_blocks() {
1021		let temp_dir = tempdir().unwrap();
1022		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1023
1024		// Add multiple old blocks that will be pruned
1025		storage
1026			.save_missed_blocks("test_network", &[10, 20, 30])
1027			.await
1028			.unwrap();
1029
1030		let network = create_test_network();
1031		let mut recovery_config = create_recovery_config();
1032		recovery_config.max_block_age = 100;
1033
1034		let rpc_client = MockRpcClient::new(1000, vec![]);
1035		let block_tracker = Arc::new(BlockTracker::new(100));
1036
1037		let block_handler = create_block_handler();
1038		let trigger_handler = create_trigger_handler();
1039
1040		let result = process_missed_blocks(
1041			&network,
1042			&recovery_config,
1043			&rpc_client,
1044			storage,
1045			block_handler,
1046			trigger_handler,
1047			block_tracker,
1048		)
1049		.await
1050		.unwrap();
1051
1052		// All blocks should be pruned (they're all older than current - max_age)
1053		assert_eq!(result.pruned, 3);
1054		assert_eq!(result.attempted, 0);
1055	}
1056
1057	#[tokio::test]
1058	async fn test_recovery_mixed_success_and_failures() {
1059		let temp_dir = tempdir().unwrap();
1060		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1061
1062		// Add multiple missed blocks
1063		storage
1064			.save_missed_blocks("test_network", &[100, 101, 102])
1065			.await
1066			.unwrap();
1067
1068		let network = create_test_network();
1069		let mut recovery_config = create_recovery_config();
1070		recovery_config.max_retries = 1;
1071		recovery_config.retry_delay_ms = 10;
1072
1073		// Block 101 will fail
1074		let rpc_client = MockRpcClient::new(1000, vec![101]);
1075		let block_tracker = Arc::new(BlockTracker::new(100));
1076
1077		let block_handler = create_block_handler();
1078		let trigger_handler = create_trigger_handler();
1079
1080		let result = process_missed_blocks(
1081			&network,
1082			&recovery_config,
1083			&rpc_client,
1084			storage.clone(),
1085			block_handler,
1086			trigger_handler,
1087			block_tracker,
1088		)
1089		.await
1090		.unwrap();
1091
1092		assert_eq!(result.attempted, 3);
1093		assert_eq!(result.recovered, 2); // 100 and 102 succeed
1094		assert_eq!(result.failed, 1); // 101 fails
1095
1096		// Verify only block 101 remains in storage
1097		let json_path = temp_dir.path().join("test_network_missed_blocks.json");
1098		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
1099		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
1100		assert_eq!(entries.len(), 1);
1101		assert_eq!(entries[0].block_number, 101);
1102		assert_eq!(entries[0].status, MissedBlockStatus::Failed);
1103	}
1104}