openzeppelin_monitor/services/blockwatcher/
tracker.rs1use async_trait::async_trait;
14use std::{
15 collections::{HashMap, HashSet, VecDeque},
16 sync::Arc,
17};
18use tokio::sync::Mutex;
19
20use crate::models::{BlockType, Network};
21
22#[derive(Debug, Clone, PartialEq)]
24pub enum BlockCheckResult {
25 Ok,
27 Duplicate { last_seen: u64 },
29 OutOfOrder { expected: u64, received: u64 },
31}
32
33#[async_trait]
37pub trait BlockTrackerTrait {
38 fn new(history_size: usize) -> Self;
39 async fn get_last_block(&self, network_slug: &str) -> Option<u64>;
40 async fn detect_missing_blocks(
45 &self,
46 network: &Network,
47 fetched_blocks: &[BlockType],
48 ) -> Vec<u64>;
49 async fn check_processed_block(&self, network: &Network, block_number: u64)
54 -> BlockCheckResult;
55
56 async fn reset_expected_next(&self, network: &Network, start_block: u64);
60}
61
62#[derive(Clone)]
68pub struct BlockTracker {
69 processed_history: Arc<Mutex<HashMap<String, VecDeque<u64>>>>,
72 expected_next: Arc<Mutex<HashMap<String, u64>>>,
75 history_size: usize,
77}
78
79#[async_trait]
80impl BlockTrackerTrait for BlockTracker {
81 fn new(history_size: usize) -> Self {
92 Self {
93 processed_history: Arc::new(Mutex::new(HashMap::new())),
94 expected_next: Arc::new(Mutex::new(HashMap::new())),
95 history_size,
96 }
97 }
98
99 async fn get_last_block(&self, network_slug: &str) -> Option<u64> {
110 self.processed_history
111 .lock()
112 .await
113 .get(network_slug)
114 .and_then(|history| history.iter().max().copied())
115 }
116
117 async fn detect_missing_blocks(
118 &self,
119 _network: &Network,
120 fetched_blocks: &[BlockType],
121 ) -> Vec<u64> {
122 let fetched_block_numbers: HashSet<u64> = fetched_blocks
124 .iter()
125 .filter_map(|block| block.number())
126 .collect();
127
128 if fetched_block_numbers.is_empty() {
129 return Vec::new();
130 }
131
132 let first = *fetched_block_numbers
134 .iter()
135 .min()
136 .expect("fetched_block_numbers is guaranteed to be non-empty");
137 let last = *fetched_block_numbers
138 .iter()
139 .max()
140 .expect("fetched_block_numbers is guaranteed to be non-empty");
141
142 let missed_blocks: Vec<u64> = (first..=last)
147 .filter(|&num| !fetched_block_numbers.contains(&num))
148 .collect();
149
150 missed_blocks
151 }
152
153 async fn check_processed_block(
154 &self,
155 network: &Network,
156 block_number: u64,
157 ) -> BlockCheckResult {
158 let mut processed_history = self.processed_history.lock().await;
159 let mut expected_next = self.expected_next.lock().await;
160
161 let network_history = processed_history
162 .entry(network.slug.clone())
163 .or_insert_with(|| VecDeque::with_capacity(self.history_size));
164
165 let expected = expected_next
166 .entry(network.slug.clone())
167 .or_insert(block_number);
168
169 if network_history.contains(&block_number) {
171 let last_seen = *network_history.back().unwrap_or(&block_number);
172 return BlockCheckResult::Duplicate { last_seen };
173 }
174
175 let result = if block_number < *expected {
177 BlockCheckResult::OutOfOrder {
178 expected: *expected,
179 received: block_number,
180 }
181 } else {
182 BlockCheckResult::Ok
183 };
184
185 network_history.push_back(block_number);
187
188 if block_number >= *expected {
192 *expected = block_number + 1;
193 }
194
195 while network_history.len() > self.history_size {
197 network_history.pop_front();
198 }
199
200 result
201 }
202
203 async fn reset_expected_next(&self, network: &Network, start_block: u64) {
204 let mut expected_next = self.expected_next.lock().await;
205 let entry = expected_next.entry(network.slug.clone());
206
207 match entry {
210 std::collections::hash_map::Entry::Occupied(mut e) => {
211 if *e.get() > start_block {
212 *e.get_mut() = start_block;
213 }
214 }
215 std::collections::hash_map::Entry::Vacant(e) => {
216 e.insert(start_block);
217 }
218 }
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use crate::utils::tests::network::NetworkBuilder;
225
226 use super::*;
227
228 fn create_test_network(name: &str, slug: &str, store_blocks: bool) -> Network {
229 NetworkBuilder::new()
230 .name(name)
231 .slug(slug)
232 .store_blocks(store_blocks)
233 .build()
234 }
235
236 #[tokio::test]
237 async fn test_normal_block_sequence() {
238 let tracker = BlockTracker::new(5);
239 let network = create_test_network("test-net", "test_net", true);
240
241 assert_eq!(
243 tracker.check_processed_block(&network, 1).await,
244 BlockCheckResult::Ok
245 );
246 assert_eq!(
247 tracker.check_processed_block(&network, 2).await,
248 BlockCheckResult::Ok
249 );
250 assert_eq!(
251 tracker.check_processed_block(&network, 3).await,
252 BlockCheckResult::Ok
253 );
254
255 assert_eq!(tracker.get_last_block("test_net").await, Some(3));
256 }
257
258 #[tokio::test]
259 async fn test_history_size_limit() {
260 let tracker = BlockTracker::new(3);
261 let network = create_test_network("test-net", "test_net", true);
262
263 for i in 1..=5 {
265 assert_eq!(
266 tracker.check_processed_block(&network, i).await,
267 BlockCheckResult::Ok
268 );
269 }
270
271 let history = tracker.processed_history.lock().await;
272 let network_history = history
273 .get(&network.slug)
274 .expect("Network history should exist");
275
276 assert_eq!(network_history.len(), 3);
278 assert_eq!(network_history.front(), Some(&3)); assert_eq!(network_history.back(), Some(&5)); }
281
282 #[tokio::test]
283 async fn test_check_processed_block_maintains_history() {
284 let tracker = BlockTracker::new(5);
285 let network = create_test_network("test-net", "test_net", true);
286
287 assert_eq!(
289 tracker.check_processed_block(&network, 1).await,
290 BlockCheckResult::Ok
291 );
292 assert_eq!(tracker.get_last_block("test_net").await, Some(1));
293
294 assert_eq!(
296 tracker.check_processed_block(&network, 3).await,
297 BlockCheckResult::Ok
298 );
299 assert_eq!(tracker.get_last_block("test_net").await, Some(3));
300 }
301
302 #[tokio::test]
303 async fn test_out_of_order_blocks() {
304 let tracker = BlockTracker::new(5);
305 let network = create_test_network("test-net", "test_net", true);
306
307 assert_eq!(
309 tracker.check_processed_block(&network, 2).await,
310 BlockCheckResult::Ok
311 );
312 assert_eq!(
313 tracker.check_processed_block(&network, 1).await,
314 BlockCheckResult::OutOfOrder {
315 expected: 3,
316 received: 1
317 }
318 );
319
320 assert_eq!(tracker.get_last_block("test_net").await, Some(2));
323 }
324
325 #[tokio::test]
326 async fn test_multiple_networks() {
327 let tracker = BlockTracker::new(5);
328 let network1 = create_test_network("net-1", "net_1", true);
329 let network2 = create_test_network("net-2", "net_2", true);
330
331 assert_eq!(
333 tracker.check_processed_block(&network1, 1).await,
334 BlockCheckResult::Ok
335 );
336 assert_eq!(
337 tracker.check_processed_block(&network2, 100).await,
338 BlockCheckResult::Ok
339 );
340 assert_eq!(
341 tracker.check_processed_block(&network1, 2).await,
342 BlockCheckResult::Ok
343 );
344 assert_eq!(
345 tracker.check_processed_block(&network2, 101).await,
346 BlockCheckResult::Ok
347 );
348
349 assert_eq!(tracker.get_last_block("net_1").await, Some(2));
350 assert_eq!(tracker.get_last_block("net_2").await, Some(101));
351 }
352
353 #[tokio::test]
354 async fn test_get_last_block_empty_network() {
355 let tracker = BlockTracker::new(5);
356 assert_eq!(tracker.get_last_block("nonexistent").await, None);
357 }
358
359 #[tokio::test]
360 async fn test_check_processed_block_with_gaps() {
361 let tracker = BlockTracker::new(5);
362 let network = create_test_network("test-network", "test_network", true);
363
364 assert_eq!(
366 tracker.check_processed_block(&network, 1).await,
367 BlockCheckResult::Ok
368 );
369 assert_eq!(tracker.get_last_block("test_network").await, Some(1));
370
371 assert_eq!(
374 tracker.check_processed_block(&network, 3).await,
375 BlockCheckResult::Ok
376 );
377 assert_eq!(tracker.get_last_block("test_network").await, Some(3));
378 }
379}