vex_persist/
queue.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::SqlitePool;
5use uuid::Uuid;
6use vex_queue::{
7    backend::QueueError,
8    job::{JobEntry, JobStatus},
9    QueueBackend,
10};
11
12/// Durable queue backend using SQLite
13pub struct SqliteQueueBackend {
14    pool: SqlitePool,
15}
16
17impl SqliteQueueBackend {
18    pub fn new(pool: SqlitePool) -> Self {
19        Self { pool }
20    }
21}
22
23#[async_trait]
24impl QueueBackend for SqliteQueueBackend {
25    async fn enqueue(
26        &self,
27        tenant_id: &str,
28        job_type: &str,
29        payload: Value,
30        delay_secs: Option<u64>,
31    ) -> Result<Uuid, QueueError> {
32        let id = Uuid::new_v4();
33        let run_at = if let Some(delay) = delay_secs {
34            Utc::now() + chrono::Duration::seconds(delay as i64)
35        } else {
36            Utc::now()
37        };
38
39        sqlx::query(
40            "INSERT INTO jobs (id, tenant_id, job_type, payload, status, run_at) VALUES (?, ?, ?, ?, 'pending', ?)"
41        )
42        .bind(id.to_string())
43        .bind(tenant_id)
44        .bind(job_type)
45        .bind(payload)
46        .bind(run_at)
47        .execute(&self.pool)
48        .await
49        .map_err(|e| QueueError::Backend(e.to_string()))?;
50
51        Ok(id)
52    }
53
54    async fn dequeue(&self) -> Result<Option<JobEntry>, QueueError> {
55        let worker_id = Uuid::new_v4().to_string();
56
57        let row = sqlx::query(
58            r#"
59            UPDATE jobs
60            SET status = 'processing', 
61                locked_at = CURRENT_TIMESTAMP, 
62                locked_by = ?
63            WHERE id = (
64                SELECT id FROM jobs
65                WHERE status = 'pending' AND run_at <= CURRENT_TIMESTAMP
66                ORDER BY priority DESC, created_at ASC
67                LIMIT 1
68            )
69            RETURNING id, tenant_id, job_type, payload, run_at, created_at, retries, last_error
70            "#,
71        )
72        .bind(worker_id)
73        .fetch_optional(&self.pool)
74        .await
75        .map_err(|e| QueueError::Backend(e.to_string()))?;
76
77        if let Some(row) = row {
78            use chrono::NaiveDateTime;
79            use sqlx::Row;
80
81            let id_str: String = row
82                .try_get("id")
83                .map_err(|e| QueueError::Backend(e.to_string()))?;
84            let id =
85                Uuid::parse_str(&id_str).map_err(|_| QueueError::Backend("Invalid UUID".into()))?;
86            let tenant_id: String = row
87                .try_get("tenant_id")
88                .map_err(|e| QueueError::Backend(e.to_string()))?;
89            let job_type: String = row
90                .try_get("job_type")
91                .map_err(|e| QueueError::Backend(e.to_string()))?;
92            let payload: Value = row
93                .try_get("payload")
94                .map_err(|e| QueueError::Backend(e.to_string()))?;
95
96            let run_at_naive: NaiveDateTime = row
97                .try_get("run_at")
98                .map_err(|e| QueueError::Backend(e.to_string()))?;
99            let created_at_naive: NaiveDateTime = row
100                .try_get("created_at")
101                .map_err(|e| QueueError::Backend(e.to_string()))?;
102
103            let retries: i64 = row.try_get("retries").unwrap_or(0);
104            let last_error: Option<String> = row.try_get("last_error").ok();
105
106            Ok(Some(JobEntry {
107                id,
108                tenant_id,
109                job_type,
110                payload,
111                status: JobStatus::Running,
112                created_at: created_at_naive.and_utc(),
113                run_at: run_at_naive.and_utc(),
114                attempts: retries as u32,
115                last_error,
116                result: None, // populated after completion via set_result
117            }))
118        } else {
119            Ok(None)
120        }
121    }
122
123    async fn update_status(
124        &self,
125        id: Uuid,
126        status: JobStatus,
127        error: Option<String>,
128        delay_secs: Option<u64>,
129    ) -> Result<(), QueueError> {
130        let status_str = match status {
131            JobStatus::Completed => "completed",
132            JobStatus::Failed(_) => "failed",
133            JobStatus::Running => "processing",
134            JobStatus::Pending => "pending",
135            JobStatus::DeadLetter => "dead_letter",
136        };
137
138        if let JobStatus::Failed(_) = status {
139            let delay = delay_secs.unwrap_or(60);
140            sqlx::query(
141                r#"
142                UPDATE jobs 
143                SET status = 'pending', last_error = ?, locked_at = NULL, locked_by = NULL, 
144                    retries = retries + 1, run_at = datetime('now', '+' || ? || ' seconds')
145                WHERE id = ?
146                "#,
147            )
148            .bind(error)
149            .bind(delay as i64)
150            .bind(id.to_string())
151            .execute(&self.pool)
152            .await
153            .map_err(|e| QueueError::Backend(e.to_string()))?;
154        } else {
155            sqlx::query(
156                r#"
157                UPDATE jobs 
158                SET status = ?, last_error = ?, locked_at = NULL, locked_by = NULL
159                WHERE id = ?
160                "#,
161            )
162            .bind(status_str)
163            .bind(error)
164            .bind(id.to_string())
165            .execute(&self.pool)
166            .await
167            .map_err(|e| QueueError::Backend(e.to_string()))?;
168        }
169
170        Ok(())
171    }
172
173    async fn get_status(&self, tenant_id: &str, id: Uuid) -> Result<JobStatus, QueueError> {
174        use sqlx::Row;
175        let row = sqlx::query("SELECT status, retries FROM jobs WHERE id = ? AND tenant_id = ?")
176            .bind(id.to_string())
177            .bind(tenant_id)
178            .fetch_optional(&self.pool)
179            .await
180            .map_err(|e| QueueError::Backend(e.to_string()))?;
181
182        match row {
183            Some(r) => {
184                let status_str: String = r
185                    .try_get("status")
186                    .map_err(|e| QueueError::Backend(e.to_string()))?;
187                let retries: i64 = r.try_get("retries").unwrap_or(0);
188
189                match status_str.as_str() {
190                    "pending" => Ok(JobStatus::Pending),
191                    "processing" => Ok(JobStatus::Running),
192                    "completed" => Ok(JobStatus::Completed),
193                    "failed" => Ok(JobStatus::Failed(retries as u32)),
194                    "dead_letter" => Ok(JobStatus::DeadLetter),
195                    _ => Err(QueueError::Backend("Invalid status in DB".into())),
196                }
197            }
198            None => Err(QueueError::NotFound),
199        }
200    }
201
202    async fn get_job(&self, tenant_id: &str, id: Uuid) -> Result<JobEntry, QueueError> {
203        use chrono::NaiveDateTime;
204        use sqlx::Row;
205
206        let row = sqlx::query(
207            "SELECT id, tenant_id, job_type, payload, status, created_at, run_at, retries, last_error, result FROM jobs WHERE id = ? AND tenant_id = ?"
208        )
209        .bind(id.to_string())
210        .bind(tenant_id)
211        .fetch_optional(&self.pool)
212        .await
213        .map_err(|e| QueueError::Backend(e.to_string()))?
214        .ok_or(QueueError::NotFound)?;
215
216        let id_str: String = row
217            .try_get("id")
218            .map_err(|e| QueueError::Backend(e.to_string()))?;
219        let job_id =
220            Uuid::parse_str(&id_str).map_err(|_| QueueError::Backend("Invalid UUID".into()))?;
221        let tenant_id: String = row
222            .try_get("tenant_id")
223            .map_err(|e| QueueError::Backend(e.to_string()))?;
224        let job_type: String = row
225            .try_get("job_type")
226            .map_err(|e| QueueError::Backend(e.to_string()))?;
227        let payload: Value = row
228            .try_get("payload")
229            .map_err(|e| QueueError::Backend(e.to_string()))?;
230        let status_str: String = row
231            .try_get("status")
232            .map_err(|e| QueueError::Backend(e.to_string()))?;
233        let retries: i64 = row.try_get("retries").unwrap_or(0);
234        let last_error: Option<String> = row.try_get("last_error").ok().flatten();
235        let result: Option<Value> = row
236            .try_get::<Option<String>, _>("result")
237            .ok()
238            .flatten()
239            .and_then(|s| serde_json::from_str(&s).ok());
240        let run_at_naive: NaiveDateTime = row
241            .try_get("run_at")
242            .map_err(|e| QueueError::Backend(e.to_string()))?;
243        let created_at_naive: NaiveDateTime = row
244            .try_get("created_at")
245            .map_err(|e| QueueError::Backend(e.to_string()))?;
246
247        let status = match status_str.as_str() {
248            "pending" => JobStatus::Pending,
249            "processing" | "running" => JobStatus::Running,
250            "completed" => JobStatus::Completed,
251            "failed" => JobStatus::Failed(retries as u32),
252            "dead_letter" => JobStatus::DeadLetter,
253            _ => JobStatus::Pending,
254        };
255
256        Ok(JobEntry {
257            id: job_id,
258            tenant_id,
259            job_type,
260            payload,
261            status,
262            created_at: created_at_naive.and_utc(),
263            run_at: run_at_naive.and_utc(),
264            attempts: retries as u32,
265            last_error,
266            result,
267        })
268    }
269
270    async fn set_result(&self, id: Uuid, result: Value) -> Result<(), QueueError> {
271        let result_str =
272            serde_json::to_string(&result).map_err(|e| QueueError::Backend(e.to_string()))?;
273        sqlx::query("UPDATE jobs SET result = ?, status = 'completed' WHERE id = ?")
274            .bind(result_str)
275            .bind(id.to_string())
276            .execute(&self.pool)
277            .await
278            .map_err(|e| QueueError::Backend(e.to_string()))?;
279        Ok(())
280    }
281}