openzeppelin_monitor/services/blockwatcher/
service.rs

1//! Block watcher service implementation.
2//!
3//! Provides functionality to watch and process blockchain blocks across multiple networks,
4//! managing individual watchers for each network and coordinating block processing.
5
6use anyhow::Context;
7use futures::{channel::mpsc, future::BoxFuture, stream::StreamExt, SinkExt};
8use std::{
9	collections::{BTreeMap, HashMap},
10	sync::Arc,
11};
12use tokio::sync::RwLock;
13use tokio_cron_scheduler::{Job, JobScheduler};
14use tracing::instrument;
15
16use crate::{
17	models::{BlockType, Network, ProcessedBlock},
18	services::{
19		blockchain::BlockChainClient,
20		blockwatcher::{
21			error::BlockWatcherError,
22			recovery::process_missed_blocks,
23			storage::BlockStorage,
24			tracker::{BlockCheckResult, BlockTracker, BlockTrackerTrait},
25		},
26	},
27};
28
29/// Trait for job scheduler
30///
31/// This trait is used to abstract the job scheduler implementation.
32/// It is used to allow the block watcher service to be used with different job scheduler
33/// implementations.
34#[async_trait::async_trait]
35pub trait JobSchedulerTrait: Send + Sync + Sized {
36	async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>>;
37	async fn add(&self, job: Job) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
38	async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
39	async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
40}
41
42/// Implementation of the job scheduler trait for the JobScheduler struct
43#[async_trait::async_trait]
44impl JobSchedulerTrait for JobScheduler {
45	async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
46		Self::new().await.map_err(Into::into)
47	}
48
49	async fn add(&self, job: Job) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
50		self.add(job).await.map(|_| ()).map_err(Into::into)
51	}
52
53	async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
54		self.start().await.map(|_| ()).map_err(Into::into)
55	}
56
57	async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
58		self.shutdown().await.map(|_| ()).map_err(Into::into)
59	}
60}
61
62/// Watcher implementation for a single network
63///
64/// Manages block watching and processing for a specific blockchain network,
65/// including scheduling and block handling.
66///
67/// # Type Parameters
68/// * `S` - Storage implementation for blocks
69/// * `H` - Handler function for processed blocks
70/// * `T` - Trigger handler function for processed blocks
71/// * `J` - Job scheduler implementation (must implement JobSchedulerTrait)
72pub struct NetworkBlockWatcher<S, H, T, J>
73where
74	J: JobSchedulerTrait,
75{
76	pub network: Network,
77	pub block_storage: Arc<S>,
78	pub block_handler: Arc<H>,
79	pub trigger_handler: Arc<T>,
80	pub scheduler: J,
81	pub block_tracker: Arc<BlockTracker>,
82}
83
84/// Map of active block watchers
85type BlockWatchersMap<S, H, T, J> = HashMap<String, NetworkBlockWatcher<S, H, T, J>>;
86
87/// Service for managing multiple network watchers
88///
89/// Coordinates block watching across multiple networks, managing individual
90/// watchers and their lifecycles.
91///
92/// # Type Parameters
93/// * `S` - Storage implementation for blocks
94/// * `H` - Handler function for processed blocks
95/// * `T` - Trigger handler function for processed blocks
96/// * `J` - Job scheduler implementation (must implement JobSchedulerTrait)
97pub struct BlockWatcherService<S, H, T, J>
98where
99	J: JobSchedulerTrait,
100{
101	pub block_storage: Arc<S>,
102	pub block_handler: Arc<H>,
103	pub trigger_handler: Arc<T>,
104	pub active_watchers: Arc<RwLock<BlockWatchersMap<S, H, T, J>>>,
105	pub block_tracker: Arc<BlockTracker>,
106}
107
108impl<S, H, T, J> NetworkBlockWatcher<S, H, T, J>
109where
110	S: BlockStorage + Send + Sync + 'static,
111	H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
112	T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
113	J: JobSchedulerTrait,
114{
115	/// Creates a new network watcher instance
116	///
117	/// # Arguments
118	/// * `network` - Network configuration
119	/// * `block_storage` - Storage implementation for blocks
120	/// * `block_handler` - Handler function for processed blocks
121	/// * `trigger_handler` - Trigger handler function
122	/// * `block_tracker` - Block tracker instance
123	///
124	/// # Returns
125	/// * `Result<Self, BlockWatcherError>` - New watcher instance or error
126	pub async fn new(
127		network: Network,
128		block_storage: Arc<S>,
129		block_handler: Arc<H>,
130		trigger_handler: Arc<T>,
131		block_tracker: Arc<BlockTracker>,
132	) -> Result<Self, BlockWatcherError> {
133		let scheduler = J::new().await.map_err(|e| {
134			BlockWatcherError::scheduler_error(
135				e.to_string(),
136				Some(e),
137				Some(HashMap::from([(
138					"network".to_string(),
139					network.slug.clone(),
140				)])),
141			)
142		})?;
143		Ok(Self {
144			network,
145			block_storage,
146			block_handler,
147			trigger_handler,
148			scheduler,
149			block_tracker,
150		})
151	}
152
153	/// Starts the network watcher
154	///
155	/// Initializes the scheduler and begins watching for new blocks according
156	/// to the network's cron schedule. Also starts the recovery job if enabled.
157	pub async fn start<C: BlockChainClient + Clone + Send + 'static>(
158		&mut self,
159		rpc_client: C,
160	) -> Result<(), BlockWatcherError> {
161		// Start main block watcher job
162		self.start_main_watcher(rpc_client.clone()).await?;
163
164		// Start recovery job if enabled
165		if let Some(ref config) = self.network.recovery_config {
166			if config.enabled {
167				self.start_recovery_job(rpc_client).await?;
168			}
169		}
170
171		self.scheduler.start().await.map_err(|e| {
172			BlockWatcherError::scheduler_error(
173				e.to_string(),
174				Some(e),
175				Some(HashMap::from([(
176					"network".to_string(),
177					self.network.slug.clone(),
178				)])),
179			)
180		})?;
181
182		tracing::info!("Started block watcher for network: {}", self.network.slug);
183		Ok(())
184	}
185
186	/// Starts the main block watcher job
187	async fn start_main_watcher<C: BlockChainClient + Clone + Send + 'static>(
188		&mut self,
189		rpc_client: C,
190	) -> Result<(), BlockWatcherError> {
191		let network = self.network.clone();
192		let block_storage = self.block_storage.clone();
193		let block_handler = self.block_handler.clone();
194		let trigger_handler = self.trigger_handler.clone();
195		let block_tracker = self.block_tracker.clone();
196
197		let job = Job::new_async(self.network.cron_schedule.as_str(), move |_uuid, _l| {
198			let network = network.clone();
199			let block_storage = block_storage.clone();
200			let block_handler = block_handler.clone();
201			let block_tracker = block_tracker.clone();
202			let rpc_client = rpc_client.clone();
203			let trigger_handler = trigger_handler.clone();
204			Box::pin(async move {
205				let _ = process_new_blocks(
206					&network,
207					&rpc_client,
208					block_storage,
209					block_handler,
210					trigger_handler,
211					block_tracker,
212				)
213				.await
214				.map_err(|e| {
215					BlockWatcherError::processing_error(
216						"Failed to process blocks".to_string(),
217						Some(e.into()),
218						Some(HashMap::from([(
219							"network".to_string(),
220							network.slug.clone(),
221						)])),
222					)
223				});
224			})
225		})
226		.with_context(|| "Failed to create main watcher job")?;
227
228		self.scheduler.add(job).await.map_err(|e| {
229			BlockWatcherError::scheduler_error(
230				e.to_string(),
231				Some(e),
232				Some(HashMap::from([(
233					"network".to_string(),
234					self.network.slug.clone(),
235				)])),
236			)
237		})?;
238
239		Ok(())
240	}
241
242	/// Starts the recovery job for missed blocks
243	async fn start_recovery_job<C: BlockChainClient + Clone + Send + 'static>(
244		&mut self,
245		rpc_client: C,
246	) -> Result<(), BlockWatcherError> {
247		let recovery_config = self
248			.network
249			.recovery_config
250			.as_ref()
251			.ok_or_else(|| {
252				BlockWatcherError::recovery_error(
253					"Recovery config is required but not found".to_string(),
254					None,
255					Some(HashMap::from([(
256						"network".to_string(),
257						self.network.slug.clone(),
258					)])),
259				)
260			})?
261			.clone();
262
263		let network = self.network.clone();
264		let block_storage = self.block_storage.clone();
265		let block_handler = self.block_handler.clone();
266		let trigger_handler = self.trigger_handler.clone();
267		let block_tracker = self.block_tracker.clone();
268
269		let cron_schedule = recovery_config.cron_schedule.clone();
270		let job = Job::new_async(cron_schedule.as_str(), move |_uuid, _l| {
271			let network = network.clone();
272			let recovery_config = recovery_config.clone();
273			let block_storage = block_storage.clone();
274			let block_handler = block_handler.clone();
275			let block_tracker = block_tracker.clone();
276			let rpc_client = rpc_client.clone();
277			let trigger_handler = trigger_handler.clone();
278			Box::pin(async move {
279				let _ = process_missed_blocks(
280					&network,
281					&recovery_config,
282					&rpc_client,
283					block_storage,
284					block_handler,
285					trigger_handler,
286					block_tracker,
287				)
288				.await
289				.map_err(|e| {
290					BlockWatcherError::recovery_error(
291						"Failed to process missed blocks".to_string(),
292						Some(e.into()),
293						Some(HashMap::from([(
294							"network".to_string(),
295							network.slug.clone(),
296						)])),
297					)
298				});
299			})
300		})
301		.with_context(|| "Failed to create recovery job")?;
302
303		self.scheduler.add(job).await.map_err(|e| {
304			BlockWatcherError::scheduler_error(
305				e.to_string(),
306				Some(e),
307				Some(HashMap::from([(
308					"network".to_string(),
309					self.network.slug.clone(),
310				)])),
311			)
312		})?;
313
314		tracing::info!(
315			"Started recovery job for network: {} with schedule: {}",
316			self.network.slug,
317			self.network
318				.recovery_config
319				.as_ref()
320				.map(|c| c.cron_schedule.as_str())
321				.unwrap_or("unknown")
322		);
323		Ok(())
324	}
325
326	/// Stops the network watcher
327	///
328	/// Shuts down the scheduler and stops watching for new blocks.
329	pub async fn stop(&mut self) -> Result<(), BlockWatcherError> {
330		self.scheduler.shutdown().await.map_err(|e| {
331			BlockWatcherError::scheduler_error(
332				e.to_string(),
333				Some(e),
334				Some(HashMap::from([(
335					"network".to_string(),
336					self.network.slug.clone(),
337				)])),
338			)
339		})?;
340
341		tracing::info!("Stopped block watcher for network: {}", self.network.slug);
342		Ok(())
343	}
344}
345
346impl<S, H, T, J> BlockWatcherService<S, H, T, J>
347where
348	S: BlockStorage + Send + Sync + 'static,
349	H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
350	T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
351	J: JobSchedulerTrait,
352{
353	/// Creates a new block watcher service
354	///
355	/// # Arguments
356	/// * `network_service` - Service for network operations
357	/// * `block_storage` - Storage implementation for blocks
358	/// * `block_handler` - Handler function for processed blocks
359	pub async fn new(
360		block_storage: Arc<S>,
361		block_handler: Arc<H>,
362		trigger_handler: Arc<T>,
363		block_tracker: Arc<BlockTracker>,
364	) -> Result<Self, BlockWatcherError> {
365		Ok(BlockWatcherService {
366			block_storage,
367			block_handler,
368			trigger_handler,
369			active_watchers: Arc::new(RwLock::new(HashMap::new())),
370			block_tracker,
371		})
372	}
373
374	/// Starts a watcher for a specific network
375	///
376	/// # Arguments
377	/// * `network` - Network configuration to start watching
378	/// * `rpc_client` - RPC client for the network
379	pub async fn start_network_watcher<C: BlockChainClient + Send + Clone + 'static>(
380		&self,
381		network: &Network,
382		rpc_client: C,
383	) -> Result<(), BlockWatcherError> {
384		let mut watchers = self.active_watchers.write().await;
385
386		if watchers.contains_key(&network.slug) {
387			tracing::info!(
388				"Block watcher already running for network: {}",
389				network.slug
390			);
391			return Ok(());
392		}
393
394		let mut watcher = NetworkBlockWatcher::new(
395			network.clone(),
396			self.block_storage.clone(),
397			self.block_handler.clone(),
398			self.trigger_handler.clone(),
399			self.block_tracker.clone(),
400		)
401		.await?;
402
403		watcher.start(rpc_client).await?;
404		watchers.insert(network.slug.clone(), watcher);
405
406		Ok(())
407	}
408
409	/// Stops a watcher for a specific network
410	///
411	/// # Arguments
412	/// * `network_slug` - Identifier of the network to stop watching
413	pub async fn stop_network_watcher(&self, network_slug: &str) -> Result<(), BlockWatcherError> {
414		let mut watchers = self.active_watchers.write().await;
415
416		if let Some(mut watcher) = watchers.remove(network_slug) {
417			watcher.stop().await?;
418		}
419
420		Ok(())
421	}
422}
423
424/// Processes new blocks for a network
425///
426/// # Arguments
427/// * `network` - Network configuration
428/// * `rpc_client` - RPC client for the network
429/// * `block_storage` - Storage implementation for blocks
430/// * `block_handler` - Handler function for processed blocks
431/// * `trigger_handler` - Handler function for processed blocks
432/// * `block_tracker` - Tracker implementation for block processing
433///
434/// # Returns
435/// * `Result<(), BlockWatcherError>` - Success or error
436#[instrument(skip_all, fields(network = network.slug))]
437pub async fn process_new_blocks<
438	S: BlockStorage,
439	C: BlockChainClient + Send + Clone + 'static,
440	H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
441	T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
442	TR: BlockTrackerTrait + Send + Sync + 'static,
443>(
444	network: &Network,
445	rpc_client: &C,
446	block_storage: Arc<S>,
447	block_handler: Arc<H>,
448	trigger_handler: Arc<T>,
449	block_tracker: Arc<TR>,
450) -> Result<(), BlockWatcherError> {
451	let start_time = std::time::Instant::now();
452
453	let last_processed_block = block_storage
454		.get_last_processed_block(&network.slug)
455		.await
456		.with_context(|| "Failed to get last processed block")?
457		.unwrap_or(0);
458
459	let latest_block = rpc_client
460		.get_latest_block_number()
461		.await
462		.with_context(|| "Failed to get latest block number")?;
463
464	let latest_confirmed_block = latest_block.saturating_sub(network.confirmation_blocks);
465
466	let recommended_past_blocks = network.get_recommended_past_blocks();
467
468	let max_past_blocks = network.max_past_blocks.unwrap_or(recommended_past_blocks);
469
470	// Calculate the start block number, using the default if max_past_blocks is not set
471	let start_block = std::cmp::max(
472		last_processed_block + 1,
473		latest_confirmed_block.saturating_sub(max_past_blocks),
474	);
475
476	tracing::info!(
477		"Processing blocks:\n\tLast processed block: {}\n\tLatest confirmed block: {}\n\tStart \
478		 block: {}{}\n\tConfirmations required: {}\n\tMax past blocks: {}",
479		last_processed_block,
480		latest_confirmed_block,
481		start_block,
482		if start_block > last_processed_block + 1 {
483			format!(
484				" (skipped {} blocks)",
485				start_block - (last_processed_block + 1)
486			)
487		} else {
488			String::new()
489		},
490		network.confirmation_blocks,
491		max_past_blocks
492	);
493
494	let mut blocks = Vec::new();
495	if last_processed_block == 0 {
496		blocks = rpc_client
497			.get_blocks(latest_confirmed_block, None)
498			.await
499			.with_context(|| format!("Failed to get block {}", latest_confirmed_block))?;
500	} else if last_processed_block < latest_confirmed_block {
501		blocks = rpc_client
502			.get_blocks(start_block, Some(latest_confirmed_block))
503			.await
504			.with_context(|| {
505				format!(
506					"Failed to get blocks from {} to {}",
507					start_block, latest_confirmed_block
508				)
509			})?;
510	}
511
512	// Reset expected_next to start_block to ensure synchronization with this execution
513	// This prevents false out-of-order warnings when reprocessing blocks or restarting
514	block_tracker
515		.reset_expected_next(network, start_block)
516		.await;
517
518	// Detect missing blocks using BlockTracker
519	let missed_blocks = block_tracker.detect_missing_blocks(network, &blocks).await;
520
521	// Log and save missed blocks if any
522	if !missed_blocks.is_empty() {
523		tracing::error!(
524			network = %network.slug,
525			count = missed_blocks.len(),
526			"Missed {} blocks: {:?}",
527			missed_blocks.len(),
528			missed_blocks
529		);
530
531		// Save missed blocks in batch (enabled if store_blocks OR recovery is enabled)
532		let recovery_enabled = network.recovery_config.as_ref().is_some_and(|c| c.enabled);
533		if network.store_blocks.unwrap_or(false) || recovery_enabled {
534			block_storage
535				.save_missed_blocks(&network.slug, &missed_blocks)
536				.await
537				.with_context(|| format!("Failed to save {} missed blocks", missed_blocks.len()))?;
538		}
539	}
540
541	// Create channels for our pipeline
542	let (process_tx, process_rx) = mpsc::channel::<(BlockType, u64)>(blocks.len() * 2);
543	let (trigger_tx, trigger_rx) = mpsc::channel::<ProcessedBlock>(blocks.len() * 2);
544
545	// Stage 1: Block Processing Pipeline
546	let process_handle = tokio::spawn({
547		let network = network.clone();
548		let block_handler = block_handler.clone();
549		let mut trigger_tx = trigger_tx.clone();
550
551		async move {
552			// Process blocks concurrently, up to 32 at a time
553			let mut results = process_rx
554				.map(|(block, _)| {
555					let network = network.clone();
556					let block_handler = block_handler.clone();
557					async move { (block_handler)(block, network).await }
558				})
559				.buffer_unordered(32);
560
561			// Process all results and send them to trigger channel
562			while let Some(result) = results.next().await {
563				trigger_tx
564					.send(result)
565					.await
566					.with_context(|| "Failed to send processed block")?;
567			}
568
569			Ok::<(), BlockWatcherError>(())
570		}
571	});
572
573	// Stage 2: Trigger Pipeline
574	let trigger_handle = tokio::spawn({
575		let network = network.clone();
576		let trigger_handler = trigger_handler.clone();
577		let block_tracker = block_tracker.clone();
578
579		async move {
580			let mut trigger_rx = trigger_rx;
581			let mut pending_blocks = BTreeMap::new();
582			let mut next_block_number = Some(start_block);
583			let block_tracker = block_tracker.clone();
584
585			// Process all incoming blocks
586			while let Some(processed_block) = trigger_rx.next().await {
587				let block_number = processed_block.block_number;
588
589				// Buffer the block - we'll check and execute in order
590				pending_blocks.insert(block_number, processed_block);
591
592				// Process blocks in order as long as we have the next expected block
593				while let Some(expected) = next_block_number {
594					if let Some(block) = pending_blocks.remove(&expected) {
595						// Check for duplicate or out-of-order blocks when actually executing
596						// This ensures we're checking the execution order, not arrival order
597						match block_tracker
598							.check_processed_block(&network, expected)
599							.await
600						{
601							BlockCheckResult::Ok => {
602								// Block is valid, execute it
603							}
604							BlockCheckResult::Duplicate { last_seen } => {
605								tracing::error!(
606									network = %network.slug,
607									block_number = expected,
608									last_seen = last_seen,
609									"Duplicate block detected: received block {} again (last seen: {})",
610									expected,
611									last_seen
612								);
613							}
614							BlockCheckResult::OutOfOrder {
615								expected: exp,
616								received,
617							} => {
618								tracing::warn!(
619									network = %network.slug,
620									block_number = received,
621									expected = exp,
622									"Out of order block detected: received {} but expected {}",
623									received,
624									exp
625								);
626							}
627						}
628
629						(trigger_handler)(&block);
630						next_block_number = Some(expected + 1);
631					} else {
632						break;
633					}
634				}
635			}
636
637			// Process any remaining blocks in order after the channel is closed
638			while let Some(min_block) = pending_blocks.keys().next().copied() {
639				if let Some(block) = pending_blocks.remove(&min_block) {
640					// Check for duplicate or out-of-order blocks when executing
641					match block_tracker
642						.check_processed_block(&network, min_block)
643						.await
644					{
645						BlockCheckResult::Ok => {
646							// Block is valid, execute it
647						}
648						BlockCheckResult::Duplicate { last_seen } => {
649							tracing::error!(
650								network = %network.slug,
651								block_number = min_block,
652								last_seen = last_seen,
653								"Duplicate block detected: received block {} again (last seen: {})",
654								min_block,
655								last_seen
656							);
657						}
658						BlockCheckResult::OutOfOrder {
659							expected: exp,
660							received,
661						} => {
662							tracing::warn!(
663								network = %network.slug,
664								block_number = received,
665								expected = exp,
666								"Out of order block detected: received {} but expected {}",
667								received,
668								exp
669							);
670						}
671					}
672
673					(trigger_handler)(&block);
674				}
675			}
676			Ok::<(), BlockWatcherError>(())
677		}
678	});
679
680	// Feed blocks into the pipeline
681	futures::future::join_all(blocks.iter().map(|block| {
682		let mut process_tx = process_tx.clone();
683		async move {
684			let block_number = block.number().unwrap_or(0);
685
686			// Send block to processing pipeline
687			process_tx
688				.send((block.clone(), block_number))
689				.await
690				.with_context(|| "Failed to send block to pipeline")?;
691
692			Ok::<(), BlockWatcherError>(())
693		}
694	}))
695	.await
696	.into_iter()
697	.collect::<Result<Vec<_>, _>>()
698	.with_context(|| format!("Failed to process blocks for network {}", network.slug))?;
699
700	// Drop the sender after all blocks are sent
701	drop(process_tx);
702	drop(trigger_tx);
703
704	// Wait for both pipeline stages to complete
705	let (_process_result, _trigger_result) = tokio::join!(process_handle, trigger_handle);
706
707	if network.store_blocks.unwrap_or(false) {
708		// Delete old blocks before saving new ones
709		block_storage
710			.delete_blocks(&network.slug)
711			.await
712			.with_context(|| "Failed to delete old blocks")?;
713
714		block_storage
715			.save_blocks(&network.slug, &blocks)
716			.await
717			.with_context(|| "Failed to save blocks")?;
718	}
719	// Update the last processed block
720	block_storage
721		.save_last_processed_block(&network.slug, latest_confirmed_block)
722		.await
723		.with_context(|| "Failed to save last processed block")?;
724
725	tracing::info!(
726		"Processed {} blocks in {}ms",
727		blocks.len(),
728		start_time.elapsed().as_millis()
729	);
730
731	Ok(())
732}
733
734#[cfg(test)]
735mod tests {
736	use super::*;
737	use crate::models::BlockRecoveryConfig;
738	use crate::services::blockwatcher::storage::FileBlockStorage;
739	use crate::utils::tests::network::NetworkBuilder;
740	use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
741	use tempfile::tempdir;
742
743	/// Helper to create an EVM block with a specific block number
744	fn create_evm_block(block_number: u64) -> BlockType {
745		use crate::models::EVMBlock;
746		use alloy::rpc::types::{Block, Header};
747
748		let alloy_block = Block {
749			header: Header {
750				inner: alloy::consensus::Header {
751					number: block_number,
752					..Default::default()
753				},
754				..Default::default()
755			},
756			..Default::default()
757		};
758		let evm_block: EVMBlock = alloy_block.into();
759		BlockType::EVM(Box::new(evm_block))
760	}
761
762	fn create_test_network() -> Network {
763		NetworkBuilder::new()
764			.name("Test Network")
765			.slug("test_network")
766			.store_blocks(true)
767			.confirmation_blocks(12)
768			.max_past_blocks(100)
769			.build()
770	}
771
772	fn create_test_network_with_recovery() -> Network {
773		NetworkBuilder::new()
774			.name("Test Network")
775			.slug("test_network")
776			.store_blocks(true)
777			.confirmation_blocks(12)
778			.max_past_blocks(100)
779			.recovery_config(BlockRecoveryConfig {
780				enabled: true,
781				cron_schedule: "0 */5 * * * *".to_string(),
782				max_blocks_per_run: 10,
783				max_block_age: 1000,
784				max_retries: 3,
785				retry_delay_ms: 100,
786			})
787			.build()
788	}
789
790	/// Mock RPC client for testing
791	#[derive(Clone)]
792	struct MockRpcClient {
793		latest_block: Arc<AtomicU64>,
794		blocks_to_return: Arc<std::sync::Mutex<Vec<BlockType>>>,
795		fail_get_blocks: Arc<AtomicBool>,
796		call_count: Arc<AtomicUsize>,
797	}
798
799	impl MockRpcClient {
800		fn new(latest_block: u64) -> Self {
801			Self {
802				latest_block: Arc::new(AtomicU64::new(latest_block)),
803				blocks_to_return: Arc::new(std::sync::Mutex::new(Vec::new())),
804				fail_get_blocks: Arc::new(AtomicBool::new(false)),
805				call_count: Arc::new(AtomicUsize::new(0)),
806			}
807		}
808
809		fn with_blocks(self, blocks: Vec<BlockType>) -> Self {
810			*self.blocks_to_return.lock().unwrap() = blocks;
811			self
812		}
813
814		#[allow(dead_code)]
815		fn with_failing_get_blocks(self) -> Self {
816			self.fail_get_blocks.store(true, Ordering::SeqCst);
817			self
818		}
819	}
820
821	#[async_trait::async_trait]
822	impl BlockChainClient for MockRpcClient {
823		async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
824			Ok(self.latest_block.load(Ordering::SeqCst))
825		}
826
827		async fn get_blocks(
828			&self,
829			start: u64,
830			end: Option<u64>,
831		) -> Result<Vec<BlockType>, anyhow::Error> {
832			self.call_count.fetch_add(1, Ordering::SeqCst);
833
834			if self.fail_get_blocks.load(Ordering::SeqCst) {
835				return Err(anyhow::anyhow!("Simulated RPC failure"));
836			}
837
838			let blocks = self.blocks_to_return.lock().unwrap();
839			if !blocks.is_empty() {
840				return Ok(blocks.clone());
841			}
842
843			// Generate mock blocks for the requested range
844			let end_block = end.unwrap_or(start);
845			let mut result = Vec::new();
846			for block_num in start..=end_block {
847				result.push(create_evm_block(block_num));
848			}
849			Ok(result)
850		}
851	}
852
853	fn create_block_handler() -> Arc<
854		impl Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
855	> {
856		Arc::new(|block: BlockType, network: Network| {
857			Box::pin(async move {
858				ProcessedBlock {
859					network_slug: network.slug,
860					block_number: block.number().unwrap_or(0),
861					processing_results: vec![],
862				}
863			}) as BoxFuture<'static, ProcessedBlock>
864		})
865	}
866
867	fn create_trigger_handler(
868	) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static> {
869		Arc::new(|_block: &ProcessedBlock| tokio::spawn(async move {}))
870	}
871
872	fn create_counting_trigger_handler(
873		counter: Arc<AtomicUsize>,
874	) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static> {
875		Arc::new(move |_block: &ProcessedBlock| {
876			let counter = counter.clone();
877			tokio::spawn(async move {
878				counter.fetch_add(1, Ordering::SeqCst);
879			})
880		})
881	}
882
883	// ============ process_new_blocks tests ============
884
885	#[tokio::test]
886	async fn test_process_new_blocks_first_run() {
887		let temp_dir = tempdir().unwrap();
888		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
889		let network = create_test_network();
890		let rpc_client = MockRpcClient::new(100);
891		let block_tracker = Arc::new(BlockTracker::new(100));
892		let block_handler = create_block_handler();
893		let trigger_handler = create_trigger_handler();
894
895		let result = process_new_blocks(
896			&network,
897			&rpc_client,
898			storage.clone(),
899			block_handler,
900			trigger_handler,
901			block_tracker,
902		)
903		.await;
904
905		assert!(result.is_ok());
906
907		// First run should save latest_confirmed_block (100 - 12 = 88)
908		let last_processed = storage
909			.get_last_processed_block("test_network")
910			.await
911			.unwrap();
912		assert_eq!(last_processed, Some(88));
913	}
914
915	#[tokio::test]
916	async fn test_process_new_blocks_subsequent_run() {
917		let temp_dir = tempdir().unwrap();
918		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
919
920		// Simulate a previous run
921		storage
922			.save_last_processed_block("test_network", 80)
923			.await
924			.unwrap();
925
926		let network = create_test_network();
927		let rpc_client = MockRpcClient::new(100);
928		let block_tracker = Arc::new(BlockTracker::new(100));
929		let block_handler = create_block_handler();
930		let trigger_handler = create_trigger_handler();
931
932		let result = process_new_blocks(
933			&network,
934			&rpc_client,
935			storage.clone(),
936			block_handler,
937			trigger_handler,
938			block_tracker,
939		)
940		.await;
941
942		assert!(result.is_ok());
943
944		// Should process blocks from 81 to 88
945		let last_processed = storage
946			.get_last_processed_block("test_network")
947			.await
948			.unwrap();
949		assert_eq!(last_processed, Some(88));
950	}
951
952	#[tokio::test]
953	async fn test_process_new_blocks_with_store_blocks_enabled() {
954		let temp_dir = tempdir().unwrap();
955		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
956
957		let mut network = create_test_network();
958		network.store_blocks = Some(true);
959
960		let rpc_client = MockRpcClient::new(100);
961		let block_tracker = Arc::new(BlockTracker::new(100));
962		let block_handler = create_block_handler();
963		let trigger_handler = create_trigger_handler();
964
965		let result = process_new_blocks(
966			&network,
967			&rpc_client,
968			storage.clone(),
969			block_handler,
970			trigger_handler,
971			block_tracker,
972		)
973		.await;
974
975		assert!(result.is_ok());
976
977		// Check that blocks were saved (file should exist)
978		let pattern = format!("{}/test_network_blocks_*.json", temp_dir.path().display());
979		let files: Vec<_> = glob::glob(&pattern).unwrap().collect();
980		assert!(
981			!files.is_empty(),
982			"Block files should be created when store_blocks is enabled"
983		);
984	}
985
986	#[tokio::test]
987	async fn test_process_new_blocks_with_store_blocks_disabled() {
988		let temp_dir = tempdir().unwrap();
989		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
990
991		let mut network = create_test_network();
992		network.store_blocks = Some(false);
993
994		let rpc_client = MockRpcClient::new(100);
995		let block_tracker = Arc::new(BlockTracker::new(100));
996		let block_handler = create_block_handler();
997		let trigger_handler = create_trigger_handler();
998
999		let result = process_new_blocks(
1000			&network,
1001			&rpc_client,
1002			storage.clone(),
1003			block_handler,
1004			trigger_handler,
1005			block_tracker,
1006		)
1007		.await;
1008
1009		assert!(result.is_ok());
1010
1011		// Check that no block files were created
1012		let pattern = format!("{}/test_network_blocks_*.json", temp_dir.path().display());
1013		let files: Vec<_> = glob::glob(&pattern).unwrap().collect();
1014		assert!(
1015			files.is_empty(),
1016			"Block files should not be created when store_blocks is disabled"
1017		);
1018	}
1019
1020	#[tokio::test]
1021	async fn test_process_new_blocks_skips_old_blocks() {
1022		let temp_dir = tempdir().unwrap();
1023		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1024
1025		// Last processed was very long ago
1026		storage
1027			.save_last_processed_block("test_network", 10)
1028			.await
1029			.unwrap();
1030
1031		let mut network = create_test_network();
1032		network.max_past_blocks = Some(50); // Only process last 50 blocks
1033
1034		let rpc_client = MockRpcClient::new(1000);
1035		let block_tracker = Arc::new(BlockTracker::new(100));
1036		let block_handler = create_block_handler();
1037		let trigger_handler = create_trigger_handler();
1038
1039		let result = process_new_blocks(
1040			&network,
1041			&rpc_client,
1042			storage.clone(),
1043			block_handler,
1044			trigger_handler,
1045			block_tracker,
1046		)
1047		.await;
1048
1049		assert!(result.is_ok());
1050
1051		// Should skip to recent blocks (current 1000 - confirmations 12 - max_past 50 = 938)
1052		let last_processed = storage
1053			.get_last_processed_block("test_network")
1054			.await
1055			.unwrap();
1056		assert_eq!(last_processed, Some(988)); // 1000 - 12 confirmations
1057	}
1058
1059	#[tokio::test]
1060	async fn test_process_new_blocks_detects_missed_blocks() {
1061		let temp_dir = tempdir().unwrap();
1062		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1063
1064		storage
1065			.save_last_processed_block("test_network", 95)
1066			.await
1067			.unwrap();
1068
1069		let network = create_test_network_with_recovery();
1070
1071		// Create blocks with a gap (missing block 97)
1072		let rpc_client =
1073			MockRpcClient::new(110).with_blocks(vec![create_evm_block(96), create_evm_block(98)]);
1074
1075		let block_tracker = Arc::new(BlockTracker::new(100));
1076		let block_handler = create_block_handler();
1077		let trigger_handler = create_trigger_handler();
1078
1079		let _ = process_new_blocks(
1080			&network,
1081			&rpc_client,
1082			storage.clone(),
1083			block_handler,
1084			trigger_handler,
1085			block_tracker,
1086		)
1087		.await;
1088
1089		// Verify missed block was saved
1090		let missed = storage
1091			.get_missed_blocks("test_network", 1000, 1000, 3)
1092			.await
1093			.unwrap();
1094
1095		// Block 97 should be recorded as missed
1096		let missed_numbers: Vec<u64> = missed.iter().map(|e| e.block_number).collect();
1097		assert!(missed_numbers.contains(&97));
1098	}
1099
1100	#[tokio::test]
1101	async fn test_process_new_blocks_no_new_blocks() {
1102		let temp_dir = tempdir().unwrap();
1103		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1104
1105		// Already processed up to current confirmed block
1106		storage
1107			.save_last_processed_block("test_network", 88)
1108			.await
1109			.unwrap();
1110
1111		let network = create_test_network();
1112		let rpc_client = MockRpcClient::new(100); // 100 - 12 = 88 confirmed
1113		let block_tracker = Arc::new(BlockTracker::new(100));
1114		let block_handler = create_block_handler();
1115		let trigger_handler = create_trigger_handler();
1116
1117		let result = process_new_blocks(
1118			&network,
1119			&rpc_client,
1120			storage.clone(),
1121			block_handler,
1122			trigger_handler,
1123			block_tracker,
1124		)
1125		.await;
1126
1127		assert!(result.is_ok());
1128	}
1129
1130	#[tokio::test]
1131	async fn test_process_new_blocks_triggers_handlers() {
1132		let temp_dir = tempdir().unwrap();
1133		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1134
1135		storage
1136			.save_last_processed_block("test_network", 85)
1137			.await
1138			.unwrap();
1139
1140		let network = create_test_network();
1141		let rpc_client = MockRpcClient::new(100);
1142		let block_tracker = Arc::new(BlockTracker::new(100));
1143		let block_handler = create_block_handler();
1144
1145		let trigger_count = Arc::new(AtomicUsize::new(0));
1146		let trigger_handler = create_counting_trigger_handler(trigger_count.clone());
1147
1148		let result = process_new_blocks(
1149			&network,
1150			&rpc_client,
1151			storage,
1152			block_handler,
1153			trigger_handler,
1154			block_tracker,
1155		)
1156		.await;
1157
1158		assert!(result.is_ok());
1159
1160		// Wait a bit for async triggers to complete
1161		tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1162
1163		// Should have triggered for blocks 86, 87, 88 (3 blocks)
1164		assert_eq!(trigger_count.load(Ordering::SeqCst), 3);
1165	}
1166
1167	#[tokio::test]
1168	async fn test_process_new_blocks_handles_duplicate_blocks() {
1169		let temp_dir = tempdir().unwrap();
1170		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1171
1172		storage
1173			.save_last_processed_block("test_network", 95)
1174			.await
1175			.unwrap();
1176
1177		let network = create_test_network();
1178
1179		// Create blocks with duplicates
1180		let rpc_client = MockRpcClient::new(110).with_blocks(vec![
1181			create_evm_block(96),
1182			create_evm_block(97),
1183			create_evm_block(96), // Duplicate
1184		]);
1185
1186		let block_tracker = Arc::new(BlockTracker::new(100));
1187		let block_handler = create_block_handler();
1188		let trigger_handler = create_trigger_handler();
1189
1190		// Should handle duplicates gracefully (BlockTracker detects them)
1191		let result = process_new_blocks(
1192			&network,
1193			&rpc_client,
1194			storage,
1195			block_handler,
1196			trigger_handler,
1197			block_tracker,
1198		)
1199		.await;
1200
1201		assert!(result.is_ok());
1202	}
1203
1204	#[tokio::test]
1205	async fn test_process_new_blocks_handles_out_of_order_blocks() {
1206		let temp_dir = tempdir().unwrap();
1207		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1208
1209		storage
1210			.save_last_processed_block("test_network", 95)
1211			.await
1212			.unwrap();
1213
1214		let network = create_test_network();
1215
1216		// Create blocks in reverse order
1217		let rpc_client = MockRpcClient::new(110).with_blocks(vec![
1218			create_evm_block(98),
1219			create_evm_block(97),
1220			create_evm_block(96),
1221		]);
1222
1223		let block_tracker = Arc::new(BlockTracker::new(100));
1224		let block_handler = create_block_handler();
1225		let trigger_handler = create_trigger_handler();
1226
1227		// Should handle out-of-order blocks (they get reordered in the trigger pipeline)
1228		let result = process_new_blocks(
1229			&network,
1230			&rpc_client,
1231			storage,
1232			block_handler,
1233			trigger_handler,
1234			block_tracker,
1235		)
1236		.await;
1237
1238		assert!(result.is_ok());
1239	}
1240
1241	#[tokio::test]
1242	async fn test_process_new_blocks_saves_missed_blocks_when_recovery_enabled() {
1243		let temp_dir = tempdir().unwrap();
1244		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1245
1246		storage
1247			.save_last_processed_block("test_network", 95)
1248			.await
1249			.unwrap();
1250
1251		// Network with store_blocks=false but recovery enabled
1252		let mut network = create_test_network_with_recovery();
1253		network.store_blocks = Some(false);
1254
1255		// Create blocks with a gap
1256		let rpc_client =
1257			MockRpcClient::new(110).with_blocks(vec![create_evm_block(96), create_evm_block(98)]);
1258
1259		let block_tracker = Arc::new(BlockTracker::new(100));
1260		let block_handler = create_block_handler();
1261		let trigger_handler = create_trigger_handler();
1262
1263		let _ = process_new_blocks(
1264			&network,
1265			&rpc_client,
1266			storage.clone(),
1267			block_handler,
1268			trigger_handler,
1269			block_tracker,
1270		)
1271		.await;
1272
1273		// Missed blocks should be saved even when store_blocks is false (because recovery is enabled)
1274		let missed = storage
1275			.get_missed_blocks("test_network", 1000, 1000, 3)
1276			.await
1277			.unwrap();
1278		assert!(!missed.is_empty());
1279	}
1280
1281	// ============ BlockWatcherService tests ============
1282
1283	/// Mock job scheduler for testing
1284	#[derive(Clone)]
1285	struct MockJobScheduler {
1286		started: Arc<AtomicBool>,
1287		shutdown_called: Arc<AtomicBool>,
1288		jobs_added: Arc<AtomicUsize>,
1289		fail_new: bool,
1290		fail_add: bool,
1291		fail_start: bool,
1292		fail_shutdown: bool,
1293	}
1294
1295	impl MockJobScheduler {
1296		fn new() -> Self {
1297			Self {
1298				started: Arc::new(AtomicBool::new(false)),
1299				shutdown_called: Arc::new(AtomicBool::new(false)),
1300				jobs_added: Arc::new(AtomicUsize::new(0)),
1301				fail_new: false,
1302				fail_add: false,
1303				fail_start: false,
1304				fail_shutdown: false,
1305			}
1306		}
1307
1308		#[allow(dead_code)]
1309		fn with_failing_new() -> Self {
1310			Self {
1311				fail_new: true,
1312				..Self::new()
1313			}
1314		}
1315
1316		#[allow(dead_code)]
1317		fn with_failing_add() -> Self {
1318			Self {
1319				fail_add: true,
1320				..Self::new()
1321			}
1322		}
1323
1324		#[allow(dead_code)]
1325		fn with_failing_start() -> Self {
1326			Self {
1327				fail_start: true,
1328				..Self::new()
1329			}
1330		}
1331
1332		#[allow(dead_code)]
1333		fn with_failing_shutdown() -> Self {
1334			Self {
1335				fail_shutdown: true,
1336				..Self::new()
1337			}
1338		}
1339	}
1340
1341	#[async_trait::async_trait]
1342	impl JobSchedulerTrait for MockJobScheduler {
1343		async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
1344			let scheduler = MockJobScheduler::new();
1345			if scheduler.fail_new {
1346				return Err("Simulated scheduler creation failure".into());
1347			}
1348			Ok(scheduler)
1349		}
1350
1351		async fn add(&self, _job: Job) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1352			if self.fail_add {
1353				return Err("Simulated job add failure".into());
1354			}
1355			self.jobs_added.fetch_add(1, Ordering::SeqCst);
1356			Ok(())
1357		}
1358
1359		async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1360			if self.fail_start {
1361				return Err("Simulated scheduler start failure".into());
1362			}
1363			self.started.store(true, Ordering::SeqCst);
1364			Ok(())
1365		}
1366
1367		async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1368			if self.fail_shutdown {
1369				return Err("Simulated scheduler shutdown failure".into());
1370			}
1371			self.shutdown_called.store(true, Ordering::SeqCst);
1372			Ok(())
1373		}
1374	}
1375
1376	#[tokio::test]
1377	async fn test_block_watcher_service_new() {
1378		let temp_dir = tempdir().unwrap();
1379		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1380		let block_handler = create_block_handler();
1381		let trigger_handler = create_trigger_handler();
1382		let block_tracker = Arc::new(BlockTracker::new(100));
1383
1384		let service: Result<BlockWatcherService<_, _, _, MockJobScheduler>, _> =
1385			BlockWatcherService::new(storage, block_handler, trigger_handler, block_tracker).await;
1386
1387		assert!(service.is_ok());
1388	}
1389
1390	#[tokio::test]
1391	async fn test_network_block_watcher_new() {
1392		let temp_dir = tempdir().unwrap();
1393		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1394		let network = create_test_network();
1395		let block_handler = create_block_handler();
1396		let trigger_handler = create_trigger_handler();
1397		let block_tracker = Arc::new(BlockTracker::new(100));
1398
1399		let watcher: Result<NetworkBlockWatcher<_, _, _, MockJobScheduler>, _> =
1400			NetworkBlockWatcher::new(
1401				network,
1402				storage,
1403				block_handler,
1404				trigger_handler,
1405				block_tracker,
1406			)
1407			.await;
1408
1409		assert!(watcher.is_ok());
1410	}
1411
1412	#[tokio::test]
1413	async fn test_start_network_watcher_already_running() {
1414		let temp_dir = tempdir().unwrap();
1415		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1416		let network = create_test_network();
1417		let block_handler = create_block_handler();
1418		let trigger_handler = create_trigger_handler();
1419		let block_tracker = Arc::new(BlockTracker::new(100));
1420
1421		let service: BlockWatcherService<_, _, _, MockJobScheduler> =
1422			BlockWatcherService::new(storage, block_handler, trigger_handler, block_tracker)
1423				.await
1424				.unwrap();
1425
1426		let rpc_client = MockRpcClient::new(100);
1427
1428		// Start watcher first time
1429		let result = service
1430			.start_network_watcher(&network, rpc_client.clone())
1431			.await;
1432		assert!(result.is_ok());
1433
1434		// Start watcher second time - should return early without error
1435		let result = service.start_network_watcher(&network, rpc_client).await;
1436		assert!(result.is_ok());
1437
1438		// Verify only one watcher exists
1439		let watchers = service.active_watchers.read().await;
1440		assert_eq!(watchers.len(), 1);
1441	}
1442
1443	#[tokio::test]
1444	async fn test_stop_network_watcher() {
1445		let temp_dir = tempdir().unwrap();
1446		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1447		let network = create_test_network();
1448		let block_handler = create_block_handler();
1449		let trigger_handler = create_trigger_handler();
1450		let block_tracker = Arc::new(BlockTracker::new(100));
1451
1452		let service: BlockWatcherService<_, _, _, MockJobScheduler> =
1453			BlockWatcherService::new(storage, block_handler, trigger_handler, block_tracker)
1454				.await
1455				.unwrap();
1456
1457		let rpc_client = MockRpcClient::new(100);
1458
1459		// Start watcher
1460		service
1461			.start_network_watcher(&network, rpc_client)
1462			.await
1463			.unwrap();
1464
1465		// Verify watcher is running
1466		{
1467			let watchers = service.active_watchers.read().await;
1468			assert_eq!(watchers.len(), 1);
1469		}
1470
1471		// Stop watcher
1472		let result = service.stop_network_watcher("test_network").await;
1473		assert!(result.is_ok());
1474
1475		// Verify watcher was removed
1476		let watchers = service.active_watchers.read().await;
1477		assert_eq!(watchers.len(), 0);
1478	}
1479
1480	#[tokio::test]
1481	async fn test_stop_network_watcher_not_running() {
1482		let temp_dir = tempdir().unwrap();
1483		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1484		let block_handler = create_block_handler();
1485		let trigger_handler = create_trigger_handler();
1486		let block_tracker = Arc::new(BlockTracker::new(100));
1487
1488		let service: BlockWatcherService<_, _, _, MockJobScheduler> =
1489			BlockWatcherService::new(storage, block_handler, trigger_handler, block_tracker)
1490				.await
1491				.unwrap();
1492
1493		// Stop watcher that doesn't exist - should not error
1494		let result = service.stop_network_watcher("nonexistent").await;
1495		assert!(result.is_ok());
1496	}
1497
1498	#[tokio::test]
1499	async fn test_network_watcher_start_with_recovery() {
1500		let temp_dir = tempdir().unwrap();
1501		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1502		let network = create_test_network_with_recovery();
1503		let block_handler = create_block_handler();
1504		let trigger_handler = create_trigger_handler();
1505		let block_tracker = Arc::new(BlockTracker::new(100));
1506
1507		let mut watcher: NetworkBlockWatcher<_, _, _, MockJobScheduler> = NetworkBlockWatcher::new(
1508			network,
1509			storage,
1510			block_handler,
1511			trigger_handler,
1512			block_tracker,
1513		)
1514		.await
1515		.unwrap();
1516
1517		let rpc_client = MockRpcClient::new(100);
1518		let result = watcher.start(rpc_client).await;
1519
1520		assert!(result.is_ok());
1521		// With recovery enabled, 2 jobs should be added (main watcher + recovery)
1522		assert_eq!(watcher.scheduler.jobs_added.load(Ordering::SeqCst), 2);
1523	}
1524
1525	#[tokio::test]
1526	async fn test_network_watcher_start_without_recovery() {
1527		let temp_dir = tempdir().unwrap();
1528		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1529		let network = create_test_network(); // No recovery config
1530		let block_handler = create_block_handler();
1531		let trigger_handler = create_trigger_handler();
1532		let block_tracker = Arc::new(BlockTracker::new(100));
1533
1534		let mut watcher: NetworkBlockWatcher<_, _, _, MockJobScheduler> = NetworkBlockWatcher::new(
1535			network,
1536			storage,
1537			block_handler,
1538			trigger_handler,
1539			block_tracker,
1540		)
1541		.await
1542		.unwrap();
1543
1544		let rpc_client = MockRpcClient::new(100);
1545		let result = watcher.start(rpc_client).await;
1546
1547		assert!(result.is_ok());
1548		// Without recovery, only 1 job should be added (main watcher)
1549		assert_eq!(watcher.scheduler.jobs_added.load(Ordering::SeqCst), 1);
1550	}
1551
1552	#[tokio::test]
1553	async fn test_network_watcher_stop() {
1554		let temp_dir = tempdir().unwrap();
1555		let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1556		let network = create_test_network();
1557		let block_handler = create_block_handler();
1558		let trigger_handler = create_trigger_handler();
1559		let block_tracker = Arc::new(BlockTracker::new(100));
1560
1561		let mut watcher: NetworkBlockWatcher<_, _, _, MockJobScheduler> = NetworkBlockWatcher::new(
1562			network,
1563			storage,
1564			block_handler,
1565			trigger_handler,
1566			block_tracker,
1567		)
1568		.await
1569		.unwrap();
1570
1571		let rpc_client = MockRpcClient::new(100);
1572		watcher.start(rpc_client).await.unwrap();
1573
1574		let result = watcher.stop().await;
1575		assert!(result.is_ok());
1576		assert!(watcher.scheduler.shutdown_called.load(Ordering::SeqCst));
1577	}
1578}