aztec_pxe/sync/
block_state_synchronizer.rs

1//! Block state synchronizer with reorg handling.
2//!
3//! Ports the TS `BlockSynchronizer` which handles block header updates,
4//! chain reorganization detection, and coordinated rollback of NoteStore
5//! and PrivateEventStore.
6
7use std::sync::Arc;
8
9use aztec_core::error::Error;
10use aztec_node_client::AztecNode;
11use tokio::sync::RwLock;
12
13use crate::stores::anchor_block_store::AnchorBlockHeader;
14use crate::stores::{AnchorBlockStore, NoteStore, PrivateEventStore};
15
16/// Which chain tip the block synchronizer should track.
17///
18/// Only `Proposed` and `Proven` are supported. Upstream does not expose
19/// separate "checkpointed" or "finalized" block numbers through the node API,
20/// so those modes are intentionally excluded to avoid misleading configuration.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum SyncChainTip {
23    /// Track the latest proposed (tip) block.
24    Proposed,
25    /// Track the latest proven block.
26    Proven,
27}
28
29/// Configuration for the block state synchronizer.
30#[derive(Debug, Clone)]
31pub struct BlockSyncConfig {
32    /// Which chain tip to sync to.
33    pub sync_chain_tip: SyncChainTip,
34}
35
36impl Default for BlockSyncConfig {
37    fn default() -> Self {
38        Self {
39            sync_chain_tip: SyncChainTip::Proposed,
40        }
41    }
42}
43
44/// Block state synchronizer that manages block header tracking and reorg handling.
45///
46/// Matches the TS `BlockSynchronizer`:
47/// - Syncs the anchor block header from the node
48/// - Detects chain reorganizations (when the node's block number < our anchor)
49/// - Rolls back NoteStore and PrivateEventStore on reorg
50/// - Signals ContractSyncService cache invalidation
51///
52/// The synchronizer ensures that all PXE state is consistent with the
53/// node's view of the chain.
54pub struct BlockStateSynchronizer {
55    anchor_block_store: Arc<AnchorBlockStore>,
56    note_store: Arc<NoteStore>,
57    private_event_store: Arc<PrivateEventStore>,
58    config: BlockSyncConfig,
59    /// Flag indicating a sync is in progress (prevents concurrent syncs).
60    syncing: RwLock<bool>,
61    /// Callback-style flag: set to true when the anchor block changes,
62    /// so that callers (e.g., EmbeddedPxe) can wipe the ContractSyncService.
63    anchor_changed: RwLock<bool>,
64}
65
66impl BlockStateSynchronizer {
67    pub fn new(
68        anchor_block_store: Arc<AnchorBlockStore>,
69        note_store: Arc<NoteStore>,
70        private_event_store: Arc<PrivateEventStore>,
71        config: BlockSyncConfig,
72    ) -> Self {
73        Self {
74            anchor_block_store,
75            note_store,
76            private_event_store,
77            config,
78            syncing: RwLock::new(false),
79            anchor_changed: RwLock::new(false),
80        }
81    }
82
83    /// Sync the PXE with the node's current state.
84    ///
85    /// This is the main entry point called before transaction simulation
86    /// or event retrieval. It:
87    /// 1. Fetches the latest block header from the node
88    /// 2. Detects if a reorg has occurred
89    /// 3. Handles rollback if needed
90    /// 4. Updates the anchor block header
91    pub async fn sync<N: AztecNode>(&self, node: &N) -> Result<(), Error> {
92        // Prevent concurrent syncs
93        {
94            let mut syncing = self.syncing.write().await;
95            if *syncing {
96                // Wait for the current sync to finish by polling
97                drop(syncing);
98                loop {
99                    let s = self.syncing.read().await;
100                    if !*s {
101                        break;
102                    }
103                    drop(s);
104                    tokio::task::yield_now().await;
105                }
106                return Ok(());
107            }
108            *syncing = true;
109        }
110
111        let result = self.do_sync(node).await;
112
113        *self.syncing.write().await = false;
114
115        result
116    }
117
118    /// Internal sync implementation.
119    async fn do_sync<N: AztecNode>(&self, node: &N) -> Result<(), Error> {
120        // Ensure we have an initial anchor block header
121        let current_anchor = self.anchor_block_store.get_block_header().await?;
122        if current_anchor.is_none() {
123            // First sync: fetch block 0 (genesis) header as initial anchor
124            let genesis_header = node.get_block_header(0).await?;
125            let anchor = AnchorBlockHeader::from_header_json(genesis_header);
126            self.update_anchor_block_header(&anchor).await?;
127        }
128
129        // Fetch the latest block number from the node based on sync config
130        let latest_block_number = match self.config.sync_chain_tip {
131            SyncChainTip::Proven => node.get_proven_block_number().await?,
132            SyncChainTip::Proposed => node.get_block_number().await?,
133        };
134
135        let current_anchor = self.anchor_block_store.get_block_header().await?;
136        let current_anchor_block_number =
137            current_anchor.as_ref().map(|a| a.block_number).unwrap_or(0);
138
139        // Check for reorg: if the node's latest block is behind our anchor
140        if latest_block_number < current_anchor_block_number && current_anchor_block_number > 0 {
141            tracing::warn!(
142                current_anchor = current_anchor_block_number,
143                node_latest = latest_block_number,
144                "detected chain reorg (block number behind) — rolling back"
145            );
146            self.handle_reorg(node, latest_block_number, current_anchor_block_number)
147                .await?;
148        } else if latest_block_number >= current_anchor_block_number
149            && current_anchor_block_number > 0
150        {
151            // Same or higher block number — check for same-height reorg by
152            // comparing the block hash at our current anchor height.
153            // Upstream uses the block stream's chain-pruned event keyed by
154            // both number and hash; we approximate by fetching the header at
155            // the anchor height and comparing hashes.
156            let stored_hash = current_anchor
157                .as_ref()
158                .map(|a| a.block_hash.as_str())
159                .unwrap_or("");
160
161            if !stored_hash.is_empty() && stored_hash != "0x0" {
162                let remote_header = node.get_block_header(current_anchor_block_number).await?;
163                let remote_hash = remote_header
164                    .pointer("/blockHash")
165                    .or_else(|| remote_header.get("blockHash"))
166                    .and_then(|v| v.as_str())
167                    .unwrap_or("");
168
169                if !remote_hash.is_empty() && remote_hash != stored_hash {
170                    tracing::warn!(
171                        current_anchor = current_anchor_block_number,
172                        stored_hash = stored_hash,
173                        remote_hash = remote_hash,
174                        "detected same-height chain reorg — rolling back"
175                    );
176                    // Roll back to the block before our anchor, then advance
177                    let rollback_to = current_anchor_block_number.saturating_sub(1);
178                    self.handle_reorg(node, rollback_to, current_anchor_block_number)
179                        .await?;
180                    // Fall through to advance below
181                }
182            }
183
184            // Re-read anchor after potential reorg handling
185            let anchor_after = self.anchor_block_store.get_block_number().await?;
186            if latest_block_number > anchor_after {
187                let new_header = node.get_block_header(latest_block_number).await?;
188                let anchor = AnchorBlockHeader::from_header_json(new_header);
189                self.update_anchor_block_header(&anchor).await?;
190            }
191        }
192
193        Ok(())
194    }
195
196    /// Handle a chain reorganization.
197    ///
198    /// Matching TS `chain-pruned` event handler:
199    /// 1. Roll back NoteStore (un-nullify orphaned notes, delete new notes)
200    /// 2. Roll back PrivateEventStore (delete events from orphaned blocks)
201    /// 3. Update anchor block header to the new tip
202    async fn handle_reorg<N: AztecNode>(
203        &self,
204        node: &N,
205        new_block_number: u64,
206        old_block_number: u64,
207    ) -> Result<(), Error> {
208        tracing::warn!(
209            "pruning data after block {new_block_number} due to reorg \
210             (was synced to block {old_block_number})"
211        );
212
213        // Fetch the new anchor block header
214        let new_header = node.get_block_header(new_block_number).await?;
215        let anchor = AnchorBlockHeader::from_header_json(new_header);
216
217        // Roll back stores atomically (best-effort atomicity via sequential ops)
218        self.note_store
219            .rollback(new_block_number, old_block_number)
220            .await?;
221
222        self.private_event_store
223            .rollback(new_block_number, old_block_number)
224            .await?;
225
226        // Update anchor
227        self.update_anchor_block_header(&anchor).await?;
228
229        tracing::info!(
230            "reorg handled: rolled back from block {old_block_number} to {new_block_number}"
231        );
232
233        Ok(())
234    }
235
236    /// Update the anchor block header and signal that it changed.
237    async fn update_anchor_block_header(&self, header: &AnchorBlockHeader) -> Result<(), Error> {
238        self.anchor_block_store.set_header(header).await?;
239        *self.anchor_changed.write().await = true;
240        tracing::debug!(
241            block_number = header.block_number,
242            "updated anchor block header"
243        );
244        Ok(())
245    }
246
247    /// Check and consume the anchor-changed flag.
248    ///
249    /// Returns `true` if the anchor block has changed since the last call.
250    /// The flag is reset to `false` after reading.
251    /// This is used by EmbeddedPxe to know when to wipe the ContractSyncService cache.
252    pub async fn take_anchor_changed(&self) -> bool {
253        let mut changed = self.anchor_changed.write().await;
254        let was_changed = *changed;
255        *changed = false;
256        was_changed
257    }
258
259    /// Get a reference to the anchor block store.
260    pub fn anchor_block_store(&self) -> &AnchorBlockStore {
261        &self.anchor_block_store
262    }
263
264    /// Get the current anchor block header.
265    pub async fn get_anchor_block_header(&self) -> Result<Option<AnchorBlockHeader>, Error> {
266        self.anchor_block_store.get_block_header().await
267    }
268
269    /// Get the current anchor block number.
270    pub async fn get_anchor_block_number(&self) -> Result<u64, Error> {
271        self.anchor_block_store.get_block_number().await
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use crate::stores::InMemoryKvStore;
279    use std::sync::atomic::{AtomicU64, Ordering};
280
281    struct MockNode {
282        block_number: AtomicU64,
283    }
284
285    impl MockNode {
286        fn new(block: u64) -> Self {
287            Self {
288                block_number: AtomicU64::new(block),
289            }
290        }
291
292        fn set_block_number(&self, n: u64) {
293            self.block_number.store(n, Ordering::SeqCst);
294        }
295    }
296
297    #[async_trait::async_trait]
298    impl AztecNode for MockNode {
299        async fn get_node_info(&self) -> Result<aztec_node_client::NodeInfo, Error> {
300            Ok(aztec_node_client::NodeInfo {
301                node_version: "mock".into(),
302                l1_chain_id: 1,
303                rollup_version: 1,
304                enr: None,
305                l1_contract_addresses: serde_json::Value::Null,
306                protocol_contract_addresses: serde_json::Value::Null,
307                real_proofs: false,
308                l2_circuits_vk_tree_root: None,
309                l2_protocol_contracts_hash: None,
310            })
311        }
312        async fn get_block_number(&self) -> Result<u64, Error> {
313            Ok(self.block_number.load(Ordering::SeqCst))
314        }
315        async fn get_proven_block_number(&self) -> Result<u64, Error> {
316            Ok(self.block_number.load(Ordering::SeqCst))
317        }
318        async fn get_tx_receipt(
319            &self,
320            _: &aztec_core::tx::TxHash,
321        ) -> Result<aztec_core::tx::TxReceipt, Error> {
322            Err(Error::InvalidData("mock".into()))
323        }
324        async fn get_tx_effect(
325            &self,
326            _: &aztec_core::tx::TxHash,
327        ) -> Result<Option<serde_json::Value>, Error> {
328            Ok(None)
329        }
330        async fn get_tx_by_hash(
331            &self,
332            _: &aztec_core::tx::TxHash,
333        ) -> Result<Option<serde_json::Value>, Error> {
334            Ok(None)
335        }
336        async fn get_public_logs(
337            &self,
338            _: aztec_node_client::PublicLogFilter,
339        ) -> Result<aztec_node_client::PublicLogsResponse, Error> {
340            Ok(aztec_node_client::PublicLogsResponse {
341                logs: vec![],
342                max_logs_hit: false,
343            })
344        }
345        async fn send_tx(&self, _: &serde_json::Value) -> Result<(), Error> {
346            Err(Error::InvalidData("mock".into()))
347        }
348        async fn get_contract(
349            &self,
350            _: &aztec_core::types::AztecAddress,
351        ) -> Result<Option<aztec_core::types::ContractInstanceWithAddress>, Error> {
352            Ok(None)
353        }
354        async fn get_contract_class(
355            &self,
356            _: &aztec_core::types::Fr,
357        ) -> Result<Option<serde_json::Value>, Error> {
358            Ok(None)
359        }
360        async fn get_block_header(&self, block_number: u64) -> Result<serde_json::Value, Error> {
361            let bn = if block_number == 0 {
362                self.block_number.load(Ordering::SeqCst)
363            } else {
364                block_number
365            };
366            Ok(serde_json::json!({
367                "globalVariables": {"blockNumber": bn},
368                "blockHash": format!("0x{:064x}", bn)
369            }))
370        }
371        async fn get_block(&self, _: u64) -> Result<Option<serde_json::Value>, Error> {
372            Ok(None)
373        }
374        async fn get_note_hash_membership_witness(
375            &self,
376            _: u64,
377            _: &aztec_core::types::Fr,
378        ) -> Result<Option<serde_json::Value>, Error> {
379            Ok(None)
380        }
381        async fn get_nullifier_membership_witness(
382            &self,
383            _: u64,
384            _: &aztec_core::types::Fr,
385        ) -> Result<Option<serde_json::Value>, Error> {
386            Ok(None)
387        }
388        async fn get_low_nullifier_membership_witness(
389            &self,
390            _: u64,
391            _: &aztec_core::types::Fr,
392        ) -> Result<Option<serde_json::Value>, Error> {
393            Ok(None)
394        }
395        async fn get_public_storage_at(
396            &self,
397            _: u64,
398            _: &aztec_core::types::AztecAddress,
399            _: &aztec_core::types::Fr,
400        ) -> Result<aztec_core::types::Fr, Error> {
401            Ok(aztec_core::types::Fr::zero())
402        }
403        async fn get_public_data_witness(
404            &self,
405            _: u64,
406            _: &aztec_core::types::Fr,
407        ) -> Result<Option<serde_json::Value>, Error> {
408            Ok(None)
409        }
410        async fn get_l1_to_l2_message_membership_witness(
411            &self,
412            _: u64,
413            _: &aztec_core::types::Fr,
414        ) -> Result<Option<serde_json::Value>, Error> {
415            Ok(None)
416        }
417        async fn simulate_public_calls(
418            &self,
419            _: &serde_json::Value,
420            _: bool,
421        ) -> Result<serde_json::Value, Error> {
422            Ok(serde_json::Value::Null)
423        }
424        async fn is_valid_tx(
425            &self,
426            _: &serde_json::Value,
427        ) -> Result<aztec_node_client::TxValidationResult, Error> {
428            Ok(aztec_node_client::TxValidationResult::Valid)
429        }
430        async fn get_private_logs_by_tags(
431            &self,
432            _: &[aztec_core::types::Fr],
433        ) -> Result<serde_json::Value, Error> {
434            Ok(serde_json::json!([]))
435        }
436        async fn get_public_logs_by_tags_from_contract(
437            &self,
438            _: &aztec_core::types::AztecAddress,
439            _: &[aztec_core::types::Fr],
440        ) -> Result<serde_json::Value, Error> {
441            Ok(serde_json::json!([]))
442        }
443        async fn register_contract_function_signatures(&self, _: &[String]) -> Result<(), Error> {
444            Ok(())
445        }
446        async fn get_block_hash_membership_witness(
447            &self,
448            _: u64,
449            _: &aztec_core::types::Fr,
450        ) -> Result<Option<serde_json::Value>, Error> {
451            Ok(None)
452        }
453        async fn find_leaves_indexes(
454            &self,
455            _: u64,
456            _: &str,
457            _: &[aztec_core::types::Fr],
458        ) -> Result<Vec<Option<u64>>, Error> {
459            Ok(vec![])
460        }
461    }
462
463    fn make_synchronizer() -> (BlockStateSynchronizer, Arc<AnchorBlockStore>) {
464        let kv: Arc<dyn crate::stores::kv::KvStore> = Arc::new(InMemoryKvStore::new());
465        let anchor_store = Arc::new(AnchorBlockStore::new(Arc::clone(&kv)));
466        let note_store = Arc::new(NoteStore::new(Arc::clone(&kv)));
467        let event_store = Arc::new(PrivateEventStore::new(Arc::clone(&kv)));
468        let sync = BlockStateSynchronizer::new(
469            Arc::clone(&anchor_store),
470            note_store,
471            event_store,
472            BlockSyncConfig::default(),
473        );
474        (sync, anchor_store)
475    }
476
477    #[tokio::test]
478    async fn first_sync_sets_anchor() {
479        let (sync, anchor_store) = make_synchronizer();
480        let node = MockNode::new(5);
481
482        sync.sync(&node).await.unwrap();
483
484        let anchor = anchor_store.get_block_header().await.unwrap().unwrap();
485        assert_eq!(anchor.block_number, 5);
486    }
487
488    #[tokio::test]
489    async fn sync_advances_anchor() {
490        let (sync, anchor_store) = make_synchronizer();
491        let node = MockNode::new(5);
492
493        sync.sync(&node).await.unwrap();
494        assert_eq!(
495            anchor_store
496                .get_block_header()
497                .await
498                .unwrap()
499                .unwrap()
500                .block_number,
501            5
502        );
503
504        node.set_block_number(10);
505        sync.sync(&node).await.unwrap();
506        assert_eq!(
507            anchor_store
508                .get_block_header()
509                .await
510                .unwrap()
511                .unwrap()
512                .block_number,
513            10
514        );
515    }
516
517    #[tokio::test]
518    async fn sync_detects_reorg_and_rolls_back() {
519        let (sync, anchor_store) = make_synchronizer();
520        let node = MockNode::new(10);
521
522        sync.sync(&node).await.unwrap();
523        assert_eq!(
524            anchor_store
525                .get_block_header()
526                .await
527                .unwrap()
528                .unwrap()
529                .block_number,
530            10
531        );
532
533        // Simulate reorg: node goes back to block 7
534        node.set_block_number(7);
535        sync.sync(&node).await.unwrap();
536        assert_eq!(
537            anchor_store
538                .get_block_header()
539                .await
540                .unwrap()
541                .unwrap()
542                .block_number,
543            7
544        );
545    }
546
547    #[tokio::test]
548    async fn take_anchor_changed_flag() {
549        let (sync, _) = make_synchronizer();
550        let node = MockNode::new(5);
551
552        sync.sync(&node).await.unwrap();
553        assert!(sync.take_anchor_changed().await);
554        assert!(!sync.take_anchor_changed().await); // consumed
555    }
556
557    #[tokio::test]
558    async fn no_update_when_same_block() {
559        let (sync, _) = make_synchronizer();
560        let node = MockNode::new(5);
561
562        sync.sync(&node).await.unwrap();
563        assert!(sync.take_anchor_changed().await);
564
565        // Same block number: no change
566        sync.sync(&node).await.unwrap();
567        assert!(!sync.take_anchor_changed().await);
568    }
569}