1use async_trait::async_trait;
11use chrono::Utc;
12use glob::glob;
13use serde::{Deserialize, Serialize};
14use std::path::PathBuf;
15
16use crate::models::BlockType;
17
18#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub enum MissedBlockStatus {
21 Pending,
23 Recovering,
25 Recovered,
27 Failed,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct MissedBlockEntry {
34 pub block_number: u64,
36 pub first_missed_at: i64,
38 pub retry_count: u32,
40 pub status: MissedBlockStatus,
42 pub last_attempt_at: Option<i64>,
44 pub last_error: Option<String>,
46}
47
48impl MissedBlockEntry {
49 pub fn new(block_number: u64) -> Self {
51 Self {
52 block_number,
53 first_missed_at: Utc::now().timestamp(),
54 retry_count: 0,
55 status: MissedBlockStatus::Pending,
56 last_attempt_at: None,
57 last_error: None,
58 }
59 }
60}
61
62#[async_trait]
67pub trait BlockStorage: Clone + Send + Sync {
68 async fn get_last_processed_block(
76 &self,
77 network_id: &str,
78 ) -> Result<Option<u64>, anyhow::Error>;
79
80 async fn save_last_processed_block(
89 &self,
90 network_id: &str,
91 block: u64,
92 ) -> Result<(), anyhow::Error>;
93
94 async fn save_blocks(
103 &self,
104 network_id: &str,
105 blocks: &[BlockType],
106 ) -> Result<(), anyhow::Error>;
107
108 async fn delete_blocks(&self, network_id: &str) -> Result<(), anyhow::Error>;
116
117 async fn save_missed_blocks(
126 &self,
127 network_id: &str,
128 blocks: &[u64],
129 ) -> Result<(), anyhow::Error>;
130
131 async fn get_missed_blocks(
145 &self,
146 network_id: &str,
147 max_block_age: u64,
148 current_block: u64,
149 max_retries: u32,
150 ) -> Result<Vec<MissedBlockEntry>, anyhow::Error>;
151
152 async fn update_missed_block_status(
163 &self,
164 network_id: &str,
165 block_number: u64,
166 status: MissedBlockStatus,
167 error: Option<String>,
168 ) -> Result<(), anyhow::Error>;
169
170 async fn remove_recovered_blocks(
179 &self,
180 network_id: &str,
181 block_numbers: &[u64],
182 ) -> Result<(), anyhow::Error>;
183
184 async fn prune_old_missed_blocks(
194 &self,
195 network_id: &str,
196 max_block_age: u64,
197 current_block: u64,
198 ) -> Result<usize, anyhow::Error>;
199}
200
201#[derive(Clone)]
206pub struct FileBlockStorage {
207 storage_path: PathBuf,
209}
210
211impl FileBlockStorage {
212 pub fn new(storage_path: PathBuf) -> Self {
216 FileBlockStorage { storage_path }
217 }
218}
219
220impl Default for FileBlockStorage {
221 fn default() -> Self {
225 FileBlockStorage::new(PathBuf::from("data"))
226 }
227}
228
229#[async_trait]
230impl BlockStorage for FileBlockStorage {
231 async fn get_last_processed_block(
235 &self,
236 network_id: &str,
237 ) -> Result<Option<u64>, anyhow::Error> {
238 let file_path = self
239 .storage_path
240 .join(format!("{}_last_block.txt", network_id));
241
242 if !file_path.exists() {
243 return Ok(None);
244 }
245
246 let content = tokio::fs::read_to_string(file_path)
247 .await
248 .map_err(|e| anyhow::anyhow!("Failed to read last processed block: {}", e))?;
249 let block_number = content
250 .trim()
251 .parse::<u64>()
252 .map_err(|e| anyhow::anyhow!("Failed to parse last processed block: {}", e))?;
253 Ok(Some(block_number))
254 }
255
256 async fn save_last_processed_block(
261 &self,
262 network_id: &str,
263 block: u64,
264 ) -> Result<(), anyhow::Error> {
265 let file_path = self
266 .storage_path
267 .join(format!("{}_last_block.txt", network_id));
268 tokio::fs::write(file_path, block.to_string())
269 .await
270 .map_err(|e| anyhow::anyhow!("Failed to save last processed block: {}", e))?;
271 Ok(())
272 }
273
274 async fn save_blocks(
280 &self,
281 network_slug: &str,
282 blocks: &[BlockType],
283 ) -> Result<(), anyhow::Error> {
284 let file_path = self.storage_path.join(format!(
285 "{}_blocks_{}.json",
286 network_slug,
287 chrono::Utc::now().timestamp()
288 ));
289 let json = serde_json::to_string(blocks)
290 .map_err(|e| anyhow::anyhow!("Failed to serialize blocks: {}", e))?;
291 tokio::fs::write(file_path, json)
292 .await
293 .map_err(|e| anyhow::anyhow!("Failed to save blocks: {}", e))?;
294 Ok(())
295 }
296
297 async fn delete_blocks(&self, network_slug: &str) -> Result<(), anyhow::Error> {
303 let pattern = self
304 .storage_path
305 .join(format!("{}_blocks_*.json", network_slug))
306 .to_string_lossy()
307 .to_string();
308
309 for entry in glob(&pattern)
310 .map_err(|e| anyhow::anyhow!("Failed to parse blocks: {}", e))?
311 .flatten()
312 {
313 tokio::fs::remove_file(entry)
314 .await
315 .map_err(|e| anyhow::anyhow!("Failed to delete blocks: {}", e))?;
316 }
317 Ok(())
318 }
319
320 async fn save_missed_blocks(
332 &self,
333 network_id: &str,
334 blocks: &[u64],
335 ) -> Result<(), anyhow::Error> {
336 if blocks.is_empty() {
337 return Ok(());
338 }
339
340 let mut entries = self.load_missed_blocks_json(network_id).await?;
342
343 let existing_blocks: std::collections::HashSet<u64> =
345 entries.iter().map(|e| e.block_number).collect();
346
347 for &block_number in blocks {
349 if !existing_blocks.contains(&block_number) {
350 entries.push(MissedBlockEntry::new(block_number));
351 }
352 }
353
354 self.save_missed_blocks_json(network_id, &entries).await
356 }
357
358 async fn get_missed_blocks(
359 &self,
360 network_id: &str,
361 max_block_age: u64,
362 current_block: u64,
363 max_retries: u32,
364 ) -> Result<Vec<MissedBlockEntry>, anyhow::Error> {
365 let entries = self.load_missed_blocks_json(network_id).await?;
366
367 let min_block = current_block.saturating_sub(max_block_age);
369
370 let eligible: Vec<MissedBlockEntry> = entries
372 .into_iter()
373 .filter(|e| {
374 e.block_number >= min_block
375 && e.status == MissedBlockStatus::Pending
376 && e.retry_count < max_retries
377 })
378 .collect();
379
380 Ok(eligible)
381 }
382
383 async fn update_missed_block_status(
384 &self,
385 network_id: &str,
386 block_number: u64,
387 status: MissedBlockStatus,
388 error: Option<String>,
389 ) -> Result<(), anyhow::Error> {
390 let mut entries = self.load_missed_blocks_json(network_id).await?;
391
392 if let Some(entry) = entries.iter_mut().find(|e| e.block_number == block_number) {
394 if status == MissedBlockStatus::Pending || status == MissedBlockStatus::Failed {
396 entry.retry_count += 1;
397 }
398 entry.status = status;
399 entry.last_attempt_at = Some(Utc::now().timestamp());
400 if error.is_some() {
401 entry.last_error = error;
402 }
403 }
404
405 self.save_missed_blocks_json(network_id, &entries).await
406 }
407
408 async fn remove_recovered_blocks(
409 &self,
410 network_id: &str,
411 block_numbers: &[u64],
412 ) -> Result<(), anyhow::Error> {
413 if block_numbers.is_empty() {
414 return Ok(());
415 }
416
417 let entries = self.load_missed_blocks_json(network_id).await?;
418
419 let block_set: std::collections::HashSet<u64> = block_numbers.iter().copied().collect();
420
421 let remaining: Vec<MissedBlockEntry> = entries
423 .into_iter()
424 .filter(|e| !block_set.contains(&e.block_number))
425 .collect();
426
427 self.save_missed_blocks_json(network_id, &remaining).await
428 }
429
430 async fn prune_old_missed_blocks(
431 &self,
432 network_id: &str,
433 max_block_age: u64,
434 current_block: u64,
435 ) -> Result<usize, anyhow::Error> {
436 let entries = self.load_missed_blocks_json(network_id).await?;
437 let original_count = entries.len();
438
439 let min_block = current_block.saturating_sub(max_block_age);
441
442 let remaining: Vec<MissedBlockEntry> = entries
444 .into_iter()
445 .filter(|e| e.block_number >= min_block)
446 .collect();
447
448 let pruned_count = original_count - remaining.len();
449
450 if pruned_count > 0 {
451 self.save_missed_blocks_json(network_id, &remaining).await?;
452 }
453
454 Ok(pruned_count)
455 }
456}
457
458impl FileBlockStorage {
459 async fn load_missed_blocks_json(
461 &self,
462 network_id: &str,
463 ) -> Result<Vec<MissedBlockEntry>, anyhow::Error> {
464 let json_path = self
465 .storage_path
466 .join(format!("{}_missed_blocks.json", network_id));
467 let txt_path = self
468 .storage_path
469 .join(format!("{}_missed_blocks.txt", network_id));
470
471 if json_path.exists() {
473 let content = tokio::fs::read_to_string(&json_path)
474 .await
475 .map_err(|e| anyhow::anyhow!("Failed to read missed blocks JSON: {}", e))?;
476
477 if content.trim().is_empty() {
478 return Ok(Vec::new());
479 }
480
481 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content)
482 .map_err(|e| anyhow::anyhow!("Failed to parse missed blocks JSON: {}", e))?;
483
484 return Ok(entries);
485 }
486
487 if txt_path.exists() {
489 let content = tokio::fs::read_to_string(&txt_path)
490 .await
491 .map_err(|e| anyhow::anyhow!("Failed to read missed blocks text file: {}", e))?;
492
493 let mut entries = Vec::new();
494 let mut seen_blocks = std::collections::HashSet::new();
495
496 for line in content.lines() {
497 let line = line.trim();
498 if line.is_empty() {
499 continue;
500 }
501 if let Ok(block_number) = line.parse::<u64>() {
502 if seen_blocks.insert(block_number) {
504 entries.push(MissedBlockEntry::new(block_number));
505 }
506 }
507 }
508
509 self.save_missed_blocks_json(network_id, &entries).await?;
511
512 if let Err(e) = tokio::fs::remove_file(&txt_path).await {
514 tracing::warn!(
515 "Failed to remove old missed blocks text file after migration: {}",
516 e
517 );
518 }
519
520 return Ok(entries);
521 }
522
523 Ok(Vec::new())
525 }
526
527 async fn save_missed_blocks_json(
529 &self,
530 network_id: &str,
531 entries: &[MissedBlockEntry],
532 ) -> Result<(), anyhow::Error> {
533 let json_path = self
534 .storage_path
535 .join(format!("{}_missed_blocks.json", network_id));
536
537 let json = serde_json::to_string_pretty(entries)
538 .map_err(|e| anyhow::anyhow!("Failed to serialize missed blocks: {}", e))?;
539
540 tokio::fs::write(json_path, json)
541 .await
542 .map_err(|e| anyhow::anyhow!("Failed to save missed blocks JSON: {}", e))?;
543
544 Ok(())
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551 use tempfile;
552
553 #[tokio::test]
554 async fn test_get_last_processed_block() {
555 let temp_dir = tempfile::tempdir().unwrap();
556 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
557
558 let existing_file = temp_dir.path().join("existing_last_block.txt");
560 tokio::fs::write(&existing_file, "100").await.unwrap();
561 let result = storage.get_last_processed_block("existing").await;
562 assert!(result.is_ok());
563 assert_eq!(result.unwrap(), Some(100));
564
565 let result = storage.get_last_processed_block("non_existent").await;
567 assert!(result.is_ok());
568 assert_eq!(result.unwrap(), None);
569
570 let invalid_file = temp_dir.path().join("invalid_last_block.txt");
572 tokio::fs::write(&invalid_file, "not a number")
573 .await
574 .unwrap();
575 let result = storage.get_last_processed_block("invalid").await;
576 assert!(result.is_err());
577 let err = result.unwrap_err();
578 assert!(err
579 .to_string()
580 .contains("Failed to parse last processed block"));
581 assert!(err.to_string().contains("invalid"));
582
583 let valid_file = temp_dir.path().join("valid_last_block.txt");
585 tokio::fs::write(&valid_file, "123").await.unwrap();
586 let result = storage.get_last_processed_block("valid").await;
587 assert_eq!(result.unwrap(), Some(123));
588 }
589
590 #[tokio::test]
591 async fn test_save_last_processed_block() {
592 let temp_dir = tempfile::tempdir().unwrap();
593 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
594
595 let result = storage.save_last_processed_block("test", 100).await;
597 assert!(result.is_ok());
598
599 let content = tokio::fs::read_to_string(temp_dir.path().join("test_last_block.txt"))
601 .await
602 .unwrap();
603 assert_eq!(content, "100");
604
605 #[cfg(unix)]
607 {
608 use std::os::unix::fs::PermissionsExt;
609 let readonly_dir = temp_dir.path().join("readonly");
610 tokio::fs::create_dir(&readonly_dir).await.unwrap();
611 let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
612 perms.set_mode(0o444); std::fs::set_permissions(&readonly_dir, perms).unwrap();
614
615 let readonly_storage = FileBlockStorage::new(readonly_dir);
616 let result = readonly_storage
617 .save_last_processed_block("test", 100)
618 .await;
619 assert!(result.is_err());
620 let err = result.unwrap_err();
621 assert!(err
622 .to_string()
623 .contains("Failed to save last processed block"));
624 assert!(err.to_string().contains("Permission denied"));
625 }
626 }
627
628 #[tokio::test]
629 async fn test_save_blocks() {
630 let temp_dir = tempfile::tempdir().unwrap();
631 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
632
633 let result = storage.save_blocks("test", &[]).await;
635 assert!(result.is_ok());
636
637 #[cfg(unix)]
639 {
640 use std::os::unix::fs::PermissionsExt;
641 let readonly_dir = temp_dir.path().join("readonly");
642 tokio::fs::create_dir(&readonly_dir).await.unwrap();
643 let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
644 perms.set_mode(0o444); std::fs::set_permissions(&readonly_dir, perms).unwrap();
646
647 let readonly_storage = FileBlockStorage::new(readonly_dir);
648 let result = readonly_storage.save_blocks("test", &[]).await;
649 assert!(result.is_err());
650 let err = result.unwrap_err();
651 assert!(err.to_string().contains("Failed to save blocks"));
652 assert!(err.to_string().contains("Permission denied"));
653 }
654 }
655
656 #[tokio::test]
657 async fn test_delete_blocks() {
658 let temp_dir = tempfile::tempdir().unwrap();
659 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
660
661 tokio::fs::write(temp_dir.path().join("test_blocks_1.json"), "[]")
663 .await
664 .unwrap();
665 tokio::fs::write(temp_dir.path().join("test_blocks_2.json"), "[]")
666 .await
667 .unwrap();
668
669 let result = storage.delete_blocks("test").await;
671 assert!(result.is_ok());
672
673 #[cfg(unix)]
675 {
676 use std::os::unix::fs::PermissionsExt;
677 let readonly_dir = temp_dir.path().join("readonly");
678 tokio::fs::create_dir(&readonly_dir).await.unwrap();
679
680 tokio::fs::write(readonly_dir.join("test_blocks_1.json"), "[]")
682 .await
683 .unwrap();
684
685 let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
687 perms.set_mode(0o555); std::fs::set_permissions(&readonly_dir, perms).unwrap();
689
690 let readonly_storage = FileBlockStorage::new(readonly_dir);
691 let result = readonly_storage.delete_blocks("test").await;
692 assert!(result.is_err());
693 let err = result.unwrap_err();
694 assert!(err.to_string().contains("Failed to delete blocks"));
695 assert!(err.to_string().contains("Permission denied"));
696 }
697 }
698
699 #[tokio::test]
700 async fn test_save_missed_blocks() {
701 let temp_dir = tempfile::tempdir().unwrap();
702 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
703
704 let result = storage.save_missed_blocks("test", &[100]).await;
706 assert!(result.is_ok());
707
708 let json_path = temp_dir.path().join("test_missed_blocks.json");
710 assert!(json_path.exists());
711 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
712 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
713 assert_eq!(entries.len(), 1);
714 assert_eq!(entries[0].block_number, 100);
715 assert_eq!(entries[0].status, MissedBlockStatus::Pending);
716
717 let result = storage.save_missed_blocks("test", &[101, 102, 103]).await;
719 assert!(result.is_ok());
720
721 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
722 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
723 assert_eq!(entries.len(), 4);
724
725 let result = storage.save_missed_blocks("test", &[100]).await;
727 assert!(result.is_ok());
728
729 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
730 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
731 assert_eq!(entries.len(), 4); let result = storage.save_missed_blocks("test", &[]).await;
735 assert!(result.is_ok());
736
737 #[cfg(unix)]
739 {
740 use std::os::unix::fs::PermissionsExt;
741 let readonly_dir = temp_dir.path().join("readonly");
742 tokio::fs::create_dir(&readonly_dir).await.unwrap();
743 let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
744 perms.set_mode(0o444); std::fs::set_permissions(&readonly_dir, perms).unwrap();
746
747 let readonly_storage = FileBlockStorage::new(readonly_dir);
748 let result = readonly_storage.save_missed_blocks("test", &[100]).await;
749 assert!(result.is_err());
750 }
751 }
752
753 #[tokio::test]
754 async fn test_migration_from_text_to_json() {
755 let temp_dir = tempfile::tempdir().unwrap();
756 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
757
758 let txt_path = temp_dir.path().join("test_missed_blocks.txt");
760 tokio::fs::write(&txt_path, "100\n101\n102\n100\n") .await
762 .unwrap();
763
764 let result = storage
766 .get_missed_blocks("test", 1000, 1000, 3)
767 .await
768 .unwrap();
769
770 assert_eq!(result.len(), 3);
772
773 let json_path = temp_dir.path().join("test_missed_blocks.json");
775 assert!(json_path.exists());
776
777 assert!(!txt_path.exists());
779
780 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
782 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
783 assert_eq!(entries.len(), 3);
784 }
785
786 #[tokio::test]
787 async fn test_get_missed_blocks() {
788 let temp_dir = tempfile::tempdir().unwrap();
789 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
790
791 storage
793 .save_missed_blocks("test", &[100, 200, 300, 400, 500])
794 .await
795 .unwrap();
796
797 let result = storage
800 .get_missed_blocks("test", 200, 500, 3)
801 .await
802 .unwrap();
803
804 assert_eq!(result.len(), 3);
805 let block_numbers: Vec<u64> = result.iter().map(|e| e.block_number).collect();
806 assert!(block_numbers.contains(&300));
807 assert!(block_numbers.contains(&400));
808 assert!(block_numbers.contains(&500));
809 }
810
811 #[tokio::test]
812 async fn test_update_missed_block_status() {
813 let temp_dir = tempfile::tempdir().unwrap();
814 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
815
816 storage.save_missed_blocks("test", &[100]).await.unwrap();
818
819 storage
821 .update_missed_block_status("test", 100, MissedBlockStatus::Recovering, None)
822 .await
823 .unwrap();
824
825 let json_path = temp_dir.path().join("test_missed_blocks.json");
827 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
828 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
829 assert_eq!(entries[0].status, MissedBlockStatus::Recovering);
830 assert!(entries[0].last_attempt_at.is_some());
831
832 storage
834 .update_missed_block_status(
835 "test",
836 100,
837 MissedBlockStatus::Failed,
838 Some("RPC error".to_string()),
839 )
840 .await
841 .unwrap();
842
843 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
844 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
845 assert_eq!(entries[0].status, MissedBlockStatus::Failed);
846 assert_eq!(entries[0].last_error, Some("RPC error".to_string()));
847 assert_eq!(entries[0].retry_count, 1); }
849
850 #[tokio::test]
851 async fn test_remove_recovered_blocks() {
852 let temp_dir = tempfile::tempdir().unwrap();
853 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
854
855 storage
857 .save_missed_blocks("test", &[100, 101, 102, 103, 104])
858 .await
859 .unwrap();
860
861 storage
863 .remove_recovered_blocks("test", &[101, 103])
864 .await
865 .unwrap();
866
867 let json_path = temp_dir.path().join("test_missed_blocks.json");
869 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
870 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
871
872 assert_eq!(entries.len(), 3);
873 let block_numbers: Vec<u64> = entries.iter().map(|e| e.block_number).collect();
874 assert!(block_numbers.contains(&100));
875 assert!(block_numbers.contains(&102));
876 assert!(block_numbers.contains(&104));
877 assert!(!block_numbers.contains(&101));
878 assert!(!block_numbers.contains(&103));
879 }
880
881 #[tokio::test]
882 async fn test_prune_old_missed_blocks() {
883 let temp_dir = tempfile::tempdir().unwrap();
884 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
885
886 storage
888 .save_missed_blocks("test", &[100, 200, 300, 400, 500])
889 .await
890 .unwrap();
891
892 let pruned = storage
895 .prune_old_missed_blocks("test", 150, 500)
896 .await
897 .unwrap();
898
899 assert_eq!(pruned, 3); let json_path = temp_dir.path().join("test_missed_blocks.json");
903 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
904 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
905
906 assert_eq!(entries.len(), 2);
907 let block_numbers: Vec<u64> = entries.iter().map(|e| e.block_number).collect();
908 assert!(block_numbers.contains(&400));
909 assert!(block_numbers.contains(&500));
910 }
911
912 #[tokio::test]
913 async fn test_get_missed_blocks_empty() {
914 let temp_dir = tempfile::tempdir().unwrap();
915 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
916
917 let result = storage
919 .get_missed_blocks("nonexistent", 1000, 1000, 3)
920 .await
921 .unwrap();
922 assert!(result.is_empty());
923 }
924
925 #[tokio::test]
926 async fn test_get_missed_blocks_filters_by_status() {
927 let temp_dir = tempfile::tempdir().unwrap();
928 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
929
930 storage
932 .save_missed_blocks("test", &[100, 101, 102])
933 .await
934 .unwrap();
935
936 storage
938 .update_missed_block_status("test", 101, MissedBlockStatus::Failed, None)
939 .await
940 .unwrap();
941
942 let result = storage
944 .get_missed_blocks("test", 1000, 1000, 3)
945 .await
946 .unwrap();
947
948 assert_eq!(result.len(), 2);
949 for entry in &result {
950 assert_eq!(entry.status, MissedBlockStatus::Pending);
951 }
952 }
953
954 #[tokio::test]
955 async fn test_load_empty_json_file() {
956 let temp_dir = tempfile::tempdir().unwrap();
957 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
958
959 let json_path = temp_dir.path().join("test_missed_blocks.json");
961 tokio::fs::write(&json_path, "").await.unwrap();
962
963 let result = storage
965 .get_missed_blocks("test", 1000, 1000, 3)
966 .await
967 .unwrap();
968 assert!(result.is_empty());
969 }
970
971 #[tokio::test]
972 async fn test_load_whitespace_only_json_file() {
973 let temp_dir = tempfile::tempdir().unwrap();
974 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
975
976 let json_path = temp_dir.path().join("test_missed_blocks.json");
978 tokio::fs::write(&json_path, " \n\t ").await.unwrap();
979
980 let result = storage
982 .get_missed_blocks("test", 1000, 1000, 3)
983 .await
984 .unwrap();
985 assert!(result.is_empty());
986 }
987
988 #[tokio::test]
989 async fn test_migration_with_empty_lines() {
990 let temp_dir = tempfile::tempdir().unwrap();
991 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
992
993 let txt_path = temp_dir.path().join("test_missed_blocks.txt");
995 tokio::fs::write(&txt_path, "\n100\n\n101\n \n102\n")
996 .await
997 .unwrap();
998
999 let result = storage
1001 .get_missed_blocks("test", 1000, 1000, 3)
1002 .await
1003 .unwrap();
1004
1005 assert_eq!(result.len(), 3);
1007
1008 let json_path = temp_dir.path().join("test_missed_blocks.json");
1010 assert!(json_path.exists());
1011
1012 assert!(!txt_path.exists());
1014 }
1015
1016 #[tokio::test]
1017 async fn test_migration_handles_invalid_numbers() {
1018 let temp_dir = tempfile::tempdir().unwrap();
1019 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1020
1021 let txt_path = temp_dir.path().join("test_missed_blocks.txt");
1023 tokio::fs::write(&txt_path, "100\nnot_a_number\n101\nabc\n102\n")
1024 .await
1025 .unwrap();
1026
1027 let result = storage
1029 .get_missed_blocks("test", 1000, 1000, 3)
1030 .await
1031 .unwrap();
1032
1033 assert_eq!(result.len(), 3);
1035 let block_numbers: Vec<u64> = result.iter().map(|e| e.block_number).collect();
1036 assert!(block_numbers.contains(&100));
1037 assert!(block_numbers.contains(&101));
1038 assert!(block_numbers.contains(&102));
1039 }
1040
1041 #[tokio::test]
1042 async fn test_update_status_block_not_found() {
1043 let temp_dir = tempfile::tempdir().unwrap();
1044 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1045
1046 storage.save_missed_blocks("test", &[100]).await.unwrap();
1048
1049 let result = storage
1051 .update_missed_block_status("test", 999, MissedBlockStatus::Recovered, None)
1052 .await;
1053 assert!(result.is_ok());
1054
1055 let entries = storage
1057 .get_missed_blocks("test", 1000, 1000, 3)
1058 .await
1059 .unwrap();
1060 assert_eq!(entries.len(), 1);
1061 assert_eq!(entries[0].block_number, 100);
1062 assert_eq!(entries[0].status, MissedBlockStatus::Pending);
1063 }
1064
1065 #[tokio::test]
1066 async fn test_update_status_preserves_existing_error() {
1067 let temp_dir = tempfile::tempdir().unwrap();
1068 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1069
1070 storage.save_missed_blocks("test", &[100]).await.unwrap();
1072
1073 storage
1075 .update_missed_block_status(
1076 "test",
1077 100,
1078 MissedBlockStatus::Failed,
1079 Some("First error".to_string()),
1080 )
1081 .await
1082 .unwrap();
1083
1084 storage
1086 .update_missed_block_status("test", 100, MissedBlockStatus::Pending, None)
1087 .await
1088 .unwrap();
1089
1090 let json_path = temp_dir.path().join("test_missed_blocks.json");
1092 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
1093 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
1094 assert_eq!(entries[0].last_error, Some("First error".to_string()));
1095 }
1096
1097 #[tokio::test]
1098 async fn test_remove_recovered_blocks_empty_list() {
1099 let temp_dir = tempfile::tempdir().unwrap();
1100 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1101
1102 storage
1104 .save_missed_blocks("test", &[100, 101, 102])
1105 .await
1106 .unwrap();
1107
1108 let result = storage.remove_recovered_blocks("test", &[]).await;
1110 assert!(result.is_ok());
1111
1112 let entries = storage
1114 .get_missed_blocks("test", 1000, 1000, 3)
1115 .await
1116 .unwrap();
1117 assert_eq!(entries.len(), 3);
1118 }
1119
1120 #[tokio::test]
1121 async fn test_prune_with_no_blocks_to_prune() {
1122 let temp_dir = tempfile::tempdir().unwrap();
1123 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1124
1125 storage
1127 .save_missed_blocks("test", &[900, 950, 1000])
1128 .await
1129 .unwrap();
1130
1131 let pruned = storage
1133 .prune_old_missed_blocks("test", 500, 1000)
1134 .await
1135 .unwrap();
1136
1137 assert_eq!(pruned, 0);
1138
1139 let entries = storage
1141 .get_missed_blocks("test", 1000, 1000, 3)
1142 .await
1143 .unwrap();
1144 assert_eq!(entries.len(), 3);
1145 }
1146
1147 #[tokio::test]
1148 async fn test_prune_empty_storage() {
1149 let temp_dir = tempfile::tempdir().unwrap();
1150 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1151
1152 let pruned = storage
1154 .prune_old_missed_blocks("test", 100, 1000)
1155 .await
1156 .unwrap();
1157
1158 assert_eq!(pruned, 0);
1159 }
1160
1161 #[tokio::test]
1162 async fn test_get_missed_blocks_filters_by_max_retries() {
1163 let temp_dir = tempfile::tempdir().unwrap();
1164 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1165
1166 storage
1168 .save_missed_blocks("test", &[100, 101, 102])
1169 .await
1170 .unwrap();
1171
1172 for _ in 0..3 {
1174 storage
1175 .update_missed_block_status("test", 100, MissedBlockStatus::Pending, None)
1176 .await
1177 .unwrap();
1178 }
1179
1180 let result = storage
1182 .get_missed_blocks("test", 1000, 1000, 3)
1183 .await
1184 .unwrap();
1185
1186 assert_eq!(result.len(), 2);
1188 let block_numbers: Vec<u64> = result.iter().map(|e| e.block_number).collect();
1189 assert!(!block_numbers.contains(&100));
1190 assert!(block_numbers.contains(&101));
1191 assert!(block_numbers.contains(&102));
1192 }
1193
1194 #[tokio::test]
1195 async fn test_missed_block_entry_new() {
1196 let entry = MissedBlockEntry::new(12345);
1197 assert_eq!(entry.block_number, 12345);
1198 assert_eq!(entry.retry_count, 0);
1199 assert_eq!(entry.status, MissedBlockStatus::Pending);
1200 assert!(entry.last_attempt_at.is_none());
1201 assert!(entry.last_error.is_none());
1202 assert!(entry.first_missed_at > 0);
1204 }
1205
1206 #[tokio::test]
1207 async fn test_file_block_storage_default() {
1208 let storage = FileBlockStorage::default();
1209 assert_eq!(storage.storage_path, std::path::PathBuf::from("data"));
1211 }
1212
1213 #[tokio::test]
1214 async fn test_update_status_recovering_does_not_increment_retry() {
1215 let temp_dir = tempfile::tempdir().unwrap();
1216 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1217
1218 storage.save_missed_blocks("test", &[100]).await.unwrap();
1220
1221 storage
1223 .update_missed_block_status("test", 100, MissedBlockStatus::Recovering, None)
1224 .await
1225 .unwrap();
1226
1227 let json_path = temp_dir.path().join("test_missed_blocks.json");
1228 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
1229 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
1230 assert_eq!(entries[0].retry_count, 0); assert_eq!(entries[0].status, MissedBlockStatus::Recovering);
1232 }
1233
1234 #[tokio::test]
1235 async fn test_update_status_recovered_does_not_increment_retry() {
1236 let temp_dir = tempfile::tempdir().unwrap();
1237 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1238
1239 storage.save_missed_blocks("test", &[100]).await.unwrap();
1241
1242 storage
1244 .update_missed_block_status("test", 100, MissedBlockStatus::Recovered, None)
1245 .await
1246 .unwrap();
1247
1248 let json_path = temp_dir.path().join("test_missed_blocks.json");
1249 let content = tokio::fs::read_to_string(&json_path).await.unwrap();
1250 let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
1251 assert_eq!(entries[0].retry_count, 0); assert_eq!(entries[0].status, MissedBlockStatus::Recovered);
1253 }
1254}