1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use uuid::Uuid;
7
8use crate::backend::{StorageBackend, StorageError, StorageExt};
9use vex_core::{Hash, MerkleTree};
10
11use vex_core::audit::{ActorType, AuditEvent, AuditEventType, HashParams};
12
13#[derive(Debug, Clone, Serialize, Deserialize, Default)]
15struct ChainState {
16 last_hash: Option<Hash>,
18 sequence: u64,
20}
21
22#[derive(Debug)]
28pub struct AuditStore<B: StorageBackend + ?Sized> {
29 backend: Arc<B>,
30 prefix: String,
31}
32
33impl<B: StorageBackend + ?Sized> AuditStore<B> {
34 pub fn new(backend: Arc<B>) -> Self {
36 Self {
37 backend,
38 prefix: "audit:".to_string(),
39 }
40 }
41
42 fn event_key(&self, tenant_id: &str, id: Uuid) -> String {
43 format!("{}tenant:{}:event:{}", self.prefix, tenant_id, id)
44 }
45
46 fn chain_key(&self, tenant_id: &str) -> String {
47 format!("{}tenant:{}:chain", self.prefix, tenant_id)
48 }
49
50 fn chain_state_key(&self, tenant_id: &str) -> String {
51 format!("{}tenant:{}:chain_state", self.prefix, tenant_id)
52 }
53
54 async fn get_chain_state(&self, tenant_id: &str) -> Result<ChainState, StorageError> {
56 self.backend
57 .get(&self.chain_state_key(tenant_id))
58 .await
59 .map(|opt| opt.unwrap_or_default())
60 }
61
62 async fn set_chain_state(
64 &self,
65 tenant_id: &str,
66 state: &ChainState,
67 ) -> Result<(), StorageError> {
68 self.backend
69 .set(&self.chain_state_key(tenant_id), state)
70 .await
71 }
72
73 pub async fn log(
77 &self,
78 tenant_id: &str,
79 event_type: AuditEventType,
80 actor: ActorType,
81 agent_id: Option<Uuid>,
82 data: serde_json::Value,
83 ) -> Result<AuditEvent, StorageError> {
84 let actor = actor.pseudonymize();
86
87 let mut chain_state = self.get_chain_state(tenant_id).await?;
89 let seq = chain_state.sequence;
90
91 let mut event = match &chain_state.last_hash {
92 Some(prev) => AuditEvent::chained(event_type, agent_id, data, prev.clone(), seq),
93 None => AuditEvent::new(event_type, agent_id, data, seq),
94 };
95
96 event.actor = actor;
98
99 event.hash = AuditEvent::compute_hash(HashParams {
101 event_type: &event.event_type,
102 timestamp: event.timestamp,
103 sequence_number: event.sequence_number,
104 data: &event.data,
105 actor: &event.actor,
106 rationale: &event.rationale,
107 policy_version: &event.policy_version,
108 data_provenance_hash: &event.data_provenance_hash,
109 human_review_required: event.human_review_required,
110 approval_count: event.approval_signatures.len(),
111 });
112
113 if let Some(prev) = &event.previous_hash {
114 event.hash = AuditEvent::compute_chained_hash(&event.hash, prev, event.sequence_number);
115 }
116
117 self.backend
119 .set(&self.event_key(tenant_id, event.id), &event)
120 .await?;
121
122 let mut chain: Vec<Uuid> = self
124 .backend
125 .get(&self.chain_key(tenant_id))
126 .await?
127 .unwrap_or_default();
128 chain.push(event.id);
129 self.backend.set(&self.chain_key(tenant_id), &chain).await?;
130
131 chain_state.last_hash = Some(event.hash.clone());
133 chain_state.sequence += 1;
134 self.set_chain_state(tenant_id, &chain_state).await?;
135
136 Ok(event)
137 }
138
139 pub async fn get(&self, tenant_id: &str, id: Uuid) -> Result<Option<AuditEvent>, StorageError> {
141 self.backend.get(&self.event_key(tenant_id, id)).await
142 }
143
144 pub async fn get_chain(&self, tenant_id: &str) -> Result<Vec<AuditEvent>, StorageError> {
146 let chain: Vec<Uuid> = self
147 .backend
148 .get(&self.chain_key(tenant_id))
149 .await?
150 .unwrap_or_default();
151
152 let mut events = Vec::new();
153 for id in chain {
154 if let Some(event) = self.get(tenant_id, id).await? {
155 events.push(event);
156 }
157 }
158 Ok(events)
159 }
160
161 pub async fn build_merkle_tree(&self, tenant_id: &str) -> Result<MerkleTree, StorageError> {
163 let events = self.get_chain(tenant_id).await?;
164 let leaves: Vec<(String, Hash)> = events
165 .iter()
166 .map(|e| (e.id.to_string(), e.hash.clone()))
167 .collect();
168 Ok(MerkleTree::from_leaves(leaves))
169 }
170
171 pub async fn verify_chain(&self, tenant_id: &str) -> Result<bool, StorageError> {
173 let events = self.get_chain(tenant_id).await?;
174
175 for (i, event) in events.iter().enumerate() {
176 if i == 0 {
177 if event.previous_hash.is_some() {
179 tracing::warn!("Chain integrity failed: first event has previous_hash");
180 return Ok(false);
181 }
182 } else {
183 match (&event.previous_hash, events.get(i - 1)) {
185 (Some(prev_hash), Some(prev_event)) => {
186 let expected = &prev_event.hash;
187
188 if prev_hash != expected {
189 tracing::warn!(
190 "Chain integrity failed at event {}: expected prev_hash {:?}, got {:?}",
191 event.id, expected.to_hex(), prev_hash.to_hex()
192 );
193 return Ok(false);
194 }
195 }
196 (None, _) => {
197 tracing::warn!(
198 "Chain integrity failed: event {} has no previous_hash",
199 event.id
200 );
201 return Ok(false);
202 }
203 (_, None) => {
204 tracing::warn!(
205 "Chain integrity failed: previous event not found for {}",
206 event.id
207 );
208 return Ok(false);
209 }
210 }
211 }
212 }
213
214 tracing::info!(
215 "Chain integrity verified for tenant {}: {} events",
216 tenant_id,
217 events.len()
218 );
219 Ok(true)
220 }
221
222 pub async fn export(&self, tenant_id: &str) -> Result<AuditExport, StorageError> {
224 let events = self.get_chain(tenant_id).await?;
225 let merkle_tree = self.build_merkle_tree(tenant_id).await?;
226
227 Ok(AuditExport {
228 events,
229 merkle_root: merkle_tree.root_hash().map(|h| h.to_string()),
230 exported_at: Utc::now(),
231 verified: self.verify_chain(tenant_id).await.unwrap_or(false),
232 })
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct AuditExport {
239 pub events: Vec<AuditEvent>,
240 pub merkle_root: Option<String>,
241 pub exported_at: DateTime<Utc>,
242 pub verified: bool,
243}
244
245impl AuditExport {
246 pub fn to_ocsf(&self) -> Vec<serde_json::Value> {
250 self.events
251 .iter()
252 .map(|event| {
253 serde_json::json!({
254 "class_uid": 2004, "class_name": "Detection Finding",
257 "category_uid": 2, "category_name": "Findings",
259 "severity_id": 1, "activity_id": 1, "activity_name": "Create",
262 "status_id": 1, "time": event.timestamp.timestamp(),
264 "timezone_offset": 0,
265
266 "finding_info": {
268 "uid": event.id.to_string(),
269 "title": format!("{:?}", event.event_type),
270 "desc": event.rationale.clone().unwrap_or_default(),
271 "created_time": event.timestamp.timestamp(),
272 },
273
274 "actor": {
276 "type_uid": match &event.actor {
277 ActorType::Bot(_) => 2,
278 ActorType::Human(_) => 1,
279 ActorType::System => 0,
280 },
281 "type": match &event.actor {
282 ActorType::Bot(id) => format!("Bot:{}", id),
283 ActorType::Human(name) => format!("Human:{}", name),
284 ActorType::System => "System".to_string(),
285 },
286 },
287
288 "unmapped": {
290 "vex_event_type": format!("{:?}", event.event_type),
291 "vex_hash": event.hash.to_hex(),
292 "vex_sequence": event.sequence_number,
293 "vex_policy_version": event.policy_version.clone(),
294 "vex_data_provenance": event.data_provenance_hash.as_ref().map(|h| h.to_hex()),
295 "vex_human_review_required": event.human_review_required,
296 "vex_merkle_root": self.merkle_root.clone(),
297 },
298
299 "metadata": {
301 "version": "1.7.0",
302 "product": {
303 "name": "VEX Protocol",
304 "vendor_name": "ProvnAI",
305 "version": env!("CARGO_PKG_VERSION"),
306 },
307 },
308 })
309 })
310 .collect()
311 }
312
313 pub fn to_splunk_hec(&self, index: &str, source: &str) -> Vec<serde_json::Value> {
317 self.events
318 .iter()
319 .map(|event| {
320 serde_json::json!({
321 "time": event.timestamp.timestamp_millis() as f64 / 1000.0,
323 "host": "vex-protocol",
324 "source": source,
325 "sourcetype": "vex:audit:json",
326 "index": index,
327
328 "event": {
330 "id": event.id.to_string(),
331 "type": format!("{:?}", event.event_type),
332 "timestamp": event.timestamp.to_rfc3339(),
333 "agent_id": event.agent_id.map(|id| id.to_string()),
334 "data": AuditEvent::sanitize_data(event.data.clone()),
335 "hash": event.hash.to_hex(),
336 "sequence": event.sequence_number,
337 "actor": match &event.actor {
339 ActorType::Bot(id) => serde_json::json!({"type": "bot", "id": id.to_string()}),
340 ActorType::Human(name) => serde_json::json!({"type": "human", "name": name}),
341 ActorType::System => serde_json::json!({"type": "system"}),
342 },
343 "rationale": event.rationale.clone(),
344 "policy_version": event.policy_version.clone(),
345 "human_review_required": event.human_review_required,
346 },
347
348 "fields": {
350 "event_type": format!("{:?}", event.event_type),
351 "merkle_root": self.merkle_root.clone(),
352 "verified": self.verified,
353 },
354 })
355 })
356 .collect()
357 }
358
359 pub fn to_datadog(&self, service: &str, env: &str) -> Vec<serde_json::Value> {
363 self.events
364 .iter()
365 .map(|event| {
366 serde_json::json!({
367 "ddsource": "vex-protocol",
369 "ddtags": format!("env:{},service:{}", env, service),
370 "hostname": "vex-audit",
371 "service": service,
372 "status": "info",
373
374 "timestamp": event.timestamp.to_rfc3339(),
376
377 "message": format!(
379 "[{}] {} - seq:{} hash:{}",
380 format!("{:?}", event.event_type),
381 event.rationale.clone().unwrap_or_else(|| "No rationale".to_string()),
382 event.sequence_number,
383 &event.hash.to_hex()[..16]
384 ),
385
386 "event": {
388 "id": event.id.to_string(),
389 "type": format!("{:?}", event.event_type),
390 "agent_id": event.agent_id.map(|id| id.to_string()),
391 "sequence": event.sequence_number,
392 "hash": event.hash.to_hex(),
393 },
394
395 "usr": match &event.actor {
397 ActorType::Human(name) => serde_json::json!({"name": name}),
398 ActorType::Bot(id) => serde_json::json!({"id": id.to_string(), "type": "bot"}),
399 ActorType::System => serde_json::json!({"type": "system"}),
400 },
401
402 "vex": {
404 "merkle_root": self.merkle_root.clone(),
405 "verified": self.verified,
406 "policy_version": event.policy_version.clone(),
407 "human_review_required": event.human_review_required,
408 "data_provenance_hash": event.data_provenance_hash.as_ref().map(|h| h.to_hex()),
409 },
410 })
411 })
412 .collect()
413 }
414
415 pub fn to_jsonl(&self) -> String {
418 self.events
419 .iter()
420 .filter_map(|e| serde_json::to_string(e).ok())
421 .collect::<Vec<_>>()
422 .join("\n")
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429 use crate::backend::MemoryBackend;
430
431 #[tokio::test]
432 async fn test_audit_store_isolation() {
433 let backend = Arc::new(MemoryBackend::new());
434 let store = AuditStore::new(backend);
435 let t1 = "tenant-1";
436 let t2 = "tenant-2";
437
438 store
440 .log(
441 t1,
442 AuditEventType::AgentCreated,
443 ActorType::System,
444 None,
445 serde_json::json!({}),
446 )
447 .await
448 .unwrap();
449
450 store
452 .log(
453 t2,
454 AuditEventType::AgentExecuted,
455 ActorType::System,
456 None,
457 serde_json::json!({}),
458 )
459 .await
460 .unwrap();
461
462 let chain1 = store.get_chain(t1).await.unwrap();
464 let chain2 = store.get_chain(t2).await.unwrap();
465
466 assert_eq!(chain1.len(), 1);
467 assert_eq!(chain2.len(), 1);
468 assert_ne!(chain1[0].id, chain2[0].id);
469
470 let root1 = store
471 .build_merkle_tree(t1)
472 .await
473 .unwrap()
474 .root_hash()
475 .cloned();
476 let root2 = store
477 .build_merkle_tree(t2)
478 .await
479 .unwrap()
480 .root_hash()
481 .cloned();
482 assert_ne!(root1, root2);
483 }
484}