vex_anchor/
celestia.rs

1//! Celestia Data Availability anchor backend
2//!
3//! Posts Merkle roots to a Celestia node as namespace-keyed blobs.
4//! Provides DA guarantees via Celestia's light client network.
5
6use async_trait::async_trait;
7use base64::{engine::general_purpose::STANDARD, Engine};
8use chrono::Utc;
9use serde::{Deserialize, Serialize};
10use vex_core::Hash;
11
12use crate::backend::{AnchorBackend, AnchorMetadata, AnchorReceipt};
13use crate::error::AnchorError;
14
15/// VEX namespace on Celestia (v0 format)
16const VEX_NAMESPACE_B64: &str = "AAAAAAAAAAAAAAAAAAAVEX=";
17
18#[derive(Deserialize)]
19struct BlobSubmitResponse {
20    result: Option<u64>,
21    error: Option<CelestiaError>,
22}
23
24#[derive(Deserialize)]
25struct CelestiaError {
26    code: i64,
27    message: String,
28}
29
30#[derive(Deserialize)]
31struct HeaderResponse {
32    result: Option<serde_json::Value>,
33}
34
35#[derive(Serialize)]
36struct Blob {
37    namespace: String,
38    data: String,
39    #[serde(rename = "shareVersion")]
40    share_version: u32,
41}
42
43/// Celestia DA anchor backend
44///
45/// Submits Merkle roots as named blobs to a Celestia node.
46/// The blob height is stored as the `anchor_id` for later verification.
47#[derive(Debug, Clone)]
48pub struct CelestiaAnchor {
49    node_url: String,
50    auth_token: Option<String>,
51    client: reqwest::Client,
52}
53
54impl CelestiaAnchor {
55    /// Create a new Celestia anchor backend
56    pub fn new(node_url: impl Into<String>, auth_token: Option<String>) -> Self {
57        let client = reqwest::Client::builder()
58            .timeout(std::time::Duration::from_secs(60))
59            .user_agent("vex-anchor/0.1.5")
60            .build()
61            .expect("Failed to build Celestia HTTP client");
62
63        Self {
64            node_url: node_url.into(),
65            auth_token,
66            client,
67        }
68    }
69
70    fn post(&self, url: &str) -> reqwest::RequestBuilder {
71        let req = self.client.post(url);
72        if let Some(ref token) = self.auth_token {
73            req.bearer_auth(token)
74        } else {
75            req
76        }
77    }
78}
79
80#[async_trait]
81impl AnchorBackend for CelestiaAnchor {
82    async fn anchor(
83        &self,
84        root: &Hash,
85        metadata: AnchorMetadata,
86    ) -> Result<AnchorReceipt, AnchorError> {
87        let blob_payload = serde_json::json!({
88            "vex_root": root.to_hex(),
89            "tenant_id": metadata.tenant_id,
90            "event_count": metadata.event_count,
91            "timestamp": metadata.timestamp.to_rfc3339(),
92        });
93        let blob_data = STANDARD.encode(blob_payload.to_string().as_bytes());
94
95        let blob = Blob {
96            namespace: VEX_NAMESPACE_B64.to_string(),
97            data: blob_data,
98            share_version: 0,
99        };
100
101        let req = serde_json::json!({
102            "id": 1,
103            "jsonrpc": "2.0",
104            "method": "blob.Submit",
105            "params": [[blob], {"gas_price": -1.0}]
106        });
107
108        let url = format!("{}/", self.node_url);
109        let resp = self
110            .post(&url)
111            .json(&req)
112            .send()
113            .await
114            .map_err(|e| AnchorError::Network(e.to_string()))?;
115
116        if !resp.status().is_success() {
117            return Err(AnchorError::Network(format!(
118                "Celestia node returned HTTP {}",
119                resp.status()
120            )));
121        }
122
123        let body: BlobSubmitResponse = resp
124            .json()
125            .await
126            .map_err(|e| AnchorError::Network(e.to_string()))?;
127
128        if let Some(err) = body.error {
129            return Err(AnchorError::Network(format!(
130                "Celestia error {}: {}",
131                err.code, err.message
132            )));
133        }
134
135        let height = body.result.unwrap_or(0);
136        let anchor_id = format!("celestia://height:{}", height);
137        let proof = serde_json::json!({
138            "height": height,
139            "namespace": VEX_NAMESPACE_B64,
140            "root_hash": root.to_hex()
141        })
142        .to_string();
143
144        Ok(AnchorReceipt {
145            backend: self.name().to_string(),
146            root_hash: root.to_hex(),
147            anchor_id,
148            anchored_at: Utc::now(),
149            proof: Some(proof),
150            metadata,
151        })
152    }
153
154    async fn verify(&self, receipt: &AnchorReceipt) -> Result<bool, AnchorError> {
155        let Some(ref proof_str) = receipt.proof else {
156            return Ok(false);
157        };
158
159        let proof: serde_json::Value = serde_json::from_str(proof_str)
160            .map_err(|e| AnchorError::VerificationFailed(format!("Invalid proof JSON: {}", e)))?;
161
162        let height = proof["height"].as_u64().unwrap_or(0);
163        if height == 0 {
164            return Ok(false);
165        }
166
167        let req = serde_json::json!({
168            "id": 1,
169            "jsonrpc": "2.0",
170            "method": "blob.GetAll",
171            "params": [height, [VEX_NAMESPACE_B64]]
172        });
173
174        let url = format!("{}/", self.node_url);
175        let resp = self
176            .post(&url)
177            .json(&req)
178            .send()
179            .await
180            .map_err(|e| AnchorError::VerificationFailed(e.to_string()))?;
181
182        if !resp.status().is_success() {
183            return Ok(false);
184        }
185
186        let body: HeaderResponse = resp
187            .json()
188            .await
189            .map_err(|e| AnchorError::VerificationFailed(e.to_string()))?;
190
191        if let Some(result) = body.result {
192            if let Some(blobs) = result.as_array() {
193                return Ok(blobs.iter().any(|b| {
194                    b.get("data")
195                        .and_then(|d| d.as_str())
196                        .and_then(|d| STANDARD.decode(d).ok())
197                        .and_then(|bytes| String::from_utf8(bytes).ok())
198                        .is_some_and(|s| s.contains(&receipt.root_hash))
199                }));
200            }
201        }
202
203        Ok(false)
204    }
205
206    fn name(&self) -> &str {
207        "celestia"
208    }
209
210    async fn is_healthy(&self) -> bool {
211        let req = serde_json::json!({
212            "id": 1,
213            "jsonrpc": "2.0",
214            "method": "header.NetworkHead",
215            "params": []
216        });
217
218        let url = format!("{}/", self.node_url);
219        self.post(&url)
220            .json(&req)
221            .send()
222            .await
223            .ok()
224            .map(|r| r.status().is_success())
225            .unwrap_or(false)
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232
233    #[test]
234    fn test_celestia_anchor_name() {
235        let anchor = CelestiaAnchor::new("http://localhost:26658", None);
236        assert_eq!(anchor.name(), "celestia");
237    }
238
239    #[test]
240    fn test_celestia_verify_missing_proof() {
241        use crate::backend::AnchorMetadata;
242        let receipt = AnchorReceipt {
243            backend: "celestia".to_string(),
244            root_hash: "abc123".to_string(),
245            anchor_id: "celestia://height:0".to_string(),
246            anchored_at: Utc::now(),
247            proof: None,
248            metadata: AnchorMetadata::new("test-tenant", 1),
249        };
250        assert!(receipt.proof.is_none());
251    }
252}