1use std::sync::Arc;
7
8use aztec_core::abi::EventSelector;
9use aztec_core::error::Error;
10use aztec_core::tx::TxHash;
11use aztec_core::types::{AztecAddress, Fr};
12use serde::{Deserialize, Serialize};
13
14use super::kv::KvStore;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct StoredPrivateEvent {
19 pub event_selector: EventSelector,
21 pub randomness: Fr,
23 pub msg_content: Vec<Fr>,
25 pub siloed_event_commitment: Fr,
27 pub contract_address: AztecAddress,
29 pub scopes: Vec<AztecAddress>,
31 pub tx_hash: TxHash,
33 pub l2_block_number: u64,
35 pub l2_block_hash: String,
37 pub tx_index_in_block: Option<u64>,
39 pub event_index_in_tx: Option<u64>,
41}
42
43#[derive(Debug, Clone)]
45pub struct PrivateEventQueryFilter {
46 pub contract_address: AztecAddress,
48 pub from_block: Option<u64>,
50 pub to_block: Option<u64>,
52 pub scopes: Vec<AztecAddress>,
54 pub tx_hash: Option<TxHash>,
56}
57
58pub struct PrivateEventStore {
60 kv: Arc<dyn KvStore>,
61}
62
63impl PrivateEventStore {
64 pub fn new(kv: Arc<dyn KvStore>) -> Self {
65 Self { kv }
66 }
67
68 pub async fn store_private_event_log(
70 &self,
71 event: &StoredPrivateEvent,
72 scope: &AztecAddress,
73 ) -> Result<(), Error> {
74 let key = event_key(&event.siloed_event_commitment);
75
76 if let Some(existing_bytes) = self.kv.get(&key).await? {
78 let mut existing: StoredPrivateEvent = serde_json::from_slice(&existing_bytes)?;
79 if !existing.scopes.contains(scope) {
80 existing.scopes.push(*scope);
81 }
82 self.kv.put(&key, &serde_json::to_vec(&existing)?).await?;
83 } else {
84 let mut stored = event.clone();
85 if !stored.scopes.contains(scope) {
86 stored.scopes.push(*scope);
87 }
88 self.kv.put(&key, &serde_json::to_vec(&stored)?).await?;
89
90 self.add_to_contract_selector_index(
92 &stored.contract_address,
93 &stored.event_selector,
94 &stored.siloed_event_commitment,
95 )
96 .await?;
97
98 self.add_to_block_index(stored.l2_block_number, &stored.siloed_event_commitment)
100 .await?;
101 }
102
103 Ok(())
104 }
105
106 pub async fn get_private_events(
108 &self,
109 event_selector: &EventSelector,
110 filter: &PrivateEventQueryFilter,
111 ) -> Result<Vec<StoredPrivateEvent>, Error> {
112 let idx_key = contract_selector_index_key(&filter.contract_address, event_selector);
113 let event_ids: Vec<String> = match self.kv.get(&idx_key).await? {
114 Some(bytes) => serde_json::from_slice(&bytes)?,
115 None => return Ok(vec![]),
116 };
117
118 let mut events = Vec::new();
119 for id_str in event_ids {
120 if let Ok(id) = Fr::from_hex(&id_str) {
121 let key = event_key(&id);
122 if let Some(bytes) = self.kv.get(&key).await? {
123 let event: StoredPrivateEvent = serde_json::from_slice(&bytes)?;
124
125 if let Some(from) = filter.from_block {
127 if event.l2_block_number < from {
128 continue;
129 }
130 }
131 if let Some(to) = filter.to_block {
132 if event.l2_block_number > to {
133 continue;
134 }
135 }
136
137 if !filter.scopes.is_empty()
139 && !event.scopes.iter().any(|s| filter.scopes.contains(s))
140 {
141 continue;
142 }
143
144 if let Some(ref tx_hash) = filter.tx_hash {
146 if event.tx_hash != *tx_hash {
147 continue;
148 }
149 }
150
151 events.push(event);
152 }
153 }
154 }
155
156 events.sort_by(|a, b| {
158 a.l2_block_number
159 .cmp(&b.l2_block_number)
160 .then(a.tx_index_in_block.cmp(&b.tx_index_in_block))
161 .then(a.event_index_in_tx.cmp(&b.event_index_in_tx))
162 });
163
164 Ok(events)
165 }
166
167 pub async fn rollback(
169 &self,
170 block_number: u64,
171 _synced_block_number: u64,
172 ) -> Result<(), Error> {
173 let prefix = b"event_idx:block:";
174 let entries = self.kv.list_prefix(prefix).await?;
175
176 for (key, value) in &entries {
177 let key_str = String::from_utf8_lossy(key);
178 if let Some(bn_str) = key_str.strip_prefix("event_idx:block:") {
179 if let Ok(bn) = bn_str.parse::<u64>() {
180 if bn > block_number {
181 let event_ids: Vec<String> = serde_json::from_slice(value)?;
182 for id_str in &event_ids {
183 if let Ok(id) = Fr::from_hex(id_str) {
184 let event_key = event_key(&id);
185 if let Some(event_bytes) = self.kv.get(&event_key).await? {
187 let event: StoredPrivateEvent =
188 serde_json::from_slice(&event_bytes)?;
189 self.remove_from_contract_selector_index(
190 &event.contract_address,
191 &event.event_selector,
192 &id,
193 )
194 .await?;
195 }
196 self.kv.delete(&event_key).await?;
197 }
198 }
199 self.kv.delete(key).await?;
200 }
201 }
202 }
203 }
204
205 Ok(())
206 }
207
208 async fn add_to_contract_selector_index(
211 &self,
212 contract: &AztecAddress,
213 selector: &EventSelector,
214 event_id: &Fr,
215 ) -> Result<(), Error> {
216 let key = contract_selector_index_key(contract, selector);
217 let mut list: Vec<String> = match self.kv.get(&key).await? {
218 Some(bytes) => serde_json::from_slice(&bytes)?,
219 None => vec![],
220 };
221 let id_str = format!("{event_id}");
222 if !list.contains(&id_str) {
223 list.push(id_str);
224 self.kv.put(&key, &serde_json::to_vec(&list)?).await?;
225 }
226 Ok(())
227 }
228
229 async fn remove_from_contract_selector_index(
230 &self,
231 contract: &AztecAddress,
232 selector: &EventSelector,
233 event_id: &Fr,
234 ) -> Result<(), Error> {
235 let key = contract_selector_index_key(contract, selector);
236 if let Some(bytes) = self.kv.get(&key).await? {
237 let mut list: Vec<String> = serde_json::from_slice(&bytes)?;
238 let id_str = format!("{event_id}");
239 list.retain(|s| s != &id_str);
240 if list.is_empty() {
241 self.kv.delete(&key).await?;
242 } else {
243 self.kv.put(&key, &serde_json::to_vec(&list)?).await?;
244 }
245 }
246 Ok(())
247 }
248
249 async fn add_to_block_index(&self, block_number: u64, event_id: &Fr) -> Result<(), Error> {
250 let key = block_index_key(block_number);
251 let mut list: Vec<String> = match self.kv.get(&key).await? {
252 Some(bytes) => serde_json::from_slice(&bytes)?,
253 None => vec![],
254 };
255 let id_str = format!("{event_id}");
256 if !list.contains(&id_str) {
257 list.push(id_str);
258 self.kv.put(&key, &serde_json::to_vec(&list)?).await?;
259 }
260 Ok(())
261 }
262}
263
264fn event_key(siloed_event_commitment: &Fr) -> Vec<u8> {
265 format!("event:{siloed_event_commitment}").into_bytes()
266}
267
268fn contract_selector_index_key(contract: &AztecAddress, selector: &EventSelector) -> Vec<u8> {
269 format!("event_idx:cs:{contract}_{}", selector.0).into_bytes()
270}
271
272fn block_index_key(block_number: u64) -> Vec<u8> {
273 format!("event_idx:block:{block_number}").into_bytes()
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use crate::stores::InMemoryKvStore;
280
281 fn make_event(block: u64, commitment: u64) -> StoredPrivateEvent {
282 StoredPrivateEvent {
283 event_selector: EventSelector(Fr::from(0x12345678u64)),
284 randomness: Fr::from(1u64),
285 msg_content: vec![Fr::from(10u64)],
286 siloed_event_commitment: Fr::from(commitment),
287 contract_address: AztecAddress::from(1u64),
288 scopes: vec![],
289 tx_hash: TxHash::from_hex(
290 "0x0000000000000000000000000000000000000000000000000000000000000001",
291 )
292 .unwrap(),
293 l2_block_number: block,
294 l2_block_hash: format!("0x{:064x}", block),
295 tx_index_in_block: Some(0),
296 event_index_in_tx: Some(0),
297 }
298 }
299
300 #[tokio::test]
301 async fn store_and_retrieve_events() {
302 let store = PrivateEventStore::new(Arc::new(InMemoryKvStore::new()));
303 let scope = AztecAddress::from(99u64);
304 let event = make_event(1, 100);
305
306 store.store_private_event_log(&event, &scope).await.unwrap();
307
308 let selector = EventSelector(Fr::from(0x12345678u64));
309 let events = store
310 .get_private_events(
311 &selector,
312 &PrivateEventQueryFilter {
313 contract_address: AztecAddress::from(1u64),
314 from_block: None,
315 to_block: None,
316 scopes: vec![scope],
317 tx_hash: None,
318 },
319 )
320 .await
321 .unwrap();
322 assert_eq!(events.len(), 1);
323 assert!(events[0].scopes.contains(&scope));
324 }
325
326 #[tokio::test]
327 async fn block_range_filtering() {
328 let store = PrivateEventStore::new(Arc::new(InMemoryKvStore::new()));
329 let scope = AztecAddress::from(99u64);
330
331 for i in 1..=5 {
332 let event = make_event(i, 100 + i);
333 store.store_private_event_log(&event, &scope).await.unwrap();
334 }
335
336 let selector = EventSelector(Fr::from(0x12345678u64));
337 let events = store
338 .get_private_events(
339 &selector,
340 &PrivateEventQueryFilter {
341 contract_address: AztecAddress::from(1u64),
342 from_block: Some(2),
343 to_block: Some(4),
344 scopes: vec![],
345 tx_hash: None,
346 },
347 )
348 .await
349 .unwrap();
350 assert_eq!(events.len(), 3);
351 }
352
353 #[tokio::test]
354 async fn rollback_removes_events() {
355 let store = PrivateEventStore::new(Arc::new(InMemoryKvStore::new()));
356 let scope = AztecAddress::from(99u64);
357
358 for i in 1..=5 {
359 let event = make_event(i, 100 + i);
360 store.store_private_event_log(&event, &scope).await.unwrap();
361 }
362
363 store.rollback(3, 3).await.unwrap();
364
365 let selector = EventSelector(Fr::from(0x12345678u64));
366 let events = store
367 .get_private_events(
368 &selector,
369 &PrivateEventQueryFilter {
370 contract_address: AztecAddress::from(1u64),
371 from_block: None,
372 to_block: None,
373 scopes: vec![],
374 tx_hash: None,
375 },
376 )
377 .await
378 .unwrap();
379 assert_eq!(events.len(), 3); }
381}