1use anyhow::Context;
7use futures::{channel::mpsc, future::BoxFuture, stream::StreamExt, SinkExt};
8use std::{
9 collections::{BTreeMap, HashMap},
10 sync::Arc,
11};
12use tokio::sync::RwLock;
13use tokio_cron_scheduler::{Job, JobScheduler};
14use tracing::instrument;
15
16use crate::{
17 models::{BlockType, Network, ProcessedBlock},
18 services::{
19 blockchain::BlockChainClient,
20 blockwatcher::{
21 error::BlockWatcherError,
22 recovery::process_missed_blocks,
23 storage::BlockStorage,
24 tracker::{BlockCheckResult, BlockTracker, BlockTrackerTrait},
25 },
26 },
27};
28
29#[async_trait::async_trait]
35pub trait JobSchedulerTrait: Send + Sync + Sized {
36 async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>>;
37 async fn add(&self, job: Job) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
38 async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
39 async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
40}
41
42#[async_trait::async_trait]
44impl JobSchedulerTrait for JobScheduler {
45 async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
46 Self::new().await.map_err(Into::into)
47 }
48
49 async fn add(&self, job: Job) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
50 self.add(job).await.map(|_| ()).map_err(Into::into)
51 }
52
53 async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
54 self.start().await.map(|_| ()).map_err(Into::into)
55 }
56
57 async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
58 self.shutdown().await.map(|_| ()).map_err(Into::into)
59 }
60}
61
62pub struct NetworkBlockWatcher<S, H, T, J>
73where
74 J: JobSchedulerTrait,
75{
76 pub network: Network,
77 pub block_storage: Arc<S>,
78 pub block_handler: Arc<H>,
79 pub trigger_handler: Arc<T>,
80 pub scheduler: J,
81 pub block_tracker: Arc<BlockTracker>,
82}
83
84type BlockWatchersMap<S, H, T, J> = HashMap<String, NetworkBlockWatcher<S, H, T, J>>;
86
87pub struct BlockWatcherService<S, H, T, J>
98where
99 J: JobSchedulerTrait,
100{
101 pub block_storage: Arc<S>,
102 pub block_handler: Arc<H>,
103 pub trigger_handler: Arc<T>,
104 pub active_watchers: Arc<RwLock<BlockWatchersMap<S, H, T, J>>>,
105 pub block_tracker: Arc<BlockTracker>,
106}
107
108impl<S, H, T, J> NetworkBlockWatcher<S, H, T, J>
109where
110 S: BlockStorage + Send + Sync + 'static,
111 H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
112 T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
113 J: JobSchedulerTrait,
114{
115 pub async fn new(
127 network: Network,
128 block_storage: Arc<S>,
129 block_handler: Arc<H>,
130 trigger_handler: Arc<T>,
131 block_tracker: Arc<BlockTracker>,
132 ) -> Result<Self, BlockWatcherError> {
133 let scheduler = J::new().await.map_err(|e| {
134 BlockWatcherError::scheduler_error(
135 e.to_string(),
136 Some(e),
137 Some(HashMap::from([(
138 "network".to_string(),
139 network.slug.clone(),
140 )])),
141 )
142 })?;
143 Ok(Self {
144 network,
145 block_storage,
146 block_handler,
147 trigger_handler,
148 scheduler,
149 block_tracker,
150 })
151 }
152
153 pub async fn start<C: BlockChainClient + Clone + Send + 'static>(
158 &mut self,
159 rpc_client: C,
160 ) -> Result<(), BlockWatcherError> {
161 self.start_main_watcher(rpc_client.clone()).await?;
163
164 if let Some(ref config) = self.network.recovery_config {
166 if config.enabled {
167 self.start_recovery_job(rpc_client).await?;
168 }
169 }
170
171 self.scheduler.start().await.map_err(|e| {
172 BlockWatcherError::scheduler_error(
173 e.to_string(),
174 Some(e),
175 Some(HashMap::from([(
176 "network".to_string(),
177 self.network.slug.clone(),
178 )])),
179 )
180 })?;
181
182 tracing::info!("Started block watcher for network: {}", self.network.slug);
183 Ok(())
184 }
185
186 async fn start_main_watcher<C: BlockChainClient + Clone + Send + 'static>(
188 &mut self,
189 rpc_client: C,
190 ) -> Result<(), BlockWatcherError> {
191 let network = self.network.clone();
192 let block_storage = self.block_storage.clone();
193 let block_handler = self.block_handler.clone();
194 let trigger_handler = self.trigger_handler.clone();
195 let block_tracker = self.block_tracker.clone();
196
197 let job = Job::new_async(self.network.cron_schedule.as_str(), move |_uuid, _l| {
198 let network = network.clone();
199 let block_storage = block_storage.clone();
200 let block_handler = block_handler.clone();
201 let block_tracker = block_tracker.clone();
202 let rpc_client = rpc_client.clone();
203 let trigger_handler = trigger_handler.clone();
204 Box::pin(async move {
205 let _ = process_new_blocks(
206 &network,
207 &rpc_client,
208 block_storage,
209 block_handler,
210 trigger_handler,
211 block_tracker,
212 )
213 .await
214 .map_err(|e| {
215 BlockWatcherError::processing_error(
216 "Failed to process blocks".to_string(),
217 Some(e.into()),
218 Some(HashMap::from([(
219 "network".to_string(),
220 network.slug.clone(),
221 )])),
222 )
223 });
224 })
225 })
226 .with_context(|| "Failed to create main watcher job")?;
227
228 self.scheduler.add(job).await.map_err(|e| {
229 BlockWatcherError::scheduler_error(
230 e.to_string(),
231 Some(e),
232 Some(HashMap::from([(
233 "network".to_string(),
234 self.network.slug.clone(),
235 )])),
236 )
237 })?;
238
239 Ok(())
240 }
241
242 async fn start_recovery_job<C: BlockChainClient + Clone + Send + 'static>(
244 &mut self,
245 rpc_client: C,
246 ) -> Result<(), BlockWatcherError> {
247 let recovery_config = self
248 .network
249 .recovery_config
250 .as_ref()
251 .ok_or_else(|| {
252 BlockWatcherError::recovery_error(
253 "Recovery config is required but not found".to_string(),
254 None,
255 Some(HashMap::from([(
256 "network".to_string(),
257 self.network.slug.clone(),
258 )])),
259 )
260 })?
261 .clone();
262
263 let network = self.network.clone();
264 let block_storage = self.block_storage.clone();
265 let block_handler = self.block_handler.clone();
266 let trigger_handler = self.trigger_handler.clone();
267 let block_tracker = self.block_tracker.clone();
268
269 let cron_schedule = recovery_config.cron_schedule.clone();
270 let job = Job::new_async(cron_schedule.as_str(), move |_uuid, _l| {
271 let network = network.clone();
272 let recovery_config = recovery_config.clone();
273 let block_storage = block_storage.clone();
274 let block_handler = block_handler.clone();
275 let block_tracker = block_tracker.clone();
276 let rpc_client = rpc_client.clone();
277 let trigger_handler = trigger_handler.clone();
278 Box::pin(async move {
279 let _ = process_missed_blocks(
280 &network,
281 &recovery_config,
282 &rpc_client,
283 block_storage,
284 block_handler,
285 trigger_handler,
286 block_tracker,
287 )
288 .await
289 .map_err(|e| {
290 BlockWatcherError::recovery_error(
291 "Failed to process missed blocks".to_string(),
292 Some(e.into()),
293 Some(HashMap::from([(
294 "network".to_string(),
295 network.slug.clone(),
296 )])),
297 )
298 });
299 })
300 })
301 .with_context(|| "Failed to create recovery job")?;
302
303 self.scheduler.add(job).await.map_err(|e| {
304 BlockWatcherError::scheduler_error(
305 e.to_string(),
306 Some(e),
307 Some(HashMap::from([(
308 "network".to_string(),
309 self.network.slug.clone(),
310 )])),
311 )
312 })?;
313
314 tracing::info!(
315 "Started recovery job for network: {} with schedule: {}",
316 self.network.slug,
317 self.network
318 .recovery_config
319 .as_ref()
320 .map(|c| c.cron_schedule.as_str())
321 .unwrap_or("unknown")
322 );
323 Ok(())
324 }
325
326 pub async fn stop(&mut self) -> Result<(), BlockWatcherError> {
330 self.scheduler.shutdown().await.map_err(|e| {
331 BlockWatcherError::scheduler_error(
332 e.to_string(),
333 Some(e),
334 Some(HashMap::from([(
335 "network".to_string(),
336 self.network.slug.clone(),
337 )])),
338 )
339 })?;
340
341 tracing::info!("Stopped block watcher for network: {}", self.network.slug);
342 Ok(())
343 }
344}
345
346impl<S, H, T, J> BlockWatcherService<S, H, T, J>
347where
348 S: BlockStorage + Send + Sync + 'static,
349 H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
350 T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
351 J: JobSchedulerTrait,
352{
353 pub async fn new(
360 block_storage: Arc<S>,
361 block_handler: Arc<H>,
362 trigger_handler: Arc<T>,
363 block_tracker: Arc<BlockTracker>,
364 ) -> Result<Self, BlockWatcherError> {
365 Ok(BlockWatcherService {
366 block_storage,
367 block_handler,
368 trigger_handler,
369 active_watchers: Arc::new(RwLock::new(HashMap::new())),
370 block_tracker,
371 })
372 }
373
374 pub async fn start_network_watcher<C: BlockChainClient + Send + Clone + 'static>(
380 &self,
381 network: &Network,
382 rpc_client: C,
383 ) -> Result<(), BlockWatcherError> {
384 let mut watchers = self.active_watchers.write().await;
385
386 if watchers.contains_key(&network.slug) {
387 tracing::info!(
388 "Block watcher already running for network: {}",
389 network.slug
390 );
391 return Ok(());
392 }
393
394 let mut watcher = NetworkBlockWatcher::new(
395 network.clone(),
396 self.block_storage.clone(),
397 self.block_handler.clone(),
398 self.trigger_handler.clone(),
399 self.block_tracker.clone(),
400 )
401 .await?;
402
403 watcher.start(rpc_client).await?;
404 watchers.insert(network.slug.clone(), watcher);
405
406 Ok(())
407 }
408
409 pub async fn stop_network_watcher(&self, network_slug: &str) -> Result<(), BlockWatcherError> {
414 let mut watchers = self.active_watchers.write().await;
415
416 if let Some(mut watcher) = watchers.remove(network_slug) {
417 watcher.stop().await?;
418 }
419
420 Ok(())
421 }
422}
423
424#[instrument(skip_all, fields(network = network.slug))]
437pub async fn process_new_blocks<
438 S: BlockStorage,
439 C: BlockChainClient + Send + Clone + 'static,
440 H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
441 T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
442 TR: BlockTrackerTrait + Send + Sync + 'static,
443>(
444 network: &Network,
445 rpc_client: &C,
446 block_storage: Arc<S>,
447 block_handler: Arc<H>,
448 trigger_handler: Arc<T>,
449 block_tracker: Arc<TR>,
450) -> Result<(), BlockWatcherError> {
451 let start_time = std::time::Instant::now();
452
453 let last_processed_block = block_storage
454 .get_last_processed_block(&network.slug)
455 .await
456 .with_context(|| "Failed to get last processed block")?
457 .unwrap_or(0);
458
459 let latest_block = rpc_client
460 .get_latest_block_number()
461 .await
462 .with_context(|| "Failed to get latest block number")?;
463
464 let latest_confirmed_block = latest_block.saturating_sub(network.confirmation_blocks);
465
466 let recommended_past_blocks = network.get_recommended_past_blocks();
467
468 let max_past_blocks = network.max_past_blocks.unwrap_or(recommended_past_blocks);
469
470 let start_block = std::cmp::max(
472 last_processed_block + 1,
473 latest_confirmed_block.saturating_sub(max_past_blocks),
474 );
475
476 tracing::info!(
477 "Processing blocks:\n\tLast processed block: {}\n\tLatest confirmed block: {}\n\tStart \
478 block: {}{}\n\tConfirmations required: {}\n\tMax past blocks: {}",
479 last_processed_block,
480 latest_confirmed_block,
481 start_block,
482 if start_block > last_processed_block + 1 {
483 format!(
484 " (skipped {} blocks)",
485 start_block - (last_processed_block + 1)
486 )
487 } else {
488 String::new()
489 },
490 network.confirmation_blocks,
491 max_past_blocks
492 );
493
494 let mut blocks = Vec::new();
495 if last_processed_block == 0 {
496 blocks = rpc_client
497 .get_blocks(latest_confirmed_block, None)
498 .await
499 .with_context(|| format!("Failed to get block {}", latest_confirmed_block))?;
500 } else if last_processed_block < latest_confirmed_block {
501 blocks = rpc_client
502 .get_blocks(start_block, Some(latest_confirmed_block))
503 .await
504 .with_context(|| {
505 format!(
506 "Failed to get blocks from {} to {}",
507 start_block, latest_confirmed_block
508 )
509 })?;
510 }
511
512 block_tracker
515 .reset_expected_next(network, start_block)
516 .await;
517
518 let missed_blocks = block_tracker.detect_missing_blocks(network, &blocks).await;
520
521 if !missed_blocks.is_empty() {
523 tracing::error!(
524 network = %network.slug,
525 count = missed_blocks.len(),
526 "Missed {} blocks: {:?}",
527 missed_blocks.len(),
528 missed_blocks
529 );
530
531 let recovery_enabled = network.recovery_config.as_ref().is_some_and(|c| c.enabled);
533 if network.store_blocks.unwrap_or(false) || recovery_enabled {
534 block_storage
535 .save_missed_blocks(&network.slug, &missed_blocks)
536 .await
537 .with_context(|| format!("Failed to save {} missed blocks", missed_blocks.len()))?;
538 }
539 }
540
541 let (process_tx, process_rx) = mpsc::channel::<(BlockType, u64)>(blocks.len() * 2);
543 let (trigger_tx, trigger_rx) = mpsc::channel::<ProcessedBlock>(blocks.len() * 2);
544
545 let process_handle = tokio::spawn({
547 let network = network.clone();
548 let block_handler = block_handler.clone();
549 let mut trigger_tx = trigger_tx.clone();
550
551 async move {
552 let mut results = process_rx
554 .map(|(block, _)| {
555 let network = network.clone();
556 let block_handler = block_handler.clone();
557 async move { (block_handler)(block, network).await }
558 })
559 .buffer_unordered(32);
560
561 while let Some(result) = results.next().await {
563 trigger_tx
564 .send(result)
565 .await
566 .with_context(|| "Failed to send processed block")?;
567 }
568
569 Ok::<(), BlockWatcherError>(())
570 }
571 });
572
573 let trigger_handle = tokio::spawn({
575 let network = network.clone();
576 let trigger_handler = trigger_handler.clone();
577 let block_tracker = block_tracker.clone();
578
579 async move {
580 let mut trigger_rx = trigger_rx;
581 let mut pending_blocks = BTreeMap::new();
582 let mut next_block_number = Some(start_block);
583 let block_tracker = block_tracker.clone();
584
585 while let Some(processed_block) = trigger_rx.next().await {
587 let block_number = processed_block.block_number;
588
589 pending_blocks.insert(block_number, processed_block);
591
592 while let Some(expected) = next_block_number {
594 if let Some(block) = pending_blocks.remove(&expected) {
595 match block_tracker
598 .check_processed_block(&network, expected)
599 .await
600 {
601 BlockCheckResult::Ok => {
602 }
604 BlockCheckResult::Duplicate { last_seen } => {
605 tracing::error!(
606 network = %network.slug,
607 block_number = expected,
608 last_seen = last_seen,
609 "Duplicate block detected: received block {} again (last seen: {})",
610 expected,
611 last_seen
612 );
613 }
614 BlockCheckResult::OutOfOrder {
615 expected: exp,
616 received,
617 } => {
618 tracing::warn!(
619 network = %network.slug,
620 block_number = received,
621 expected = exp,
622 "Out of order block detected: received {} but expected {}",
623 received,
624 exp
625 );
626 }
627 }
628
629 (trigger_handler)(&block);
630 next_block_number = Some(expected + 1);
631 } else {
632 break;
633 }
634 }
635 }
636
637 while let Some(min_block) = pending_blocks.keys().next().copied() {
639 if let Some(block) = pending_blocks.remove(&min_block) {
640 match block_tracker
642 .check_processed_block(&network, min_block)
643 .await
644 {
645 BlockCheckResult::Ok => {
646 }
648 BlockCheckResult::Duplicate { last_seen } => {
649 tracing::error!(
650 network = %network.slug,
651 block_number = min_block,
652 last_seen = last_seen,
653 "Duplicate block detected: received block {} again (last seen: {})",
654 min_block,
655 last_seen
656 );
657 }
658 BlockCheckResult::OutOfOrder {
659 expected: exp,
660 received,
661 } => {
662 tracing::warn!(
663 network = %network.slug,
664 block_number = received,
665 expected = exp,
666 "Out of order block detected: received {} but expected {}",
667 received,
668 exp
669 );
670 }
671 }
672
673 (trigger_handler)(&block);
674 }
675 }
676 Ok::<(), BlockWatcherError>(())
677 }
678 });
679
680 futures::future::join_all(blocks.iter().map(|block| {
682 let mut process_tx = process_tx.clone();
683 async move {
684 let block_number = block.number().unwrap_or(0);
685
686 process_tx
688 .send((block.clone(), block_number))
689 .await
690 .with_context(|| "Failed to send block to pipeline")?;
691
692 Ok::<(), BlockWatcherError>(())
693 }
694 }))
695 .await
696 .into_iter()
697 .collect::<Result<Vec<_>, _>>()
698 .with_context(|| format!("Failed to process blocks for network {}", network.slug))?;
699
700 drop(process_tx);
702 drop(trigger_tx);
703
704 let (_process_result, _trigger_result) = tokio::join!(process_handle, trigger_handle);
706
707 if network.store_blocks.unwrap_or(false) {
708 block_storage
710 .delete_blocks(&network.slug)
711 .await
712 .with_context(|| "Failed to delete old blocks")?;
713
714 block_storage
715 .save_blocks(&network.slug, &blocks)
716 .await
717 .with_context(|| "Failed to save blocks")?;
718 }
719 block_storage
721 .save_last_processed_block(&network.slug, latest_confirmed_block)
722 .await
723 .with_context(|| "Failed to save last processed block")?;
724
725 tracing::info!(
726 "Processed {} blocks in {}ms",
727 blocks.len(),
728 start_time.elapsed().as_millis()
729 );
730
731 Ok(())
732}
733
734#[cfg(test)]
735mod tests {
736 use super::*;
737 use crate::models::BlockRecoveryConfig;
738 use crate::services::blockwatcher::storage::FileBlockStorage;
739 use crate::utils::tests::network::NetworkBuilder;
740 use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
741 use tempfile::tempdir;
742
743 fn create_evm_block(block_number: u64) -> BlockType {
745 use crate::models::EVMBlock;
746 use alloy::rpc::types::{Block, Header};
747
748 let alloy_block = Block {
749 header: Header {
750 inner: alloy::consensus::Header {
751 number: block_number,
752 ..Default::default()
753 },
754 ..Default::default()
755 },
756 ..Default::default()
757 };
758 let evm_block: EVMBlock = alloy_block.into();
759 BlockType::EVM(Box::new(evm_block))
760 }
761
762 fn create_test_network() -> Network {
763 NetworkBuilder::new()
764 .name("Test Network")
765 .slug("test_network")
766 .store_blocks(true)
767 .confirmation_blocks(12)
768 .max_past_blocks(100)
769 .build()
770 }
771
772 fn create_test_network_with_recovery() -> Network {
773 NetworkBuilder::new()
774 .name("Test Network")
775 .slug("test_network")
776 .store_blocks(true)
777 .confirmation_blocks(12)
778 .max_past_blocks(100)
779 .recovery_config(BlockRecoveryConfig {
780 enabled: true,
781 cron_schedule: "0 */5 * * * *".to_string(),
782 max_blocks_per_run: 10,
783 max_block_age: 1000,
784 max_retries: 3,
785 retry_delay_ms: 100,
786 })
787 .build()
788 }
789
790 #[derive(Clone)]
792 struct MockRpcClient {
793 latest_block: Arc<AtomicU64>,
794 blocks_to_return: Arc<std::sync::Mutex<Vec<BlockType>>>,
795 fail_get_blocks: Arc<AtomicBool>,
796 call_count: Arc<AtomicUsize>,
797 }
798
799 impl MockRpcClient {
800 fn new(latest_block: u64) -> Self {
801 Self {
802 latest_block: Arc::new(AtomicU64::new(latest_block)),
803 blocks_to_return: Arc::new(std::sync::Mutex::new(Vec::new())),
804 fail_get_blocks: Arc::new(AtomicBool::new(false)),
805 call_count: Arc::new(AtomicUsize::new(0)),
806 }
807 }
808
809 fn with_blocks(self, blocks: Vec<BlockType>) -> Self {
810 *self.blocks_to_return.lock().unwrap() = blocks;
811 self
812 }
813
814 #[allow(dead_code)]
815 fn with_failing_get_blocks(self) -> Self {
816 self.fail_get_blocks.store(true, Ordering::SeqCst);
817 self
818 }
819 }
820
821 #[async_trait::async_trait]
822 impl BlockChainClient for MockRpcClient {
823 async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
824 Ok(self.latest_block.load(Ordering::SeqCst))
825 }
826
827 async fn get_blocks(
828 &self,
829 start: u64,
830 end: Option<u64>,
831 ) -> Result<Vec<BlockType>, anyhow::Error> {
832 self.call_count.fetch_add(1, Ordering::SeqCst);
833
834 if self.fail_get_blocks.load(Ordering::SeqCst) {
835 return Err(anyhow::anyhow!("Simulated RPC failure"));
836 }
837
838 let blocks = self.blocks_to_return.lock().unwrap();
839 if !blocks.is_empty() {
840 return Ok(blocks.clone());
841 }
842
843 let end_block = end.unwrap_or(start);
845 let mut result = Vec::new();
846 for block_num in start..=end_block {
847 result.push(create_evm_block(block_num));
848 }
849 Ok(result)
850 }
851 }
852
853 fn create_block_handler() -> Arc<
854 impl Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
855 > {
856 Arc::new(|block: BlockType, network: Network| {
857 Box::pin(async move {
858 ProcessedBlock {
859 network_slug: network.slug,
860 block_number: block.number().unwrap_or(0),
861 processing_results: vec![],
862 }
863 }) as BoxFuture<'static, ProcessedBlock>
864 })
865 }
866
867 fn create_trigger_handler(
868 ) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static> {
869 Arc::new(|_block: &ProcessedBlock| tokio::spawn(async move {}))
870 }
871
872 fn create_counting_trigger_handler(
873 counter: Arc<AtomicUsize>,
874 ) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static> {
875 Arc::new(move |_block: &ProcessedBlock| {
876 let counter = counter.clone();
877 tokio::spawn(async move {
878 counter.fetch_add(1, Ordering::SeqCst);
879 })
880 })
881 }
882
883 #[tokio::test]
886 async fn test_process_new_blocks_first_run() {
887 let temp_dir = tempdir().unwrap();
888 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
889 let network = create_test_network();
890 let rpc_client = MockRpcClient::new(100);
891 let block_tracker = Arc::new(BlockTracker::new(100));
892 let block_handler = create_block_handler();
893 let trigger_handler = create_trigger_handler();
894
895 let result = process_new_blocks(
896 &network,
897 &rpc_client,
898 storage.clone(),
899 block_handler,
900 trigger_handler,
901 block_tracker,
902 )
903 .await;
904
905 assert!(result.is_ok());
906
907 let last_processed = storage
909 .get_last_processed_block("test_network")
910 .await
911 .unwrap();
912 assert_eq!(last_processed, Some(88));
913 }
914
915 #[tokio::test]
916 async fn test_process_new_blocks_subsequent_run() {
917 let temp_dir = tempdir().unwrap();
918 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
919
920 storage
922 .save_last_processed_block("test_network", 80)
923 .await
924 .unwrap();
925
926 let network = create_test_network();
927 let rpc_client = MockRpcClient::new(100);
928 let block_tracker = Arc::new(BlockTracker::new(100));
929 let block_handler = create_block_handler();
930 let trigger_handler = create_trigger_handler();
931
932 let result = process_new_blocks(
933 &network,
934 &rpc_client,
935 storage.clone(),
936 block_handler,
937 trigger_handler,
938 block_tracker,
939 )
940 .await;
941
942 assert!(result.is_ok());
943
944 let last_processed = storage
946 .get_last_processed_block("test_network")
947 .await
948 .unwrap();
949 assert_eq!(last_processed, Some(88));
950 }
951
952 #[tokio::test]
953 async fn test_process_new_blocks_with_store_blocks_enabled() {
954 let temp_dir = tempdir().unwrap();
955 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
956
957 let mut network = create_test_network();
958 network.store_blocks = Some(true);
959
960 let rpc_client = MockRpcClient::new(100);
961 let block_tracker = Arc::new(BlockTracker::new(100));
962 let block_handler = create_block_handler();
963 let trigger_handler = create_trigger_handler();
964
965 let result = process_new_blocks(
966 &network,
967 &rpc_client,
968 storage.clone(),
969 block_handler,
970 trigger_handler,
971 block_tracker,
972 )
973 .await;
974
975 assert!(result.is_ok());
976
977 let pattern = format!("{}/test_network_blocks_*.json", temp_dir.path().display());
979 let files: Vec<_> = glob::glob(&pattern).unwrap().collect();
980 assert!(
981 !files.is_empty(),
982 "Block files should be created when store_blocks is enabled"
983 );
984 }
985
986 #[tokio::test]
987 async fn test_process_new_blocks_with_store_blocks_disabled() {
988 let temp_dir = tempdir().unwrap();
989 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
990
991 let mut network = create_test_network();
992 network.store_blocks = Some(false);
993
994 let rpc_client = MockRpcClient::new(100);
995 let block_tracker = Arc::new(BlockTracker::new(100));
996 let block_handler = create_block_handler();
997 let trigger_handler = create_trigger_handler();
998
999 let result = process_new_blocks(
1000 &network,
1001 &rpc_client,
1002 storage.clone(),
1003 block_handler,
1004 trigger_handler,
1005 block_tracker,
1006 )
1007 .await;
1008
1009 assert!(result.is_ok());
1010
1011 let pattern = format!("{}/test_network_blocks_*.json", temp_dir.path().display());
1013 let files: Vec<_> = glob::glob(&pattern).unwrap().collect();
1014 assert!(
1015 files.is_empty(),
1016 "Block files should not be created when store_blocks is disabled"
1017 );
1018 }
1019
1020 #[tokio::test]
1021 async fn test_process_new_blocks_skips_old_blocks() {
1022 let temp_dir = tempdir().unwrap();
1023 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1024
1025 storage
1027 .save_last_processed_block("test_network", 10)
1028 .await
1029 .unwrap();
1030
1031 let mut network = create_test_network();
1032 network.max_past_blocks = Some(50); let rpc_client = MockRpcClient::new(1000);
1035 let block_tracker = Arc::new(BlockTracker::new(100));
1036 let block_handler = create_block_handler();
1037 let trigger_handler = create_trigger_handler();
1038
1039 let result = process_new_blocks(
1040 &network,
1041 &rpc_client,
1042 storage.clone(),
1043 block_handler,
1044 trigger_handler,
1045 block_tracker,
1046 )
1047 .await;
1048
1049 assert!(result.is_ok());
1050
1051 let last_processed = storage
1053 .get_last_processed_block("test_network")
1054 .await
1055 .unwrap();
1056 assert_eq!(last_processed, Some(988)); }
1058
1059 #[tokio::test]
1060 async fn test_process_new_blocks_detects_missed_blocks() {
1061 let temp_dir = tempdir().unwrap();
1062 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1063
1064 storage
1065 .save_last_processed_block("test_network", 95)
1066 .await
1067 .unwrap();
1068
1069 let network = create_test_network_with_recovery();
1070
1071 let rpc_client =
1073 MockRpcClient::new(110).with_blocks(vec![create_evm_block(96), create_evm_block(98)]);
1074
1075 let block_tracker = Arc::new(BlockTracker::new(100));
1076 let block_handler = create_block_handler();
1077 let trigger_handler = create_trigger_handler();
1078
1079 let _ = process_new_blocks(
1080 &network,
1081 &rpc_client,
1082 storage.clone(),
1083 block_handler,
1084 trigger_handler,
1085 block_tracker,
1086 )
1087 .await;
1088
1089 let missed = storage
1091 .get_missed_blocks("test_network", 1000, 1000, 3)
1092 .await
1093 .unwrap();
1094
1095 let missed_numbers: Vec<u64> = missed.iter().map(|e| e.block_number).collect();
1097 assert!(missed_numbers.contains(&97));
1098 }
1099
1100 #[tokio::test]
1101 async fn test_process_new_blocks_no_new_blocks() {
1102 let temp_dir = tempdir().unwrap();
1103 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1104
1105 storage
1107 .save_last_processed_block("test_network", 88)
1108 .await
1109 .unwrap();
1110
1111 let network = create_test_network();
1112 let rpc_client = MockRpcClient::new(100); let block_tracker = Arc::new(BlockTracker::new(100));
1114 let block_handler = create_block_handler();
1115 let trigger_handler = create_trigger_handler();
1116
1117 let result = process_new_blocks(
1118 &network,
1119 &rpc_client,
1120 storage.clone(),
1121 block_handler,
1122 trigger_handler,
1123 block_tracker,
1124 )
1125 .await;
1126
1127 assert!(result.is_ok());
1128 }
1129
1130 #[tokio::test]
1131 async fn test_process_new_blocks_triggers_handlers() {
1132 let temp_dir = tempdir().unwrap();
1133 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1134
1135 storage
1136 .save_last_processed_block("test_network", 85)
1137 .await
1138 .unwrap();
1139
1140 let network = create_test_network();
1141 let rpc_client = MockRpcClient::new(100);
1142 let block_tracker = Arc::new(BlockTracker::new(100));
1143 let block_handler = create_block_handler();
1144
1145 let trigger_count = Arc::new(AtomicUsize::new(0));
1146 let trigger_handler = create_counting_trigger_handler(trigger_count.clone());
1147
1148 let result = process_new_blocks(
1149 &network,
1150 &rpc_client,
1151 storage,
1152 block_handler,
1153 trigger_handler,
1154 block_tracker,
1155 )
1156 .await;
1157
1158 assert!(result.is_ok());
1159
1160 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1162
1163 assert_eq!(trigger_count.load(Ordering::SeqCst), 3);
1165 }
1166
1167 #[tokio::test]
1168 async fn test_process_new_blocks_handles_duplicate_blocks() {
1169 let temp_dir = tempdir().unwrap();
1170 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1171
1172 storage
1173 .save_last_processed_block("test_network", 95)
1174 .await
1175 .unwrap();
1176
1177 let network = create_test_network();
1178
1179 let rpc_client = MockRpcClient::new(110).with_blocks(vec![
1181 create_evm_block(96),
1182 create_evm_block(97),
1183 create_evm_block(96), ]);
1185
1186 let block_tracker = Arc::new(BlockTracker::new(100));
1187 let block_handler = create_block_handler();
1188 let trigger_handler = create_trigger_handler();
1189
1190 let result = process_new_blocks(
1192 &network,
1193 &rpc_client,
1194 storage,
1195 block_handler,
1196 trigger_handler,
1197 block_tracker,
1198 )
1199 .await;
1200
1201 assert!(result.is_ok());
1202 }
1203
1204 #[tokio::test]
1205 async fn test_process_new_blocks_handles_out_of_order_blocks() {
1206 let temp_dir = tempdir().unwrap();
1207 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1208
1209 storage
1210 .save_last_processed_block("test_network", 95)
1211 .await
1212 .unwrap();
1213
1214 let network = create_test_network();
1215
1216 let rpc_client = MockRpcClient::new(110).with_blocks(vec![
1218 create_evm_block(98),
1219 create_evm_block(97),
1220 create_evm_block(96),
1221 ]);
1222
1223 let block_tracker = Arc::new(BlockTracker::new(100));
1224 let block_handler = create_block_handler();
1225 let trigger_handler = create_trigger_handler();
1226
1227 let result = process_new_blocks(
1229 &network,
1230 &rpc_client,
1231 storage,
1232 block_handler,
1233 trigger_handler,
1234 block_tracker,
1235 )
1236 .await;
1237
1238 assert!(result.is_ok());
1239 }
1240
1241 #[tokio::test]
1242 async fn test_process_new_blocks_saves_missed_blocks_when_recovery_enabled() {
1243 let temp_dir = tempdir().unwrap();
1244 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1245
1246 storage
1247 .save_last_processed_block("test_network", 95)
1248 .await
1249 .unwrap();
1250
1251 let mut network = create_test_network_with_recovery();
1253 network.store_blocks = Some(false);
1254
1255 let rpc_client =
1257 MockRpcClient::new(110).with_blocks(vec![create_evm_block(96), create_evm_block(98)]);
1258
1259 let block_tracker = Arc::new(BlockTracker::new(100));
1260 let block_handler = create_block_handler();
1261 let trigger_handler = create_trigger_handler();
1262
1263 let _ = process_new_blocks(
1264 &network,
1265 &rpc_client,
1266 storage.clone(),
1267 block_handler,
1268 trigger_handler,
1269 block_tracker,
1270 )
1271 .await;
1272
1273 let missed = storage
1275 .get_missed_blocks("test_network", 1000, 1000, 3)
1276 .await
1277 .unwrap();
1278 assert!(!missed.is_empty());
1279 }
1280
1281 #[derive(Clone)]
1285 struct MockJobScheduler {
1286 started: Arc<AtomicBool>,
1287 shutdown_called: Arc<AtomicBool>,
1288 jobs_added: Arc<AtomicUsize>,
1289 fail_new: bool,
1290 fail_add: bool,
1291 fail_start: bool,
1292 fail_shutdown: bool,
1293 }
1294
1295 impl MockJobScheduler {
1296 fn new() -> Self {
1297 Self {
1298 started: Arc::new(AtomicBool::new(false)),
1299 shutdown_called: Arc::new(AtomicBool::new(false)),
1300 jobs_added: Arc::new(AtomicUsize::new(0)),
1301 fail_new: false,
1302 fail_add: false,
1303 fail_start: false,
1304 fail_shutdown: false,
1305 }
1306 }
1307
1308 #[allow(dead_code)]
1309 fn with_failing_new() -> Self {
1310 Self {
1311 fail_new: true,
1312 ..Self::new()
1313 }
1314 }
1315
1316 #[allow(dead_code)]
1317 fn with_failing_add() -> Self {
1318 Self {
1319 fail_add: true,
1320 ..Self::new()
1321 }
1322 }
1323
1324 #[allow(dead_code)]
1325 fn with_failing_start() -> Self {
1326 Self {
1327 fail_start: true,
1328 ..Self::new()
1329 }
1330 }
1331
1332 #[allow(dead_code)]
1333 fn with_failing_shutdown() -> Self {
1334 Self {
1335 fail_shutdown: true,
1336 ..Self::new()
1337 }
1338 }
1339 }
1340
1341 #[async_trait::async_trait]
1342 impl JobSchedulerTrait for MockJobScheduler {
1343 async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
1344 let scheduler = MockJobScheduler::new();
1345 if scheduler.fail_new {
1346 return Err("Simulated scheduler creation failure".into());
1347 }
1348 Ok(scheduler)
1349 }
1350
1351 async fn add(&self, _job: Job) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1352 if self.fail_add {
1353 return Err("Simulated job add failure".into());
1354 }
1355 self.jobs_added.fetch_add(1, Ordering::SeqCst);
1356 Ok(())
1357 }
1358
1359 async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1360 if self.fail_start {
1361 return Err("Simulated scheduler start failure".into());
1362 }
1363 self.started.store(true, Ordering::SeqCst);
1364 Ok(())
1365 }
1366
1367 async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1368 if self.fail_shutdown {
1369 return Err("Simulated scheduler shutdown failure".into());
1370 }
1371 self.shutdown_called.store(true, Ordering::SeqCst);
1372 Ok(())
1373 }
1374 }
1375
1376 #[tokio::test]
1377 async fn test_block_watcher_service_new() {
1378 let temp_dir = tempdir().unwrap();
1379 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1380 let block_handler = create_block_handler();
1381 let trigger_handler = create_trigger_handler();
1382 let block_tracker = Arc::new(BlockTracker::new(100));
1383
1384 let service: Result<BlockWatcherService<_, _, _, MockJobScheduler>, _> =
1385 BlockWatcherService::new(storage, block_handler, trigger_handler, block_tracker).await;
1386
1387 assert!(service.is_ok());
1388 }
1389
1390 #[tokio::test]
1391 async fn test_network_block_watcher_new() {
1392 let temp_dir = tempdir().unwrap();
1393 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1394 let network = create_test_network();
1395 let block_handler = create_block_handler();
1396 let trigger_handler = create_trigger_handler();
1397 let block_tracker = Arc::new(BlockTracker::new(100));
1398
1399 let watcher: Result<NetworkBlockWatcher<_, _, _, MockJobScheduler>, _> =
1400 NetworkBlockWatcher::new(
1401 network,
1402 storage,
1403 block_handler,
1404 trigger_handler,
1405 block_tracker,
1406 )
1407 .await;
1408
1409 assert!(watcher.is_ok());
1410 }
1411
1412 #[tokio::test]
1413 async fn test_start_network_watcher_already_running() {
1414 let temp_dir = tempdir().unwrap();
1415 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1416 let network = create_test_network();
1417 let block_handler = create_block_handler();
1418 let trigger_handler = create_trigger_handler();
1419 let block_tracker = Arc::new(BlockTracker::new(100));
1420
1421 let service: BlockWatcherService<_, _, _, MockJobScheduler> =
1422 BlockWatcherService::new(storage, block_handler, trigger_handler, block_tracker)
1423 .await
1424 .unwrap();
1425
1426 let rpc_client = MockRpcClient::new(100);
1427
1428 let result = service
1430 .start_network_watcher(&network, rpc_client.clone())
1431 .await;
1432 assert!(result.is_ok());
1433
1434 let result = service.start_network_watcher(&network, rpc_client).await;
1436 assert!(result.is_ok());
1437
1438 let watchers = service.active_watchers.read().await;
1440 assert_eq!(watchers.len(), 1);
1441 }
1442
1443 #[tokio::test]
1444 async fn test_stop_network_watcher() {
1445 let temp_dir = tempdir().unwrap();
1446 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1447 let network = create_test_network();
1448 let block_handler = create_block_handler();
1449 let trigger_handler = create_trigger_handler();
1450 let block_tracker = Arc::new(BlockTracker::new(100));
1451
1452 let service: BlockWatcherService<_, _, _, MockJobScheduler> =
1453 BlockWatcherService::new(storage, block_handler, trigger_handler, block_tracker)
1454 .await
1455 .unwrap();
1456
1457 let rpc_client = MockRpcClient::new(100);
1458
1459 service
1461 .start_network_watcher(&network, rpc_client)
1462 .await
1463 .unwrap();
1464
1465 {
1467 let watchers = service.active_watchers.read().await;
1468 assert_eq!(watchers.len(), 1);
1469 }
1470
1471 let result = service.stop_network_watcher("test_network").await;
1473 assert!(result.is_ok());
1474
1475 let watchers = service.active_watchers.read().await;
1477 assert_eq!(watchers.len(), 0);
1478 }
1479
1480 #[tokio::test]
1481 async fn test_stop_network_watcher_not_running() {
1482 let temp_dir = tempdir().unwrap();
1483 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1484 let block_handler = create_block_handler();
1485 let trigger_handler = create_trigger_handler();
1486 let block_tracker = Arc::new(BlockTracker::new(100));
1487
1488 let service: BlockWatcherService<_, _, _, MockJobScheduler> =
1489 BlockWatcherService::new(storage, block_handler, trigger_handler, block_tracker)
1490 .await
1491 .unwrap();
1492
1493 let result = service.stop_network_watcher("nonexistent").await;
1495 assert!(result.is_ok());
1496 }
1497
1498 #[tokio::test]
1499 async fn test_network_watcher_start_with_recovery() {
1500 let temp_dir = tempdir().unwrap();
1501 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1502 let network = create_test_network_with_recovery();
1503 let block_handler = create_block_handler();
1504 let trigger_handler = create_trigger_handler();
1505 let block_tracker = Arc::new(BlockTracker::new(100));
1506
1507 let mut watcher: NetworkBlockWatcher<_, _, _, MockJobScheduler> = NetworkBlockWatcher::new(
1508 network,
1509 storage,
1510 block_handler,
1511 trigger_handler,
1512 block_tracker,
1513 )
1514 .await
1515 .unwrap();
1516
1517 let rpc_client = MockRpcClient::new(100);
1518 let result = watcher.start(rpc_client).await;
1519
1520 assert!(result.is_ok());
1521 assert_eq!(watcher.scheduler.jobs_added.load(Ordering::SeqCst), 2);
1523 }
1524
1525 #[tokio::test]
1526 async fn test_network_watcher_start_without_recovery() {
1527 let temp_dir = tempdir().unwrap();
1528 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1529 let network = create_test_network(); let block_handler = create_block_handler();
1531 let trigger_handler = create_trigger_handler();
1532 let block_tracker = Arc::new(BlockTracker::new(100));
1533
1534 let mut watcher: NetworkBlockWatcher<_, _, _, MockJobScheduler> = NetworkBlockWatcher::new(
1535 network,
1536 storage,
1537 block_handler,
1538 trigger_handler,
1539 block_tracker,
1540 )
1541 .await
1542 .unwrap();
1543
1544 let rpc_client = MockRpcClient::new(100);
1545 let result = watcher.start(rpc_client).await;
1546
1547 assert!(result.is_ok());
1548 assert_eq!(watcher.scheduler.jobs_added.load(Ordering::SeqCst), 1);
1550 }
1551
1552 #[tokio::test]
1553 async fn test_network_watcher_stop() {
1554 let temp_dir = tempdir().unwrap();
1555 let storage = Arc::new(FileBlockStorage::new(temp_dir.path().to_path_buf()));
1556 let network = create_test_network();
1557 let block_handler = create_block_handler();
1558 let trigger_handler = create_trigger_handler();
1559 let block_tracker = Arc::new(BlockTracker::new(100));
1560
1561 let mut watcher: NetworkBlockWatcher<_, _, _, MockJobScheduler> = NetworkBlockWatcher::new(
1562 network,
1563 storage,
1564 block_handler,
1565 trigger_handler,
1566 block_tracker,
1567 )
1568 .await
1569 .unwrap();
1570
1571 let rpc_client = MockRpcClient::new(100);
1572 watcher.start(rpc_client).await.unwrap();
1573
1574 let result = watcher.stop().await;
1575 assert!(result.is_ok());
1576 assert!(watcher.scheduler.shutdown_called.load(Ordering::SeqCst));
1577 }
1578}