openzeppelin_monitor/services/blockwatcher/
storage.rs

1//! Block storage implementations for the block watcher service.
2//!
3//! This module provides storage interfaces and implementations for persisting
4//! blockchain blocks and tracking processing state. Currently supports:
5//! - File-based storage with JSON serialization
6//! - Last processed block tracking
7//! - Block deletion for cleanup
8//! - Missed block tracking and recovery
9
10use async_trait::async_trait;
11use chrono::Utc;
12use glob::glob;
13use serde::{Deserialize, Serialize};
14use std::path::PathBuf;
15
16use crate::models::BlockType;
17
18/// Status of a missed block entry
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub enum MissedBlockStatus {
21	/// Block is pending recovery
22	Pending,
23	/// Block recovery is in progress
24	Recovering,
25	/// Block was successfully recovered
26	Recovered,
27	/// Block recovery failed after max retries
28	Failed,
29}
30
31/// Entry tracking a missed block with recovery metadata
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct MissedBlockEntry {
34	/// The block number that was missed
35	pub block_number: u64,
36	/// Unix timestamp (seconds) when this block was first detected as missed
37	pub first_missed_at: i64,
38	/// Number of recovery attempts made
39	pub retry_count: u32,
40	/// Current status of the missed block
41	pub status: MissedBlockStatus,
42	/// Unix timestamp (seconds) of the last recovery attempt
43	pub last_attempt_at: Option<i64>,
44	/// Error message from the last failed attempt
45	pub last_error: Option<String>,
46}
47
48impl MissedBlockEntry {
49	/// Creates a new missed block entry
50	pub fn new(block_number: u64) -> Self {
51		Self {
52			block_number,
53			first_missed_at: Utc::now().timestamp(),
54			retry_count: 0,
55			status: MissedBlockStatus::Pending,
56			last_attempt_at: None,
57			last_error: None,
58		}
59	}
60}
61
62/// Interface for block storage implementations
63///
64/// Defines the required functionality for storing and retrieving blocks
65/// and tracking the last processed block for each network.
66#[async_trait]
67pub trait BlockStorage: Clone + Send + Sync {
68	/// Retrieves the last processed block number for a network
69	///
70	/// # Arguments
71	/// * `network_id` - Unique identifier for the network
72	///
73	/// # Returns
74	/// * `Result<Option<u64>, anyhow::Error>` - Last processed block number or None if not found
75	async fn get_last_processed_block(
76		&self,
77		network_id: &str,
78	) -> Result<Option<u64>, anyhow::Error>;
79
80	/// Saves the last processed block number for a network
81	///
82	/// # Arguments
83	/// * `network_id` - Unique identifier for the network
84	/// * `block` - Block number to save
85	///
86	/// # Returns
87	/// * `Result<(), anyhow::Error>` - Success or error
88	async fn save_last_processed_block(
89		&self,
90		network_id: &str,
91		block: u64,
92	) -> Result<(), anyhow::Error>;
93
94	/// Saves a collection of blocks for a network
95	///
96	/// # Arguments
97	/// * `network_id` - Unique identifier for the network
98	/// * `blocks` - Collection of blocks to save
99	///
100	/// # Returns
101	/// * `Result<(), anyhow::Error>` - Success or error
102	async fn save_blocks(
103		&self,
104		network_id: &str,
105		blocks: &[BlockType],
106	) -> Result<(), anyhow::Error>;
107
108	/// Deletes all stored blocks for a network
109	///
110	/// # Arguments
111	/// * `network_id` - Unique identifier for the network
112	///
113	/// # Returns
114	/// * `Result<(), anyhow::Error>` - Success or error
115	async fn delete_blocks(&self, network_id: &str) -> Result<(), anyhow::Error>;
116
117	/// Saves multiple missed blocks for a network in a single operation
118	///
119	/// # Arguments
120	/// * `network_id` - Unique identifier for the network
121	/// * `blocks` - Slice of block numbers to save
122	///
123	/// # Returns
124	/// * `Result<(), anyhow::Error>` - Success or error
125	async fn save_missed_blocks(
126		&self,
127		network_id: &str,
128		blocks: &[u64],
129	) -> Result<(), anyhow::Error>;
130
131	/// Retrieves missed blocks eligible for recovery
132	///
133	/// Returns blocks that are within the max_block_age range and have
134	/// status Pending with retry_count below max_retries.
135	///
136	/// # Arguments
137	/// * `network_id` - Unique identifier for the network
138	/// * `max_block_age` - Maximum age in blocks from current_block
139	/// * `current_block` - The current block number
140	/// * `max_retries` - Maximum retry attempts; blocks with retry_count >= max_retries are excluded
141	///
142	/// # Returns
143	/// * `Result<Vec<MissedBlockEntry>, anyhow::Error>` - Eligible missed blocks
144	async fn get_missed_blocks(
145		&self,
146		network_id: &str,
147		max_block_age: u64,
148		current_block: u64,
149		max_retries: u32,
150	) -> Result<Vec<MissedBlockEntry>, anyhow::Error>;
151
152	/// Updates the status of a missed block
153	///
154	/// # Arguments
155	/// * `network_id` - Unique identifier for the network
156	/// * `block_number` - Block number to update
157	/// * `status` - New status for the block
158	/// * `error` - Optional error message (for failed status)
159	///
160	/// # Returns
161	/// * `Result<(), anyhow::Error>` - Success or error
162	async fn update_missed_block_status(
163		&self,
164		network_id: &str,
165		block_number: u64,
166		status: MissedBlockStatus,
167		error: Option<String>,
168	) -> Result<(), anyhow::Error>;
169
170	/// Removes recovered blocks from storage
171	///
172	/// # Arguments
173	/// * `network_id` - Unique identifier for the network
174	/// * `block_numbers` - Block numbers to remove
175	///
176	/// # Returns
177	/// * `Result<(), anyhow::Error>` - Success or error
178	async fn remove_recovered_blocks(
179		&self,
180		network_id: &str,
181		block_numbers: &[u64],
182	) -> Result<(), anyhow::Error>;
183
184	/// Prunes missed blocks older than max_block_age
185	///
186	/// # Arguments
187	/// * `network_id` - Unique identifier for the network
188	/// * `max_block_age` - Maximum age in blocks
189	/// * `current_block` - The current block number
190	///
191	/// # Returns
192	/// * `Result<usize, anyhow::Error>` - Number of pruned blocks
193	async fn prune_old_missed_blocks(
194		&self,
195		network_id: &str,
196		max_block_age: u64,
197		current_block: u64,
198	) -> Result<usize, anyhow::Error>;
199}
200
201/// File-based implementation of block storage
202///
203/// Stores blocks and processing state in JSON files within a configured
204/// directory structure.
205#[derive(Clone)]
206pub struct FileBlockStorage {
207	/// Base path for all storage files
208	storage_path: PathBuf,
209}
210
211impl FileBlockStorage {
212	/// Creates a new file-based block storage instance
213	///
214	/// Initializes storage with the provided path
215	pub fn new(storage_path: PathBuf) -> Self {
216		FileBlockStorage { storage_path }
217	}
218}
219
220impl Default for FileBlockStorage {
221	/// Default implementation for FileBlockStorage
222	///
223	/// Initializes storage with the default path "data"
224	fn default() -> Self {
225		FileBlockStorage::new(PathBuf::from("data"))
226	}
227}
228
229#[async_trait]
230impl BlockStorage for FileBlockStorage {
231	/// Retrieves the last processed block from a network-specific file
232	///
233	/// The file is named "{network_id}_last_block.txt"
234	async fn get_last_processed_block(
235		&self,
236		network_id: &str,
237	) -> Result<Option<u64>, anyhow::Error> {
238		let file_path = self
239			.storage_path
240			.join(format!("{}_last_block.txt", network_id));
241
242		if !file_path.exists() {
243			return Ok(None);
244		}
245
246		let content = tokio::fs::read_to_string(file_path)
247			.await
248			.map_err(|e| anyhow::anyhow!("Failed to read last processed block: {}", e))?;
249		let block_number = content
250			.trim()
251			.parse::<u64>()
252			.map_err(|e| anyhow::anyhow!("Failed to parse last processed block: {}", e))?;
253		Ok(Some(block_number))
254	}
255
256	/// Saves the last processed block to a network-specific file
257	///
258	/// # Note
259	/// Overwrites any existing last block file for the network
260	async fn save_last_processed_block(
261		&self,
262		network_id: &str,
263		block: u64,
264	) -> Result<(), anyhow::Error> {
265		let file_path = self
266			.storage_path
267			.join(format!("{}_last_block.txt", network_id));
268		tokio::fs::write(file_path, block.to_string())
269			.await
270			.map_err(|e| anyhow::anyhow!("Failed to save last processed block: {}", e))?;
271		Ok(())
272	}
273
274	/// Saves blocks to a timestamped JSON file
275	///
276	/// # Note
277	/// Creates a new file for each save operation, named:
278	/// "{network_id}_blocks_{timestamp}.json"
279	async fn save_blocks(
280		&self,
281		network_slug: &str,
282		blocks: &[BlockType],
283	) -> Result<(), anyhow::Error> {
284		let file_path = self.storage_path.join(format!(
285			"{}_blocks_{}.json",
286			network_slug,
287			chrono::Utc::now().timestamp()
288		));
289		let json = serde_json::to_string(blocks)
290			.map_err(|e| anyhow::anyhow!("Failed to serialize blocks: {}", e))?;
291		tokio::fs::write(file_path, json)
292			.await
293			.map_err(|e| anyhow::anyhow!("Failed to save blocks: {}", e))?;
294		Ok(())
295	}
296
297	/// Deletes all block files for a network
298	///
299	/// # Note
300	/// Uses glob pattern matching to find and delete all files matching:
301	/// "{network_id}_blocks_*.json"
302	async fn delete_blocks(&self, network_slug: &str) -> Result<(), anyhow::Error> {
303		let pattern = self
304			.storage_path
305			.join(format!("{}_blocks_*.json", network_slug))
306			.to_string_lossy()
307			.to_string();
308
309		for entry in glob(&pattern)
310			.map_err(|e| anyhow::anyhow!("Failed to parse blocks: {}", e))?
311			.flatten()
312		{
313			tokio::fs::remove_file(entry)
314				.await
315				.map_err(|e| anyhow::anyhow!("Failed to delete blocks: {}", e))?;
316		}
317		Ok(())
318	}
319
320	/// Saves multiple missed blocks for a network in a single operation
321	///
322	/// This method saves new missed blocks to the JSON file. It first loads
323	/// existing entries, adds the new blocks (deduplicating), and saves back.
324	///
325	/// # Arguments
326	/// * `network_id` - Unique identifier for the network
327	/// * `blocks` - Slice of block numbers to save
328	///
329	/// # Returns
330	/// * `Result<(), anyhow::Error>` - Success or error
331	async fn save_missed_blocks(
332		&self,
333		network_id: &str,
334		blocks: &[u64],
335	) -> Result<(), anyhow::Error> {
336		if blocks.is_empty() {
337			return Ok(());
338		}
339
340		// Load existing entries (with migration if needed)
341		let mut entries = self.load_missed_blocks_json(network_id).await?;
342
343		// Create a set of existing block numbers for deduplication
344		let existing_blocks: std::collections::HashSet<u64> =
345			entries.iter().map(|e| e.block_number).collect();
346
347		// Add new blocks that don't already exist
348		for &block_number in blocks {
349			if !existing_blocks.contains(&block_number) {
350				entries.push(MissedBlockEntry::new(block_number));
351			}
352		}
353
354		// Save back to JSON
355		self.save_missed_blocks_json(network_id, &entries).await
356	}
357
358	async fn get_missed_blocks(
359		&self,
360		network_id: &str,
361		max_block_age: u64,
362		current_block: u64,
363		max_retries: u32,
364	) -> Result<Vec<MissedBlockEntry>, anyhow::Error> {
365		let entries = self.load_missed_blocks_json(network_id).await?;
366
367		// Calculate the minimum block number we'll consider
368		let min_block = current_block.saturating_sub(max_block_age);
369
370		// Filter to blocks within age range, with Pending status, and below max retries
371		let eligible: Vec<MissedBlockEntry> = entries
372			.into_iter()
373			.filter(|e| {
374				e.block_number >= min_block
375					&& e.status == MissedBlockStatus::Pending
376					&& e.retry_count < max_retries
377			})
378			.collect();
379
380		Ok(eligible)
381	}
382
383	async fn update_missed_block_status(
384		&self,
385		network_id: &str,
386		block_number: u64,
387		status: MissedBlockStatus,
388		error: Option<String>,
389	) -> Result<(), anyhow::Error> {
390		let mut entries = self.load_missed_blocks_json(network_id).await?;
391
392		// Find and update the entry
393		if let Some(entry) = entries.iter_mut().find(|e| e.block_number == block_number) {
394			// Increment retry count when status transitions to Pending (retry) or Failed (gave up)
395			if status == MissedBlockStatus::Pending || status == MissedBlockStatus::Failed {
396				entry.retry_count += 1;
397			}
398			entry.status = status;
399			entry.last_attempt_at = Some(Utc::now().timestamp());
400			if error.is_some() {
401				entry.last_error = error;
402			}
403		}
404
405		self.save_missed_blocks_json(network_id, &entries).await
406	}
407
408	async fn remove_recovered_blocks(
409		&self,
410		network_id: &str,
411		block_numbers: &[u64],
412	) -> Result<(), anyhow::Error> {
413		if block_numbers.is_empty() {
414			return Ok(());
415		}
416
417		let entries = self.load_missed_blocks_json(network_id).await?;
418
419		let block_set: std::collections::HashSet<u64> = block_numbers.iter().copied().collect();
420
421		// Filter out the recovered blocks
422		let remaining: Vec<MissedBlockEntry> = entries
423			.into_iter()
424			.filter(|e| !block_set.contains(&e.block_number))
425			.collect();
426
427		self.save_missed_blocks_json(network_id, &remaining).await
428	}
429
430	async fn prune_old_missed_blocks(
431		&self,
432		network_id: &str,
433		max_block_age: u64,
434		current_block: u64,
435	) -> Result<usize, anyhow::Error> {
436		let entries = self.load_missed_blocks_json(network_id).await?;
437		let original_count = entries.len();
438
439		// Calculate the minimum block number to keep
440		let min_block = current_block.saturating_sub(max_block_age);
441
442		// Filter to keep only blocks within the age range
443		let remaining: Vec<MissedBlockEntry> = entries
444			.into_iter()
445			.filter(|e| e.block_number >= min_block)
446			.collect();
447
448		let pruned_count = original_count - remaining.len();
449
450		if pruned_count > 0 {
451			self.save_missed_blocks_json(network_id, &remaining).await?;
452		}
453
454		Ok(pruned_count)
455	}
456}
457
458impl FileBlockStorage {
459	/// Loads missed blocks from JSON file, migrating from text format if needed
460	async fn load_missed_blocks_json(
461		&self,
462		network_id: &str,
463	) -> Result<Vec<MissedBlockEntry>, anyhow::Error> {
464		let json_path = self
465			.storage_path
466			.join(format!("{}_missed_blocks.json", network_id));
467		let txt_path = self
468			.storage_path
469			.join(format!("{}_missed_blocks.txt", network_id));
470
471		// Check if JSON file exists
472		if json_path.exists() {
473			let content = tokio::fs::read_to_string(&json_path)
474				.await
475				.map_err(|e| anyhow::anyhow!("Failed to read missed blocks JSON: {}", e))?;
476
477			if content.trim().is_empty() {
478				return Ok(Vec::new());
479			}
480
481			let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content)
482				.map_err(|e| anyhow::anyhow!("Failed to parse missed blocks JSON: {}", e))?;
483
484			return Ok(entries);
485		}
486
487		// Check if old text format exists and migrate
488		if txt_path.exists() {
489			let content = tokio::fs::read_to_string(&txt_path)
490				.await
491				.map_err(|e| anyhow::anyhow!("Failed to read missed blocks text file: {}", e))?;
492
493			let mut entries = Vec::new();
494			let mut seen_blocks = std::collections::HashSet::new();
495
496			for line in content.lines() {
497				let line = line.trim();
498				if line.is_empty() {
499					continue;
500				}
501				if let Ok(block_number) = line.parse::<u64>() {
502					// Deduplicate during migration
503					if seen_blocks.insert(block_number) {
504						entries.push(MissedBlockEntry::new(block_number));
505					}
506				}
507			}
508
509			// Save to new JSON format
510			self.save_missed_blocks_json(network_id, &entries).await?;
511
512			// Remove old text file after successful migration
513			if let Err(e) = tokio::fs::remove_file(&txt_path).await {
514				tracing::warn!(
515					"Failed to remove old missed blocks text file after migration: {}",
516					e
517				);
518			}
519
520			return Ok(entries);
521		}
522
523		// No file exists, return empty
524		Ok(Vec::new())
525	}
526
527	/// Saves missed blocks to JSON file
528	async fn save_missed_blocks_json(
529		&self,
530		network_id: &str,
531		entries: &[MissedBlockEntry],
532	) -> Result<(), anyhow::Error> {
533		let json_path = self
534			.storage_path
535			.join(format!("{}_missed_blocks.json", network_id));
536
537		let json = serde_json::to_string_pretty(entries)
538			.map_err(|e| anyhow::anyhow!("Failed to serialize missed blocks: {}", e))?;
539
540		tokio::fs::write(json_path, json)
541			.await
542			.map_err(|e| anyhow::anyhow!("Failed to save missed blocks JSON: {}", e))?;
543
544		Ok(())
545	}
546}
547
548#[cfg(test)]
549mod tests {
550	use super::*;
551	use tempfile;
552
553	#[tokio::test]
554	async fn test_get_last_processed_block() {
555		let temp_dir = tempfile::tempdir().unwrap();
556		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
557
558		// Test 1: existing file
559		let existing_file = temp_dir.path().join("existing_last_block.txt");
560		tokio::fs::write(&existing_file, "100").await.unwrap();
561		let result = storage.get_last_processed_block("existing").await;
562		assert!(result.is_ok());
563		assert_eq!(result.unwrap(), Some(100));
564
565		// Test 2: Non-existent file
566		let result = storage.get_last_processed_block("non_existent").await;
567		assert!(result.is_ok());
568		assert_eq!(result.unwrap(), None);
569
570		// Test 3: Invalid content (not a number)
571		let invalid_file = temp_dir.path().join("invalid_last_block.txt");
572		tokio::fs::write(&invalid_file, "not a number")
573			.await
574			.unwrap();
575		let result = storage.get_last_processed_block("invalid").await;
576		assert!(result.is_err());
577		let err = result.unwrap_err();
578		assert!(err
579			.to_string()
580			.contains("Failed to parse last processed block"));
581		assert!(err.to_string().contains("invalid"));
582
583		// Test 4: Valid block number
584		let valid_file = temp_dir.path().join("valid_last_block.txt");
585		tokio::fs::write(&valid_file, "123").await.unwrap();
586		let result = storage.get_last_processed_block("valid").await;
587		assert_eq!(result.unwrap(), Some(123));
588	}
589
590	#[tokio::test]
591	async fn test_save_last_processed_block() {
592		let temp_dir = tempfile::tempdir().unwrap();
593		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
594
595		// Test 1: Normal save
596		let result = storage.save_last_processed_block("test", 100).await;
597		assert!(result.is_ok());
598
599		// Verify the content
600		let content = tokio::fs::read_to_string(temp_dir.path().join("test_last_block.txt"))
601			.await
602			.unwrap();
603		assert_eq!(content, "100");
604
605		// Test 2: Save with invalid path (create a readonly directory)
606		#[cfg(unix)]
607		{
608			use std::os::unix::fs::PermissionsExt;
609			let readonly_dir = temp_dir.path().join("readonly");
610			tokio::fs::create_dir(&readonly_dir).await.unwrap();
611			let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
612			perms.set_mode(0o444); // Read-only
613			std::fs::set_permissions(&readonly_dir, perms).unwrap();
614
615			let readonly_storage = FileBlockStorage::new(readonly_dir);
616			let result = readonly_storage
617				.save_last_processed_block("test", 100)
618				.await;
619			assert!(result.is_err());
620			let err = result.unwrap_err();
621			assert!(err
622				.to_string()
623				.contains("Failed to save last processed block"));
624			assert!(err.to_string().contains("Permission denied"));
625		}
626	}
627
628	#[tokio::test]
629	async fn test_save_blocks() {
630		let temp_dir = tempfile::tempdir().unwrap();
631		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
632
633		// Test 1: Save empty blocks array
634		let result = storage.save_blocks("test", &[]).await;
635		assert!(result.is_ok());
636
637		// Test 2: Save with invalid path
638		#[cfg(unix)]
639		{
640			use std::os::unix::fs::PermissionsExt;
641			let readonly_dir = temp_dir.path().join("readonly");
642			tokio::fs::create_dir(&readonly_dir).await.unwrap();
643			let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
644			perms.set_mode(0o444); // Read-only
645			std::fs::set_permissions(&readonly_dir, perms).unwrap();
646
647			let readonly_storage = FileBlockStorage::new(readonly_dir);
648			let result = readonly_storage.save_blocks("test", &[]).await;
649			assert!(result.is_err());
650			let err = result.unwrap_err();
651			assert!(err.to_string().contains("Failed to save blocks"));
652			assert!(err.to_string().contains("Permission denied"));
653		}
654	}
655
656	#[tokio::test]
657	async fn test_delete_blocks() {
658		let temp_dir = tempfile::tempdir().unwrap();
659		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
660
661		// Create some test block files
662		tokio::fs::write(temp_dir.path().join("test_blocks_1.json"), "[]")
663			.await
664			.unwrap();
665		tokio::fs::write(temp_dir.path().join("test_blocks_2.json"), "[]")
666			.await
667			.unwrap();
668
669		// Test 1: Normal delete
670		let result = storage.delete_blocks("test").await;
671		assert!(result.is_ok());
672
673		// Test 2: Delete with invalid path
674		#[cfg(unix)]
675		{
676			use std::os::unix::fs::PermissionsExt;
677			let readonly_dir = temp_dir.path().join("readonly");
678			tokio::fs::create_dir(&readonly_dir).await.unwrap();
679
680			// Create test files first
681			tokio::fs::write(readonly_dir.join("test_blocks_1.json"), "[]")
682				.await
683				.unwrap();
684
685			// Then make directory readonly
686			let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
687			perms.set_mode(0o555); // Read-only directory with execute permission
688			std::fs::set_permissions(&readonly_dir, perms).unwrap();
689
690			let readonly_storage = FileBlockStorage::new(readonly_dir);
691			let result = readonly_storage.delete_blocks("test").await;
692			assert!(result.is_err());
693			let err = result.unwrap_err();
694			assert!(err.to_string().contains("Failed to delete blocks"));
695			assert!(err.to_string().contains("Permission denied"));
696		}
697	}
698
699	#[tokio::test]
700	async fn test_save_missed_blocks() {
701		let temp_dir = tempfile::tempdir().unwrap();
702		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
703
704		// Test 1: Normal save with single block
705		let result = storage.save_missed_blocks("test", &[100]).await;
706		assert!(result.is_ok());
707
708		// Verify JSON file exists and has correct content
709		let json_path = temp_dir.path().join("test_missed_blocks.json");
710		assert!(json_path.exists());
711		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
712		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
713		assert_eq!(entries.len(), 1);
714		assert_eq!(entries[0].block_number, 100);
715		assert_eq!(entries[0].status, MissedBlockStatus::Pending);
716
717		// Test 2: Save multiple blocks (should add to existing)
718		let result = storage.save_missed_blocks("test", &[101, 102, 103]).await;
719		assert!(result.is_ok());
720
721		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
722		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
723		assert_eq!(entries.len(), 4);
724
725		// Test 3: Save duplicate block (should not add duplicate)
726		let result = storage.save_missed_blocks("test", &[100]).await;
727		assert!(result.is_ok());
728
729		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
730		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
731		assert_eq!(entries.len(), 4); // Still 4, no duplicate added
732
733		// Test 4: Save empty slice (should be no-op)
734		let result = storage.save_missed_blocks("test", &[]).await;
735		assert!(result.is_ok());
736
737		// Test 5: Save with invalid path
738		#[cfg(unix)]
739		{
740			use std::os::unix::fs::PermissionsExt;
741			let readonly_dir = temp_dir.path().join("readonly");
742			tokio::fs::create_dir(&readonly_dir).await.unwrap();
743			let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
744			perms.set_mode(0o444); // Read-only
745			std::fs::set_permissions(&readonly_dir, perms).unwrap();
746
747			let readonly_storage = FileBlockStorage::new(readonly_dir);
748			let result = readonly_storage.save_missed_blocks("test", &[100]).await;
749			assert!(result.is_err());
750		}
751	}
752
753	#[tokio::test]
754	async fn test_migration_from_text_to_json() {
755		let temp_dir = tempfile::tempdir().unwrap();
756		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
757
758		// Create an old-format text file
759		let txt_path = temp_dir.path().join("test_missed_blocks.txt");
760		tokio::fs::write(&txt_path, "100\n101\n102\n100\n") // Include duplicate
761			.await
762			.unwrap();
763
764		// Call get_missed_blocks which should trigger migration
765		let result = storage
766			.get_missed_blocks("test", 1000, 1000, 3)
767			.await
768			.unwrap();
769
770		// Should have 3 unique entries (deduplicated during migration)
771		assert_eq!(result.len(), 3);
772
773		// Verify JSON file was created
774		let json_path = temp_dir.path().join("test_missed_blocks.json");
775		assert!(json_path.exists());
776
777		// Verify text file was removed
778		assert!(!txt_path.exists());
779
780		// Verify JSON content
781		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
782		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
783		assert_eq!(entries.len(), 3);
784	}
785
786	#[tokio::test]
787	async fn test_get_missed_blocks() {
788		let temp_dir = tempfile::tempdir().unwrap();
789		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
790
791		// Save some missed blocks
792		storage
793			.save_missed_blocks("test", &[100, 200, 300, 400, 500])
794			.await
795			.unwrap();
796
797		// Get blocks with max_block_age=200, current_block=500, max_retries=3
798		// Should return blocks >= 300 (500 - 200) with retry_count < 3
799		let result = storage
800			.get_missed_blocks("test", 200, 500, 3)
801			.await
802			.unwrap();
803
804		assert_eq!(result.len(), 3);
805		let block_numbers: Vec<u64> = result.iter().map(|e| e.block_number).collect();
806		assert!(block_numbers.contains(&300));
807		assert!(block_numbers.contains(&400));
808		assert!(block_numbers.contains(&500));
809	}
810
811	#[tokio::test]
812	async fn test_update_missed_block_status() {
813		let temp_dir = tempfile::tempdir().unwrap();
814		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
815
816		// Save a missed block
817		storage.save_missed_blocks("test", &[100]).await.unwrap();
818
819		// Update status to Recovering
820		storage
821			.update_missed_block_status("test", 100, MissedBlockStatus::Recovering, None)
822			.await
823			.unwrap();
824
825		// Verify the update
826		let json_path = temp_dir.path().join("test_missed_blocks.json");
827		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
828		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
829		assert_eq!(entries[0].status, MissedBlockStatus::Recovering);
830		assert!(entries[0].last_attempt_at.is_some());
831
832		// Update status to Failed with error
833		storage
834			.update_missed_block_status(
835				"test",
836				100,
837				MissedBlockStatus::Failed,
838				Some("RPC error".to_string()),
839			)
840			.await
841			.unwrap();
842
843		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
844		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
845		assert_eq!(entries[0].status, MissedBlockStatus::Failed);
846		assert_eq!(entries[0].last_error, Some("RPC error".to_string()));
847		assert_eq!(entries[0].retry_count, 1); // Incremented
848	}
849
850	#[tokio::test]
851	async fn test_remove_recovered_blocks() {
852		let temp_dir = tempfile::tempdir().unwrap();
853		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
854
855		// Save some missed blocks
856		storage
857			.save_missed_blocks("test", &[100, 101, 102, 103, 104])
858			.await
859			.unwrap();
860
861		// Remove some recovered blocks
862		storage
863			.remove_recovered_blocks("test", &[101, 103])
864			.await
865			.unwrap();
866
867		// Verify remaining blocks
868		let json_path = temp_dir.path().join("test_missed_blocks.json");
869		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
870		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
871
872		assert_eq!(entries.len(), 3);
873		let block_numbers: Vec<u64> = entries.iter().map(|e| e.block_number).collect();
874		assert!(block_numbers.contains(&100));
875		assert!(block_numbers.contains(&102));
876		assert!(block_numbers.contains(&104));
877		assert!(!block_numbers.contains(&101));
878		assert!(!block_numbers.contains(&103));
879	}
880
881	#[tokio::test]
882	async fn test_prune_old_missed_blocks() {
883		let temp_dir = tempfile::tempdir().unwrap();
884		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
885
886		// Save some missed blocks
887		storage
888			.save_missed_blocks("test", &[100, 200, 300, 400, 500])
889			.await
890			.unwrap();
891
892		// Prune with max_block_age=150, current_block=500
893		// Should prune blocks < 350 (500 - 150)
894		let pruned = storage
895			.prune_old_missed_blocks("test", 150, 500)
896			.await
897			.unwrap();
898
899		assert_eq!(pruned, 3); // Blocks 100, 200, 300 should be pruned
900
901		// Verify remaining blocks
902		let json_path = temp_dir.path().join("test_missed_blocks.json");
903		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
904		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
905
906		assert_eq!(entries.len(), 2);
907		let block_numbers: Vec<u64> = entries.iter().map(|e| e.block_number).collect();
908		assert!(block_numbers.contains(&400));
909		assert!(block_numbers.contains(&500));
910	}
911
912	#[tokio::test]
913	async fn test_get_missed_blocks_empty() {
914		let temp_dir = tempfile::tempdir().unwrap();
915		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
916
917		// Get from non-existent network should return empty
918		let result = storage
919			.get_missed_blocks("nonexistent", 1000, 1000, 3)
920			.await
921			.unwrap();
922		assert!(result.is_empty());
923	}
924
925	#[tokio::test]
926	async fn test_get_missed_blocks_filters_by_status() {
927		let temp_dir = tempfile::tempdir().unwrap();
928		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
929
930		// Save some missed blocks
931		storage
932			.save_missed_blocks("test", &[100, 101, 102])
933			.await
934			.unwrap();
935
936		// Update one to Failed status
937		storage
938			.update_missed_block_status("test", 101, MissedBlockStatus::Failed, None)
939			.await
940			.unwrap();
941
942		// Get missed blocks - should only return Pending ones with retry_count < 3
943		let result = storage
944			.get_missed_blocks("test", 1000, 1000, 3)
945			.await
946			.unwrap();
947
948		assert_eq!(result.len(), 2);
949		for entry in &result {
950			assert_eq!(entry.status, MissedBlockStatus::Pending);
951		}
952	}
953
954	#[tokio::test]
955	async fn test_load_empty_json_file() {
956		let temp_dir = tempfile::tempdir().unwrap();
957		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
958
959		// Create an empty JSON file
960		let json_path = temp_dir.path().join("test_missed_blocks.json");
961		tokio::fs::write(&json_path, "").await.unwrap();
962
963		// Should return empty vector for empty file
964		let result = storage
965			.get_missed_blocks("test", 1000, 1000, 3)
966			.await
967			.unwrap();
968		assert!(result.is_empty());
969	}
970
971	#[tokio::test]
972	async fn test_load_whitespace_only_json_file() {
973		let temp_dir = tempfile::tempdir().unwrap();
974		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
975
976		// Create a JSON file with only whitespace
977		let json_path = temp_dir.path().join("test_missed_blocks.json");
978		tokio::fs::write(&json_path, "   \n\t  ").await.unwrap();
979
980		// Should return empty vector for whitespace-only file
981		let result = storage
982			.get_missed_blocks("test", 1000, 1000, 3)
983			.await
984			.unwrap();
985		assert!(result.is_empty());
986	}
987
988	#[tokio::test]
989	async fn test_migration_with_empty_lines() {
990		let temp_dir = tempfile::tempdir().unwrap();
991		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
992
993		// Create an old-format text file with empty lines and whitespace
994		let txt_path = temp_dir.path().join("test_missed_blocks.txt");
995		tokio::fs::write(&txt_path, "\n100\n\n101\n  \n102\n")
996			.await
997			.unwrap();
998
999		// Call get_missed_blocks which should trigger migration
1000		let result = storage
1001			.get_missed_blocks("test", 1000, 1000, 3)
1002			.await
1003			.unwrap();
1004
1005		// Should have 3 entries (empty lines ignored)
1006		assert_eq!(result.len(), 3);
1007
1008		// Verify JSON file was created
1009		let json_path = temp_dir.path().join("test_missed_blocks.json");
1010		assert!(json_path.exists());
1011
1012		// Verify text file was removed
1013		assert!(!txt_path.exists());
1014	}
1015
1016	#[tokio::test]
1017	async fn test_migration_handles_invalid_numbers() {
1018		let temp_dir = tempfile::tempdir().unwrap();
1019		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1020
1021		// Create an old-format text file with some invalid entries
1022		let txt_path = temp_dir.path().join("test_missed_blocks.txt");
1023		tokio::fs::write(&txt_path, "100\nnot_a_number\n101\nabc\n102\n")
1024			.await
1025			.unwrap();
1026
1027		// Call get_missed_blocks which should trigger migration
1028		let result = storage
1029			.get_missed_blocks("test", 1000, 1000, 3)
1030			.await
1031			.unwrap();
1032
1033		// Should only have 3 valid entries (invalid lines skipped)
1034		assert_eq!(result.len(), 3);
1035		let block_numbers: Vec<u64> = result.iter().map(|e| e.block_number).collect();
1036		assert!(block_numbers.contains(&100));
1037		assert!(block_numbers.contains(&101));
1038		assert!(block_numbers.contains(&102));
1039	}
1040
1041	#[tokio::test]
1042	async fn test_update_status_block_not_found() {
1043		let temp_dir = tempfile::tempdir().unwrap();
1044		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1045
1046		// Save some missed blocks
1047		storage.save_missed_blocks("test", &[100]).await.unwrap();
1048
1049		// Update status for a block that doesn't exist (should not error)
1050		let result = storage
1051			.update_missed_block_status("test", 999, MissedBlockStatus::Recovered, None)
1052			.await;
1053		assert!(result.is_ok());
1054
1055		// Verify original block is unchanged
1056		let entries = storage
1057			.get_missed_blocks("test", 1000, 1000, 3)
1058			.await
1059			.unwrap();
1060		assert_eq!(entries.len(), 1);
1061		assert_eq!(entries[0].block_number, 100);
1062		assert_eq!(entries[0].status, MissedBlockStatus::Pending);
1063	}
1064
1065	#[tokio::test]
1066	async fn test_update_status_preserves_existing_error() {
1067		let temp_dir = tempfile::tempdir().unwrap();
1068		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1069
1070		// Save a missed block
1071		storage.save_missed_blocks("test", &[100]).await.unwrap();
1072
1073		// Update with an error
1074		storage
1075			.update_missed_block_status(
1076				"test",
1077				100,
1078				MissedBlockStatus::Failed,
1079				Some("First error".to_string()),
1080			)
1081			.await
1082			.unwrap();
1083
1084		// Update again without error (should preserve existing error)
1085		storage
1086			.update_missed_block_status("test", 100, MissedBlockStatus::Pending, None)
1087			.await
1088			.unwrap();
1089
1090		// Verify error was preserved
1091		let json_path = temp_dir.path().join("test_missed_blocks.json");
1092		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
1093		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
1094		assert_eq!(entries[0].last_error, Some("First error".to_string()));
1095	}
1096
1097	#[tokio::test]
1098	async fn test_remove_recovered_blocks_empty_list() {
1099		let temp_dir = tempfile::tempdir().unwrap();
1100		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1101
1102		// Save some missed blocks
1103		storage
1104			.save_missed_blocks("test", &[100, 101, 102])
1105			.await
1106			.unwrap();
1107
1108		// Remove empty list (should be no-op)
1109		let result = storage.remove_recovered_blocks("test", &[]).await;
1110		assert!(result.is_ok());
1111
1112		// Verify all blocks still exist
1113		let entries = storage
1114			.get_missed_blocks("test", 1000, 1000, 3)
1115			.await
1116			.unwrap();
1117		assert_eq!(entries.len(), 3);
1118	}
1119
1120	#[tokio::test]
1121	async fn test_prune_with_no_blocks_to_prune() {
1122		let temp_dir = tempfile::tempdir().unwrap();
1123		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1124
1125		// Save blocks that are all within the age range
1126		storage
1127			.save_missed_blocks("test", &[900, 950, 1000])
1128			.await
1129			.unwrap();
1130
1131		// Prune with a large max_block_age
1132		let pruned = storage
1133			.prune_old_missed_blocks("test", 500, 1000)
1134			.await
1135			.unwrap();
1136
1137		assert_eq!(pruned, 0);
1138
1139		// Verify all blocks still exist
1140		let entries = storage
1141			.get_missed_blocks("test", 1000, 1000, 3)
1142			.await
1143			.unwrap();
1144		assert_eq!(entries.len(), 3);
1145	}
1146
1147	#[tokio::test]
1148	async fn test_prune_empty_storage() {
1149		let temp_dir = tempfile::tempdir().unwrap();
1150		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1151
1152		// Prune on empty storage
1153		let pruned = storage
1154			.prune_old_missed_blocks("test", 100, 1000)
1155			.await
1156			.unwrap();
1157
1158		assert_eq!(pruned, 0);
1159	}
1160
1161	#[tokio::test]
1162	async fn test_get_missed_blocks_filters_by_max_retries() {
1163		let temp_dir = tempfile::tempdir().unwrap();
1164		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1165
1166		// Save some missed blocks
1167		storage
1168			.save_missed_blocks("test", &[100, 101, 102])
1169			.await
1170			.unwrap();
1171
1172		// Simulate retries on block 100 (will have retry_count = 3 after 3 status updates)
1173		for _ in 0..3 {
1174			storage
1175				.update_missed_block_status("test", 100, MissedBlockStatus::Pending, None)
1176				.await
1177				.unwrap();
1178		}
1179
1180		// Get missed blocks with max_retries = 3
1181		let result = storage
1182			.get_missed_blocks("test", 1000, 1000, 3)
1183			.await
1184			.unwrap();
1185
1186		// Block 100 should be excluded (retry_count >= max_retries)
1187		assert_eq!(result.len(), 2);
1188		let block_numbers: Vec<u64> = result.iter().map(|e| e.block_number).collect();
1189		assert!(!block_numbers.contains(&100));
1190		assert!(block_numbers.contains(&101));
1191		assert!(block_numbers.contains(&102));
1192	}
1193
1194	#[tokio::test]
1195	async fn test_missed_block_entry_new() {
1196		let entry = MissedBlockEntry::new(12345);
1197		assert_eq!(entry.block_number, 12345);
1198		assert_eq!(entry.retry_count, 0);
1199		assert_eq!(entry.status, MissedBlockStatus::Pending);
1200		assert!(entry.last_attempt_at.is_none());
1201		assert!(entry.last_error.is_none());
1202		// first_missed_at should be set to current timestamp
1203		assert!(entry.first_missed_at > 0);
1204	}
1205
1206	#[tokio::test]
1207	async fn test_file_block_storage_default() {
1208		let storage = FileBlockStorage::default();
1209		// Default path should be "data"
1210		assert_eq!(storage.storage_path, std::path::PathBuf::from("data"));
1211	}
1212
1213	#[tokio::test]
1214	async fn test_update_status_recovering_does_not_increment_retry() {
1215		let temp_dir = tempfile::tempdir().unwrap();
1216		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1217
1218		// Save a missed block
1219		storage.save_missed_blocks("test", &[100]).await.unwrap();
1220
1221		// Update to Recovering status (should NOT increment retry_count)
1222		storage
1223			.update_missed_block_status("test", 100, MissedBlockStatus::Recovering, None)
1224			.await
1225			.unwrap();
1226
1227		let json_path = temp_dir.path().join("test_missed_blocks.json");
1228		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
1229		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
1230		assert_eq!(entries[0].retry_count, 0); // Should still be 0
1231		assert_eq!(entries[0].status, MissedBlockStatus::Recovering);
1232	}
1233
1234	#[tokio::test]
1235	async fn test_update_status_recovered_does_not_increment_retry() {
1236		let temp_dir = tempfile::tempdir().unwrap();
1237		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
1238
1239		// Save a missed block
1240		storage.save_missed_blocks("test", &[100]).await.unwrap();
1241
1242		// Update to Recovered status (should NOT increment retry_count)
1243		storage
1244			.update_missed_block_status("test", 100, MissedBlockStatus::Recovered, None)
1245			.await
1246			.unwrap();
1247
1248		let json_path = temp_dir.path().join("test_missed_blocks.json");
1249		let content = tokio::fs::read_to_string(&json_path).await.unwrap();
1250		let entries: Vec<MissedBlockEntry> = serde_json::from_str(&content).unwrap();
1251		assert_eq!(entries[0].retry_count, 0); // Should still be 0
1252		assert_eq!(entries[0].status, MissedBlockStatus::Recovered);
1253	}
1254}