1use futures::future::BoxFuture;
19use std::{collections::HashMap, error::Error, sync::Arc};
20use tokio::sync::{watch, Mutex};
21
22use crate::{
23 models::{
24 BlockChainType, BlockType, ContractSpec, Monitor, MonitorMatch, Network, ProcessedBlock,
25 ScriptLanguage, TriggerConditions,
26 },
27 repositories::{
28 MonitorRepositoryTrait, MonitorService, NetworkRepositoryTrait, NetworkService,
29 TriggerRepositoryTrait, TriggerService,
30 },
31 services::{
32 blockchain::{BlockChainClient, BlockFilterFactory, ClientPoolTrait},
33 filter::{evm_helpers, handle_match, stellar_helpers, FilterService},
34 notification::NotificationService,
35 trigger::{
36 ScriptError, ScriptExecutorFactory, TriggerError, TriggerExecutionService,
37 TriggerExecutionServiceTrait,
38 },
39 },
40 utils::normalize_string,
41};
42
43pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
45
46type ServiceResult<M, N, T> = Result<(
47 Arc<FilterService>,
48 Arc<TriggerExecutionService<T>>,
49 Vec<Monitor>,
50 HashMap<String, Network>,
51 Arc<Mutex<MonitorService<M, N, T>>>,
52 Arc<Mutex<NetworkService<N>>>,
53 Arc<Mutex<TriggerService<T>>>,
54)>;
55
56pub async fn initialize_services<M, N, T>(
70 monitor_service: Option<MonitorService<M, N, T>>,
71 network_service: Option<NetworkService<N>>,
72 trigger_service: Option<TriggerService<T>>,
73) -> ServiceResult<M, N, T>
74where
75 M: MonitorRepositoryTrait<N, T> + Send + Sync + 'static,
76 N: NetworkRepositoryTrait + Send + Sync + 'static,
77 T: TriggerRepositoryTrait + Send + Sync + 'static,
78{
79 let network_service = match network_service {
80 Some(service) => service,
81 None => {
82 let repository = N::new(None).await?;
83 NetworkService::<N>::new_with_repository(repository)?
84 }
85 };
86
87 let trigger_service = match trigger_service {
88 Some(service) => service,
89 None => {
90 let repository = T::new(None).await?;
91 TriggerService::<T>::new_with_repository(repository)?
92 }
93 };
94
95 let monitor_service = match monitor_service {
96 Some(service) => service,
97 None => {
98 let repository = M::new(
99 None,
100 Some(network_service.clone()),
101 Some(trigger_service.clone()),
102 )
103 .await?;
104 MonitorService::<M, N, T>::new_with_repository(repository)?
105 }
106 };
107
108 let notification_service = NotificationService::new();
109
110 let filter_service = Arc::new(FilterService::new());
111 let trigger_execution_service = Arc::new(TriggerExecutionService::new(
112 trigger_service.clone(),
113 notification_service,
114 ));
115
116 let monitors = monitor_service.get_all();
117 let active_monitors = filter_active_monitors(monitors);
118 let networks = network_service.get_all();
119
120 Ok((
121 filter_service,
122 trigger_execution_service,
123 active_monitors,
124 networks,
125 Arc::new(Mutex::new(monitor_service)),
126 Arc::new(Mutex::new(network_service)),
127 Arc::new(Mutex::new(trigger_service)),
128 ))
129}
130
131pub fn create_block_handler<P: ClientPoolTrait + 'static>(
142 shutdown_tx: watch::Sender<bool>,
143 filter_service: Arc<FilterService>,
144 active_monitors: Vec<Monitor>,
145 client_pools: Arc<P>,
146 contract_specs: Vec<(String, ContractSpec)>,
147) -> Arc<impl Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync> {
148 Arc::new(
149 move |block: BlockType, network: Network| -> BoxFuture<'static, ProcessedBlock> {
150 let filter_service = filter_service.clone();
151 let active_monitors = active_monitors.clone();
152 let client_pools = client_pools.clone();
153 let shutdown_tx = shutdown_tx.clone();
154 let contract_specs = contract_specs.clone();
155 Box::pin(async move {
156 let applicable_monitors = filter_network_monitors(&active_monitors, &network.slug);
157
158 let mut processed_block = ProcessedBlock {
159 block_number: block.number().unwrap_or(0),
160 network_slug: network.slug.clone(),
161 processing_results: Vec::new(),
162 };
163
164 if !applicable_monitors.is_empty() {
165 let mut shutdown_rx = shutdown_tx.subscribe();
166
167 let matches = match network.network_type {
168 BlockChainType::EVM => match client_pools.get_evm_client(&network).await {
169 Ok(client) => {
170 process_block(
171 client.as_ref(),
172 &network,
173 &block,
174 &applicable_monitors,
175 Some(&contract_specs),
176 &filter_service,
177 &mut shutdown_rx,
178 )
179 .await
180 }
181 Err(_) => None,
182 },
183 BlockChainType::Stellar => {
184 match client_pools.get_stellar_client(&network).await {
185 Ok(client) => {
186 process_block(
187 client.as_ref(),
188 &network,
189 &block,
190 &applicable_monitors,
191 Some(&contract_specs),
192 &filter_service,
193 &mut shutdown_rx,
194 )
195 .await
196 }
197 Err(_) => None,
198 }
199 }
200 BlockChainType::Midnight => {
201 match client_pools.get_midnight_client(&network).await {
202 Ok(client) => {
203 process_block(
204 client.as_ref(),
205 &network,
206 &block,
207 &applicable_monitors,
208 Some(&contract_specs),
209 &filter_service,
210 &mut shutdown_rx,
211 )
212 .await
213 }
214 Err(_) => None,
215 }
216 }
217 BlockChainType::Solana => {
218 match client_pools.get_solana_client(&network).await {
219 Ok(client) => {
220 process_block(
221 client.as_ref(),
222 &network,
223 &block,
224 &applicable_monitors,
225 Some(&contract_specs),
226 &filter_service,
227 &mut shutdown_rx,
228 )
229 .await
230 }
231 Err(e) => {
232 tracing::error!(error = %e, "Failed to get Solana client");
233 None
234 }
235 }
236 }
237 };
238
239 processed_block.processing_results = matches.unwrap_or_default();
240 }
241
242 processed_block
243 })
244 },
245 )
246}
247
248pub async fn process_block<T>(
258 client: &T,
259 network: &Network,
260 block: &BlockType,
261 applicable_monitors: &[Monitor],
262 contract_specs: Option<&[(String, ContractSpec)]>,
263 filter_service: &FilterService,
264 shutdown_rx: &mut watch::Receiver<bool>,
265) -> Option<Vec<MonitorMatch>>
266where
267 T: BlockChainClient + BlockFilterFactory<T>,
268{
269 tokio::select! {
270 result = filter_service.filter_block(client, network, block, applicable_monitors, contract_specs) => {
271 result.ok()
272 }
273 _ = shutdown_rx.changed() => {
274 tracing::info!("Shutting down block processing task");
275 None
276 }
277 }
278}
279
280pub async fn get_contract_specs<P: ClientPoolTrait + 'static>(
289 client_pool: &Arc<P>,
290 network_monitors: &[(Network, Vec<Monitor>)],
291) -> Vec<(String, ContractSpec)> {
292 let mut all_specs = Vec::new();
293
294 for (network, monitors) in network_monitors {
295 for monitor in monitors {
296 let specs = match network.network_type {
297 BlockChainType::Stellar => {
298 let mut contract_specs = Vec::new();
299 let mut addresses_without_specs = Vec::new();
300 for monitored_addr in &monitor.addresses {
302 if let Some(spec) = &monitored_addr.contract_spec {
303 let parsed_spec = match spec {
304 ContractSpec::Stellar(spec) => spec,
305 _ => {
306 tracing::warn!(
307 "Skipping non-Stellar contract spec for address {}",
308 monitored_addr.address
309 );
310 continue;
311 }
312 };
313
314 contract_specs.push((
315 stellar_helpers::normalize_address(&monitored_addr.address),
316 ContractSpec::Stellar(parsed_spec.clone()),
317 ))
318 } else {
319 addresses_without_specs.push(monitored_addr.address.clone());
320 }
321 }
322
323 if !addresses_without_specs.is_empty() {
325 let client: Arc<P::StellarClient> =
327 match client_pool.get_stellar_client(network).await {
328 Ok(client) => client,
329 Err(_) => {
330 tracing::warn!("Failed to get stellar client");
331 continue;
332 }
333 };
334
335 let chain_specs = futures::future::join_all(
336 addresses_without_specs.iter().map(|address| {
337 let client = client.clone();
338 async move {
339 let spec = client.get_contract_spec(address).await;
340 (address.clone(), spec)
341 }
342 }),
343 )
344 .await
345 .into_iter()
346 .filter_map(|(addr, spec)| match spec {
347 Ok(s) => Some((addr, s)),
348 Err(e) => {
349 tracing::warn!(
350 "Failed to fetch contract spec for address {}: {:?}",
351 addr,
352 e
353 );
354 None
355 }
356 })
357 .collect::<Vec<_>>();
358
359 contract_specs.extend(chain_specs);
360 }
361 contract_specs
362 }
363 BlockChainType::EVM => {
364 let mut contract_specs = Vec::new();
365 for monitored_addr in &monitor.addresses {
367 if let Some(spec) = &monitored_addr.contract_spec {
368 let parsed_spec = match spec {
369 ContractSpec::EVM(spec) => spec,
370 _ => {
371 tracing::warn!(
372 "Skipping non-EVM contract spec for address {}",
373 monitored_addr.address
374 );
375 continue;
376 }
377 };
378
379 contract_specs.push((
380 format!(
381 "0x{}",
382 evm_helpers::normalize_address(&monitored_addr.address)
383 ),
384 ContractSpec::EVM(parsed_spec.clone()),
385 ))
386 }
387 }
388 contract_specs
389 }
390 _ => {
391 vec![]
392 }
393 };
394 all_specs.extend(specs);
395 }
396 }
397 all_specs
398}
399
400pub fn create_trigger_handler<S: TriggerExecutionServiceTrait + Send + Sync + 'static>(
410 shutdown_tx: watch::Sender<bool>,
411 trigger_service: Arc<S>,
412 active_monitors_trigger_scripts: HashMap<String, (ScriptLanguage, String)>,
413) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync> {
414 Arc::new(move |block: &ProcessedBlock| {
415 let mut shutdown_rx = shutdown_tx.subscribe();
416 let trigger_service = trigger_service.clone();
417 let trigger_scripts = active_monitors_trigger_scripts.clone();
418 let block = block.clone();
419
420 tokio::spawn(async move {
421 tokio::select! {
422 _ = async {
423 if block.processing_results.is_empty() {
424 return;
425 }
426 let filtered_matches = run_trigger_filters(&block.processing_results, &block.network_slug, &trigger_scripts).await;
427 for monitor_match in &filtered_matches {
428 if let Err(e) = handle_match(monitor_match.clone(), &*trigger_service, &trigger_scripts).await {
429 TriggerError::execution_error(e.to_string(), Some(e.into()), None);
430 }
431 }
432 } => {}
433 _ = shutdown_rx.changed() => {
434 tracing::info!("Shutting down trigger handling task");
435 }
436 }
437 })
438 })
439}
440
441pub fn has_active_monitors(monitors: &[Monitor], network_slug: &String) -> bool {
450 monitors
451 .iter()
452 .any(|m| m.networks.contains(network_slug) && !m.paused)
453}
454
455fn filter_active_monitors(monitors: HashMap<String, Monitor>) -> Vec<Monitor> {
463 monitors
464 .into_values()
465 .filter(|m| !m.paused)
466 .collect::<Vec<_>>()
467}
468
469fn filter_network_monitors(monitors: &[Monitor], network_slug: &String) -> Vec<Monitor> {
478 monitors
479 .iter()
480 .filter(|m| m.networks.contains(network_slug))
481 .cloned()
482 .collect()
483}
484
485async fn execute_trigger_condition(
486 trigger_condition: &TriggerConditions,
487 monitor_match: &MonitorMatch,
488 script_content: &(ScriptLanguage, String),
489) -> bool {
490 let executor = ScriptExecutorFactory::create(&script_content.0, &script_content.1);
491
492 let result = executor
493 .execute(
494 monitor_match.clone(),
495 &trigger_condition.timeout_ms,
496 trigger_condition.arguments.as_deref(),
497 false,
498 )
499 .await;
500
501 match result {
502 Ok(true) => true,
503 Err(e) => {
504 ScriptError::execution_error(e.to_string(), None, None);
505 false
506 }
507 _ => false,
508 }
509}
510
511async fn run_trigger_filters(
512 matches: &[MonitorMatch],
513 _network: &str,
514 trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
515) -> Vec<MonitorMatch> {
516 let mut filtered_matches = vec![];
517
518 for monitor_match in matches {
519 let mut is_filtered = false;
520 let trigger_conditions = match monitor_match {
521 MonitorMatch::EVM(evm_match) => &evm_match.monitor.trigger_conditions,
522 MonitorMatch::Stellar(stellar_match) => &stellar_match.monitor.trigger_conditions,
523 MonitorMatch::Midnight(midnight_match) => &midnight_match.monitor.trigger_conditions,
524 MonitorMatch::Solana(solana_match) => &solana_match.monitor.trigger_conditions,
525 };
526
527 for trigger_condition in trigger_conditions {
528 let monitor_name = match monitor_match {
529 MonitorMatch::EVM(evm_match) => evm_match.monitor.name.clone(),
530 MonitorMatch::Stellar(stellar_match) => stellar_match.monitor.name.clone(),
531 MonitorMatch::Midnight(midnight_match) => midnight_match.monitor.name.clone(),
532 MonitorMatch::Solana(solana_match) => solana_match.monitor.name.clone(),
533 };
534
535 let script_content = trigger_scripts
536 .get(&format!(
537 "{}|{}",
538 normalize_string(&monitor_name),
539 trigger_condition.script_path
540 ))
541 .ok_or_else(|| {
542 ScriptError::execution_error("Script content not found".to_string(), None, None)
543 });
544 if let Ok(script_content) = script_content {
545 if execute_trigger_condition(trigger_condition, monitor_match, script_content).await
546 {
547 is_filtered = true;
548 break;
549 }
550 }
551 }
552 if !is_filtered {
553 filtered_matches.push(monitor_match.clone());
554 }
555 }
556
557 filtered_matches
558}
559
560#[cfg(test)]
561mod tests {
562 use super::*;
563 use crate::{
564 models::{
565 EVMMonitorMatch, EVMReceiptLog, EVMTransaction, EVMTransactionReceipt, MatchConditions,
566 Monitor, MonitorMatch, ScriptLanguage, SolanaBlock, SolanaMonitorMatch,
567 SolanaTransaction, SolanaTransactionInfo, StellarBlock, StellarMonitorMatch,
568 StellarTransaction, StellarTransactionInfo, TriggerConditions,
569 },
570 utils::tests::{builders::evm::monitor::MonitorBuilder, evm::receipt::ReceiptBuilder},
571 };
572 use alloy::{
573 consensus::{transaction::Recovered, Signed, TxEnvelope},
574 primitives::{Address, Bytes, TxKind, B256, U256},
575 };
576 use std::io::Write;
577 use tempfile::NamedTempFile;
578
579 fn create_temp_script(content: &str) -> NamedTempFile {
581 let mut file = NamedTempFile::new().unwrap();
582 file.write_all(content.as_bytes()).unwrap();
583 file
584 }
585 fn create_test_monitor(
586 name: &str,
587 networks: Vec<&str>,
588 paused: bool,
589 script_path: Option<&str>,
590 ) -> Monitor {
591 let mut builder = MonitorBuilder::new()
592 .name(name)
593 .networks(networks.into_iter().map(|s| s.to_string()).collect())
594 .paused(paused);
595
596 if let Some(path) = script_path {
597 builder = builder.trigger_condition(path, 1000, ScriptLanguage::Python, None);
598 }
599
600 builder.build()
601 }
602
603 fn create_test_evm_transaction_receipt() -> EVMTransactionReceipt {
604 ReceiptBuilder::new().build()
605 }
606
607 fn create_test_evm_logs() -> Vec<EVMReceiptLog> {
608 ReceiptBuilder::new().build().logs.clone()
609 }
610
611 fn create_test_evm_transaction() -> EVMTransaction {
612 let tx = alloy::consensus::TxLegacy {
613 chain_id: None,
614 nonce: 0,
615 gas_price: 0,
616 gas_limit: 0,
617 to: TxKind::Call(Address::ZERO),
618 value: U256::ZERO,
619 input: Bytes::default(),
620 };
621
622 let signature =
623 alloy::signers::Signature::from_scalars_and_parity(B256::ZERO, B256::ZERO, false);
624
625 let hash = B256::ZERO;
626
627 EVMTransaction::from(alloy::rpc::types::Transaction {
628 inner: Recovered::new_unchecked(
629 TxEnvelope::Legacy(Signed::new_unchecked(tx, signature, hash)),
630 Address::ZERO,
631 ),
632 block_hash: None,
633 block_number: None,
634 transaction_index: None,
635 effective_gas_price: None,
636 })
637 }
638
639 fn create_test_stellar_transaction() -> StellarTransaction {
640 StellarTransaction::from({
641 StellarTransactionInfo {
642 ..Default::default()
643 }
644 })
645 }
646
647 fn create_test_stellar_block() -> StellarBlock {
648 StellarBlock::default()
649 }
650
651 fn create_test_solana_transaction() -> SolanaTransaction {
652 SolanaTransaction::from(SolanaTransactionInfo {
653 signature: "5wHu1qwD7q5ifaN5nwdcDqNFF53GJqa7nLp2BLPASe7FPYoWZL3YBrJmVL6nrMtwKjNFin1F"
654 .to_string(),
655 slot: 123456789,
656 block_time: Some(1234567890),
657 transaction: Default::default(),
658 meta: None,
659 })
660 }
661
662 fn create_test_solana_block() -> SolanaBlock {
663 SolanaBlock::default()
664 }
665
666 fn create_mock_monitor_match_from_path(
667 blockchain_type: BlockChainType,
668 script_path: Option<&str>,
669 ) -> MonitorMatch {
670 match blockchain_type {
671 BlockChainType::EVM => MonitorMatch::EVM(Box::new(EVMMonitorMatch {
672 monitor: create_test_monitor("test", vec![], false, script_path),
673 transaction: create_test_evm_transaction(),
674 receipt: Some(create_test_evm_transaction_receipt()),
675 logs: Some(create_test_evm_logs()),
676 network_slug: "ethereum_mainnet".to_string(),
677 matched_on: MatchConditions {
678 functions: vec![],
679 events: vec![],
680 transactions: vec![],
681 },
682 matched_on_args: None,
683 })),
684 BlockChainType::Stellar => MonitorMatch::Stellar(Box::new(StellarMonitorMatch {
685 monitor: create_test_monitor("test", vec![], false, script_path),
686 transaction: create_test_stellar_transaction(),
687 ledger: create_test_stellar_block(),
688 network_slug: "stellar_mainnet".to_string(),
689 matched_on: MatchConditions {
690 functions: vec![],
691 events: vec![],
692 transactions: vec![],
693 },
694 matched_on_args: None,
695 })),
696 BlockChainType::Solana => MonitorMatch::Solana(Box::new(SolanaMonitorMatch {
697 monitor: create_test_monitor("test", vec![], false, script_path),
698 transaction: create_test_solana_transaction(),
699 block: create_test_solana_block(),
700 network_slug: "solana_mainnet".to_string(),
701 matched_on: MatchConditions {
702 functions: vec![],
703 events: vec![],
704 transactions: vec![],
705 },
706 matched_on_args: None,
707 })),
708 BlockChainType::Midnight => unimplemented!(),
709 }
710 }
711
712 fn create_mock_monitor_match_from_monitor(
713 blockchain_type: BlockChainType,
714 monitor: Monitor,
715 ) -> MonitorMatch {
716 match blockchain_type {
717 BlockChainType::EVM => MonitorMatch::EVM(Box::new(EVMMonitorMatch {
718 monitor,
719 transaction: create_test_evm_transaction(),
720 receipt: Some(create_test_evm_transaction_receipt()),
721 logs: Some(create_test_evm_logs()),
722 network_slug: "ethereum_mainnet".to_string(),
723 matched_on: MatchConditions {
724 functions: vec![],
725 events: vec![],
726 transactions: vec![],
727 },
728 matched_on_args: None,
729 })),
730 BlockChainType::Stellar => MonitorMatch::Stellar(Box::new(StellarMonitorMatch {
731 monitor,
732 transaction: create_test_stellar_transaction(),
733 ledger: create_test_stellar_block(),
734 network_slug: "stellar_mainnet".to_string(),
735 matched_on: MatchConditions {
736 functions: vec![],
737 events: vec![],
738 transactions: vec![],
739 },
740 matched_on_args: None,
741 })),
742 BlockChainType::Solana => MonitorMatch::Solana(Box::new(SolanaMonitorMatch {
743 monitor,
744 transaction: create_test_solana_transaction(),
745 block: create_test_solana_block(),
746 network_slug: "solana_mainnet".to_string(),
747 matched_on: MatchConditions {
748 functions: vec![],
749 events: vec![],
750 transactions: vec![],
751 },
752 matched_on_args: None,
753 })),
754 BlockChainType::Midnight => unimplemented!(),
755 }
756 }
757
758 fn matches_equal(a: &MonitorMatch, b: &MonitorMatch) -> bool {
759 match (a, b) {
760 (MonitorMatch::EVM(a), MonitorMatch::EVM(b)) => a.monitor.name == b.monitor.name,
761 (MonitorMatch::Stellar(a), MonitorMatch::Stellar(b)) => {
762 a.monitor.name == b.monitor.name
763 }
764 (MonitorMatch::Solana(a), MonitorMatch::Solana(b)) => a.monitor.name == b.monitor.name,
765 _ => false,
766 }
767 }
768
769 #[test]
770 fn test_has_active_monitors() {
771 let monitors = vec![
772 create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
773 create_test_monitor("2", vec!["ethereum_sepolia"], false, None),
774 create_test_monitor(
775 "3",
776 vec!["ethereum_mainnet", "ethereum_sepolia"],
777 false,
778 None,
779 ),
780 create_test_monitor("4", vec!["stellar_mainnet"], true, None),
781 ];
782
783 assert!(has_active_monitors(
784 &monitors,
785 &"ethereum_mainnet".to_string()
786 ));
787 assert!(has_active_monitors(
788 &monitors,
789 &"ethereum_sepolia".to_string()
790 ));
791 assert!(!has_active_monitors(
792 &monitors,
793 &"midnight_mainnet".to_string()
794 ));
795 assert!(!has_active_monitors(
796 &monitors,
797 &"stellar_mainnet".to_string()
798 ));
799 }
800
801 #[test]
802 fn test_filter_active_monitors() {
803 let mut monitors = HashMap::new();
804 monitors.insert(
805 "1".to_string(),
806 create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
807 );
808 monitors.insert(
809 "2".to_string(),
810 create_test_monitor("2", vec!["stellar_mainnet"], true, None),
811 );
812 monitors.insert(
813 "3".to_string(),
814 create_test_monitor("3", vec!["ethereum_mainnet"], false, None),
815 );
816
817 let active_monitors = filter_active_monitors(monitors);
818 assert_eq!(active_monitors.len(), 2);
819 assert!(active_monitors.iter().all(|m| !m.paused));
820 }
821
822 #[test]
823 fn test_filter_network_monitors() {
824 let monitors = vec![
825 create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
826 create_test_monitor("2", vec!["stellar_mainnet"], true, None),
827 create_test_monitor(
828 "3",
829 vec!["ethereum_mainnet", "stellar_mainnet"],
830 false,
831 None,
832 ),
833 ];
834
835 let eth_monitors = filter_network_monitors(&monitors, &"ethereum_mainnet".to_string());
836 assert_eq!(eth_monitors.len(), 2);
837 assert!(eth_monitors
838 .iter()
839 .all(|m| m.networks.contains(&"ethereum_mainnet".to_string())));
840
841 let stellar_monitors = filter_network_monitors(&monitors, &"stellar_mainnet".to_string());
842 assert_eq!(stellar_monitors.len(), 2);
843 assert!(stellar_monitors
844 .iter()
845 .all(|m| m.networks.contains(&"stellar_mainnet".to_string())));
846
847 let midnight_monitors = filter_network_monitors(&monitors, &"midnight_mainnet".to_string());
848 assert!(midnight_monitors.is_empty());
849 }
850
851 #[tokio::test]
852 async fn test_run_trigger_filters_empty_matches() {
853 let matches: Vec<MonitorMatch> = vec![];
855
856 let mut trigger_scripts = HashMap::new();
858 trigger_scripts.insert(
859 "monitor_test-test.py".to_string(), (
861 ScriptLanguage::Python,
862 r#"
863import sys
864import json
865
866input_data = sys.stdin.read()
867data = json.loads(input_data)
868print(False)
869 "#
870 .to_string(),
871 ),
872 );
873
874 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
876 assert!(filtered.is_empty());
877 }
878
879 #[tokio::test]
880 async fn test_run_trigger_filters_true_condition() {
881 let script_content = r#"
882import sys
883import json
884
885input_json = sys.argv[1]
886data = json.loads(input_json)
887print("debugging...")
888def test():
889 return True
890result = test()
891print(result)
892"#;
893 let temp_file = create_temp_script(script_content);
894 let mut trigger_scripts = HashMap::new();
895 trigger_scripts.insert(
896 format!("test-{}", temp_file.path().to_str().unwrap()),
897 (ScriptLanguage::Python, script_content.to_string()),
898 );
899 let match_item = create_mock_monitor_match_from_path(
900 BlockChainType::EVM,
901 Some(temp_file.path().to_str().unwrap()),
902 );
903 let matches = vec![match_item.clone()];
904
905 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
906 assert_eq!(filtered.len(), 1);
907 assert!(matches_equal(&filtered[0], &match_item));
908 }
909
910 #[tokio::test]
911 async fn test_run_trigger_filters_false_condition() {
912 let script_content = r#"
913import sys
914import json
915
916input_data = sys.stdin.read()
917data = json.loads(input_data)
918print("debugging...")
919def test():
920 return False
921result = test()
922print(result)
923"#;
924 let temp_file = create_temp_script(script_content);
925 let mut trigger_scripts = HashMap::new();
926 trigger_scripts.insert(
927 format!("test-{}", temp_file.path().to_str().unwrap()),
928 (ScriptLanguage::Python, script_content.to_string()),
929 );
930 let match_item = create_mock_monitor_match_from_path(
931 BlockChainType::EVM,
932 Some(temp_file.path().to_str().unwrap()),
933 );
934 let matches = vec![match_item.clone()];
935
936 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
937 assert_eq!(filtered.len(), 1);
938 }
939
940 #[tokio::test]
941 async fn test_execute_trigger_condition_returns_false() {
942 let script_content = r#"print(False) # Script returns false"#;
943 let temp_file = create_temp_script(script_content);
944 let trigger_condition = TriggerConditions {
945 language: ScriptLanguage::Python,
946 script_path: temp_file.path().to_str().unwrap().to_string(),
947 timeout_ms: 1000,
948 arguments: None,
949 };
950 let match_item = create_mock_monitor_match_from_path(
951 BlockChainType::EVM,
952 Some(temp_file.path().to_str().unwrap()),
953 );
954 let script_content = (ScriptLanguage::Python, script_content.to_string());
955
956 let result =
957 execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
958 assert!(!result); }
960
961 #[tokio::test]
962 async fn test_execute_trigger_condition_script_error() {
963 let script_content = r#"raise Exception("Test error") # Raise an error"#;
964 let temp_file = create_temp_script(script_content);
965 let trigger_condition = TriggerConditions {
966 language: ScriptLanguage::Python,
967 script_path: temp_file.path().to_str().unwrap().to_string(),
968 timeout_ms: 1000,
969 arguments: None,
970 };
971 let match_item = create_mock_monitor_match_from_path(
972 BlockChainType::EVM,
973 Some(temp_file.path().to_str().unwrap()),
974 );
975 let script_content = (ScriptLanguage::Python, script_content.to_string());
976
977 let result =
978 execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
979 assert!(!result); }
981
982 #[tokio::test]
983 async fn test_execute_trigger_condition_invalid_script() {
984 let trigger_condition = TriggerConditions {
985 language: ScriptLanguage::Python,
986 script_path: "non_existent_script.py".to_string(),
987 timeout_ms: 1000,
988 arguments: None,
989 };
990 let match_item = create_mock_monitor_match_from_path(
991 BlockChainType::EVM,
992 Some("non_existent_script.py"),
993 );
994 let script_content = (ScriptLanguage::Python, "invalid script content".to_string());
995
996 let result =
997 execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
998 assert!(!result); }
1000
1001 #[tokio::test]
1002 async fn test_run_trigger_filters_multiple_conditions_keep_match() {
1003 let monitor = MonitorBuilder::new()
1005 .name("monitor_test")
1006 .networks(vec!["ethereum_mainnet".to_string()])
1007 .trigger_condition("test1.py", 1000, ScriptLanguage::Python, None)
1008 .trigger_condition("test2.py", 1000, ScriptLanguage::Python, None)
1009 .build();
1010
1011 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1013
1014 let mut trigger_scripts = HashMap::new();
1015 trigger_scripts.insert(
1016 "monitor_test|test1.py".to_string(),
1017 (
1018 ScriptLanguage::Python,
1019 r#"
1020import sys
1021import json
1022
1023input_data = sys.stdin.read()
1024data = json.loads(input_data)
1025print(True)
1026"#
1027 .to_string(),
1028 ),
1029 );
1030 trigger_scripts.insert(
1031 "monitor_test|test2.py".to_string(),
1032 (
1033 ScriptLanguage::Python,
1034 r#"
1035import sys
1036import json
1037input_data = sys.stdin.read()
1038data = json.loads(input_data)
1039print(True)
1040"#
1041 .to_string(),
1042 ),
1043 );
1044
1045 let matches = vec![match_item.clone()];
1047 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1048
1049 assert_eq!(filtered.len(), 0);
1050 }
1051
1052 #[tokio::test]
1053 async fn test_run_trigger_filters_condition_two_combinations_exclude_match() {
1054 let monitor = MonitorBuilder::new()
1055 .name("monitor_test")
1056 .networks(vec!["ethereum_mainnet".to_string()])
1057 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1058 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1059 .build();
1060
1061 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1062
1063 let mut trigger_scripts = HashMap::new();
1065 trigger_scripts.insert(
1066 "monitor_test|condition1.py".to_string(),
1067 (ScriptLanguage::Python, "print(True)".to_string()),
1068 );
1069 trigger_scripts.insert(
1070 "monitor_test|condition2.py".to_string(),
1071 (ScriptLanguage::Python, "print(False)".to_string()),
1072 );
1073
1074 let matches = vec![match_item.clone()];
1075 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1076 assert_eq!(filtered.len(), 0);
1077 }
1078
1079 #[tokio::test]
1080 async fn test_run_trigger_filters_condition_two_combinations_keep_match() {
1081 let monitor = MonitorBuilder::new()
1082 .name("monitor_test")
1083 .networks(vec!["ethereum_mainnet".to_string()])
1084 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1085 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1086 .build();
1087
1088 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1089
1090 let mut trigger_scripts = HashMap::new();
1091 trigger_scripts.insert(
1092 "monitor_test|condition1.py".to_string(),
1093 (ScriptLanguage::Python, "print(False)".to_string()),
1094 );
1095 trigger_scripts.insert(
1096 "monitor_test|condition2.py".to_string(),
1097 (ScriptLanguage::Python, "print(False)".to_string()),
1098 );
1099
1100 let matches = vec![match_item.clone()];
1101 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1102 assert_eq!(filtered.len(), 1);
1103 }
1104
1105 #[tokio::test]
1106 async fn test_run_trigger_filters_condition_two_combinations_exclude_match_last_condition() {
1107 let monitor = MonitorBuilder::new()
1108 .name("monitor_test")
1109 .networks(vec!["ethereum_mainnet".to_string()])
1110 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1111 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1112 .build();
1113
1114 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1115
1116 let mut trigger_scripts = HashMap::new();
1117 trigger_scripts.insert(
1118 "monitor_test|condition1.py".to_string(),
1119 (ScriptLanguage::Python, "print(False)".to_string()),
1120 );
1121 trigger_scripts.insert(
1122 "monitor_test|condition2.py".to_string(),
1123 (ScriptLanguage::Python, "print(True)".to_string()),
1124 );
1125
1126 let matches = vec![match_item.clone()];
1127 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1128 assert_eq!(filtered.len(), 0);
1129 }
1130
1131 #[tokio::test]
1132 async fn test_run_trigger_filters_condition_three_combinations_exclude_match() {
1133 let monitor = MonitorBuilder::new()
1134 .name("monitor_test")
1135 .networks(vec!["ethereum_mainnet".to_string()])
1136 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1137 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1138 .trigger_condition("condition3.py", 1000, ScriptLanguage::Python, None)
1139 .build();
1140
1141 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1142
1143 let mut trigger_scripts = HashMap::new();
1144 trigger_scripts.insert(
1145 "monitor_test|condition1.py".to_string(),
1146 (ScriptLanguage::Python, "print(False)".to_string()),
1147 );
1148 trigger_scripts.insert(
1149 "monitor_test|condition2.py".to_string(),
1150 (ScriptLanguage::Python, "print(False)".to_string()),
1151 );
1152 trigger_scripts.insert(
1153 "monitor_test|condition3.py".to_string(),
1154 (ScriptLanguage::Python, "print(True)".to_string()),
1155 );
1156
1157 let matches = vec![match_item.clone()];
1158 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1159 assert_eq!(filtered.len(), 0);
1160 }
1161
1162 #[tokio::test]
1163 async fn test_run_trigger_filters_condition_three_combinations_keep_match() {
1164 let monitor = MonitorBuilder::new()
1165 .name("monitor_test")
1166 .networks(vec!["ethereum_mainnet".to_string()])
1167 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1168 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1169 .trigger_condition("condition3.py", 1000, ScriptLanguage::Python, None)
1170 .build();
1171
1172 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1173
1174 let mut trigger_scripts = HashMap::new();
1175 trigger_scripts.insert(
1176 "monitor_test|condition1.py".to_string(),
1177 (ScriptLanguage::Python, "print(False)".to_string()),
1178 );
1179 trigger_scripts.insert(
1180 "monitor_test|condition2.py".to_string(),
1181 (ScriptLanguage::Python, "print(False)".to_string()),
1182 );
1183 trigger_scripts.insert(
1184 "monitor_test|condition3.py".to_string(),
1185 (ScriptLanguage::Python, "print(False)".to_string()),
1186 );
1187
1188 let matches = vec![match_item.clone()];
1189 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1190 assert_eq!(filtered.len(), 1);
1191 }
1192
1193 #[tokio::test]
1195 async fn test_run_trigger_filters_stellar_empty_matches() {
1196 let matches: Vec<MonitorMatch> = vec![];
1197 let mut trigger_scripts = HashMap::new();
1198 trigger_scripts.insert(
1199 "monitor_test|test.py".to_string(),
1200 (
1201 ScriptLanguage::Python,
1202 r#"
1203import sys
1204import json
1205
1206input_data = sys.stdin.read()
1207data = json.loads(input_data)
1208print(False)
1209"#
1210 .to_string(),
1211 ),
1212 );
1213
1214 let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1215 assert!(filtered.is_empty());
1216 }
1217
1218 #[tokio::test]
1219 async fn test_run_trigger_filters_stellar_true_condition() {
1220 let script_content = r#"
1221import sys
1222import json
1223
1224input_json = sys.argv[1]
1225data = json.loads(input_json)
1226print("debugging...")
1227def test():
1228 return True
1229result = test()
1230print(result)
1231"#;
1232 let temp_file = create_temp_script(script_content);
1233 let mut trigger_scripts = HashMap::new();
1234 trigger_scripts.insert(
1235 format!("test|{}", temp_file.path().to_str().unwrap()),
1236 (ScriptLanguage::Python, script_content.to_string()),
1237 );
1238 let match_item = create_mock_monitor_match_from_path(
1239 BlockChainType::Stellar,
1240 Some(temp_file.path().to_str().unwrap()),
1241 );
1242 let matches = vec![match_item.clone()];
1243
1244 let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1245 assert_eq!(filtered.len(), 1);
1246 assert!(matches_equal(&filtered[0], &match_item));
1247 }
1248
1249 #[tokio::test]
1250 async fn test_run_trigger_filters_stellar_multiple_conditions() {
1251 let monitor = MonitorBuilder::new()
1252 .name("monitor_test")
1253 .networks(vec!["stellar_mainnet".to_string()])
1254 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1255 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1256 .build();
1257
1258 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::Stellar, monitor);
1259
1260 let mut trigger_scripts = HashMap::new();
1261 trigger_scripts.insert(
1262 "monitor_test|condition1.py".to_string(),
1263 (ScriptLanguage::Python, "print(False)".to_string()),
1264 );
1265 trigger_scripts.insert(
1266 "monitor_test|condition2.py".to_string(),
1267 (ScriptLanguage::Python, "print(True)".to_string()),
1268 );
1269
1270 let matches = vec![match_item.clone()];
1271 let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1272 assert_eq!(filtered.len(), 0); }
1274
1275 #[tokio::test]
1277 async fn test_run_trigger_filters_solana_empty_matches() {
1278 let matches: Vec<MonitorMatch> = vec![];
1279 let mut trigger_scripts = HashMap::new();
1280 trigger_scripts.insert(
1281 "monitor_test|test.py".to_string(),
1282 (
1283 ScriptLanguage::Python,
1284 r#"
1285import sys
1286import json
1287
1288input_data = sys.stdin.read()
1289data = json.loads(input_data)
1290print(False)
1291"#
1292 .to_string(),
1293 ),
1294 );
1295
1296 let filtered = run_trigger_filters(&matches, "solana_mainnet", &trigger_scripts).await;
1297 assert!(filtered.is_empty());
1298 }
1299
1300 #[tokio::test]
1301 async fn test_run_trigger_filters_solana_true_condition() {
1302 let script_content = r#"
1303import sys
1304import json
1305
1306input_json = sys.argv[1]
1307data = json.loads(input_json)
1308print("debugging...")
1309def test():
1310 return True
1311result = test()
1312print(result)
1313"#;
1314 let temp_file = create_temp_script(script_content);
1315 let mut trigger_scripts = HashMap::new();
1316 trigger_scripts.insert(
1317 format!("test|{}", temp_file.path().to_str().unwrap()),
1318 (ScriptLanguage::Python, script_content.to_string()),
1319 );
1320 let match_item = create_mock_monitor_match_from_path(
1321 BlockChainType::Solana,
1322 Some(temp_file.path().to_str().unwrap()),
1323 );
1324 let matches = vec![match_item.clone()];
1325
1326 let filtered = run_trigger_filters(&matches, "solana_mainnet", &trigger_scripts).await;
1327 assert_eq!(filtered.len(), 1);
1328 assert!(matches_equal(&filtered[0], &match_item));
1329 }
1330
1331 #[tokio::test]
1332 async fn test_run_trigger_filters_solana_multiple_conditions() {
1333 let monitor = MonitorBuilder::new()
1334 .name("monitor_test")
1335 .networks(vec!["solana_mainnet".to_string()])
1336 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1337 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1338 .build();
1339
1340 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::Solana, monitor);
1341
1342 let mut trigger_scripts = HashMap::new();
1343 trigger_scripts.insert(
1344 "monitor_test|condition1.py".to_string(),
1345 (ScriptLanguage::Python, "print(False)".to_string()),
1346 );
1347 trigger_scripts.insert(
1348 "monitor_test|condition2.py".to_string(),
1349 (ScriptLanguage::Python, "print(True)".to_string()),
1350 );
1351
1352 let matches = vec![match_item.clone()];
1353 let filtered = run_trigger_filters(&matches, "solana_mainnet", &trigger_scripts).await;
1354 assert_eq!(filtered.len(), 0); }
1356
1357 #[tokio::test]
1358 async fn test_run_trigger_filters_solana_false_condition() {
1359 let script_content = r#"
1360import sys
1361import json
1362
1363input_data = sys.stdin.read()
1364data = json.loads(input_data)
1365print("debugging...")
1366def test():
1367 return False
1368result = test()
1369print(result)
1370"#;
1371 let temp_file = create_temp_script(script_content);
1372 let mut trigger_scripts = HashMap::new();
1373 trigger_scripts.insert(
1374 format!("test|{}", temp_file.path().to_str().unwrap()),
1375 (ScriptLanguage::Python, script_content.to_string()),
1376 );
1377 let match_item = create_mock_monitor_match_from_path(
1378 BlockChainType::Solana,
1379 Some(temp_file.path().to_str().unwrap()),
1380 );
1381 let matches = vec![match_item.clone()];
1382
1383 let filtered = run_trigger_filters(&matches, "solana_mainnet", &trigger_scripts).await;
1384 assert_eq!(filtered.len(), 1);
1385 }
1386
1387 #[tokio::test]
1388 async fn test_run_trigger_filters_solana_multiple_conditions_keep_match() {
1389 let monitor = MonitorBuilder::new()
1390 .name("monitor_test")
1391 .networks(vec!["solana_mainnet".to_string()])
1392 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1393 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1394 .build();
1395
1396 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::Solana, monitor);
1397
1398 let mut trigger_scripts = HashMap::new();
1399 trigger_scripts.insert(
1400 "monitor_test|condition1.py".to_string(),
1401 (ScriptLanguage::Python, "print(False)".to_string()),
1402 );
1403 trigger_scripts.insert(
1404 "monitor_test|condition2.py".to_string(),
1405 (ScriptLanguage::Python, "print(False)".to_string()),
1406 );
1407
1408 let matches = vec![match_item.clone()];
1409 let filtered = run_trigger_filters(&matches, "solana_mainnet", &trigger_scripts).await;
1410 assert_eq!(filtered.len(), 1); }
1412}