aztec_pxe/sync/
contract_sync.rs

1//! Contract sync service for note discovery.
2//!
3//! Ports the TS `ContractSyncService` which ensures a contract's private state
4//! is synchronized at the PXE level. Runs `sync_state` utility function and
5//! syncs note nullifiers.
6
7use std::collections::HashSet;
8use std::future::Future;
9use std::sync::Arc;
10
11use aztec_core::error::Error;
12use aztec_core::types::AztecAddress;
13use aztec_node_client::AztecNode;
14use tokio::sync::RwLock;
15
16use crate::stores::NoteStore;
17use crate::sync::note_service::NoteService;
18
19/// Service that ensures contracts' private state is synced before execution.
20///
21/// Maintains a cache of already-synced contracts to avoid redundant sync
22/// operations within a transaction.
23pub struct ContractSyncService<N: AztecNode> {
24    node: Arc<N>,
25    note_store: Arc<NoteStore>,
26    /// Set of (contract, scope) pairs that have been synced.
27    synced: RwLock<HashSet<String>>,
28    /// Current anchor block hash — cache is cleared when this changes.
29    anchor_block_hash: RwLock<Option<String>>,
30}
31
32impl<N: AztecNode> ContractSyncService<N> {
33    pub fn new(node: Arc<N>, note_store: Arc<NoteStore>) -> Self {
34        Self {
35            node,
36            note_store,
37            synced: RwLock::new(HashSet::new()),
38            anchor_block_hash: RwLock::new(None),
39        }
40    }
41
42    /// Ensure a contract's private state is synchronized.
43    ///
44    /// If the contract has already been synced for the given scopes in the
45    /// current anchor block, this is a no-op.
46    pub async fn ensure_contract_synced<F, Fut>(
47        &self,
48        contract_address: &AztecAddress,
49        scopes: &[AztecAddress],
50        anchor_block_hash: &str,
51        utility_executor: F,
52    ) -> Result<(), Error>
53    where
54        F: Fn(AztecAddress, Vec<AztecAddress>) -> Fut,
55        Fut: Future<Output = Result<(), Error>>,
56    {
57        self.ensure_contract_synced_with(
58            contract_address,
59            scopes,
60            anchor_block_hash,
61            &utility_executor,
62        )
63        .await
64    }
65
66    pub async fn ensure_contract_synced_with<F, Fut>(
67        &self,
68        contract_address: &AztecAddress,
69        scopes: &[AztecAddress],
70        anchor_block_hash: &str,
71        utility_executor: &F,
72    ) -> Result<(), Error>
73    where
74        F: Fn(AztecAddress, Vec<AztecAddress>) -> Fut,
75        Fut: Future<Output = Result<(), Error>>,
76    {
77        // Check if anchor block changed — clear cache
78        {
79            let mut cached_hash = self.anchor_block_hash.write().await;
80            if cached_hash.as_deref() != Some(anchor_block_hash) {
81                *cached_hash = Some(anchor_block_hash.to_owned());
82                self.synced.write().await.clear();
83            }
84        }
85
86        // Check if already synced for these scopes
87        let unsynced_scopes = {
88            let synced = self.synced.read().await;
89            scopes
90                .iter()
91                .filter(|scope| {
92                    let key = sync_key(contract_address, scope);
93                    let wildcard = sync_key_wildcard(contract_address);
94                    !synced.contains(&key) && !synced.contains(&wildcard)
95                })
96                .cloned()
97                .collect::<Vec<_>>()
98        };
99
100        if unsynced_scopes.is_empty() {
101            return Ok(());
102        }
103
104        // Do the sync
105        self.do_sync(contract_address, &unsynced_scopes, utility_executor)
106            .await?;
107
108        // Mark as synced
109        {
110            let mut synced = self.synced.write().await;
111            for scope in &unsynced_scopes {
112                synced.insert(sync_key(contract_address, scope));
113            }
114        }
115
116        Ok(())
117    }
118
119    /// Perform the actual sync: run sync_state and sync nullifiers in parallel.
120    async fn do_sync<F, Fut>(
121        &self,
122        contract_address: &AztecAddress,
123        scopes: &[AztecAddress],
124        utility_executor: &F,
125    ) -> Result<(), Error>
126    where
127        F: Fn(AztecAddress, Vec<AztecAddress>) -> Fut,
128        Fut: Future<Output = Result<(), Error>>,
129    {
130        tracing::debug!(
131            contract = %contract_address,
132            scopes = scopes.len(),
133            "syncing contract state"
134        );
135
136        let note_service = NoteService::new(&*self.node, &self.note_store);
137        // Use the latest block from the node for nullifier lookups
138        let anchor_block = self.node.get_block_number().await.unwrap_or(0);
139
140        let nullified_future =
141            note_service.sync_note_nullifiers(contract_address, scopes, anchor_block);
142        let sync_state_future = utility_executor(*contract_address, scopes.to_vec());
143        let (nullified, ()) = tokio::try_join!(nullified_future, sync_state_future)?;
144
145        if nullified > 0 {
146            tracing::debug!(
147                contract = %contract_address,
148                nullified = nullified,
149                "nullified stale notes"
150            );
151        }
152
153        Ok(())
154    }
155
156    /// Clear the sync cache (e.g., on reorg or anchor block change).
157    pub async fn wipe(&self) {
158        self.synced.write().await.clear();
159        *self.anchor_block_hash.write().await = None;
160    }
161}
162
163fn sync_key(contract: &AztecAddress, scope: &AztecAddress) -> String {
164    format!("{contract}:{scope}")
165}
166
167fn sync_key_wildcard(contract: &AztecAddress) -> String {
168    format!("{contract}:*")
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::stores::InMemoryKvStore;
175
176    // A minimal mock for testing sync service
177    struct MockNode;
178
179    #[async_trait::async_trait]
180    impl AztecNode for MockNode {
181        async fn get_node_info(&self) -> Result<aztec_node_client::NodeInfo, Error> {
182            Ok(aztec_node_client::NodeInfo {
183                node_version: "mock".into(),
184                l1_chain_id: 1,
185                rollup_version: 1,
186                enr: None,
187                l1_contract_addresses: serde_json::Value::Null,
188                protocol_contract_addresses: serde_json::Value::Null,
189                real_proofs: false,
190                l2_circuits_vk_tree_root: None,
191                l2_protocol_contracts_hash: None,
192            })
193        }
194        async fn get_block_number(&self) -> Result<u64, Error> {
195            Ok(1)
196        }
197        async fn get_proven_block_number(&self) -> Result<u64, Error> {
198            Ok(1)
199        }
200        async fn get_tx_receipt(
201            &self,
202            _: &aztec_core::tx::TxHash,
203        ) -> Result<aztec_core::tx::TxReceipt, Error> {
204            Err(Error::InvalidData("mock".into()))
205        }
206        async fn get_tx_effect(
207            &self,
208            _: &aztec_core::tx::TxHash,
209        ) -> Result<Option<serde_json::Value>, Error> {
210            Ok(None)
211        }
212        async fn get_tx_by_hash(
213            &self,
214            _: &aztec_core::tx::TxHash,
215        ) -> Result<Option<serde_json::Value>, Error> {
216            Ok(None)
217        }
218        async fn get_public_logs(
219            &self,
220            _: aztec_node_client::PublicLogFilter,
221        ) -> Result<aztec_node_client::PublicLogsResponse, Error> {
222            Ok(aztec_node_client::PublicLogsResponse {
223                logs: vec![],
224                max_logs_hit: false,
225            })
226        }
227        async fn send_tx(&self, _: &serde_json::Value) -> Result<(), Error> {
228            Err(Error::InvalidData("mock".into()))
229        }
230        async fn get_contract(
231            &self,
232            _: &AztecAddress,
233        ) -> Result<Option<aztec_core::types::ContractInstanceWithAddress>, Error> {
234            Ok(None)
235        }
236        async fn get_contract_class(
237            &self,
238            _: &aztec_core::types::Fr,
239        ) -> Result<Option<serde_json::Value>, Error> {
240            Ok(None)
241        }
242        async fn get_block_header(&self, _: u64) -> Result<serde_json::Value, Error> {
243            Ok(serde_json::json!({}))
244        }
245        async fn get_block(&self, _: u64) -> Result<Option<serde_json::Value>, Error> {
246            Ok(None)
247        }
248        async fn get_note_hash_membership_witness(
249            &self,
250            _: u64,
251            _: &aztec_core::types::Fr,
252        ) -> Result<Option<serde_json::Value>, Error> {
253            Ok(None)
254        }
255        async fn get_nullifier_membership_witness(
256            &self,
257            _: u64,
258            _: &aztec_core::types::Fr,
259        ) -> Result<Option<serde_json::Value>, Error> {
260            Ok(None)
261        }
262        async fn get_low_nullifier_membership_witness(
263            &self,
264            _: u64,
265            _: &aztec_core::types::Fr,
266        ) -> Result<Option<serde_json::Value>, Error> {
267            Ok(None)
268        }
269        async fn get_public_storage_at(
270            &self,
271            _: u64,
272            _: &AztecAddress,
273            _: &aztec_core::types::Fr,
274        ) -> Result<aztec_core::types::Fr, Error> {
275            Ok(aztec_core::types::Fr::zero())
276        }
277        async fn get_public_data_witness(
278            &self,
279            _: u64,
280            _: &aztec_core::types::Fr,
281        ) -> Result<Option<serde_json::Value>, Error> {
282            Ok(None)
283        }
284        async fn get_l1_to_l2_message_membership_witness(
285            &self,
286            _: u64,
287            _: &aztec_core::types::Fr,
288        ) -> Result<Option<serde_json::Value>, Error> {
289            Ok(None)
290        }
291        async fn simulate_public_calls(
292            &self,
293            _: &serde_json::Value,
294            _: bool,
295        ) -> Result<serde_json::Value, Error> {
296            Ok(serde_json::Value::Null)
297        }
298        async fn is_valid_tx(
299            &self,
300            _: &serde_json::Value,
301        ) -> Result<aztec_node_client::TxValidationResult, Error> {
302            Ok(aztec_node_client::TxValidationResult::Valid)
303        }
304        async fn get_private_logs_by_tags(
305            &self,
306            _: &[aztec_core::types::Fr],
307        ) -> Result<serde_json::Value, Error> {
308            Ok(serde_json::json!([]))
309        }
310        async fn get_public_logs_by_tags_from_contract(
311            &self,
312            _: &AztecAddress,
313            _: &[aztec_core::types::Fr],
314        ) -> Result<serde_json::Value, Error> {
315            Ok(serde_json::json!([]))
316        }
317        async fn register_contract_function_signatures(&self, _: &[String]) -> Result<(), Error> {
318            Ok(())
319        }
320        async fn get_block_hash_membership_witness(
321            &self,
322            _: u64,
323            _: &aztec_core::types::Fr,
324        ) -> Result<Option<serde_json::Value>, Error> {
325            Ok(None)
326        }
327        async fn find_leaves_indexes(
328            &self,
329            _: u64,
330            _: &str,
331            _: &[aztec_core::types::Fr],
332        ) -> Result<Vec<Option<u64>>, Error> {
333            Ok(vec![])
334        }
335    }
336
337    #[tokio::test]
338    async fn sync_is_idempotent() {
339        let node = Arc::new(MockNode);
340        let kv = Arc::new(InMemoryKvStore::new());
341        let note_store = Arc::new(NoteStore::new(kv));
342        let service = ContractSyncService::new(node, note_store);
343
344        let contract = AztecAddress::from(1u64);
345        let scope = AztecAddress::from(99u64);
346
347        // First sync should succeed
348        service
349            .ensure_contract_synced(
350                &contract,
351                &[scope],
352                "block_hash_1",
353                |_contract, _scopes| async { Ok(()) },
354            )
355            .await
356            .unwrap();
357
358        // Second sync with same params is a no-op (cached)
359        service
360            .ensure_contract_synced(
361                &contract,
362                &[scope],
363                "block_hash_1",
364                |_contract, _scopes| async { Ok(()) },
365            )
366            .await
367            .unwrap();
368    }
369
370    #[tokio::test]
371    async fn sync_cache_clears_on_new_block() {
372        let node = Arc::new(MockNode);
373        let kv = Arc::new(InMemoryKvStore::new());
374        let note_store = Arc::new(NoteStore::new(kv));
375        let service = ContractSyncService::new(node, note_store);
376
377        let contract = AztecAddress::from(1u64);
378        let scope = AztecAddress::from(99u64);
379
380        service
381            .ensure_contract_synced(
382                &contract,
383                &[scope],
384                "block_hash_1",
385                |_contract, _scopes| async { Ok(()) },
386            )
387            .await
388            .unwrap();
389
390        // New block hash clears cache, sync runs again
391        service
392            .ensure_contract_synced(
393                &contract,
394                &[scope],
395                "block_hash_2",
396                |_contract, _scopes| async { Ok(()) },
397            )
398            .await
399            .unwrap();
400    }
401}