1use aztec_core::error::Error;
7use aztec_core::hash::{compute_siloed_private_log_first_field, poseidon2_hash};
8use aztec_core::tx::TxHash;
9use aztec_core::types::{AztecAddress, Fr};
10use aztec_node_client::AztecNode;
11
12use crate::stores::{CapsuleStore, RecipientTaggingStore, SenderStore, SenderTaggingStore};
13
14const MAX_RPC_LEN: usize = 128;
16
17const UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN: u64 = 20;
19
20#[derive(Debug, Clone)]
22pub struct LogRetrievalRequest {
23 pub is_public: bool,
25 pub tag: Fr,
27 pub contract_address: Option<AztecAddress>,
29}
30
31#[derive(Debug, Clone)]
33pub struct TaggedLog {
34 pub tag: Fr,
36 pub data: Vec<Fr>,
38 pub block_number: u64,
40 pub is_public: bool,
42 pub tx_hash: TxHash,
44 pub note_hashes: Vec<Fr>,
46 pub first_nullifier: Fr,
48}
49
50pub struct LogService<'a, N: AztecNode> {
52 node: &'a N,
53 sender_store: &'a SenderStore,
54 #[allow(dead_code)] sender_tagging_store: &'a SenderTaggingStore,
56 recipient_tagging_store: &'a RecipientTaggingStore,
57 #[allow(dead_code)]
58 capsule_store: &'a CapsuleStore,
59}
60
61impl<'a, N: AztecNode> LogService<'a, N> {
62 pub fn new(
63 node: &'a N,
64 sender_store: &'a SenderStore,
65 sender_tagging_store: &'a SenderTaggingStore,
66 recipient_tagging_store: &'a RecipientTaggingStore,
67 capsule_store: &'a CapsuleStore,
68 ) -> Self {
69 Self {
70 node,
71 sender_store,
72 sender_tagging_store,
73 recipient_tagging_store,
74 capsule_store,
75 }
76 }
77
78 pub async fn bulk_retrieve_logs(
83 &self,
84 requests: &[LogRetrievalRequest],
85 ) -> Result<Vec<Vec<TaggedLog>>, Error> {
86 let mut results = Vec::with_capacity(requests.len());
87 for request in requests {
88 let public_logs = if let Some(contract) = &request.contract_address {
89 self.get_public_logs_by_tag(contract, &request.tag).await?
90 } else {
91 vec![]
92 };
93 let siloed_tag = if let Some(contract) = &request.contract_address {
97 compute_siloed_private_log_first_field(contract, &request.tag)
98 } else {
99 request.tag
100 };
101 let mut private_logs = self.get_private_logs_by_tags(&[siloed_tag]).await?;
102 let private_logs = private_logs.pop().unwrap_or_default();
103
104 if !public_logs.is_empty() && !private_logs.is_empty() {
105 return Err(Error::InvalidData(format!(
106 "found both a public and private log for tag {}",
107 request.tag
108 )));
109 }
110
111 results.push(if !public_logs.is_empty() {
112 public_logs
113 } else {
114 private_logs
115 });
116 }
117
118 Ok(results)
119 }
120
121 pub async fn fetch_tagged_logs(
125 &self,
126 contract_address: &AztecAddress,
127 recipient: &AztecAddress,
128 tagging_secrets: &[Fr],
129 ) -> Result<Vec<TaggedLog>, Error> {
130 let mut all_logs = Vec::new();
131
132 for secret in tagging_secrets {
133 let finalized = self
134 .recipient_tagging_store
135 .get_highest_finalized_index(secret)
136 .await?;
137
138 let from_index = finalized + 1;
140 let to_index = finalized + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN;
141
142 let logs = self
143 .load_logs_for_range(contract_address, secret, from_index, to_index)
144 .await?;
145
146 all_logs.extend(logs);
147 }
148
149 if !all_logs.is_empty() {
150 tracing::debug!(
151 contract = %contract_address,
152 recipient = %recipient,
153 count = all_logs.len(),
154 "fetched tagged logs"
155 );
156 }
157
158 Ok(all_logs)
159 }
160
161 async fn load_logs_for_range(
163 &self,
164 contract_address: &AztecAddress,
165 secret: &Fr,
166 from_index: u64,
167 to_index: u64,
168 ) -> Result<Vec<TaggedLog>, Error> {
169 let mut tags = Vec::new();
171 for idx in from_index..=to_index {
172 let tag = compute_siloed_tag(secret, idx, contract_address);
173 tags.push(tag);
174 }
175
176 let mut all_logs = Vec::new();
178 for (chunk_idx, chunk) in tags.chunks(MAX_RPC_LEN).enumerate() {
179 let logs = self.get_private_logs_by_tags(chunk).await?;
180 let mut highest_found_index = None;
181 for (i, tag_logs) in logs.into_iter().enumerate() {
182 let idx = from_index + (chunk_idx * MAX_RPC_LEN + i) as u64;
183 let had_logs = !tag_logs.is_empty();
184 for log in tag_logs {
185 all_logs.push(log);
186 }
187
188 if had_logs {
189 highest_found_index = Some(idx);
190 }
191 }
192
193 if let Some(idx) = highest_found_index {
194 self.recipient_tagging_store
195 .update_highest_finalized_index(secret, idx)
196 .await
197 .ok();
198 }
199 }
200
201 Ok(all_logs)
202 }
203
204 async fn get_private_logs_by_tags(&self, tags: &[Fr]) -> Result<Vec<Vec<TaggedLog>>, Error> {
206 if tags.is_empty() {
207 return Ok(vec![]);
208 }
209
210 let response = self.node.get_private_logs_by_tags(tags).await?;
211
212 let mut results = vec![Vec::new(); tags.len()];
214 if let Some(outer) = response.as_array() {
215 for (tag_idx, tag_logs) in outer.iter().enumerate().take(tags.len()) {
216 let mut logs = Vec::new();
217 if let Some(entries) = tag_logs.as_array() {
218 for entry in entries {
219 let data = entry
220 .get("logData")
221 .or_else(|| entry.get("data"))
222 .and_then(|d| d.as_array())
223 .map(|arr| {
224 arr.iter()
225 .filter_map(|v| v.as_str().and_then(|s| Fr::from_hex(s).ok()))
226 .collect::<Vec<_>>()
227 })
228 .unwrap_or_default();
229
230 let block_number = entry
231 .get("blockNumber")
232 .and_then(|v| v.as_u64())
233 .unwrap_or(0);
234 let tx_hash = entry
235 .get("txHash")
236 .and_then(|v| v.as_str())
237 .map(TxHash::from_hex)
238 .transpose()?
239 .unwrap_or_else(TxHash::zero);
240 let note_hashes = entry
241 .get("noteHashes")
242 .and_then(|v| v.as_array())
243 .map(|arr| {
244 arr.iter()
245 .filter_map(|v| v.as_str().and_then(|s| Fr::from_hex(s).ok()))
246 .collect::<Vec<_>>()
247 })
248 .unwrap_or_default();
249 let first_nullifier = entry
250 .get("firstNullifier")
251 .and_then(|v| v.as_str())
252 .map(Fr::from_hex)
253 .transpose()?
254 .unwrap_or_else(Fr::zero);
255
256 logs.push(TaggedLog {
257 tag: tags[tag_idx],
258 data,
259 block_number,
260 is_public: false,
261 tx_hash,
262 note_hashes,
263 first_nullifier,
264 });
265 }
266 }
267 results[tag_idx] = logs;
268 }
269 }
270
271 Ok(results)
272 }
273
274 async fn get_public_logs_by_tag(
276 &self,
277 contract: &AztecAddress,
278 tag: &Fr,
279 ) -> Result<Vec<TaggedLog>, Error> {
280 let response = self
281 .node
282 .get_public_logs_by_tags_from_contract(contract, &[*tag])
283 .await?;
284
285 let mut logs = Vec::new();
286 if let Some(outer) = response.as_array() {
287 for tag_logs in outer {
288 if let Some(entries) = tag_logs.as_array() {
289 for entry in entries {
290 let data = entry
291 .get("logData")
292 .or_else(|| entry.get("data"))
293 .and_then(|d| d.as_array())
294 .map(|arr| {
295 arr.iter()
296 .filter_map(|v| v.as_str().and_then(|s| Fr::from_hex(s).ok()))
297 .collect::<Vec<_>>()
298 })
299 .unwrap_or_default();
300
301 let block_number = entry
302 .get("blockNumber")
303 .and_then(|v| v.as_u64())
304 .unwrap_or(0);
305 let tx_hash = entry
306 .get("txHash")
307 .and_then(|v| v.as_str())
308 .map(TxHash::from_hex)
309 .transpose()?
310 .unwrap_or_else(TxHash::zero);
311 let note_hashes = entry
312 .get("noteHashes")
313 .and_then(|v| v.as_array())
314 .map(|arr| {
315 arr.iter()
316 .filter_map(|v| v.as_str().and_then(|s| Fr::from_hex(s).ok()))
317 .collect::<Vec<_>>()
318 })
319 .unwrap_or_default();
320 let first_nullifier = entry
321 .get("firstNullifier")
322 .and_then(|v| v.as_str())
323 .map(Fr::from_hex)
324 .transpose()?
325 .unwrap_or_else(Fr::zero);
326
327 logs.push(TaggedLog {
328 tag: *tag,
329 data,
330 block_number,
331 is_public: true,
332 tx_hash,
333 note_hashes,
334 first_nullifier,
335 });
336 }
337 }
338 }
339 }
340
341 Ok(logs)
342 }
343
344 pub async fn get_senders(&self) -> Result<Vec<AztecAddress>, Error> {
346 self.sender_store.get_all().await
347 }
348}
349
350fn compute_siloed_tag(secret: &Fr, index: u64, contract_address: &AztecAddress) -> Fr {
355 let tag = poseidon2_hash(&[*secret, Fr::from(index)]);
356 compute_siloed_private_log_first_field(contract_address, &tag)
357}
358
359#[cfg(test)]
360mod tests {
361 use std::sync::Arc;
362
363 use super::*;
364 use crate::stores::{
365 kv::KvStore, CapsuleStore, InMemoryKvStore, RecipientTaggingStore, SenderStore,
366 SenderTaggingStore,
367 };
368 use aztec_core::tx::{TxExecutionResult, TxHash, TxReceipt, TxStatus};
369 use aztec_core::types::{ContractInstanceWithAddress, Fr};
370 use aztec_node_client::{NodeInfo, PublicLogFilter, PublicLogsResponse};
371
372 struct MockNode {
373 private_logs: serde_json::Value,
374 }
375
376 #[async_trait::async_trait]
377 impl AztecNode for MockNode {
378 async fn get_node_info(&self) -> Result<NodeInfo, Error> {
379 Ok(NodeInfo {
380 node_version: "mock".into(),
381 l1_chain_id: 1,
382 rollup_version: 1,
383 enr: None,
384 l1_contract_addresses: serde_json::Value::Null,
385 protocol_contract_addresses: serde_json::Value::Null,
386 real_proofs: false,
387 l2_circuits_vk_tree_root: None,
388 l2_protocol_contracts_hash: None,
389 })
390 }
391 async fn get_block_number(&self) -> Result<u64, Error> {
392 Ok(1)
393 }
394 async fn get_proven_block_number(&self) -> Result<u64, Error> {
395 Ok(1)
396 }
397 async fn get_tx_receipt(&self, _tx_hash: &TxHash) -> Result<TxReceipt, Error> {
398 Ok(TxReceipt {
399 tx_hash: TxHash::zero(),
400 status: TxStatus::Pending,
401 execution_result: Some(TxExecutionResult::Success),
402 error: None,
403 transaction_fee: None,
404 block_hash: None,
405 block_number: None,
406 epoch_number: None,
407 })
408 }
409 async fn get_tx_effect(
410 &self,
411 _tx_hash: &TxHash,
412 ) -> Result<Option<serde_json::Value>, Error> {
413 Ok(None)
414 }
415 async fn get_tx_by_hash(
416 &self,
417 _tx_hash: &TxHash,
418 ) -> Result<Option<serde_json::Value>, Error> {
419 Ok(None)
420 }
421 async fn get_public_logs(
422 &self,
423 _filter: PublicLogFilter,
424 ) -> Result<PublicLogsResponse, Error> {
425 Ok(PublicLogsResponse {
426 logs: vec![],
427 max_logs_hit: false,
428 })
429 }
430 async fn send_tx(&self, _tx: &serde_json::Value) -> Result<(), Error> {
431 Ok(())
432 }
433 async fn get_contract(
434 &self,
435 _address: &AztecAddress,
436 ) -> Result<Option<ContractInstanceWithAddress>, Error> {
437 Ok(None)
438 }
439 async fn get_contract_class(&self, _id: &Fr) -> Result<Option<serde_json::Value>, Error> {
440 Ok(None)
441 }
442 async fn get_block_header(&self, _block_number: u64) -> Result<serde_json::Value, Error> {
443 Ok(serde_json::json!({}))
444 }
445 async fn get_block(&self, _block_number: u64) -> Result<Option<serde_json::Value>, Error> {
446 Ok(None)
447 }
448 async fn get_note_hash_membership_witness(
449 &self,
450 _block_number: u64,
451 _note_hash: &Fr,
452 ) -> Result<Option<serde_json::Value>, Error> {
453 Ok(None)
454 }
455 async fn get_nullifier_membership_witness(
456 &self,
457 _block_number: u64,
458 _nullifier: &Fr,
459 ) -> Result<Option<serde_json::Value>, Error> {
460 Ok(None)
461 }
462 async fn get_low_nullifier_membership_witness(
463 &self,
464 _block_number: u64,
465 _nullifier: &Fr,
466 ) -> Result<Option<serde_json::Value>, Error> {
467 Ok(None)
468 }
469 async fn get_public_storage_at(
470 &self,
471 _block_number: u64,
472 _contract: &AztecAddress,
473 _slot: &Fr,
474 ) -> Result<Fr, Error> {
475 Ok(Fr::zero())
476 }
477 async fn get_public_data_witness(
478 &self,
479 _block_number: u64,
480 _leaf_slot: &Fr,
481 ) -> Result<Option<serde_json::Value>, Error> {
482 Ok(None)
483 }
484 async fn get_l1_to_l2_message_membership_witness(
485 &self,
486 _block_number: u64,
487 _entry_key: &Fr,
488 ) -> Result<Option<serde_json::Value>, Error> {
489 Ok(None)
490 }
491 async fn simulate_public_calls(
492 &self,
493 _tx: &serde_json::Value,
494 _skip_fee_enforcement: bool,
495 ) -> Result<serde_json::Value, Error> {
496 Ok(serde_json::Value::Null)
497 }
498 async fn is_valid_tx(
499 &self,
500 _tx: &serde_json::Value,
501 ) -> Result<aztec_node_client::TxValidationResult, Error> {
502 Ok(aztec_node_client::TxValidationResult::Valid)
503 }
504 async fn get_private_logs_by_tags(&self, _tags: &[Fr]) -> Result<serde_json::Value, Error> {
505 Ok(self.private_logs.clone())
506 }
507 async fn get_public_logs_by_tags_from_contract(
508 &self,
509 _contract: &AztecAddress,
510 _tags: &[Fr],
511 ) -> Result<serde_json::Value, Error> {
512 Ok(serde_json::json!([]))
513 }
514 async fn register_contract_function_signatures(
515 &self,
516 _signatures: &[String],
517 ) -> Result<(), Error> {
518 Ok(())
519 }
520 async fn get_block_hash_membership_witness(
521 &self,
522 _block_number: u64,
523 _block_hash: &Fr,
524 ) -> Result<Option<serde_json::Value>, Error> {
525 Ok(None)
526 }
527 async fn find_leaves_indexes(
528 &self,
529 _block_number: u64,
530 _tree_id: &str,
531 _leaves: &[Fr],
532 ) -> Result<Vec<Option<u64>>, Error> {
533 Ok(vec![])
534 }
535 }
536
537 fn make_service(
538 private_logs: serde_json::Value,
539 ) -> (
540 LogService<'static, MockNode>,
541 &'static RecipientTaggingStore,
542 Fr,
543 ) {
544 let kv: Arc<dyn KvStore> = Arc::new(InMemoryKvStore::new());
545 let secret = Fr::from(7u64);
546 let node = Box::leak(Box::new(MockNode { private_logs }));
547 let sender_store = Box::leak(Box::new(SenderStore::new(Arc::clone(&kv))));
548 let sender_tagging_store = Box::leak(Box::new(SenderTaggingStore::new(Arc::clone(&kv))));
549 let recipient_tagging_store =
550 Box::leak(Box::new(RecipientTaggingStore::new(Arc::clone(&kv))));
551 let capsule_store = Box::leak(Box::new(CapsuleStore::new(kv)));
552 let service = LogService::new(
553 node,
554 sender_store,
555 sender_tagging_store,
556 recipient_tagging_store,
557 capsule_store,
558 );
559 (service, recipient_tagging_store, secret)
560 }
561
562 #[tokio::test]
563 async fn private_logs_preserve_requested_tag_alignment() {
564 let tag_a = Fr::from(11u64);
565 let tag_b = Fr::from(12u64);
566 let (service, _, _) = make_service(serde_json::json!([
567 [{"data": ["0x01"], "blockNumber": 5}],
568 [{"data": ["0x02"], "blockNumber": 6}]
569 ]));
570
571 let logs = service
572 .get_private_logs_by_tags(&[tag_a, tag_b])
573 .await
574 .unwrap();
575 assert_eq!(logs.len(), 2);
576 assert_eq!(logs[0][0].tag, tag_a);
577 assert_eq!(logs[1][0].tag, tag_b);
578 }
579
580 #[tokio::test]
581 async fn load_logs_for_range_only_advances_index_when_logs_exist() {
582 let (service, recipient_tagging_store, secret) =
583 make_service(serde_json::json!([[], [], []]));
584 service
585 .load_logs_for_range(&AztecAddress::from(1u64), &secret, 1, 3)
586 .await
587 .unwrap();
588
589 assert_eq!(
590 recipient_tagging_store
591 .get_highest_finalized_index(&secret)
592 .await
593 .unwrap(),
594 0
595 );
596 }
597
598 #[tokio::test]
599 async fn bulk_retrieve_logs_preserves_request_order() {
600 let (service, _, _) = make_service(serde_json::json!([
601 [{"data": ["0x0a"], "blockNumber": 1}]
602 ]));
603
604 let tag_first = Fr::from(1u64);
605 let tag_second = Fr::from(2u64);
606
607 let logs = service
608 .bulk_retrieve_logs(&[
609 LogRetrievalRequest {
610 is_public: false,
611 tag: tag_first,
612 contract_address: None,
613 },
614 LogRetrievalRequest {
615 is_public: false,
616 tag: tag_second,
617 contract_address: None,
618 },
619 ])
620 .await
621 .unwrap();
622
623 assert_eq!(logs.len(), 2);
626 assert_eq!(logs[0].len(), 1);
627 assert_eq!(logs[0][0].tag, tag_first);
628 assert_eq!(logs[1].len(), 1);
629 assert_eq!(logs[1][0].tag, tag_second);
630 }
631}