1use std::collections::HashSet;
8use std::future::Future;
9use std::sync::Arc;
10
11use aztec_core::error::Error;
12use aztec_core::types::AztecAddress;
13use aztec_node_client::AztecNode;
14use tokio::sync::RwLock;
15
16use crate::stores::NoteStore;
17use crate::sync::note_service::NoteService;
18
19pub struct ContractSyncService<N: AztecNode> {
24 node: Arc<N>,
25 note_store: Arc<NoteStore>,
26 synced: RwLock<HashSet<String>>,
28 anchor_block_hash: RwLock<Option<String>>,
30}
31
32impl<N: AztecNode> ContractSyncService<N> {
33 pub fn new(node: Arc<N>, note_store: Arc<NoteStore>) -> Self {
34 Self {
35 node,
36 note_store,
37 synced: RwLock::new(HashSet::new()),
38 anchor_block_hash: RwLock::new(None),
39 }
40 }
41
42 pub async fn ensure_contract_synced<F, Fut>(
47 &self,
48 contract_address: &AztecAddress,
49 scopes: &[AztecAddress],
50 anchor_block_hash: &str,
51 utility_executor: F,
52 ) -> Result<(), Error>
53 where
54 F: Fn(AztecAddress, Vec<AztecAddress>) -> Fut,
55 Fut: Future<Output = Result<(), Error>>,
56 {
57 self.ensure_contract_synced_with(
58 contract_address,
59 scopes,
60 anchor_block_hash,
61 &utility_executor,
62 )
63 .await
64 }
65
66 pub async fn ensure_contract_synced_with<F, Fut>(
67 &self,
68 contract_address: &AztecAddress,
69 scopes: &[AztecAddress],
70 anchor_block_hash: &str,
71 utility_executor: &F,
72 ) -> Result<(), Error>
73 where
74 F: Fn(AztecAddress, Vec<AztecAddress>) -> Fut,
75 Fut: Future<Output = Result<(), Error>>,
76 {
77 {
79 let mut cached_hash = self.anchor_block_hash.write().await;
80 if cached_hash.as_deref() != Some(anchor_block_hash) {
81 *cached_hash = Some(anchor_block_hash.to_owned());
82 self.synced.write().await.clear();
83 }
84 }
85
86 let unsynced_scopes = {
88 let synced = self.synced.read().await;
89 scopes
90 .iter()
91 .filter(|scope| {
92 let key = sync_key(contract_address, scope);
93 let wildcard = sync_key_wildcard(contract_address);
94 !synced.contains(&key) && !synced.contains(&wildcard)
95 })
96 .cloned()
97 .collect::<Vec<_>>()
98 };
99
100 if unsynced_scopes.is_empty() {
101 return Ok(());
102 }
103
104 self.do_sync(contract_address, &unsynced_scopes, utility_executor)
106 .await?;
107
108 {
110 let mut synced = self.synced.write().await;
111 for scope in &unsynced_scopes {
112 synced.insert(sync_key(contract_address, scope));
113 }
114 }
115
116 Ok(())
117 }
118
119 async fn do_sync<F, Fut>(
121 &self,
122 contract_address: &AztecAddress,
123 scopes: &[AztecAddress],
124 utility_executor: &F,
125 ) -> Result<(), Error>
126 where
127 F: Fn(AztecAddress, Vec<AztecAddress>) -> Fut,
128 Fut: Future<Output = Result<(), Error>>,
129 {
130 tracing::debug!(
131 contract = %contract_address,
132 scopes = scopes.len(),
133 "syncing contract state"
134 );
135
136 let note_service = NoteService::new(&*self.node, &self.note_store);
137 let anchor_block = self.node.get_block_number().await.unwrap_or(0);
139
140 let nullified_future =
141 note_service.sync_note_nullifiers(contract_address, scopes, anchor_block);
142 let sync_state_future = utility_executor(*contract_address, scopes.to_vec());
143 let (nullified, ()) = tokio::try_join!(nullified_future, sync_state_future)?;
144
145 if nullified > 0 {
146 tracing::debug!(
147 contract = %contract_address,
148 nullified = nullified,
149 "nullified stale notes"
150 );
151 }
152
153 Ok(())
154 }
155
156 pub async fn wipe(&self) {
158 self.synced.write().await.clear();
159 *self.anchor_block_hash.write().await = None;
160 }
161}
162
163fn sync_key(contract: &AztecAddress, scope: &AztecAddress) -> String {
164 format!("{contract}:{scope}")
165}
166
167fn sync_key_wildcard(contract: &AztecAddress) -> String {
168 format!("{contract}:*")
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::stores::InMemoryKvStore;
175
176 struct MockNode;
178
179 #[async_trait::async_trait]
180 impl AztecNode for MockNode {
181 async fn get_node_info(&self) -> Result<aztec_node_client::NodeInfo, Error> {
182 Ok(aztec_node_client::NodeInfo {
183 node_version: "mock".into(),
184 l1_chain_id: 1,
185 rollup_version: 1,
186 enr: None,
187 l1_contract_addresses: serde_json::Value::Null,
188 protocol_contract_addresses: serde_json::Value::Null,
189 real_proofs: false,
190 l2_circuits_vk_tree_root: None,
191 l2_protocol_contracts_hash: None,
192 })
193 }
194 async fn get_block_number(&self) -> Result<u64, Error> {
195 Ok(1)
196 }
197 async fn get_proven_block_number(&self) -> Result<u64, Error> {
198 Ok(1)
199 }
200 async fn get_tx_receipt(
201 &self,
202 _: &aztec_core::tx::TxHash,
203 ) -> Result<aztec_core::tx::TxReceipt, Error> {
204 Err(Error::InvalidData("mock".into()))
205 }
206 async fn get_tx_effect(
207 &self,
208 _: &aztec_core::tx::TxHash,
209 ) -> Result<Option<serde_json::Value>, Error> {
210 Ok(None)
211 }
212 async fn get_tx_by_hash(
213 &self,
214 _: &aztec_core::tx::TxHash,
215 ) -> Result<Option<serde_json::Value>, Error> {
216 Ok(None)
217 }
218 async fn get_public_logs(
219 &self,
220 _: aztec_node_client::PublicLogFilter,
221 ) -> Result<aztec_node_client::PublicLogsResponse, Error> {
222 Ok(aztec_node_client::PublicLogsResponse {
223 logs: vec![],
224 max_logs_hit: false,
225 })
226 }
227 async fn send_tx(&self, _: &serde_json::Value) -> Result<(), Error> {
228 Err(Error::InvalidData("mock".into()))
229 }
230 async fn get_contract(
231 &self,
232 _: &AztecAddress,
233 ) -> Result<Option<aztec_core::types::ContractInstanceWithAddress>, Error> {
234 Ok(None)
235 }
236 async fn get_contract_class(
237 &self,
238 _: &aztec_core::types::Fr,
239 ) -> Result<Option<serde_json::Value>, Error> {
240 Ok(None)
241 }
242 async fn get_block_header(&self, _: u64) -> Result<serde_json::Value, Error> {
243 Ok(serde_json::json!({}))
244 }
245 async fn get_block(&self, _: u64) -> Result<Option<serde_json::Value>, Error> {
246 Ok(None)
247 }
248 async fn get_note_hash_membership_witness(
249 &self,
250 _: u64,
251 _: &aztec_core::types::Fr,
252 ) -> Result<Option<serde_json::Value>, Error> {
253 Ok(None)
254 }
255 async fn get_nullifier_membership_witness(
256 &self,
257 _: u64,
258 _: &aztec_core::types::Fr,
259 ) -> Result<Option<serde_json::Value>, Error> {
260 Ok(None)
261 }
262 async fn get_low_nullifier_membership_witness(
263 &self,
264 _: u64,
265 _: &aztec_core::types::Fr,
266 ) -> Result<Option<serde_json::Value>, Error> {
267 Ok(None)
268 }
269 async fn get_public_storage_at(
270 &self,
271 _: u64,
272 _: &AztecAddress,
273 _: &aztec_core::types::Fr,
274 ) -> Result<aztec_core::types::Fr, Error> {
275 Ok(aztec_core::types::Fr::zero())
276 }
277 async fn get_public_data_witness(
278 &self,
279 _: u64,
280 _: &aztec_core::types::Fr,
281 ) -> Result<Option<serde_json::Value>, Error> {
282 Ok(None)
283 }
284 async fn get_l1_to_l2_message_membership_witness(
285 &self,
286 _: u64,
287 _: &aztec_core::types::Fr,
288 ) -> Result<Option<serde_json::Value>, Error> {
289 Ok(None)
290 }
291 async fn simulate_public_calls(
292 &self,
293 _: &serde_json::Value,
294 _: bool,
295 ) -> Result<serde_json::Value, Error> {
296 Ok(serde_json::Value::Null)
297 }
298 async fn is_valid_tx(
299 &self,
300 _: &serde_json::Value,
301 ) -> Result<aztec_node_client::TxValidationResult, Error> {
302 Ok(aztec_node_client::TxValidationResult::Valid)
303 }
304 async fn get_private_logs_by_tags(
305 &self,
306 _: &[aztec_core::types::Fr],
307 ) -> Result<serde_json::Value, Error> {
308 Ok(serde_json::json!([]))
309 }
310 async fn get_public_logs_by_tags_from_contract(
311 &self,
312 _: &AztecAddress,
313 _: &[aztec_core::types::Fr],
314 ) -> Result<serde_json::Value, Error> {
315 Ok(serde_json::json!([]))
316 }
317 async fn register_contract_function_signatures(&self, _: &[String]) -> Result<(), Error> {
318 Ok(())
319 }
320 async fn get_block_hash_membership_witness(
321 &self,
322 _: u64,
323 _: &aztec_core::types::Fr,
324 ) -> Result<Option<serde_json::Value>, Error> {
325 Ok(None)
326 }
327 async fn find_leaves_indexes(
328 &self,
329 _: u64,
330 _: &str,
331 _: &[aztec_core::types::Fr],
332 ) -> Result<Vec<Option<u64>>, Error> {
333 Ok(vec![])
334 }
335 }
336
337 #[tokio::test]
338 async fn sync_is_idempotent() {
339 let node = Arc::new(MockNode);
340 let kv = Arc::new(InMemoryKvStore::new());
341 let note_store = Arc::new(NoteStore::new(kv));
342 let service = ContractSyncService::new(node, note_store);
343
344 let contract = AztecAddress::from(1u64);
345 let scope = AztecAddress::from(99u64);
346
347 service
349 .ensure_contract_synced(
350 &contract,
351 &[scope],
352 "block_hash_1",
353 |_contract, _scopes| async { Ok(()) },
354 )
355 .await
356 .unwrap();
357
358 service
360 .ensure_contract_synced(
361 &contract,
362 &[scope],
363 "block_hash_1",
364 |_contract, _scopes| async { Ok(()) },
365 )
366 .await
367 .unwrap();
368 }
369
370 #[tokio::test]
371 async fn sync_cache_clears_on_new_block() {
372 let node = Arc::new(MockNode);
373 let kv = Arc::new(InMemoryKvStore::new());
374 let note_store = Arc::new(NoteStore::new(kv));
375 let service = ContractSyncService::new(node, note_store);
376
377 let contract = AztecAddress::from(1u64);
378 let scope = AztecAddress::from(99u64);
379
380 service
381 .ensure_contract_synced(
382 &contract,
383 &[scope],
384 "block_hash_1",
385 |_contract, _scopes| async { Ok(()) },
386 )
387 .await
388 .unwrap();
389
390 service
392 .ensure_contract_synced(
393 &contract,
394 &[scope],
395 "block_hash_2",
396 |_contract, _scopes| async { Ok(()) },
397 )
398 .await
399 .unwrap();
400 }
401}