vex_persist/
audit_store.rs

1//! Audit log storage with Merkle verification
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use uuid::Uuid;
7
8use crate::backend::{StorageBackend, StorageError, StorageExt};
9use vex_core::{Hash, MerkleTree};
10
11use vex_core::audit::{ActorType, AuditEvent, AuditEventType, HashParams};
12
13/// Per-tenant chain state for proper multi-tenancy isolation
14#[derive(Debug, Clone, Serialize, Deserialize, Default)]
15struct ChainState {
16    /// Last event hash for this tenant
17    last_hash: Option<Hash>,
18    /// Monotonic sequence counter for this tenant
19    sequence: u64,
20}
21
22/// Audit store for compliance logging
23///
24/// # Multi-Tenancy
25/// Chain state (hash and sequence) is now stored per-tenant in the backend,
26/// ensuring tenant isolation and preventing cross-tenant chain corruption.
27#[derive(Debug)]
28pub struct AuditStore<B: StorageBackend + ?Sized> {
29    backend: Arc<B>,
30    prefix: String,
31}
32
33impl<B: StorageBackend + ?Sized> AuditStore<B> {
34    /// Create a new audit store
35    pub fn new(backend: Arc<B>) -> Self {
36        Self {
37            backend,
38            prefix: "audit:".to_string(),
39        }
40    }
41
42    fn event_key(&self, tenant_id: &str, id: Uuid) -> String {
43        format!("{}tenant:{}:event:{}", self.prefix, tenant_id, id)
44    }
45
46    fn chain_key(&self, tenant_id: &str) -> String {
47        format!("{}tenant:{}:chain", self.prefix, tenant_id)
48    }
49
50    fn chain_state_key(&self, tenant_id: &str) -> String {
51        format!("{}tenant:{}:chain_state", self.prefix, tenant_id)
52    }
53
54    /// Get per-tenant chain state from storage
55    async fn get_chain_state(&self, tenant_id: &str) -> Result<ChainState, StorageError> {
56        self.backend
57            .get(&self.chain_state_key(tenant_id))
58            .await
59            .map(|opt| opt.unwrap_or_default())
60    }
61
62    /// Update per-tenant chain state in storage
63    async fn set_chain_state(
64        &self,
65        tenant_id: &str,
66        state: &ChainState,
67    ) -> Result<(), StorageError> {
68        self.backend
69            .set(&self.chain_state_key(tenant_id), state)
70            .await
71    }
72
73    /// Log an audit event (automatically chained with sequence number)
74    ///
75    /// Chain state is stored per-tenant to ensure proper isolation.
76    pub async fn log(
77        &self,
78        tenant_id: &str,
79        event_type: AuditEventType,
80        actor: ActorType,
81        agent_id: Option<Uuid>,
82        data: serde_json::Value,
83    ) -> Result<AuditEvent, StorageError> {
84        // Pseudonymize actor to protect PII (Centralized in vex-core)
85        let actor = actor.pseudonymize();
86
87        // Get per-tenant chain state
88        let mut chain_state = self.get_chain_state(tenant_id).await?;
89        let seq = chain_state.sequence;
90
91        let mut event = match &chain_state.last_hash {
92            Some(prev) => AuditEvent::chained(event_type, agent_id, data, prev.clone(), seq),
93            None => AuditEvent::new(event_type, agent_id, data, seq),
94        };
95
96        // Set actor after creation to override default system actor
97        event.actor = actor;
98
99        // Rehash after setting actor
100        event.hash = AuditEvent::compute_hash(HashParams {
101            event_type: &event.event_type,
102            timestamp: event.timestamp,
103            sequence_number: event.sequence_number,
104            data: &event.data,
105            actor: &event.actor,
106            rationale: &event.rationale,
107            policy_version: &event.policy_version,
108            data_provenance_hash: &event.data_provenance_hash,
109            human_review_required: event.human_review_required,
110            approval_count: event.approval_signatures.len(),
111        });
112
113        if let Some(prev) = &event.previous_hash {
114            event.hash = AuditEvent::compute_chained_hash(&event.hash, prev, event.sequence_number);
115        }
116
117        // Store event
118        self.backend
119            .set(&self.event_key(tenant_id, event.id), &event)
120            .await?;
121
122        // Update chain index
123        let mut chain: Vec<Uuid> = self
124            .backend
125            .get(&self.chain_key(tenant_id))
126            .await?
127            .unwrap_or_default();
128        chain.push(event.id);
129        self.backend.set(&self.chain_key(tenant_id), &chain).await?;
130
131        // Update per-tenant chain state
132        chain_state.last_hash = Some(event.hash.clone());
133        chain_state.sequence += 1;
134        self.set_chain_state(tenant_id, &chain_state).await?;
135
136        Ok(event)
137    }
138
139    /// Get event by ID
140    pub async fn get(&self, tenant_id: &str, id: Uuid) -> Result<Option<AuditEvent>, StorageError> {
141        self.backend.get(&self.event_key(tenant_id, id)).await
142    }
143
144    /// Get all events in chain order
145    pub async fn get_chain(&self, tenant_id: &str) -> Result<Vec<AuditEvent>, StorageError> {
146        let chain: Vec<Uuid> = self
147            .backend
148            .get(&self.chain_key(tenant_id))
149            .await?
150            .unwrap_or_default();
151
152        let mut events = Vec::new();
153        for id in chain {
154            if let Some(event) = self.get(tenant_id, id).await? {
155                events.push(event);
156            }
157        }
158        Ok(events)
159    }
160
161    /// Build Merkle tree of all events for a tenant
162    pub async fn build_merkle_tree(&self, tenant_id: &str) -> Result<MerkleTree, StorageError> {
163        let events = self.get_chain(tenant_id).await?;
164        let leaves: Vec<(String, Hash)> = events
165            .iter()
166            .map(|e| (e.id.to_string(), e.hash.clone()))
167            .collect();
168        Ok(MerkleTree::from_leaves(leaves))
169    }
170
171    /// Verify chain integrity for a tenant
172    pub async fn verify_chain(&self, tenant_id: &str) -> Result<bool, StorageError> {
173        let events = self.get_chain(tenant_id).await?;
174
175        for (i, event) in events.iter().enumerate() {
176            if i == 0 {
177                // First event should have no previous hash
178                if event.previous_hash.is_some() {
179                    tracing::warn!("Chain integrity failed: first event has previous_hash");
180                    return Ok(false);
181                }
182            } else {
183                // Check chain link - verify prev_hash matches previous event's hash
184                match (&event.previous_hash, events.get(i - 1)) {
185                    (Some(prev_hash), Some(prev_event)) => {
186                        let expected = &prev_event.hash;
187
188                        if prev_hash != expected {
189                            tracing::warn!(
190                                "Chain integrity failed at event {}: expected prev_hash {:?}, got {:?}",
191                                event.id, expected.to_hex(), prev_hash.to_hex()
192                            );
193                            return Ok(false);
194                        }
195                    }
196                    (None, _) => {
197                        tracing::warn!(
198                            "Chain integrity failed: event {} has no previous_hash",
199                            event.id
200                        );
201                        return Ok(false);
202                    }
203                    (_, None) => {
204                        tracing::warn!(
205                            "Chain integrity failed: previous event not found for {}",
206                            event.id
207                        );
208                        return Ok(false);
209                    }
210                }
211            }
212        }
213
214        tracing::info!(
215            "Chain integrity verified for tenant {}: {} events",
216            tenant_id,
217            events.len()
218        );
219        Ok(true)
220    }
221
222    /// Export audit trail for compliance for a tenant
223    pub async fn export(&self, tenant_id: &str) -> Result<AuditExport, StorageError> {
224        let events = self.get_chain(tenant_id).await?;
225        let merkle_tree = self.build_merkle_tree(tenant_id).await?;
226
227        Ok(AuditExport {
228            events,
229            merkle_root: merkle_tree.root_hash().map(|h| h.to_string()),
230            exported_at: Utc::now(),
231            verified: self.verify_chain(tenant_id).await.unwrap_or(false),
232        })
233    }
234}
235
236/// Audit export for compliance reporting
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct AuditExport {
239    pub events: Vec<AuditEvent>,
240    pub merkle_root: Option<String>,
241    pub exported_at: DateTime<Utc>,
242    pub verified: bool,
243}
244
245impl AuditExport {
246    /// Export to OCSF v1.7.0 format (Open Cybersecurity Schema Framework)
247    /// Uses Detection Finding class (class_uid: 2004) for AI agent events
248    /// See: https://schema.ocsf.io/1.7.0/classes/detection_finding
249    pub fn to_ocsf(&self) -> Vec<serde_json::Value> {
250        self.events
251            .iter()
252            .map(|event| {
253                serde_json::json!({
254                    // OCSF Base Event attributes
255                    "class_uid": 2004, // Detection Finding
256                    "class_name": "Detection Finding",
257                    "category_uid": 2, // Findings
258                    "category_name": "Findings",
259                    "severity_id": 1, // Informational
260                    "activity_id": 1, // Create
261                    "activity_name": "Create",
262                    "status_id": 1, // Success
263                    "time": event.timestamp.timestamp(),
264                    "timezone_offset": 0,
265
266                    // Finding-specific attributes
267                    "finding_info": {
268                        "uid": event.id.to_string(),
269                        "title": format!("{:?}", event.event_type),
270                        "desc": event.rationale.clone().unwrap_or_default(),
271                        "created_time": event.timestamp.timestamp(),
272                    },
273
274                    // Actor information (ISO 42001 A.6.2.8)
275                    "actor": {
276                        "type_uid": match &event.actor {
277                            ActorType::Bot(_) => 2,
278                            ActorType::Human(_) => 1,
279                            ActorType::System => 0,
280                        },
281                        "type": match &event.actor {
282                            ActorType::Bot(id) => format!("Bot:{}", id),
283                            ActorType::Human(name) => format!("Human:{}", name),
284                            ActorType::System => "System".to_string(),
285                        },
286                    },
287
288                    // VEX-specific extensions
289                    "unmapped": {
290                        "vex_event_type": format!("{:?}", event.event_type),
291                        "vex_hash": event.hash.to_hex(),
292                        "vex_sequence": event.sequence_number,
293                        "vex_policy_version": event.policy_version.clone(),
294                        "vex_data_provenance": event.data_provenance_hash.as_ref().map(|h| h.to_hex()),
295                        "vex_human_review_required": event.human_review_required,
296                        "vex_merkle_root": self.merkle_root.clone(),
297                    },
298
299                    // Metadata
300                    "metadata": {
301                        "version": "1.7.0",
302                        "product": {
303                            "name": "VEX Protocol",
304                            "vendor_name": "ProvnAI",
305                            "version": env!("CARGO_PKG_VERSION"),
306                        },
307                    },
308                })
309            })
310            .collect()
311    }
312
313    /// Export to Splunk HEC format (HTTP Event Collector)
314    /// Uses epoch timestamps and proper metadata placement
315    /// See: https://docs.splunk.com/Documentation/Splunk/latest/Data/FormateventsforHTTPEventCollector
316    pub fn to_splunk_hec(&self, index: &str, source: &str) -> Vec<serde_json::Value> {
317        self.events
318            .iter()
319            .map(|event| {
320                serde_json::json!({
321                    // Splunk metadata (top-level)
322                    "time": event.timestamp.timestamp_millis() as f64 / 1000.0,
323                    "host": "vex-protocol",
324                    "source": source,
325                    "sourcetype": "vex:audit:json",
326                    "index": index,
327
328                    // Event data (sanitized for external export - HIGH-2 fix)
329                    "event": {
330                        "id": event.id.to_string(),
331                        "type": format!("{:?}", event.event_type),
332                        "timestamp": event.timestamp.to_rfc3339(),
333                        "agent_id": event.agent_id.map(|id| id.to_string()),
334                        "data": AuditEvent::sanitize_data(event.data.clone()),
335                        "hash": event.hash.to_hex(),
336                        "sequence": event.sequence_number,
337                        // ISO 42001 fields
338                        "actor": match &event.actor {
339                            ActorType::Bot(id) => serde_json::json!({"type": "bot", "id": id.to_string()}),
340                            ActorType::Human(name) => serde_json::json!({"type": "human", "name": name}),
341                            ActorType::System => serde_json::json!({"type": "system"}),
342                        },
343                        "rationale": event.rationale.clone(),
344                        "policy_version": event.policy_version.clone(),
345                        "human_review_required": event.human_review_required,
346                    },
347
348                    // Indexed fields (for fast searching)
349                    "fields": {
350                        "event_type": format!("{:?}", event.event_type),
351                        "merkle_root": self.merkle_root.clone(),
352                        "verified": self.verified,
353                    },
354                })
355            })
356            .collect()
357    }
358
359    /// Export to Datadog logs format
360    /// Uses reserved attributes for proper log correlation
361    /// See: https://docs.datadoghq.com/logs/log_configuration/attributes_naming_convention
362    pub fn to_datadog(&self, service: &str, env: &str) -> Vec<serde_json::Value> {
363        self.events
364            .iter()
365            .map(|event| {
366                serde_json::json!({
367                    // Datadog reserved attributes
368                    "ddsource": "vex-protocol",
369                    "ddtags": format!("env:{},service:{}", env, service),
370                    "hostname": "vex-audit",
371                    "service": service,
372                    "status": "info",
373
374                    // Timestamp in ISO8601
375                    "timestamp": event.timestamp.to_rfc3339(),
376
377                    // Message for log stream
378                    "message": format!(
379                        "[{}] {} - seq:{} hash:{}",
380                        format!("{:?}", event.event_type),
381                        event.rationale.clone().unwrap_or_else(|| "No rationale".to_string()),
382                        event.sequence_number,
383                        &event.hash.to_hex()[..16]
384                    ),
385
386                    // Structured data
387                    "event": {
388                        "id": event.id.to_string(),
389                        "type": format!("{:?}", event.event_type),
390                        "agent_id": event.agent_id.map(|id| id.to_string()),
391                        "sequence": event.sequence_number,
392                        "hash": event.hash.to_hex(),
393                    },
394
395                    // Actor attribution
396                    "usr": match &event.actor {
397                        ActorType::Human(name) => serde_json::json!({"name": name}),
398                        ActorType::Bot(id) => serde_json::json!({"id": id.to_string(), "type": "bot"}),
399                        ActorType::System => serde_json::json!({"type": "system"}),
400                    },
401
402                    // VEX custom attributes
403                    "vex": {
404                        "merkle_root": self.merkle_root.clone(),
405                        "verified": self.verified,
406                        "policy_version": event.policy_version.clone(),
407                        "human_review_required": event.human_review_required,
408                        "data_provenance_hash": event.data_provenance_hash.as_ref().map(|h| h.to_hex()),
409                    },
410                })
411            })
412            .collect()
413    }
414
415    /// Export all events to JSON Lines format (one JSON per line)
416    /// Compatible with most log ingestion systems
417    pub fn to_jsonl(&self) -> String {
418        self.events
419            .iter()
420            .filter_map(|e| serde_json::to_string(e).ok())
421            .collect::<Vec<_>>()
422            .join("\n")
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use crate::backend::MemoryBackend;
430
431    #[tokio::test]
432    async fn test_audit_store_isolation() {
433        let backend = Arc::new(MemoryBackend::new());
434        let store = AuditStore::new(backend);
435        let t1 = "tenant-1";
436        let t2 = "tenant-2";
437
438        // Log to tenant 1
439        store
440            .log(
441                t1,
442                AuditEventType::AgentCreated,
443                ActorType::System,
444                None,
445                serde_json::json!({}),
446            )
447            .await
448            .unwrap();
449
450        // Log to tenant 2
451        store
452            .log(
453                t2,
454                AuditEventType::AgentExecuted,
455                ActorType::System,
456                None,
457                serde_json::json!({}),
458            )
459            .await
460            .unwrap();
461
462        // Verify isolation
463        let chain1 = store.get_chain(t1).await.unwrap();
464        let chain2 = store.get_chain(t2).await.unwrap();
465
466        assert_eq!(chain1.len(), 1);
467        assert_eq!(chain2.len(), 1);
468        assert_ne!(chain1[0].id, chain2[0].id);
469
470        let root1 = store
471            .build_merkle_tree(t1)
472            .await
473            .unwrap()
474            .root_hash()
475            .cloned();
476        let root2 = store
477            .build_merkle_tree(t2)
478            .await
479            .unwrap()
480            .root_hash()
481            .cloned();
482        assert_ne!(root1, root2);
483    }
484}