aztec_pxe/stores/
private_event_store.rs

1//! Private event store for discovered private event logs.
2//!
3//! Ports the TS `PrivateEventStore` with event storage, filtering by
4//! contract/event selector, block range filtering, and rollback support.
5
6use std::sync::Arc;
7
8use aztec_core::abi::EventSelector;
9use aztec_core::error::Error;
10use aztec_core::tx::TxHash;
11use aztec_core::types::{AztecAddress, Fr};
12use serde::{Deserialize, Serialize};
13
14use super::kv::KvStore;
15
16/// A stored private event with metadata.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct StoredPrivateEvent {
19    /// The event selector identifying the event type.
20    pub event_selector: EventSelector,
21    /// Randomness used in commitment.
22    pub randomness: Fr,
23    /// The decrypted message content fields.
24    pub msg_content: Vec<Fr>,
25    /// The siloed event commitment (unique identifier).
26    pub siloed_event_commitment: Fr,
27    /// Contract address that emitted the event.
28    pub contract_address: AztecAddress,
29    /// Scopes (accounts) that decrypted this event.
30    pub scopes: Vec<AztecAddress>,
31    /// Transaction hash.
32    pub tx_hash: TxHash,
33    /// L2 block number.
34    pub l2_block_number: u64,
35    /// L2 block hash.
36    pub l2_block_hash: String,
37    /// Index of the transaction within the block.
38    pub tx_index_in_block: Option<u64>,
39    /// Index of the event within the transaction.
40    pub event_index_in_tx: Option<u64>,
41}
42
43/// Filter for querying private events.
44#[derive(Debug, Clone)]
45pub struct PrivateEventQueryFilter {
46    /// Contract address (required).
47    pub contract_address: AztecAddress,
48    /// Start block (inclusive).
49    pub from_block: Option<u64>,
50    /// End block (inclusive).
51    pub to_block: Option<u64>,
52    /// Filter by scopes.
53    pub scopes: Vec<AztecAddress>,
54    /// Filter by transaction hash.
55    pub tx_hash: Option<TxHash>,
56}
57
58/// Stores decrypted private event logs with filtering and reorg support.
59pub struct PrivateEventStore {
60    kv: Arc<dyn KvStore>,
61}
62
63impl PrivateEventStore {
64    pub fn new(kv: Arc<dyn KvStore>) -> Self {
65        Self { kv }
66    }
67
68    /// Store a private event log.
69    pub async fn store_private_event_log(
70        &self,
71        event: &StoredPrivateEvent,
72        scope: &AztecAddress,
73    ) -> Result<(), Error> {
74        let key = event_key(&event.siloed_event_commitment);
75
76        // Check if event already exists (merge scopes)
77        if let Some(existing_bytes) = self.kv.get(&key).await? {
78            let mut existing: StoredPrivateEvent = serde_json::from_slice(&existing_bytes)?;
79            if !existing.scopes.contains(scope) {
80                existing.scopes.push(*scope);
81            }
82            self.kv.put(&key, &serde_json::to_vec(&existing)?).await?;
83        } else {
84            let mut stored = event.clone();
85            if !stored.scopes.contains(scope) {
86                stored.scopes.push(*scope);
87            }
88            self.kv.put(&key, &serde_json::to_vec(&stored)?).await?;
89
90            // Add to contract+selector index
91            self.add_to_contract_selector_index(
92                &stored.contract_address,
93                &stored.event_selector,
94                &stored.siloed_event_commitment,
95            )
96            .await?;
97
98            // Add to block number index
99            self.add_to_block_index(stored.l2_block_number, &stored.siloed_event_commitment)
100                .await?;
101        }
102
103        Ok(())
104    }
105
106    /// Get private events matching a filter.
107    pub async fn get_private_events(
108        &self,
109        event_selector: &EventSelector,
110        filter: &PrivateEventQueryFilter,
111    ) -> Result<Vec<StoredPrivateEvent>, Error> {
112        let idx_key = contract_selector_index_key(&filter.contract_address, event_selector);
113        let event_ids: Vec<String> = match self.kv.get(&idx_key).await? {
114            Some(bytes) => serde_json::from_slice(&bytes)?,
115            None => return Ok(vec![]),
116        };
117
118        let mut events = Vec::new();
119        for id_str in event_ids {
120            if let Ok(id) = Fr::from_hex(&id_str) {
121                let key = event_key(&id);
122                if let Some(bytes) = self.kv.get(&key).await? {
123                    let event: StoredPrivateEvent = serde_json::from_slice(&bytes)?;
124
125                    // Block range filter
126                    if let Some(from) = filter.from_block {
127                        if event.l2_block_number < from {
128                            continue;
129                        }
130                    }
131                    if let Some(to) = filter.to_block {
132                        if event.l2_block_number > to {
133                            continue;
134                        }
135                    }
136
137                    // Scope filter
138                    if !filter.scopes.is_empty()
139                        && !event.scopes.iter().any(|s| filter.scopes.contains(s))
140                    {
141                        continue;
142                    }
143
144                    // Tx hash filter
145                    if let Some(ref tx_hash) = filter.tx_hash {
146                        if event.tx_hash != *tx_hash {
147                            continue;
148                        }
149                    }
150
151                    events.push(event);
152                }
153            }
154        }
155
156        // Sort by block_number, tx_index_in_block, event_index_in_tx
157        events.sort_by(|a, b| {
158            a.l2_block_number
159                .cmp(&b.l2_block_number)
160                .then(a.tx_index_in_block.cmp(&b.tx_index_in_block))
161                .then(a.event_index_in_tx.cmp(&b.event_index_in_tx))
162        });
163
164        Ok(events)
165    }
166
167    /// Rollback: delete events after a given block number.
168    pub async fn rollback(
169        &self,
170        block_number: u64,
171        _synced_block_number: u64,
172    ) -> Result<(), Error> {
173        let prefix = b"event_idx:block:";
174        let entries = self.kv.list_prefix(prefix).await?;
175
176        for (key, value) in &entries {
177            let key_str = String::from_utf8_lossy(key);
178            if let Some(bn_str) = key_str.strip_prefix("event_idx:block:") {
179                if let Ok(bn) = bn_str.parse::<u64>() {
180                    if bn > block_number {
181                        let event_ids: Vec<String> = serde_json::from_slice(value)?;
182                        for id_str in &event_ids {
183                            if let Ok(id) = Fr::from_hex(id_str) {
184                                let event_key = event_key(&id);
185                                // Remove from contract+selector index
186                                if let Some(event_bytes) = self.kv.get(&event_key).await? {
187                                    let event: StoredPrivateEvent =
188                                        serde_json::from_slice(&event_bytes)?;
189                                    self.remove_from_contract_selector_index(
190                                        &event.contract_address,
191                                        &event.event_selector,
192                                        &id,
193                                    )
194                                    .await?;
195                                }
196                                self.kv.delete(&event_key).await?;
197                            }
198                        }
199                        self.kv.delete(key).await?;
200                    }
201                }
202            }
203        }
204
205        Ok(())
206    }
207
208    // --- Index management ---
209
210    async fn add_to_contract_selector_index(
211        &self,
212        contract: &AztecAddress,
213        selector: &EventSelector,
214        event_id: &Fr,
215    ) -> Result<(), Error> {
216        let key = contract_selector_index_key(contract, selector);
217        let mut list: Vec<String> = match self.kv.get(&key).await? {
218            Some(bytes) => serde_json::from_slice(&bytes)?,
219            None => vec![],
220        };
221        let id_str = format!("{event_id}");
222        if !list.contains(&id_str) {
223            list.push(id_str);
224            self.kv.put(&key, &serde_json::to_vec(&list)?).await?;
225        }
226        Ok(())
227    }
228
229    async fn remove_from_contract_selector_index(
230        &self,
231        contract: &AztecAddress,
232        selector: &EventSelector,
233        event_id: &Fr,
234    ) -> Result<(), Error> {
235        let key = contract_selector_index_key(contract, selector);
236        if let Some(bytes) = self.kv.get(&key).await? {
237            let mut list: Vec<String> = serde_json::from_slice(&bytes)?;
238            let id_str = format!("{event_id}");
239            list.retain(|s| s != &id_str);
240            if list.is_empty() {
241                self.kv.delete(&key).await?;
242            } else {
243                self.kv.put(&key, &serde_json::to_vec(&list)?).await?;
244            }
245        }
246        Ok(())
247    }
248
249    async fn add_to_block_index(&self, block_number: u64, event_id: &Fr) -> Result<(), Error> {
250        let key = block_index_key(block_number);
251        let mut list: Vec<String> = match self.kv.get(&key).await? {
252            Some(bytes) => serde_json::from_slice(&bytes)?,
253            None => vec![],
254        };
255        let id_str = format!("{event_id}");
256        if !list.contains(&id_str) {
257            list.push(id_str);
258            self.kv.put(&key, &serde_json::to_vec(&list)?).await?;
259        }
260        Ok(())
261    }
262}
263
264fn event_key(siloed_event_commitment: &Fr) -> Vec<u8> {
265    format!("event:{siloed_event_commitment}").into_bytes()
266}
267
268fn contract_selector_index_key(contract: &AztecAddress, selector: &EventSelector) -> Vec<u8> {
269    format!("event_idx:cs:{contract}_{}", selector.0).into_bytes()
270}
271
272fn block_index_key(block_number: u64) -> Vec<u8> {
273    format!("event_idx:block:{block_number}").into_bytes()
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use crate::stores::InMemoryKvStore;
280
281    fn make_event(block: u64, commitment: u64) -> StoredPrivateEvent {
282        StoredPrivateEvent {
283            event_selector: EventSelector(Fr::from(0x12345678u64)),
284            randomness: Fr::from(1u64),
285            msg_content: vec![Fr::from(10u64)],
286            siloed_event_commitment: Fr::from(commitment),
287            contract_address: AztecAddress::from(1u64),
288            scopes: vec![],
289            tx_hash: TxHash::from_hex(
290                "0x0000000000000000000000000000000000000000000000000000000000000001",
291            )
292            .unwrap(),
293            l2_block_number: block,
294            l2_block_hash: format!("0x{:064x}", block),
295            tx_index_in_block: Some(0),
296            event_index_in_tx: Some(0),
297        }
298    }
299
300    #[tokio::test]
301    async fn store_and_retrieve_events() {
302        let store = PrivateEventStore::new(Arc::new(InMemoryKvStore::new()));
303        let scope = AztecAddress::from(99u64);
304        let event = make_event(1, 100);
305
306        store.store_private_event_log(&event, &scope).await.unwrap();
307
308        let selector = EventSelector(Fr::from(0x12345678u64));
309        let events = store
310            .get_private_events(
311                &selector,
312                &PrivateEventQueryFilter {
313                    contract_address: AztecAddress::from(1u64),
314                    from_block: None,
315                    to_block: None,
316                    scopes: vec![scope],
317                    tx_hash: None,
318                },
319            )
320            .await
321            .unwrap();
322        assert_eq!(events.len(), 1);
323        assert!(events[0].scopes.contains(&scope));
324    }
325
326    #[tokio::test]
327    async fn block_range_filtering() {
328        let store = PrivateEventStore::new(Arc::new(InMemoryKvStore::new()));
329        let scope = AztecAddress::from(99u64);
330
331        for i in 1..=5 {
332            let event = make_event(i, 100 + i);
333            store.store_private_event_log(&event, &scope).await.unwrap();
334        }
335
336        let selector = EventSelector(Fr::from(0x12345678u64));
337        let events = store
338            .get_private_events(
339                &selector,
340                &PrivateEventQueryFilter {
341                    contract_address: AztecAddress::from(1u64),
342                    from_block: Some(2),
343                    to_block: Some(4),
344                    scopes: vec![],
345                    tx_hash: None,
346                },
347            )
348            .await
349            .unwrap();
350        assert_eq!(events.len(), 3);
351    }
352
353    #[tokio::test]
354    async fn rollback_removes_events() {
355        let store = PrivateEventStore::new(Arc::new(InMemoryKvStore::new()));
356        let scope = AztecAddress::from(99u64);
357
358        for i in 1..=5 {
359            let event = make_event(i, 100 + i);
360            store.store_private_event_log(&event, &scope).await.unwrap();
361        }
362
363        store.rollback(3, 3).await.unwrap();
364
365        let selector = EventSelector(Fr::from(0x12345678u64));
366        let events = store
367            .get_private_events(
368                &selector,
369                &PrivateEventQueryFilter {
370                    contract_address: AztecAddress::from(1u64),
371                    from_block: None,
372                    to_block: None,
373                    scopes: vec![],
374                    tx_hash: None,
375                },
376            )
377            .await
378            .unwrap();
379        assert_eq!(events.len(), 3); // blocks 1, 2, 3 remain
380    }
381}