openzeppelin_monitor/services/blockwatcher/
tracker.rs

1//! Block tracking functionality for monitoring blockchain networks.
2//!
3//! This module provides tools for tracking processed blocks across different networks
4//! and identifying potential issues such as:
5//! - Missed blocks
6//! - Out-of-order block processing
7//! - Duplicate block processing
8//!
9//! The primary component is the [`BlockTracker`] which maintains a history of
10//! recently processed blocks and can optionally persist information about missed
11//! blocks using a storage implementation.
12
13use async_trait::async_trait;
14use std::{
15	collections::{HashMap, HashSet, VecDeque},
16	sync::Arc,
17};
18use tokio::sync::Mutex;
19
20use crate::models::{BlockType, Network};
21
22/// Result of checking a processed block for issues
23#[derive(Debug, Clone, PartialEq)]
24pub enum BlockCheckResult {
25	/// Block is valid
26	Ok,
27	/// Duplicate block detected
28	Duplicate { last_seen: u64 },
29	/// Block received out of order
30	OutOfOrder { expected: u64, received: u64 },
31}
32
33/// Trait for the BlockTracker
34///
35/// This trait defines the interface for the BlockTracker.
36#[async_trait]
37pub trait BlockTrackerTrait {
38	fn new(history_size: usize) -> Self;
39	async fn get_last_block(&self, network_slug: &str) -> Option<u64>;
40	/// Detects missing blocks in a batch of fetched blocks
41	///
42	/// Takes the entire fetched block set, detects gaps using optimized min/max approach,
43	/// records all fetched blocks to history in batch, and returns list of missed block numbers.
44	async fn detect_missing_blocks(
45		&self,
46		network: &Network,
47		fetched_blocks: &[BlockType],
48	) -> Vec<u64>;
49	/// Checks a processed block for duplicates or out-of-order issues
50	///
51	/// Tracks processed sequence separately from fetched sequence, detects duplicates and
52	/// out-of-order blocks, and returns result enum.
53	async fn check_processed_block(&self, network: &Network, block_number: u64)
54		-> BlockCheckResult;
55
56	/// Resets the expected next block number for a network to a new starting point.
57	/// This should be called at the start of each process_new_blocks execution to
58	/// synchronize expected_next with the start_block.
59	async fn reset_expected_next(&self, network: &Network, start_block: u64);
60}
61
62/// BlockTracker is responsible for monitoring the sequence of processed blocks
63/// across different networks and identifying any gaps or irregularities in block processing.
64///
65/// Gap detection is per-execution and doesn't require shared state, so we don't track
66/// fetched blocks in shared history. Only processed blocks are tracked for duplicate/out-of-order detection.
67#[derive(Clone)]
68pub struct BlockTracker {
69	/// Tracks the last N processed blocks for each network
70	/// Key: network_slug, Value: Queue of block numbers
71	processed_history: Arc<Mutex<HashMap<String, VecDeque<u64>>>>,
72	/// Expected next processed block number for each network
73	/// Key: network_slug, Value: Expected next block number
74	expected_next: Arc<Mutex<HashMap<String, u64>>>,
75	/// Maximum number of blocks to keep in history per network
76	history_size: usize,
77}
78
79#[async_trait]
80impl BlockTrackerTrait for BlockTracker {
81	/// Creates a new BlockTracker instance.
82	///
83	/// # Arguments
84	///
85	/// * `history_size` - The maximum number of recent blocks to track per network
86	/// * `storage` - Optional storage implementation for persisting missed block information
87	///
88	/// # Returns
89	///
90	/// A new `BlockTracker` instance
91	fn new(history_size: usize) -> Self {
92		Self {
93			processed_history: Arc::new(Mutex::new(HashMap::new())),
94			expected_next: Arc::new(Mutex::new(HashMap::new())),
95			history_size,
96		}
97	}
98
99	/// Retrieves the most recently processed block number for a given network.
100	///
101	/// # Arguments
102	///
103	/// * `network_slug` - The unique identifier for the network
104	///
105	/// # Returns
106	///
107	/// Returns `Some(block_number)` if blocks have been processed for the network,
108	/// otherwise returns `None`.
109	async fn get_last_block(&self, network_slug: &str) -> Option<u64> {
110		self.processed_history
111			.lock()
112			.await
113			.get(network_slug)
114			.and_then(|history| history.iter().max().copied())
115	}
116
117	async fn detect_missing_blocks(
118		&self,
119		_network: &Network,
120		fetched_blocks: &[BlockType],
121	) -> Vec<u64> {
122		// Extract block numbers from fetched blocks
123		let fetched_block_numbers: HashSet<u64> = fetched_blocks
124			.iter()
125			.filter_map(|block| block.number())
126			.collect();
127
128		if fetched_block_numbers.is_empty() {
129			return Vec::new();
130		}
131
132		// Find min and max without sorting
133		let first = *fetched_block_numbers
134			.iter()
135			.min()
136			.expect("fetched_block_numbers is guaranteed to be non-empty");
137		let last = *fetched_block_numbers
138			.iter()
139			.max()
140			.expect("fetched_block_numbers is guaranteed to be non-empty");
141
142		// Collect missed blocks
143		// Note: Gap detection is per-execution and doesn't require shared state.
144		// Each execution only looks at its own fetched blocks, so concurrent executions
145		// won't cause false positives.
146		let missed_blocks: Vec<u64> = (first..=last)
147			.filter(|&num| !fetched_block_numbers.contains(&num))
148			.collect();
149
150		missed_blocks
151	}
152
153	async fn check_processed_block(
154		&self,
155		network: &Network,
156		block_number: u64,
157	) -> BlockCheckResult {
158		let mut processed_history = self.processed_history.lock().await;
159		let mut expected_next = self.expected_next.lock().await;
160
161		let network_history = processed_history
162			.entry(network.slug.clone())
163			.or_insert_with(|| VecDeque::with_capacity(self.history_size));
164
165		let expected = expected_next
166			.entry(network.slug.clone())
167			.or_insert(block_number);
168
169		// Check for duplicate
170		if network_history.contains(&block_number) {
171			let last_seen = *network_history.back().unwrap_or(&block_number);
172			return BlockCheckResult::Duplicate { last_seen };
173		}
174
175		// Check for out-of-order (if block is less than expected, it's out of order)
176		let result = if block_number < *expected {
177			BlockCheckResult::OutOfOrder {
178				expected: *expected,
179				received: block_number,
180			}
181		} else {
182			BlockCheckResult::Ok
183		};
184
185		// Always record the block (even if out of order, we still process it)
186		network_history.push_back(block_number);
187
188		// Only update expected_next when the block is in-order or ahead
189		// If it's out-of-order (behind), don't advance expected_next as we're still
190		// waiting for the missing blocks in between
191		if block_number >= *expected {
192			*expected = block_number + 1;
193		}
194
195		// Maintain history size
196		while network_history.len() > self.history_size {
197			network_history.pop_front();
198		}
199
200		result
201	}
202
203	async fn reset_expected_next(&self, network: &Network, start_block: u64) {
204		let mut expected_next = self.expected_next.lock().await;
205		let entry = expected_next.entry(network.slug.clone());
206
207		// Reset expected_next to start_block if it's higher than start_block
208		// This handles cases where we're reprocessing blocks or restarting from an earlier point
209		match entry {
210			std::collections::hash_map::Entry::Occupied(mut e) => {
211				if *e.get() > start_block {
212					*e.get_mut() = start_block;
213				}
214			}
215			std::collections::hash_map::Entry::Vacant(e) => {
216				e.insert(start_block);
217			}
218		}
219	}
220}
221
222#[cfg(test)]
223mod tests {
224	use crate::utils::tests::network::NetworkBuilder;
225
226	use super::*;
227
228	fn create_test_network(name: &str, slug: &str, store_blocks: bool) -> Network {
229		NetworkBuilder::new()
230			.name(name)
231			.slug(slug)
232			.store_blocks(store_blocks)
233			.build()
234	}
235
236	#[tokio::test]
237	async fn test_normal_block_sequence() {
238		let tracker = BlockTracker::new(5);
239		let network = create_test_network("test-net", "test_net", true);
240
241		// Process blocks in sequence
242		assert_eq!(
243			tracker.check_processed_block(&network, 1).await,
244			BlockCheckResult::Ok
245		);
246		assert_eq!(
247			tracker.check_processed_block(&network, 2).await,
248			BlockCheckResult::Ok
249		);
250		assert_eq!(
251			tracker.check_processed_block(&network, 3).await,
252			BlockCheckResult::Ok
253		);
254
255		assert_eq!(tracker.get_last_block("test_net").await, Some(3));
256	}
257
258	#[tokio::test]
259	async fn test_history_size_limit() {
260		let tracker = BlockTracker::new(3);
261		let network = create_test_network("test-net", "test_net", true);
262
263		// Process 5 blocks with a history limit of 3
264		for i in 1..=5 {
265			assert_eq!(
266				tracker.check_processed_block(&network, i).await,
267				BlockCheckResult::Ok
268			);
269		}
270
271		let history = tracker.processed_history.lock().await;
272		let network_history = history
273			.get(&network.slug)
274			.expect("Network history should exist");
275
276		// Verify we only kept the last 3 blocks
277		assert_eq!(network_history.len(), 3);
278		assert_eq!(network_history.front(), Some(&3)); // Oldest block
279		assert_eq!(network_history.back(), Some(&5)); // Newest block
280	}
281
282	#[tokio::test]
283	async fn test_check_processed_block_maintains_history() {
284		let tracker = BlockTracker::new(5);
285		let network = create_test_network("test-net", "test_net", true);
286
287		// Process block 1 - should add to history
288		assert_eq!(
289			tracker.check_processed_block(&network, 1).await,
290			BlockCheckResult::Ok
291		);
292		assert_eq!(tracker.get_last_block("test_net").await, Some(1));
293
294		// Process block 3 - should be Ok (ahead of expected, advances expected)
295		assert_eq!(
296			tracker.check_processed_block(&network, 3).await,
297			BlockCheckResult::Ok
298		);
299		assert_eq!(tracker.get_last_block("test_net").await, Some(3));
300	}
301
302	#[tokio::test]
303	async fn test_out_of_order_blocks() {
304		let tracker = BlockTracker::new(5);
305		let network = create_test_network("test-net", "test_net", true);
306
307		// Process blocks out of order - should detect out-of-order
308		assert_eq!(
309			tracker.check_processed_block(&network, 2).await,
310			BlockCheckResult::Ok
311		);
312		assert_eq!(
313			tracker.check_processed_block(&network, 1).await,
314			BlockCheckResult::OutOfOrder {
315				expected: 3,
316				received: 1
317			}
318		);
319
320		// Both blocks are recorded, but last is the higher one
321		// Note: After processing block 2, expected becomes 3, so block 1 is OutOfOrder
322		assert_eq!(tracker.get_last_block("test_net").await, Some(2));
323	}
324
325	#[tokio::test]
326	async fn test_multiple_networks() {
327		let tracker = BlockTracker::new(5);
328		let network1 = create_test_network("net-1", "net_1", true);
329		let network2 = create_test_network("net-2", "net_2", true);
330
331		// Process blocks for both networks
332		assert_eq!(
333			tracker.check_processed_block(&network1, 1).await,
334			BlockCheckResult::Ok
335		);
336		assert_eq!(
337			tracker.check_processed_block(&network2, 100).await,
338			BlockCheckResult::Ok
339		);
340		assert_eq!(
341			tracker.check_processed_block(&network1, 2).await,
342			BlockCheckResult::Ok
343		);
344		assert_eq!(
345			tracker.check_processed_block(&network2, 101).await,
346			BlockCheckResult::Ok
347		);
348
349		assert_eq!(tracker.get_last_block("net_1").await, Some(2));
350		assert_eq!(tracker.get_last_block("net_2").await, Some(101));
351	}
352
353	#[tokio::test]
354	async fn test_get_last_block_empty_network() {
355		let tracker = BlockTracker::new(5);
356		assert_eq!(tracker.get_last_block("nonexistent").await, None);
357	}
358
359	#[tokio::test]
360	async fn test_check_processed_block_with_gaps() {
361		let tracker = BlockTracker::new(5);
362		let network = create_test_network("test-network", "test_network", true);
363
364		// Process block 1
365		assert_eq!(
366			tracker.check_processed_block(&network, 1).await,
367			BlockCheckResult::Ok
368		);
369		assert_eq!(tracker.get_last_block("test_network").await, Some(1));
370
371		// Process block 3 (gap detection happens at service layer via detect_missing_blocks)
372		// Block 3 is ahead of expected (2), so it's Ok and advances expected to 4
373		assert_eq!(
374			tracker.check_processed_block(&network, 3).await,
375			BlockCheckResult::Ok
376		);
377		assert_eq!(tracker.get_last_block("test_network").await, Some(3));
378	}
379}