1use std::sync::Arc;
8
9use aztec_core::error::Error;
10use aztec_node_client::AztecNode;
11use tokio::sync::RwLock;
12
13use crate::stores::anchor_block_store::AnchorBlockHeader;
14use crate::stores::{AnchorBlockStore, NoteStore, PrivateEventStore};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum SyncChainTip {
23 Proposed,
25 Proven,
27}
28
29#[derive(Debug, Clone)]
31pub struct BlockSyncConfig {
32 pub sync_chain_tip: SyncChainTip,
34}
35
36impl Default for BlockSyncConfig {
37 fn default() -> Self {
38 Self {
39 sync_chain_tip: SyncChainTip::Proposed,
40 }
41 }
42}
43
44pub struct BlockStateSynchronizer {
55 anchor_block_store: Arc<AnchorBlockStore>,
56 note_store: Arc<NoteStore>,
57 private_event_store: Arc<PrivateEventStore>,
58 config: BlockSyncConfig,
59 syncing: RwLock<bool>,
61 anchor_changed: RwLock<bool>,
64}
65
66impl BlockStateSynchronizer {
67 pub fn new(
68 anchor_block_store: Arc<AnchorBlockStore>,
69 note_store: Arc<NoteStore>,
70 private_event_store: Arc<PrivateEventStore>,
71 config: BlockSyncConfig,
72 ) -> Self {
73 Self {
74 anchor_block_store,
75 note_store,
76 private_event_store,
77 config,
78 syncing: RwLock::new(false),
79 anchor_changed: RwLock::new(false),
80 }
81 }
82
83 pub async fn sync<N: AztecNode>(&self, node: &N) -> Result<(), Error> {
92 {
94 let mut syncing = self.syncing.write().await;
95 if *syncing {
96 drop(syncing);
98 loop {
99 let s = self.syncing.read().await;
100 if !*s {
101 break;
102 }
103 drop(s);
104 tokio::task::yield_now().await;
105 }
106 return Ok(());
107 }
108 *syncing = true;
109 }
110
111 let result = self.do_sync(node).await;
112
113 *self.syncing.write().await = false;
114
115 result
116 }
117
118 async fn do_sync<N: AztecNode>(&self, node: &N) -> Result<(), Error> {
120 let current_anchor = self.anchor_block_store.get_block_header().await?;
122 if current_anchor.is_none() {
123 let genesis_header = node.get_block_header(0).await?;
125 let anchor = AnchorBlockHeader::from_header_json(genesis_header);
126 self.update_anchor_block_header(&anchor).await?;
127 }
128
129 let latest_block_number = match self.config.sync_chain_tip {
131 SyncChainTip::Proven => node.get_proven_block_number().await?,
132 SyncChainTip::Proposed => node.get_block_number().await?,
133 };
134
135 let current_anchor = self.anchor_block_store.get_block_header().await?;
136 let current_anchor_block_number =
137 current_anchor.as_ref().map(|a| a.block_number).unwrap_or(0);
138
139 if latest_block_number < current_anchor_block_number && current_anchor_block_number > 0 {
141 tracing::warn!(
142 current_anchor = current_anchor_block_number,
143 node_latest = latest_block_number,
144 "detected chain reorg (block number behind) — rolling back"
145 );
146 self.handle_reorg(node, latest_block_number, current_anchor_block_number)
147 .await?;
148 } else if latest_block_number >= current_anchor_block_number
149 && current_anchor_block_number > 0
150 {
151 let stored_hash = current_anchor
157 .as_ref()
158 .map(|a| a.block_hash.as_str())
159 .unwrap_or("");
160
161 if !stored_hash.is_empty() && stored_hash != "0x0" {
162 let remote_header = node.get_block_header(current_anchor_block_number).await?;
163 let remote_hash = remote_header
164 .pointer("/blockHash")
165 .or_else(|| remote_header.get("blockHash"))
166 .and_then(|v| v.as_str())
167 .unwrap_or("");
168
169 if !remote_hash.is_empty() && remote_hash != stored_hash {
170 tracing::warn!(
171 current_anchor = current_anchor_block_number,
172 stored_hash = stored_hash,
173 remote_hash = remote_hash,
174 "detected same-height chain reorg — rolling back"
175 );
176 let rollback_to = current_anchor_block_number.saturating_sub(1);
178 self.handle_reorg(node, rollback_to, current_anchor_block_number)
179 .await?;
180 }
182 }
183
184 let anchor_after = self.anchor_block_store.get_block_number().await?;
186 if latest_block_number > anchor_after {
187 let new_header = node.get_block_header(latest_block_number).await?;
188 let anchor = AnchorBlockHeader::from_header_json(new_header);
189 self.update_anchor_block_header(&anchor).await?;
190 }
191 }
192
193 Ok(())
194 }
195
196 async fn handle_reorg<N: AztecNode>(
203 &self,
204 node: &N,
205 new_block_number: u64,
206 old_block_number: u64,
207 ) -> Result<(), Error> {
208 tracing::warn!(
209 "pruning data after block {new_block_number} due to reorg \
210 (was synced to block {old_block_number})"
211 );
212
213 let new_header = node.get_block_header(new_block_number).await?;
215 let anchor = AnchorBlockHeader::from_header_json(new_header);
216
217 self.note_store
219 .rollback(new_block_number, old_block_number)
220 .await?;
221
222 self.private_event_store
223 .rollback(new_block_number, old_block_number)
224 .await?;
225
226 self.update_anchor_block_header(&anchor).await?;
228
229 tracing::info!(
230 "reorg handled: rolled back from block {old_block_number} to {new_block_number}"
231 );
232
233 Ok(())
234 }
235
236 async fn update_anchor_block_header(&self, header: &AnchorBlockHeader) -> Result<(), Error> {
238 self.anchor_block_store.set_header(header).await?;
239 *self.anchor_changed.write().await = true;
240 tracing::debug!(
241 block_number = header.block_number,
242 "updated anchor block header"
243 );
244 Ok(())
245 }
246
247 pub async fn take_anchor_changed(&self) -> bool {
253 let mut changed = self.anchor_changed.write().await;
254 let was_changed = *changed;
255 *changed = false;
256 was_changed
257 }
258
259 pub fn anchor_block_store(&self) -> &AnchorBlockStore {
261 &self.anchor_block_store
262 }
263
264 pub async fn get_anchor_block_header(&self) -> Result<Option<AnchorBlockHeader>, Error> {
266 self.anchor_block_store.get_block_header().await
267 }
268
269 pub async fn get_anchor_block_number(&self) -> Result<u64, Error> {
271 self.anchor_block_store.get_block_number().await
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use crate::stores::InMemoryKvStore;
279 use std::sync::atomic::{AtomicU64, Ordering};
280
281 struct MockNode {
282 block_number: AtomicU64,
283 }
284
285 impl MockNode {
286 fn new(block: u64) -> Self {
287 Self {
288 block_number: AtomicU64::new(block),
289 }
290 }
291
292 fn set_block_number(&self, n: u64) {
293 self.block_number.store(n, Ordering::SeqCst);
294 }
295 }
296
297 #[async_trait::async_trait]
298 impl AztecNode for MockNode {
299 async fn get_node_info(&self) -> Result<aztec_node_client::NodeInfo, Error> {
300 Ok(aztec_node_client::NodeInfo {
301 node_version: "mock".into(),
302 l1_chain_id: 1,
303 rollup_version: 1,
304 enr: None,
305 l1_contract_addresses: serde_json::Value::Null,
306 protocol_contract_addresses: serde_json::Value::Null,
307 real_proofs: false,
308 l2_circuits_vk_tree_root: None,
309 l2_protocol_contracts_hash: None,
310 })
311 }
312 async fn get_block_number(&self) -> Result<u64, Error> {
313 Ok(self.block_number.load(Ordering::SeqCst))
314 }
315 async fn get_proven_block_number(&self) -> Result<u64, Error> {
316 Ok(self.block_number.load(Ordering::SeqCst))
317 }
318 async fn get_tx_receipt(
319 &self,
320 _: &aztec_core::tx::TxHash,
321 ) -> Result<aztec_core::tx::TxReceipt, Error> {
322 Err(Error::InvalidData("mock".into()))
323 }
324 async fn get_tx_effect(
325 &self,
326 _: &aztec_core::tx::TxHash,
327 ) -> Result<Option<serde_json::Value>, Error> {
328 Ok(None)
329 }
330 async fn get_tx_by_hash(
331 &self,
332 _: &aztec_core::tx::TxHash,
333 ) -> Result<Option<serde_json::Value>, Error> {
334 Ok(None)
335 }
336 async fn get_public_logs(
337 &self,
338 _: aztec_node_client::PublicLogFilter,
339 ) -> Result<aztec_node_client::PublicLogsResponse, Error> {
340 Ok(aztec_node_client::PublicLogsResponse {
341 logs: vec![],
342 max_logs_hit: false,
343 })
344 }
345 async fn send_tx(&self, _: &serde_json::Value) -> Result<(), Error> {
346 Err(Error::InvalidData("mock".into()))
347 }
348 async fn get_contract(
349 &self,
350 _: &aztec_core::types::AztecAddress,
351 ) -> Result<Option<aztec_core::types::ContractInstanceWithAddress>, Error> {
352 Ok(None)
353 }
354 async fn get_contract_class(
355 &self,
356 _: &aztec_core::types::Fr,
357 ) -> Result<Option<serde_json::Value>, Error> {
358 Ok(None)
359 }
360 async fn get_block_header(&self, block_number: u64) -> Result<serde_json::Value, Error> {
361 let bn = if block_number == 0 {
362 self.block_number.load(Ordering::SeqCst)
363 } else {
364 block_number
365 };
366 Ok(serde_json::json!({
367 "globalVariables": {"blockNumber": bn},
368 "blockHash": format!("0x{:064x}", bn)
369 }))
370 }
371 async fn get_block(&self, _: u64) -> Result<Option<serde_json::Value>, Error> {
372 Ok(None)
373 }
374 async fn get_note_hash_membership_witness(
375 &self,
376 _: u64,
377 _: &aztec_core::types::Fr,
378 ) -> Result<Option<serde_json::Value>, Error> {
379 Ok(None)
380 }
381 async fn get_nullifier_membership_witness(
382 &self,
383 _: u64,
384 _: &aztec_core::types::Fr,
385 ) -> Result<Option<serde_json::Value>, Error> {
386 Ok(None)
387 }
388 async fn get_low_nullifier_membership_witness(
389 &self,
390 _: u64,
391 _: &aztec_core::types::Fr,
392 ) -> Result<Option<serde_json::Value>, Error> {
393 Ok(None)
394 }
395 async fn get_public_storage_at(
396 &self,
397 _: u64,
398 _: &aztec_core::types::AztecAddress,
399 _: &aztec_core::types::Fr,
400 ) -> Result<aztec_core::types::Fr, Error> {
401 Ok(aztec_core::types::Fr::zero())
402 }
403 async fn get_public_data_witness(
404 &self,
405 _: u64,
406 _: &aztec_core::types::Fr,
407 ) -> Result<Option<serde_json::Value>, Error> {
408 Ok(None)
409 }
410 async fn get_l1_to_l2_message_membership_witness(
411 &self,
412 _: u64,
413 _: &aztec_core::types::Fr,
414 ) -> Result<Option<serde_json::Value>, Error> {
415 Ok(None)
416 }
417 async fn simulate_public_calls(
418 &self,
419 _: &serde_json::Value,
420 _: bool,
421 ) -> Result<serde_json::Value, Error> {
422 Ok(serde_json::Value::Null)
423 }
424 async fn is_valid_tx(
425 &self,
426 _: &serde_json::Value,
427 ) -> Result<aztec_node_client::TxValidationResult, Error> {
428 Ok(aztec_node_client::TxValidationResult::Valid)
429 }
430 async fn get_private_logs_by_tags(
431 &self,
432 _: &[aztec_core::types::Fr],
433 ) -> Result<serde_json::Value, Error> {
434 Ok(serde_json::json!([]))
435 }
436 async fn get_public_logs_by_tags_from_contract(
437 &self,
438 _: &aztec_core::types::AztecAddress,
439 _: &[aztec_core::types::Fr],
440 ) -> Result<serde_json::Value, Error> {
441 Ok(serde_json::json!([]))
442 }
443 async fn register_contract_function_signatures(&self, _: &[String]) -> Result<(), Error> {
444 Ok(())
445 }
446 async fn get_block_hash_membership_witness(
447 &self,
448 _: u64,
449 _: &aztec_core::types::Fr,
450 ) -> Result<Option<serde_json::Value>, Error> {
451 Ok(None)
452 }
453 async fn find_leaves_indexes(
454 &self,
455 _: u64,
456 _: &str,
457 _: &[aztec_core::types::Fr],
458 ) -> Result<Vec<Option<u64>>, Error> {
459 Ok(vec![])
460 }
461 }
462
463 fn make_synchronizer() -> (BlockStateSynchronizer, Arc<AnchorBlockStore>) {
464 let kv: Arc<dyn crate::stores::kv::KvStore> = Arc::new(InMemoryKvStore::new());
465 let anchor_store = Arc::new(AnchorBlockStore::new(Arc::clone(&kv)));
466 let note_store = Arc::new(NoteStore::new(Arc::clone(&kv)));
467 let event_store = Arc::new(PrivateEventStore::new(Arc::clone(&kv)));
468 let sync = BlockStateSynchronizer::new(
469 Arc::clone(&anchor_store),
470 note_store,
471 event_store,
472 BlockSyncConfig::default(),
473 );
474 (sync, anchor_store)
475 }
476
477 #[tokio::test]
478 async fn first_sync_sets_anchor() {
479 let (sync, anchor_store) = make_synchronizer();
480 let node = MockNode::new(5);
481
482 sync.sync(&node).await.unwrap();
483
484 let anchor = anchor_store.get_block_header().await.unwrap().unwrap();
485 assert_eq!(anchor.block_number, 5);
486 }
487
488 #[tokio::test]
489 async fn sync_advances_anchor() {
490 let (sync, anchor_store) = make_synchronizer();
491 let node = MockNode::new(5);
492
493 sync.sync(&node).await.unwrap();
494 assert_eq!(
495 anchor_store
496 .get_block_header()
497 .await
498 .unwrap()
499 .unwrap()
500 .block_number,
501 5
502 );
503
504 node.set_block_number(10);
505 sync.sync(&node).await.unwrap();
506 assert_eq!(
507 anchor_store
508 .get_block_header()
509 .await
510 .unwrap()
511 .unwrap()
512 .block_number,
513 10
514 );
515 }
516
517 #[tokio::test]
518 async fn sync_detects_reorg_and_rolls_back() {
519 let (sync, anchor_store) = make_synchronizer();
520 let node = MockNode::new(10);
521
522 sync.sync(&node).await.unwrap();
523 assert_eq!(
524 anchor_store
525 .get_block_header()
526 .await
527 .unwrap()
528 .unwrap()
529 .block_number,
530 10
531 );
532
533 node.set_block_number(7);
535 sync.sync(&node).await.unwrap();
536 assert_eq!(
537 anchor_store
538 .get_block_header()
539 .await
540 .unwrap()
541 .unwrap()
542 .block_number,
543 7
544 );
545 }
546
547 #[tokio::test]
548 async fn take_anchor_changed_flag() {
549 let (sync, _) = make_synchronizer();
550 let node = MockNode::new(5);
551
552 sync.sync(&node).await.unwrap();
553 assert!(sync.take_anchor_changed().await);
554 assert!(!sync.take_anchor_changed().await); }
556
557 #[tokio::test]
558 async fn no_update_when_same_block() {
559 let (sync, _) = make_synchronizer();
560 let node = MockNode::new(5);
561
562 sync.sync(&node).await.unwrap();
563 assert!(sync.take_anchor_changed().await);
564
565 sync.sync(&node).await.unwrap();
567 assert!(!sync.take_anchor_changed().await);
568 }
569}