1use async_trait::async_trait;
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::SqlitePool;
5use uuid::Uuid;
6use vex_queue::{
7 backend::QueueError,
8 job::{JobEntry, JobStatus},
9 QueueBackend,
10};
11
12pub struct SqliteQueueBackend {
14 pool: SqlitePool,
15}
16
17impl SqliteQueueBackend {
18 pub fn new(pool: SqlitePool) -> Self {
19 Self { pool }
20 }
21}
22
23#[async_trait]
24impl QueueBackend for SqliteQueueBackend {
25 async fn enqueue(
26 &self,
27 tenant_id: &str,
28 job_type: &str,
29 payload: Value,
30 delay_secs: Option<u64>,
31 ) -> Result<Uuid, QueueError> {
32 let id = Uuid::new_v4();
33 let run_at = if let Some(delay) = delay_secs {
34 Utc::now() + chrono::Duration::seconds(delay as i64)
35 } else {
36 Utc::now()
37 };
38
39 sqlx::query(
40 "INSERT INTO jobs (id, tenant_id, job_type, payload, status, run_at) VALUES (?, ?, ?, ?, 'pending', ?)"
41 )
42 .bind(id.to_string())
43 .bind(tenant_id)
44 .bind(job_type)
45 .bind(payload)
46 .bind(run_at)
47 .execute(&self.pool)
48 .await
49 .map_err(|e| QueueError::Backend(e.to_string()))?;
50
51 Ok(id)
52 }
53
54 async fn dequeue(&self) -> Result<Option<JobEntry>, QueueError> {
55 let worker_id = Uuid::new_v4().to_string();
56
57 let row = sqlx::query(
58 r#"
59 UPDATE jobs
60 SET status = 'processing',
61 locked_at = CURRENT_TIMESTAMP,
62 locked_by = ?
63 WHERE id = (
64 SELECT id FROM jobs
65 WHERE status = 'pending' AND run_at <= CURRENT_TIMESTAMP
66 ORDER BY priority DESC, created_at ASC
67 LIMIT 1
68 )
69 RETURNING id, tenant_id, job_type, payload, run_at, created_at, retries, last_error
70 "#,
71 )
72 .bind(worker_id)
73 .fetch_optional(&self.pool)
74 .await
75 .map_err(|e| QueueError::Backend(e.to_string()))?;
76
77 if let Some(row) = row {
78 use chrono::NaiveDateTime;
79 use sqlx::Row;
80
81 let id_str: String = row
82 .try_get("id")
83 .map_err(|e| QueueError::Backend(e.to_string()))?;
84 let id =
85 Uuid::parse_str(&id_str).map_err(|_| QueueError::Backend("Invalid UUID".into()))?;
86 let tenant_id: String = row
87 .try_get("tenant_id")
88 .map_err(|e| QueueError::Backend(e.to_string()))?;
89 let job_type: String = row
90 .try_get("job_type")
91 .map_err(|e| QueueError::Backend(e.to_string()))?;
92 let payload: Value = row
93 .try_get("payload")
94 .map_err(|e| QueueError::Backend(e.to_string()))?;
95
96 let run_at_naive: NaiveDateTime = row
97 .try_get("run_at")
98 .map_err(|e| QueueError::Backend(e.to_string()))?;
99 let created_at_naive: NaiveDateTime = row
100 .try_get("created_at")
101 .map_err(|e| QueueError::Backend(e.to_string()))?;
102
103 let retries: i64 = row.try_get("retries").unwrap_or(0);
104 let last_error: Option<String> = row.try_get("last_error").ok();
105
106 Ok(Some(JobEntry {
107 id,
108 tenant_id,
109 job_type,
110 payload,
111 status: JobStatus::Running,
112 created_at: created_at_naive.and_utc(),
113 run_at: run_at_naive.and_utc(),
114 attempts: retries as u32,
115 last_error,
116 result: None, }))
118 } else {
119 Ok(None)
120 }
121 }
122
123 async fn update_status(
124 &self,
125 id: Uuid,
126 status: JobStatus,
127 error: Option<String>,
128 delay_secs: Option<u64>,
129 ) -> Result<(), QueueError> {
130 let status_str = match status {
131 JobStatus::Completed => "completed",
132 JobStatus::Failed(_) => "failed",
133 JobStatus::Running => "processing",
134 JobStatus::Pending => "pending",
135 JobStatus::DeadLetter => "dead_letter",
136 };
137
138 if let JobStatus::Failed(_) = status {
139 let delay = delay_secs.unwrap_or(60);
140 sqlx::query(
141 r#"
142 UPDATE jobs
143 SET status = 'pending', last_error = ?, locked_at = NULL, locked_by = NULL,
144 retries = retries + 1, run_at = datetime('now', '+' || ? || ' seconds')
145 WHERE id = ?
146 "#,
147 )
148 .bind(error)
149 .bind(delay as i64)
150 .bind(id.to_string())
151 .execute(&self.pool)
152 .await
153 .map_err(|e| QueueError::Backend(e.to_string()))?;
154 } else {
155 sqlx::query(
156 r#"
157 UPDATE jobs
158 SET status = ?, last_error = ?, locked_at = NULL, locked_by = NULL
159 WHERE id = ?
160 "#,
161 )
162 .bind(status_str)
163 .bind(error)
164 .bind(id.to_string())
165 .execute(&self.pool)
166 .await
167 .map_err(|e| QueueError::Backend(e.to_string()))?;
168 }
169
170 Ok(())
171 }
172
173 async fn get_status(&self, tenant_id: &str, id: Uuid) -> Result<JobStatus, QueueError> {
174 use sqlx::Row;
175 let row = sqlx::query("SELECT status, retries FROM jobs WHERE id = ? AND tenant_id = ?")
176 .bind(id.to_string())
177 .bind(tenant_id)
178 .fetch_optional(&self.pool)
179 .await
180 .map_err(|e| QueueError::Backend(e.to_string()))?;
181
182 match row {
183 Some(r) => {
184 let status_str: String = r
185 .try_get("status")
186 .map_err(|e| QueueError::Backend(e.to_string()))?;
187 let retries: i64 = r.try_get("retries").unwrap_or(0);
188
189 match status_str.as_str() {
190 "pending" => Ok(JobStatus::Pending),
191 "processing" => Ok(JobStatus::Running),
192 "completed" => Ok(JobStatus::Completed),
193 "failed" => Ok(JobStatus::Failed(retries as u32)),
194 "dead_letter" => Ok(JobStatus::DeadLetter),
195 _ => Err(QueueError::Backend("Invalid status in DB".into())),
196 }
197 }
198 None => Err(QueueError::NotFound),
199 }
200 }
201
202 async fn get_job(&self, tenant_id: &str, id: Uuid) -> Result<JobEntry, QueueError> {
203 use chrono::NaiveDateTime;
204 use sqlx::Row;
205
206 let row = sqlx::query(
207 "SELECT id, tenant_id, job_type, payload, status, created_at, run_at, retries, last_error, result FROM jobs WHERE id = ? AND tenant_id = ?"
208 )
209 .bind(id.to_string())
210 .bind(tenant_id)
211 .fetch_optional(&self.pool)
212 .await
213 .map_err(|e| QueueError::Backend(e.to_string()))?
214 .ok_or(QueueError::NotFound)?;
215
216 let id_str: String = row
217 .try_get("id")
218 .map_err(|e| QueueError::Backend(e.to_string()))?;
219 let job_id =
220 Uuid::parse_str(&id_str).map_err(|_| QueueError::Backend("Invalid UUID".into()))?;
221 let tenant_id: String = row
222 .try_get("tenant_id")
223 .map_err(|e| QueueError::Backend(e.to_string()))?;
224 let job_type: String = row
225 .try_get("job_type")
226 .map_err(|e| QueueError::Backend(e.to_string()))?;
227 let payload: Value = row
228 .try_get("payload")
229 .map_err(|e| QueueError::Backend(e.to_string()))?;
230 let status_str: String = row
231 .try_get("status")
232 .map_err(|e| QueueError::Backend(e.to_string()))?;
233 let retries: i64 = row.try_get("retries").unwrap_or(0);
234 let last_error: Option<String> = row.try_get("last_error").ok().flatten();
235 let result: Option<Value> = row
236 .try_get::<Option<String>, _>("result")
237 .ok()
238 .flatten()
239 .and_then(|s| serde_json::from_str(&s).ok());
240 let run_at_naive: NaiveDateTime = row
241 .try_get("run_at")
242 .map_err(|e| QueueError::Backend(e.to_string()))?;
243 let created_at_naive: NaiveDateTime = row
244 .try_get("created_at")
245 .map_err(|e| QueueError::Backend(e.to_string()))?;
246
247 let status = match status_str.as_str() {
248 "pending" => JobStatus::Pending,
249 "processing" | "running" => JobStatus::Running,
250 "completed" => JobStatus::Completed,
251 "failed" => JobStatus::Failed(retries as u32),
252 "dead_letter" => JobStatus::DeadLetter,
253 _ => JobStatus::Pending,
254 };
255
256 Ok(JobEntry {
257 id: job_id,
258 tenant_id,
259 job_type,
260 payload,
261 status,
262 created_at: created_at_naive.and_utc(),
263 run_at: run_at_naive.and_utc(),
264 attempts: retries as u32,
265 last_error,
266 result,
267 })
268 }
269
270 async fn set_result(&self, id: Uuid, result: Value) -> Result<(), QueueError> {
271 let result_str =
272 serde_json::to_string(&result).map_err(|e| QueueError::Backend(e.to_string()))?;
273 sqlx::query("UPDATE jobs SET result = ?, status = 'completed' WHERE id = ?")
274 .bind(result_str)
275 .bind(id.to_string())
276 .execute(&self.pool)
277 .await
278 .map_err(|e| QueueError::Backend(e.to_string()))?;
279 Ok(())
280 }
281}