1use async_trait::async_trait;
7use base64::{engine::general_purpose::STANDARD, Engine};
8use chrono::Utc;
9use serde::{Deserialize, Serialize};
10use vex_core::Hash;
11
12use crate::backend::{AnchorBackend, AnchorMetadata, AnchorReceipt};
13use crate::error::AnchorError;
14
15const VEX_NAMESPACE_B64: &str = "AAAAAAAAAAAAAAAAAAAVEX=";
17
18#[derive(Deserialize)]
19struct BlobSubmitResponse {
20 result: Option<u64>,
21 error: Option<CelestiaError>,
22}
23
24#[derive(Deserialize)]
25struct CelestiaError {
26 code: i64,
27 message: String,
28}
29
30#[derive(Deserialize)]
31struct HeaderResponse {
32 result: Option<serde_json::Value>,
33}
34
35#[derive(Serialize)]
36struct Blob {
37 namespace: String,
38 data: String,
39 #[serde(rename = "shareVersion")]
40 share_version: u32,
41}
42
43#[derive(Debug, Clone)]
48pub struct CelestiaAnchor {
49 node_url: String,
50 auth_token: Option<String>,
51 client: reqwest::Client,
52}
53
54impl CelestiaAnchor {
55 pub fn new(node_url: impl Into<String>, auth_token: Option<String>) -> Self {
57 let client = reqwest::Client::builder()
58 .timeout(std::time::Duration::from_secs(60))
59 .user_agent("vex-anchor/0.1.5")
60 .build()
61 .expect("Failed to build Celestia HTTP client");
62
63 Self {
64 node_url: node_url.into(),
65 auth_token,
66 client,
67 }
68 }
69
70 fn post(&self, url: &str) -> reqwest::RequestBuilder {
71 let req = self.client.post(url);
72 if let Some(ref token) = self.auth_token {
73 req.bearer_auth(token)
74 } else {
75 req
76 }
77 }
78}
79
80#[async_trait]
81impl AnchorBackend for CelestiaAnchor {
82 async fn anchor(
83 &self,
84 root: &Hash,
85 metadata: AnchorMetadata,
86 ) -> Result<AnchorReceipt, AnchorError> {
87 let blob_payload = serde_json::json!({
88 "vex_root": root.to_hex(),
89 "tenant_id": metadata.tenant_id,
90 "event_count": metadata.event_count,
91 "timestamp": metadata.timestamp.to_rfc3339(),
92 });
93 let blob_data = STANDARD.encode(blob_payload.to_string().as_bytes());
94
95 let blob = Blob {
96 namespace: VEX_NAMESPACE_B64.to_string(),
97 data: blob_data,
98 share_version: 0,
99 };
100
101 let req = serde_json::json!({
102 "id": 1,
103 "jsonrpc": "2.0",
104 "method": "blob.Submit",
105 "params": [[blob], {"gas_price": -1.0}]
106 });
107
108 let url = format!("{}/", self.node_url);
109 let resp = self
110 .post(&url)
111 .json(&req)
112 .send()
113 .await
114 .map_err(|e| AnchorError::Network(e.to_string()))?;
115
116 if !resp.status().is_success() {
117 return Err(AnchorError::Network(format!(
118 "Celestia node returned HTTP {}",
119 resp.status()
120 )));
121 }
122
123 let body: BlobSubmitResponse = resp
124 .json()
125 .await
126 .map_err(|e| AnchorError::Network(e.to_string()))?;
127
128 if let Some(err) = body.error {
129 return Err(AnchorError::Network(format!(
130 "Celestia error {}: {}",
131 err.code, err.message
132 )));
133 }
134
135 let height = body.result.unwrap_or(0);
136 let anchor_id = format!("celestia://height:{}", height);
137 let proof = serde_json::json!({
138 "height": height,
139 "namespace": VEX_NAMESPACE_B64,
140 "root_hash": root.to_hex()
141 })
142 .to_string();
143
144 Ok(AnchorReceipt {
145 backend: self.name().to_string(),
146 root_hash: root.to_hex(),
147 anchor_id,
148 anchored_at: Utc::now(),
149 proof: Some(proof),
150 metadata,
151 })
152 }
153
154 async fn verify(&self, receipt: &AnchorReceipt) -> Result<bool, AnchorError> {
155 let Some(ref proof_str) = receipt.proof else {
156 return Ok(false);
157 };
158
159 let proof: serde_json::Value = serde_json::from_str(proof_str)
160 .map_err(|e| AnchorError::VerificationFailed(format!("Invalid proof JSON: {}", e)))?;
161
162 let height = proof["height"].as_u64().unwrap_or(0);
163 if height == 0 {
164 return Ok(false);
165 }
166
167 let req = serde_json::json!({
168 "id": 1,
169 "jsonrpc": "2.0",
170 "method": "blob.GetAll",
171 "params": [height, [VEX_NAMESPACE_B64]]
172 });
173
174 let url = format!("{}/", self.node_url);
175 let resp = self
176 .post(&url)
177 .json(&req)
178 .send()
179 .await
180 .map_err(|e| AnchorError::VerificationFailed(e.to_string()))?;
181
182 if !resp.status().is_success() {
183 return Ok(false);
184 }
185
186 let body: HeaderResponse = resp
187 .json()
188 .await
189 .map_err(|e| AnchorError::VerificationFailed(e.to_string()))?;
190
191 if let Some(result) = body.result {
192 if let Some(blobs) = result.as_array() {
193 return Ok(blobs.iter().any(|b| {
194 b.get("data")
195 .and_then(|d| d.as_str())
196 .and_then(|d| STANDARD.decode(d).ok())
197 .and_then(|bytes| String::from_utf8(bytes).ok())
198 .is_some_and(|s| s.contains(&receipt.root_hash))
199 }));
200 }
201 }
202
203 Ok(false)
204 }
205
206 fn name(&self) -> &str {
207 "celestia"
208 }
209
210 async fn is_healthy(&self) -> bool {
211 let req = serde_json::json!({
212 "id": 1,
213 "jsonrpc": "2.0",
214 "method": "header.NetworkHead",
215 "params": []
216 });
217
218 let url = format!("{}/", self.node_url);
219 self.post(&url)
220 .json(&req)
221 .send()
222 .await
223 .ok()
224 .map(|r| r.status().is_success())
225 .unwrap_or(false)
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232
233 #[test]
234 fn test_celestia_anchor_name() {
235 let anchor = CelestiaAnchor::new("http://localhost:26658", None);
236 assert_eq!(anchor.name(), "celestia");
237 }
238
239 #[test]
240 fn test_celestia_verify_missing_proof() {
241 use crate::backend::AnchorMetadata;
242 let receipt = AnchorReceipt {
243 backend: "celestia".to_string(),
244 root_hash: "abc123".to_string(),
245 anchor_id: "celestia://height:0".to_string(),
246 anchored_at: Utc::now(),
247 proof: None,
248 metadata: AnchorMetadata::new("test-tenant", 1),
249 };
250 assert!(receipt.proof.is_none());
251 }
252}