1#![allow(clippy::result_large_err)]
8
9use std::{collections::HashMap, marker::PhantomData, path::Path};
10
11use async_trait::async_trait;
12
13use crate::{
14 models::{ConfigLoader, Monitor, Network, Trigger, SCRIPT_LANGUAGE_EXTENSIONS},
15 repositories::{
16 error::RepositoryError,
17 network::{NetworkRepository, NetworkRepositoryTrait, NetworkService},
18 trigger::{TriggerRepository, TriggerRepositoryTrait, TriggerService},
19 },
20};
21
22#[derive(Clone)]
24pub struct MonitorRepository<
25 N: NetworkRepositoryTrait + Send + 'static,
26 T: TriggerRepositoryTrait + Send + 'static,
27> {
28 pub monitors: HashMap<String, Monitor>,
30 _network_repository: PhantomData<N>,
31 _trigger_repository: PhantomData<T>,
32}
33
34impl<
35 N: NetworkRepositoryTrait + Send + Sync + 'static,
36 T: TriggerRepositoryTrait + Send + Sync + 'static,
37 > MonitorRepository<N, T>
38{
39 pub async fn new(
44 path: Option<&Path>,
45 network_service: Option<NetworkService<N>>,
46 trigger_service: Option<TriggerService<T>>,
47 ) -> Result<Self, RepositoryError> {
48 let monitors = Self::load_all(path, network_service, trigger_service).await?;
49 Ok(MonitorRepository {
50 monitors,
51 _network_repository: PhantomData,
52 _trigger_repository: PhantomData,
53 })
54 }
55
56 pub fn new_with_monitors(monitors: HashMap<String, Monitor>) -> Self {
58 MonitorRepository {
59 monitors,
60 _network_repository: PhantomData,
61 _trigger_repository: PhantomData,
62 }
63 }
64
65 fn validate_monitor_signatures(
67 monitor_name: &str,
68 monitor: &Monitor,
69 networks: &HashMap<String, Network>,
70 validation_errors: &mut Vec<String>,
71 ) {
72 for network_slug in &monitor.networks {
73 let Some(network) = networks.get(network_slug) else {
74 continue; };
76
77 let rules = network.network_type.signature_rules();
78 if !rules.requires_parentheses {
79 continue;
80 }
81
82 let has_parens = |sig: &str| {
83 let sig = sig.trim();
84 matches!((sig.find('('), sig.rfind(')')), (Some(open), Some(close)) if open < close && close == sig.len() - 1)
85 };
86
87 for func in &monitor.match_conditions.functions {
89 if !has_parens(&func.signature) {
90 validation_errors.push(format!(
91 "Monitor '{}' has invalid function signature '{}' for {} network '{}' \
92 (expected format: 'functionName(type1,type2)')",
93 monitor_name, func.signature, network.network_type, network_slug
94 ));
95 }
96 }
97
98 for event in &monitor.match_conditions.events {
100 if !has_parens(&event.signature) {
101 validation_errors.push(format!(
102 "Monitor '{}' has invalid event signature '{}' for {} network '{}' \
103 (expected format: 'EventName(type1,type2)')",
104 monitor_name, event.signature, network.network_type, network_slug
105 ));
106 }
107 }
108 }
109 }
110
111 pub fn validate_monitor_references(
113 monitors: &HashMap<String, Monitor>,
114 triggers: &HashMap<String, Trigger>,
115 networks: &HashMap<String, Network>,
116 ) -> Result<(), RepositoryError> {
117 let mut validation_errors = Vec::new();
118 let mut metadata = HashMap::new();
119
120 for (monitor_name, monitor) in monitors {
121 for trigger_id in &monitor.triggers {
123 if !triggers.contains_key(trigger_id) {
124 validation_errors.push(format!(
125 "Monitor '{}' references non-existent trigger '{}'",
126 monitor_name, trigger_id
127 ));
128 metadata.insert(
129 format!("monitor_{}_invalid_trigger", monitor_name),
130 trigger_id.clone(),
131 );
132 }
133 }
134
135 for network_slug in &monitor.networks {
137 if !networks.contains_key(network_slug) {
138 validation_errors.push(format!(
139 "Monitor '{}' references non-existent network '{}'",
140 monitor_name, network_slug
141 ));
142 metadata.insert(
143 format!("monitor_{}_invalid_network", monitor_name),
144 network_slug.clone(),
145 );
146 }
147 }
148
149 Self::validate_monitor_signatures(
151 monitor_name,
152 monitor,
153 networks,
154 &mut validation_errors,
155 );
156
157 for condition in &monitor.trigger_conditions {
159 let script_path = Path::new(&condition.script_path);
160 if !script_path.exists() {
161 validation_errors.push(format!(
162 "Monitor '{}' has a custom filter script that does not exist: {}",
163 monitor_name, condition.script_path
164 ));
165 }
166
167 let expected_extension = match SCRIPT_LANGUAGE_EXTENSIONS
169 .iter()
170 .find(|(lang, _)| *lang == &condition.language)
171 .map(|(_, ext)| *ext)
172 {
173 Some(ext) => ext,
174 None => {
175 validation_errors.push(format!(
176 "Monitor '{}' uses unsupported script language {:?}",
177 monitor_name, condition.language
178 ));
179 continue;
180 }
181 };
182
183 match script_path.extension().and_then(|ext| ext.to_str()) {
184 Some(ext) if ext == expected_extension => (), _ => validation_errors.push(format!(
186 "Monitor '{}' has a custom filter script with invalid extension - must be \
187 .{} for {:?} language: {}",
188 monitor_name, expected_extension, condition.language, condition.script_path
189 )),
190 }
191
192 if condition.timeout_ms == 0 {
193 validation_errors.push(format!(
194 "Monitor '{}' should have a custom filter timeout_ms greater than 0",
195 monitor_name
196 ));
197 }
198 }
199 }
200
201 if !validation_errors.is_empty() {
202 return Err(RepositoryError::validation_error(
203 format!(
204 "Configuration validation failed:\n{}",
205 validation_errors.join("\n"),
206 ),
207 None,
208 Some(metadata),
209 ));
210 }
211
212 Ok(())
213 }
214}
215
216#[async_trait]
221pub trait MonitorRepositoryTrait<
222 N: NetworkRepositoryTrait + Send + 'static,
223 T: TriggerRepositoryTrait + Send + 'static,
224>: Clone + Send
225{
226 async fn new(
228 path: Option<&Path>,
229 network_service: Option<NetworkService<N>>,
230 trigger_service: Option<TriggerService<T>>,
231 ) -> Result<Self, RepositoryError>
232 where
233 Self: Sized;
234
235 async fn load_all(
241 path: Option<&Path>,
242 network_service: Option<NetworkService<N>>,
243 trigger_service: Option<TriggerService<T>>,
244 ) -> Result<HashMap<String, Monitor>, RepositoryError>;
245
246 async fn load_from_path(
250 &self,
251 path: Option<&Path>,
252 network_service: Option<NetworkService<N>>,
253 trigger_service: Option<TriggerService<T>>,
254 ) -> Result<Monitor, RepositoryError>;
255
256 fn get(&self, monitor_id: &str) -> Option<Monitor>;
260
261 fn get_all(&self) -> HashMap<String, Monitor>;
265}
266
267#[async_trait]
268impl<
269 N: NetworkRepositoryTrait + Send + Sync + 'static,
270 T: TriggerRepositoryTrait + Send + Sync + 'static,
271 > MonitorRepositoryTrait<N, T> for MonitorRepository<N, T>
272{
273 async fn new(
274 path: Option<&Path>,
275 network_service: Option<NetworkService<N>>,
276 trigger_service: Option<TriggerService<T>>,
277 ) -> Result<Self, RepositoryError> {
278 MonitorRepository::new(path, network_service, trigger_service).await
279 }
280
281 async fn load_all(
282 path: Option<&Path>,
283 network_service: Option<NetworkService<N>>,
284 trigger_service: Option<TriggerService<T>>,
285 ) -> Result<HashMap<String, Monitor>, RepositoryError> {
286 let monitors = Monitor::load_all(path).await.map_err(|e| {
287 RepositoryError::load_error(
288 "Failed to load monitors",
289 Some(Box::new(e)),
290 Some(HashMap::from([(
291 "path".to_string(),
292 path.map_or_else(|| "default".to_string(), |p| p.display().to_string()),
293 )])),
294 )
295 })?;
296
297 let networks = match network_service {
298 Some(service) => service.get_all(),
299 None => {
300 NetworkRepository::new(None)
301 .await
302 .map_err(|e| {
303 RepositoryError::load_error(
304 "Failed to load networks for monitor validation",
305 Some(Box::new(e)),
306 None,
307 )
308 })?
309 .networks
310 }
311 };
312
313 let triggers = match trigger_service {
314 Some(service) => service.get_all(),
315 None => {
316 TriggerRepository::new(None)
317 .await
318 .map_err(|e| {
319 RepositoryError::load_error(
320 "Failed to load triggers for monitor validation",
321 Some(Box::new(e)),
322 None,
323 )
324 })?
325 .triggers
326 }
327 };
328
329 Self::validate_monitor_references(&monitors, &triggers, &networks)?;
330 Ok(monitors)
331 }
332
333 async fn load_from_path(
337 &self,
338 path: Option<&Path>,
339 network_service: Option<NetworkService<N>>,
340 trigger_service: Option<TriggerService<T>>,
341 ) -> Result<Monitor, RepositoryError> {
342 match path {
343 Some(path) => {
344 let monitor = Monitor::load_from_path(path).await.map_err(|e| {
345 RepositoryError::load_error(
346 "Failed to load monitors",
347 Some(Box::new(e)),
348 Some(HashMap::from([(
349 "path".to_string(),
350 path.display().to_string(),
351 )])),
352 )
353 })?;
354
355 let networks = match network_service {
356 Some(service) => service.get_all(),
357 None => NetworkRepository::new(None).await?.networks,
358 };
359
360 let triggers = match trigger_service {
361 Some(service) => service.get_all(),
362 None => TriggerRepository::new(None).await?.triggers,
363 };
364 let monitors = HashMap::from([(monitor.name.clone(), monitor)]);
365 Self::validate_monitor_references(&monitors, &triggers, &networks)?;
366 match monitors.values().next() {
367 Some(monitor) => Ok(monitor.clone()),
368 None => Err(RepositoryError::load_error("No monitors found", None, None)),
369 }
370 }
371 None => Err(RepositoryError::load_error(
372 "Failed to load monitors",
373 None,
374 None,
375 )),
376 }
377 }
378
379 fn get(&self, monitor_id: &str) -> Option<Monitor> {
380 self.monitors.get(monitor_id).cloned()
381 }
382
383 fn get_all(&self) -> HashMap<String, Monitor> {
384 self.monitors.clone()
385 }
386}
387
388#[derive(Clone)]
394pub struct MonitorService<
395 M: MonitorRepositoryTrait<N, T> + Send,
396 N: NetworkRepositoryTrait + Send + Sync + 'static,
397 T: TriggerRepositoryTrait + Send + Sync + 'static,
398> {
399 repository: M,
400 _network_repository: PhantomData<N>,
401 _trigger_repository: PhantomData<T>,
402}
403
404impl<
405 M: MonitorRepositoryTrait<N, T> + Send,
406 N: NetworkRepositoryTrait + Send + Sync + 'static,
407 T: TriggerRepositoryTrait + Send + Sync + 'static,
408 > MonitorService<M, N, T>
409{
410 pub async fn new(
415 path: Option<&Path>,
416 network_service: Option<NetworkService<N>>,
417 trigger_service: Option<TriggerService<T>>,
418 ) -> Result<MonitorService<M, N, T>, RepositoryError> {
419 let repository = M::new(path, network_service, trigger_service).await?;
420 Ok(MonitorService {
421 repository,
422 _network_repository: PhantomData,
423 _trigger_repository: PhantomData,
424 })
425 }
426
427 pub async fn new_with_path(
431 path: Option<&Path>,
432 ) -> Result<MonitorService<M, N, T>, RepositoryError> {
433 let repository = M::new(path, None, None).await?;
434 Ok(MonitorService {
435 repository,
436 _network_repository: PhantomData,
437 _trigger_repository: PhantomData,
438 })
439 }
440
441 pub fn new_with_repository(repository: M) -> Result<Self, RepositoryError> {
445 Ok(MonitorService {
446 repository,
447 _network_repository: PhantomData,
448 _trigger_repository: PhantomData,
449 })
450 }
451
452 pub fn get(&self, monitor_id: &str) -> Option<Monitor> {
456 self.repository.get(monitor_id)
457 }
458
459 pub fn get_all(&self) -> HashMap<String, Monitor> {
463 self.repository.get_all()
464 }
465
466 pub async fn load_from_path(
470 &self,
471 path: Option<&Path>,
472 network_service: Option<NetworkService<N>>,
473 trigger_service: Option<TriggerService<T>>,
474 ) -> Result<Monitor, RepositoryError> {
475 self.repository
476 .load_from_path(path, network_service, trigger_service)
477 .await
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use crate::{models::ScriptLanguage, utils::tests::builders::evm::monitor::MonitorBuilder};
485 use std::fs;
486 use tempfile::TempDir;
487
488 #[test]
489 fn test_validate_custom_trigger_conditions() {
490 let temp_dir = TempDir::new().unwrap();
491 let script_path = temp_dir.path().join("test_script.py");
492 fs::write(&script_path, "print('test')").unwrap();
493
494 let mut monitors = HashMap::new();
495 let triggers = HashMap::new();
496 let networks = HashMap::new();
497
498 let monitor = MonitorBuilder::new()
500 .name("test_monitor")
501 .networks(vec![])
502 .trigger_condition(
503 script_path.to_str().unwrap(),
504 1000,
505 ScriptLanguage::Python,
506 None,
507 )
508 .build();
509 monitors.insert("test_monitor".to_string(), monitor);
510
511 let result =
512 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
513 &monitors, &triggers, &networks,
514 );
515 assert!(result.is_ok());
516
517 let monitor_bad_path = MonitorBuilder::new()
519 .name("test_monitor_bad_path")
520 .trigger_condition("non_existent_script.py", 1000, ScriptLanguage::Python, None)
521 .build();
522 monitors.insert("test_monitor_bad_path".to_string(), monitor_bad_path);
523
524 let err =
525 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
526 &monitors, &triggers, &networks,
527 )
528 .unwrap_err();
529 assert!(err.to_string().contains("does not exist"));
530
531 let wrong_ext_path = temp_dir.path().join("test_script.js");
533 fs::write(&wrong_ext_path, "print('test')").unwrap();
534
535 let monitor_wrong_ext = MonitorBuilder::new()
536 .name("test_monitor_wrong_ext")
537 .trigger_condition(
538 wrong_ext_path.to_str().unwrap(),
539 1000,
540 ScriptLanguage::Python,
541 None,
542 )
543 .build();
544 monitors.clear();
545 monitors.insert("test_monitor_wrong_ext".to_string(), monitor_wrong_ext);
546
547 let err =
548 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
549 &monitors, &triggers, &networks,
550 )
551 .unwrap_err();
552 assert!(err.to_string().contains(
553 "Monitor 'test_monitor_wrong_ext' has a custom filter script with invalid extension - \
554 must be .py for Python language"
555 ));
556
557 let monitor_zero_timeout = MonitorBuilder::new()
559 .name("test_monitor_zero_timeout")
560 .trigger_condition(
561 script_path.to_str().unwrap(),
562 0,
563 ScriptLanguage::Python,
564 None,
565 )
566 .build();
567 monitors.clear();
568 monitors.insert(
569 "test_monitor_zero_timeout".to_string(),
570 monitor_zero_timeout,
571 );
572
573 let err =
574 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
575 &monitors, &triggers, &networks,
576 )
577 .unwrap_err();
578 assert!(err.to_string().contains("timeout_ms greater than 0"));
579 }
580
581 #[tokio::test]
582 async fn test_load_error_messages() {
583 let invalid_path = Path::new("/non/existent/path");
585 let result = MonitorRepository::<NetworkRepository, TriggerRepository>::load_all(
586 Some(invalid_path),
587 None,
588 None,
589 )
590 .await;
591
592 assert!(result.is_err());
593 let err = result.unwrap_err();
594 match err {
595 RepositoryError::LoadError(message) => {
596 assert!(message.to_string().contains("Failed to load monitors"));
597 }
598 _ => panic!("Expected RepositoryError::LoadError"),
599 }
600 }
601
602 #[test]
603 fn test_network_validation_error() {
604 let mut monitors = HashMap::new();
606 let monitor = MonitorBuilder::new()
607 .name("test_monitor")
608 .networks(vec!["non_existent_network".to_string()])
609 .build();
610 monitors.insert("test_monitor".to_string(), monitor);
611
612 let networks = HashMap::new();
614 let triggers = HashMap::new();
615
616 let result =
618 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
619 &monitors, &triggers, &networks,
620 );
621
622 assert!(result.is_err());
623 let err = result.unwrap_err();
624 assert!(err.to_string().contains("references non-existent network"));
625 }
626
627 #[test]
628 fn test_trigger_validation_error() {
629 let mut monitors = HashMap::new();
631 let monitor = MonitorBuilder::new()
632 .name("test_monitor")
633 .triggers(vec!["non_existent_trigger".to_string()])
634 .build();
635 monitors.insert("test_monitor".to_string(), monitor);
636
637 let networks = HashMap::new();
639 let triggers = HashMap::new();
640
641 let result =
643 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
644 &monitors, &triggers, &networks,
645 );
646
647 assert!(result.is_err());
648 let err = result.unwrap_err();
649 assert!(err.to_string().contains("references non-existent trigger"));
650 }
651
652 #[tokio::test]
653 async fn test_load_from_path_error_handling() {
654 let temp_dir = TempDir::new().unwrap();
656 let invalid_path = temp_dir.path().join("non_existent_monitor.json");
657
658 let repository =
660 MonitorRepository::<NetworkRepository, TriggerRepository>::new_with_monitors(
661 HashMap::new(),
662 );
663
664 let result = repository
666 .load_from_path(Some(&invalid_path), None, None)
667 .await;
668
669 assert!(result.is_err());
671 let err = result.unwrap_err();
672 match err {
673 RepositoryError::LoadError(message) => {
674 assert!(message.to_string().contains("Failed to load monitors"));
675 assert!(message
677 .to_string()
678 .contains(&invalid_path.display().to_string()));
679 }
680 _ => panic!("Expected RepositoryError::LoadError"),
681 }
682 }
683
684 #[test]
685 fn test_signature_validation_with_network_types() {
686 use crate::models::{BlockChainType, EventCondition, FunctionCondition, MatchConditions};
687 use crate::utils::tests::builders::network::NetworkBuilder;
688
689 let mut networks = HashMap::new();
691
692 networks.insert(
694 "ethereum_mainnet".to_string(),
695 NetworkBuilder::new()
696 .name("Ethereum Mainnet")
697 .slug("ethereum_mainnet")
698 .network_type(BlockChainType::EVM)
699 .chain_id(1)
700 .build(),
701 );
702
703 networks.insert(
705 "mainnet_beta".to_string(),
706 NetworkBuilder::new()
707 .name("Solana Mainnet Beta")
708 .slug("mainnet_beta")
709 .network_type(BlockChainType::Solana)
710 .build(),
711 );
712
713 networks.insert(
715 "solana_devnet".to_string(),
716 NetworkBuilder::new()
717 .name("Solana Devnet")
718 .slug("solana_devnet")
719 .network_type(BlockChainType::Solana)
720 .build(),
721 );
722
723 let triggers = HashMap::new();
724 let mut monitors = HashMap::new();
725
726 let evm_monitor_invalid = MonitorBuilder::new()
728 .name("evm_monitor_invalid")
729 .networks(vec!["ethereum_mainnet".to_string()])
730 .match_conditions(MatchConditions {
731 functions: vec![FunctionCondition {
732 signature: "transfer".to_string(), expression: None,
734 }],
735 events: vec![EventCondition {
736 signature: "Transfer".to_string(), expression: None,
738 }],
739 transactions: vec![],
740 })
741 .build();
742 monitors.insert("evm_monitor_invalid".to_string(), evm_monitor_invalid);
743
744 let result =
745 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
746 &monitors, &triggers, &networks,
747 );
748
749 assert!(result.is_err());
750 let err = result.unwrap_err();
751 assert!(err
752 .to_string()
753 .contains("invalid function signature 'transfer' for EVM network"));
754 assert!(err
755 .to_string()
756 .contains("invalid event signature 'Transfer' for EVM network"));
757
758 monitors.clear();
760 let solana_monitor_valid = MonitorBuilder::new()
761 .name("solana_monitor_valid")
762 .networks(vec!["mainnet_beta".to_string()]) .match_conditions(MatchConditions {
764 functions: vec![FunctionCondition {
765 signature: "transfer".to_string(), expression: None,
767 }],
768 events: vec![EventCondition {
769 signature: "TransferEvent".to_string(), expression: None,
771 }],
772 transactions: vec![],
773 })
774 .build();
775 monitors.insert("solana_monitor_valid".to_string(), solana_monitor_valid);
776
777 let result =
778 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
779 &monitors, &triggers, &networks,
780 );
781
782 assert!(result.is_ok());
784
785 monitors.clear();
787 let evm_monitor_valid = MonitorBuilder::new()
788 .name("evm_monitor_valid")
789 .networks(vec!["ethereum_mainnet".to_string()])
790 .match_conditions(MatchConditions {
791 functions: vec![FunctionCondition {
792 signature: "transfer(address,uint256)".to_string(), expression: None,
794 }],
795 events: vec![EventCondition {
796 signature: "Transfer(address,address,uint256)".to_string(), expression: None,
798 }],
799 transactions: vec![],
800 })
801 .build();
802 monitors.insert("evm_monitor_valid".to_string(), evm_monitor_valid);
803
804 let result =
805 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
806 &monitors, &triggers, &networks,
807 );
808
809 assert!(result.is_ok());
811
812 monitors.clear();
814 let mixed_monitor = MonitorBuilder::new()
815 .name("mixed_monitor")
816 .networks(vec![
817 "ethereum_mainnet".to_string(),
818 "mainnet_beta".to_string(),
819 ])
820 .match_conditions(MatchConditions {
821 functions: vec![FunctionCondition {
822 signature: "transfer".to_string(), expression: None,
824 }],
825 events: vec![],
826 transactions: vec![],
827 })
828 .build();
829 monitors.insert("mixed_monitor".to_string(), mixed_monitor);
830
831 let result =
832 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
833 &monitors, &triggers, &networks,
834 );
835
836 assert!(result.is_err());
838 let err = result.unwrap_err();
839 assert!(err
840 .to_string()
841 .contains("invalid function signature 'transfer' for EVM network 'ethereum_mainnet'"));
842 }
843}