aztec_pxe/sync/
log_service.rs

1//! Log service for tagged log retrieval and storage.
2//!
3//! Ports the TS `LogService` which manages log retrieval via the tagging
4//! protocol, supporting both public and private logs with pagination.
5
6use aztec_core::error::Error;
7use aztec_core::hash::{compute_siloed_private_log_first_field, poseidon2_hash};
8use aztec_core::tx::TxHash;
9use aztec_core::types::{AztecAddress, Fr};
10use aztec_node_client::AztecNode;
11
12use crate::stores::{CapsuleStore, RecipientTaggingStore, SenderStore, SenderTaggingStore};
13
14/// Maximum number of tags per RPC request.
15const MAX_RPC_LEN: usize = 128;
16
17/// Window length for unfinalized tagging indexes.
18const UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN: u64 = 20;
19
20/// A request to retrieve logs by tag.
21#[derive(Debug, Clone)]
22pub struct LogRetrievalRequest {
23    /// Whether the log is public or private.
24    pub is_public: bool,
25    /// The tag to search for.
26    pub tag: Fr,
27    /// The contract address (for public logs).
28    pub contract_address: Option<AztecAddress>,
29}
30
31/// A retrieved tagged log entry.
32#[derive(Debug, Clone)]
33pub struct TaggedLog {
34    /// The tag that matched.
35    pub tag: Fr,
36    /// The log data fields.
37    pub data: Vec<Fr>,
38    /// Block number containing the log.
39    pub block_number: u64,
40    /// Whether this is a public log.
41    pub is_public: bool,
42    /// Transaction hash that emitted the log.
43    pub tx_hash: TxHash,
44    /// Unique note hashes created by the emitting transaction.
45    pub note_hashes: Vec<Fr>,
46    /// First nullifier created by the emitting transaction.
47    pub first_nullifier: Fr,
48}
49
50/// Service for log retrieval operations using the tagging protocol.
51pub struct LogService<'a, N: AztecNode> {
52    node: &'a N,
53    sender_store: &'a SenderStore,
54    #[allow(dead_code)] // Used when sender-side tag sync is wired
55    sender_tagging_store: &'a SenderTaggingStore,
56    recipient_tagging_store: &'a RecipientTaggingStore,
57    #[allow(dead_code)]
58    capsule_store: &'a CapsuleStore,
59}
60
61impl<'a, N: AztecNode> LogService<'a, N> {
62    pub fn new(
63        node: &'a N,
64        sender_store: &'a SenderStore,
65        sender_tagging_store: &'a SenderTaggingStore,
66        recipient_tagging_store: &'a RecipientTaggingStore,
67        capsule_store: &'a CapsuleStore,
68    ) -> Self {
69        Self {
70            node,
71            sender_store,
72            sender_tagging_store,
73            recipient_tagging_store,
74            capsule_store,
75        }
76    }
77
78    /// Bulk retrieve logs by tags.
79    ///
80    /// Fetches both public and private logs for multiple tag requests,
81    /// handling pagination automatically.
82    pub async fn bulk_retrieve_logs(
83        &self,
84        requests: &[LogRetrievalRequest],
85    ) -> Result<Vec<Vec<TaggedLog>>, Error> {
86        let mut results = Vec::with_capacity(requests.len());
87        for request in requests {
88            let public_logs = if let Some(contract) = &request.contract_address {
89                self.get_public_logs_by_tag(contract, &request.tag).await?
90            } else {
91                vec![]
92            };
93            // The request tag from Noir is UNSILOED.  The node indexes
94            // private logs by the SILOED first field, so we must silo
95            // before querying.
96            let siloed_tag = if let Some(contract) = &request.contract_address {
97                compute_siloed_private_log_first_field(contract, &request.tag)
98            } else {
99                request.tag
100            };
101            let mut private_logs = self.get_private_logs_by_tags(&[siloed_tag]).await?;
102            let private_logs = private_logs.pop().unwrap_or_default();
103
104            if !public_logs.is_empty() && !private_logs.is_empty() {
105                return Err(Error::InvalidData(format!(
106                    "found both a public and private log for tag {}",
107                    request.tag
108                )));
109            }
110
111            results.push(if !public_logs.is_empty() {
112                public_logs
113            } else {
114                private_logs
115            });
116        }
117
118        Ok(results)
119    }
120
121    /// Fetch all tagged logs for a contract, handling multiple recipients and senders.
122    ///
123    /// This is the main entry point for note discovery via the tagging protocol.
124    pub async fn fetch_tagged_logs(
125        &self,
126        contract_address: &AztecAddress,
127        recipient: &AztecAddress,
128        tagging_secrets: &[Fr],
129    ) -> Result<Vec<TaggedLog>, Error> {
130        let mut all_logs = Vec::new();
131
132        for secret in tagging_secrets {
133            let finalized = self
134                .recipient_tagging_store
135                .get_highest_finalized_index(secret)
136                .await?;
137
138            // Load logs for index range: (finalized, finalized + WINDOW_LEN]
139            let from_index = finalized + 1;
140            let to_index = finalized + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN;
141
142            let logs = self
143                .load_logs_for_range(contract_address, secret, from_index, to_index)
144                .await?;
145
146            all_logs.extend(logs);
147        }
148
149        if !all_logs.is_empty() {
150            tracing::debug!(
151                contract = %contract_address,
152                recipient = %recipient,
153                count = all_logs.len(),
154                "fetched tagged logs"
155            );
156        }
157
158        Ok(all_logs)
159    }
160
161    /// Load logs for a range of tagging indexes.
162    async fn load_logs_for_range(
163        &self,
164        contract_address: &AztecAddress,
165        secret: &Fr,
166        from_index: u64,
167        to_index: u64,
168    ) -> Result<Vec<TaggedLog>, Error> {
169        // Compute siloed tags for each index in range
170        let mut tags = Vec::new();
171        for idx in from_index..=to_index {
172            let tag = compute_siloed_tag(secret, idx, contract_address);
173            tags.push(tag);
174        }
175
176        // Fetch logs in batches
177        let mut all_logs = Vec::new();
178        for (chunk_idx, chunk) in tags.chunks(MAX_RPC_LEN).enumerate() {
179            let logs = self.get_private_logs_by_tags(chunk).await?;
180            let mut highest_found_index = None;
181            for (i, tag_logs) in logs.into_iter().enumerate() {
182                let idx = from_index + (chunk_idx * MAX_RPC_LEN + i) as u64;
183                let had_logs = !tag_logs.is_empty();
184                for log in tag_logs {
185                    all_logs.push(log);
186                }
187
188                if had_logs {
189                    highest_found_index = Some(idx);
190                }
191            }
192
193            if let Some(idx) = highest_found_index {
194                self.recipient_tagging_store
195                    .update_highest_finalized_index(secret, idx)
196                    .await
197                    .ok();
198            }
199        }
200
201        Ok(all_logs)
202    }
203
204    /// Fetch private logs by siloed tags from the node.
205    async fn get_private_logs_by_tags(&self, tags: &[Fr]) -> Result<Vec<Vec<TaggedLog>>, Error> {
206        if tags.is_empty() {
207            return Ok(vec![]);
208        }
209
210        let response = self.node.get_private_logs_by_tags(tags).await?;
211
212        // Parse the response — each tag gets an array of log entries
213        let mut results = vec![Vec::new(); tags.len()];
214        if let Some(outer) = response.as_array() {
215            for (tag_idx, tag_logs) in outer.iter().enumerate().take(tags.len()) {
216                let mut logs = Vec::new();
217                if let Some(entries) = tag_logs.as_array() {
218                    for entry in entries {
219                        let data = entry
220                            .get("logData")
221                            .or_else(|| entry.get("data"))
222                            .and_then(|d| d.as_array())
223                            .map(|arr| {
224                                arr.iter()
225                                    .filter_map(|v| v.as_str().and_then(|s| Fr::from_hex(s).ok()))
226                                    .collect::<Vec<_>>()
227                            })
228                            .unwrap_or_default();
229
230                        let block_number = entry
231                            .get("blockNumber")
232                            .and_then(|v| v.as_u64())
233                            .unwrap_or(0);
234                        let tx_hash = entry
235                            .get("txHash")
236                            .and_then(|v| v.as_str())
237                            .map(TxHash::from_hex)
238                            .transpose()?
239                            .unwrap_or_else(TxHash::zero);
240                        let note_hashes = entry
241                            .get("noteHashes")
242                            .and_then(|v| v.as_array())
243                            .map(|arr| {
244                                arr.iter()
245                                    .filter_map(|v| v.as_str().and_then(|s| Fr::from_hex(s).ok()))
246                                    .collect::<Vec<_>>()
247                            })
248                            .unwrap_or_default();
249                        let first_nullifier = entry
250                            .get("firstNullifier")
251                            .and_then(|v| v.as_str())
252                            .map(Fr::from_hex)
253                            .transpose()?
254                            .unwrap_or_else(Fr::zero);
255
256                        logs.push(TaggedLog {
257                            tag: tags[tag_idx],
258                            data,
259                            block_number,
260                            is_public: false,
261                            tx_hash,
262                            note_hashes,
263                            first_nullifier,
264                        });
265                    }
266                }
267                results[tag_idx] = logs;
268            }
269        }
270
271        Ok(results)
272    }
273
274    /// Fetch public logs by tag from a specific contract.
275    async fn get_public_logs_by_tag(
276        &self,
277        contract: &AztecAddress,
278        tag: &Fr,
279    ) -> Result<Vec<TaggedLog>, Error> {
280        let response = self
281            .node
282            .get_public_logs_by_tags_from_contract(contract, &[*tag])
283            .await?;
284
285        let mut logs = Vec::new();
286        if let Some(outer) = response.as_array() {
287            for tag_logs in outer {
288                if let Some(entries) = tag_logs.as_array() {
289                    for entry in entries {
290                        let data = entry
291                            .get("logData")
292                            .or_else(|| entry.get("data"))
293                            .and_then(|d| d.as_array())
294                            .map(|arr| {
295                                arr.iter()
296                                    .filter_map(|v| v.as_str().and_then(|s| Fr::from_hex(s).ok()))
297                                    .collect::<Vec<_>>()
298                            })
299                            .unwrap_or_default();
300
301                        let block_number = entry
302                            .get("blockNumber")
303                            .and_then(|v| v.as_u64())
304                            .unwrap_or(0);
305                        let tx_hash = entry
306                            .get("txHash")
307                            .and_then(|v| v.as_str())
308                            .map(TxHash::from_hex)
309                            .transpose()?
310                            .unwrap_or_else(TxHash::zero);
311                        let note_hashes = entry
312                            .get("noteHashes")
313                            .and_then(|v| v.as_array())
314                            .map(|arr| {
315                                arr.iter()
316                                    .filter_map(|v| v.as_str().and_then(|s| Fr::from_hex(s).ok()))
317                                    .collect::<Vec<_>>()
318                            })
319                            .unwrap_or_default();
320                        let first_nullifier = entry
321                            .get("firstNullifier")
322                            .and_then(|v| v.as_str())
323                            .map(Fr::from_hex)
324                            .transpose()?
325                            .unwrap_or_else(Fr::zero);
326
327                        logs.push(TaggedLog {
328                            tag: *tag,
329                            data,
330                            block_number,
331                            is_public: true,
332                            tx_hash,
333                            note_hashes,
334                            first_nullifier,
335                        });
336                    }
337                }
338            }
339        }
340
341        Ok(logs)
342    }
343
344    /// Get all registered senders.
345    pub async fn get_senders(&self) -> Result<Vec<AztecAddress>, Error> {
346        self.sender_store.get_all().await
347    }
348}
349
350/// Compute a siloed tag from a secret and index.
351///
352/// In the full implementation, this uses ExtendedDirectionalAppTaggingSecret
353/// and Poseidon2 hashing. For now, use a simple derivation with a separator.
354fn compute_siloed_tag(secret: &Fr, index: u64, contract_address: &AztecAddress) -> Fr {
355    let tag = poseidon2_hash(&[*secret, Fr::from(index)]);
356    compute_siloed_private_log_first_field(contract_address, &tag)
357}
358
359#[cfg(test)]
360mod tests {
361    use std::sync::Arc;
362
363    use super::*;
364    use crate::stores::{
365        kv::KvStore, CapsuleStore, InMemoryKvStore, RecipientTaggingStore, SenderStore,
366        SenderTaggingStore,
367    };
368    use aztec_core::tx::{TxExecutionResult, TxHash, TxReceipt, TxStatus};
369    use aztec_core::types::{ContractInstanceWithAddress, Fr};
370    use aztec_node_client::{NodeInfo, PublicLogFilter, PublicLogsResponse};
371
372    struct MockNode {
373        private_logs: serde_json::Value,
374    }
375
376    #[async_trait::async_trait]
377    impl AztecNode for MockNode {
378        async fn get_node_info(&self) -> Result<NodeInfo, Error> {
379            Ok(NodeInfo {
380                node_version: "mock".into(),
381                l1_chain_id: 1,
382                rollup_version: 1,
383                enr: None,
384                l1_contract_addresses: serde_json::Value::Null,
385                protocol_contract_addresses: serde_json::Value::Null,
386                real_proofs: false,
387                l2_circuits_vk_tree_root: None,
388                l2_protocol_contracts_hash: None,
389            })
390        }
391        async fn get_block_number(&self) -> Result<u64, Error> {
392            Ok(1)
393        }
394        async fn get_proven_block_number(&self) -> Result<u64, Error> {
395            Ok(1)
396        }
397        async fn get_tx_receipt(&self, _tx_hash: &TxHash) -> Result<TxReceipt, Error> {
398            Ok(TxReceipt {
399                tx_hash: TxHash::zero(),
400                status: TxStatus::Pending,
401                execution_result: Some(TxExecutionResult::Success),
402                error: None,
403                transaction_fee: None,
404                block_hash: None,
405                block_number: None,
406                epoch_number: None,
407            })
408        }
409        async fn get_tx_effect(
410            &self,
411            _tx_hash: &TxHash,
412        ) -> Result<Option<serde_json::Value>, Error> {
413            Ok(None)
414        }
415        async fn get_tx_by_hash(
416            &self,
417            _tx_hash: &TxHash,
418        ) -> Result<Option<serde_json::Value>, Error> {
419            Ok(None)
420        }
421        async fn get_public_logs(
422            &self,
423            _filter: PublicLogFilter,
424        ) -> Result<PublicLogsResponse, Error> {
425            Ok(PublicLogsResponse {
426                logs: vec![],
427                max_logs_hit: false,
428            })
429        }
430        async fn send_tx(&self, _tx: &serde_json::Value) -> Result<(), Error> {
431            Ok(())
432        }
433        async fn get_contract(
434            &self,
435            _address: &AztecAddress,
436        ) -> Result<Option<ContractInstanceWithAddress>, Error> {
437            Ok(None)
438        }
439        async fn get_contract_class(&self, _id: &Fr) -> Result<Option<serde_json::Value>, Error> {
440            Ok(None)
441        }
442        async fn get_block_header(&self, _block_number: u64) -> Result<serde_json::Value, Error> {
443            Ok(serde_json::json!({}))
444        }
445        async fn get_block(&self, _block_number: u64) -> Result<Option<serde_json::Value>, Error> {
446            Ok(None)
447        }
448        async fn get_note_hash_membership_witness(
449            &self,
450            _block_number: u64,
451            _note_hash: &Fr,
452        ) -> Result<Option<serde_json::Value>, Error> {
453            Ok(None)
454        }
455        async fn get_nullifier_membership_witness(
456            &self,
457            _block_number: u64,
458            _nullifier: &Fr,
459        ) -> Result<Option<serde_json::Value>, Error> {
460            Ok(None)
461        }
462        async fn get_low_nullifier_membership_witness(
463            &self,
464            _block_number: u64,
465            _nullifier: &Fr,
466        ) -> Result<Option<serde_json::Value>, Error> {
467            Ok(None)
468        }
469        async fn get_public_storage_at(
470            &self,
471            _block_number: u64,
472            _contract: &AztecAddress,
473            _slot: &Fr,
474        ) -> Result<Fr, Error> {
475            Ok(Fr::zero())
476        }
477        async fn get_public_data_witness(
478            &self,
479            _block_number: u64,
480            _leaf_slot: &Fr,
481        ) -> Result<Option<serde_json::Value>, Error> {
482            Ok(None)
483        }
484        async fn get_l1_to_l2_message_membership_witness(
485            &self,
486            _block_number: u64,
487            _entry_key: &Fr,
488        ) -> Result<Option<serde_json::Value>, Error> {
489            Ok(None)
490        }
491        async fn simulate_public_calls(
492            &self,
493            _tx: &serde_json::Value,
494            _skip_fee_enforcement: bool,
495        ) -> Result<serde_json::Value, Error> {
496            Ok(serde_json::Value::Null)
497        }
498        async fn is_valid_tx(
499            &self,
500            _tx: &serde_json::Value,
501        ) -> Result<aztec_node_client::TxValidationResult, Error> {
502            Ok(aztec_node_client::TxValidationResult::Valid)
503        }
504        async fn get_private_logs_by_tags(&self, _tags: &[Fr]) -> Result<serde_json::Value, Error> {
505            Ok(self.private_logs.clone())
506        }
507        async fn get_public_logs_by_tags_from_contract(
508            &self,
509            _contract: &AztecAddress,
510            _tags: &[Fr],
511        ) -> Result<serde_json::Value, Error> {
512            Ok(serde_json::json!([]))
513        }
514        async fn register_contract_function_signatures(
515            &self,
516            _signatures: &[String],
517        ) -> Result<(), Error> {
518            Ok(())
519        }
520        async fn get_block_hash_membership_witness(
521            &self,
522            _block_number: u64,
523            _block_hash: &Fr,
524        ) -> Result<Option<serde_json::Value>, Error> {
525            Ok(None)
526        }
527        async fn find_leaves_indexes(
528            &self,
529            _block_number: u64,
530            _tree_id: &str,
531            _leaves: &[Fr],
532        ) -> Result<Vec<Option<u64>>, Error> {
533            Ok(vec![])
534        }
535    }
536
537    fn make_service(
538        private_logs: serde_json::Value,
539    ) -> (
540        LogService<'static, MockNode>,
541        &'static RecipientTaggingStore,
542        Fr,
543    ) {
544        let kv: Arc<dyn KvStore> = Arc::new(InMemoryKvStore::new());
545        let secret = Fr::from(7u64);
546        let node = Box::leak(Box::new(MockNode { private_logs }));
547        let sender_store = Box::leak(Box::new(SenderStore::new(Arc::clone(&kv))));
548        let sender_tagging_store = Box::leak(Box::new(SenderTaggingStore::new(Arc::clone(&kv))));
549        let recipient_tagging_store =
550            Box::leak(Box::new(RecipientTaggingStore::new(Arc::clone(&kv))));
551        let capsule_store = Box::leak(Box::new(CapsuleStore::new(kv)));
552        let service = LogService::new(
553            node,
554            sender_store,
555            sender_tagging_store,
556            recipient_tagging_store,
557            capsule_store,
558        );
559        (service, recipient_tagging_store, secret)
560    }
561
562    #[tokio::test]
563    async fn private_logs_preserve_requested_tag_alignment() {
564        let tag_a = Fr::from(11u64);
565        let tag_b = Fr::from(12u64);
566        let (service, _, _) = make_service(serde_json::json!([
567            [{"data": ["0x01"], "blockNumber": 5}],
568            [{"data": ["0x02"], "blockNumber": 6}]
569        ]));
570
571        let logs = service
572            .get_private_logs_by_tags(&[tag_a, tag_b])
573            .await
574            .unwrap();
575        assert_eq!(logs.len(), 2);
576        assert_eq!(logs[0][0].tag, tag_a);
577        assert_eq!(logs[1][0].tag, tag_b);
578    }
579
580    #[tokio::test]
581    async fn load_logs_for_range_only_advances_index_when_logs_exist() {
582        let (service, recipient_tagging_store, secret) =
583            make_service(serde_json::json!([[], [], []]));
584        service
585            .load_logs_for_range(&AztecAddress::from(1u64), &secret, 1, 3)
586            .await
587            .unwrap();
588
589        assert_eq!(
590            recipient_tagging_store
591                .get_highest_finalized_index(&secret)
592                .await
593                .unwrap(),
594            0
595        );
596    }
597
598    #[tokio::test]
599    async fn bulk_retrieve_logs_preserves_request_order() {
600        let (service, _, _) = make_service(serde_json::json!([
601            [{"data": ["0x0a"], "blockNumber": 1}]
602        ]));
603
604        let tag_first = Fr::from(1u64);
605        let tag_second = Fr::from(2u64);
606
607        let logs = service
608            .bulk_retrieve_logs(&[
609                LogRetrievalRequest {
610                    is_public: false,
611                    tag: tag_first,
612                    contract_address: None,
613                },
614                LogRetrievalRequest {
615                    is_public: false,
616                    tag: tag_second,
617                    contract_address: None,
618                },
619            ])
620            .await
621            .unwrap();
622
623        // Both requests hit private logs; ordering is verified via the tag
624        // that get_private_logs_by_tags stamps on each TaggedLog.
625        assert_eq!(logs.len(), 2);
626        assert_eq!(logs[0].len(), 1);
627        assert_eq!(logs[0][0].tag, tag_first);
628        assert_eq!(logs[1].len(), 1);
629        assert_eq!(logs[1][0].tag, tag_second);
630    }
631}