Skip to content

Commit db724ff

Browse files
committed
Add workers commands with versioned deployments
- workers list/get/create/delete - workers deploy with versioning (v1, v2, ...) - Backend trait abstracts API vs DB - Supports .js, .ts, .wasm files
1 parent c978735 commit db724ff

9 files changed

Lines changed: 1452 additions & 13 deletions

File tree

Cargo.lock

Lines changed: 726 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,9 @@ dirs = "6"
1616
thiserror = "2"
1717
colored = "3"
1818
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
19-
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono"] }
19+
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid"] }
2020
chrono = { version = "0.4", features = ["serde"] }
21+
uuid = { version = "1", features = ["serde"] }
22+
reqwest = { version = "0.12", features = ["json"] }
23+
sha2 = "0.10"
24+
hex = "0.4"

README.md

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,41 @@ ow infra db status
5858
ow infra db baseline
5959
```
6060

61+
### Workers
62+
63+
Works with both `api` and `db` aliases.
64+
65+
```bash
66+
# List workers
67+
ow workers list
68+
ow workers ls
69+
70+
# Create a worker
71+
ow workers create my-api -d "My API worker"
72+
73+
# Get worker details
74+
ow workers get my-api
75+
76+
# Deploy code to a worker
77+
ow workers deploy my-api ./src/index.ts -m "Initial deploy"
78+
ow workers deploy my-api ./src/index.ts -m "Fix bug" # v2, v3, ...
79+
80+
# Delete a worker
81+
ow workers delete my-api
82+
ow workers rm my-api
83+
```
84+
85+
Supported file types: `.js`, `.ts`, `.wasm`
86+
6187
### Using Aliases
6288

6389
```bash
6490
# Use default alias
65-
ow db status
91+
ow workers list
6692

6793
# Use specific alias (mc-style, alias as first argument)
94+
ow prod workers list
6895
ow infra db migrate
69-
ow prod db status
7096
```
7197

7298
## Config File Format

src/backend/api.rs

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use super::{Backend, BackendError, CreateWorkerInput, DeployInput, Deployment, Worker};
2+
use reqwest::Client;
3+
4+
pub struct ApiBackend {
5+
client: Client,
6+
base_url: String,
7+
token: Option<String>,
8+
}
9+
10+
impl ApiBackend {
11+
pub fn new(base_url: String, token: Option<String>) -> Self {
12+
Self {
13+
client: Client::new(),
14+
base_url,
15+
token,
16+
}
17+
}
18+
19+
fn request(&self, method: reqwest::Method, path: &str) -> reqwest::RequestBuilder {
20+
let url = format!("{}{}", self.base_url, path);
21+
let mut req = self.client.request(method, &url);
22+
23+
if let Some(token) = &self.token {
24+
req = req.bearer_auth(token);
25+
}
26+
27+
req
28+
}
29+
}
30+
31+
impl Backend for ApiBackend {
32+
async fn list_workers(&self) -> Result<Vec<Worker>, BackendError> {
33+
let response = self
34+
.request(reqwest::Method::GET, "/workers")
35+
.send()
36+
.await?;
37+
38+
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
39+
return Err(BackendError::Unauthorized);
40+
}
41+
42+
if !response.status().is_success() {
43+
let text = response.text().await.unwrap_or_default();
44+
return Err(BackendError::Api(text));
45+
}
46+
47+
let workers: Vec<Worker> = response.json().await?;
48+
Ok(workers)
49+
}
50+
51+
async fn get_worker(&self, name: &str) -> Result<Worker, BackendError> {
52+
let response = self
53+
.request(reqwest::Method::GET, &format!("/workers/{}", name))
54+
.send()
55+
.await?;
56+
57+
if response.status() == reqwest::StatusCode::NOT_FOUND {
58+
return Err(BackendError::NotFound(format!(
59+
"Worker '{}' not found",
60+
name
61+
)));
62+
}
63+
64+
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
65+
return Err(BackendError::Unauthorized);
66+
}
67+
68+
if !response.status().is_success() {
69+
let text = response.text().await.unwrap_or_default();
70+
return Err(BackendError::Api(text));
71+
}
72+
73+
let worker: Worker = response.json().await?;
74+
Ok(worker)
75+
}
76+
77+
async fn create_worker(&self, input: CreateWorkerInput) -> Result<Worker, BackendError> {
78+
let response = self
79+
.request(reqwest::Method::POST, "/workers")
80+
.json(&input)
81+
.send()
82+
.await?;
83+
84+
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
85+
return Err(BackendError::Unauthorized);
86+
}
87+
88+
if !response.status().is_success() {
89+
let text = response.text().await.unwrap_or_default();
90+
return Err(BackendError::Api(text));
91+
}
92+
93+
let worker: Worker = response.json().await?;
94+
Ok(worker)
95+
}
96+
97+
async fn delete_worker(&self, name: &str) -> Result<(), BackendError> {
98+
let response = self
99+
.request(reqwest::Method::DELETE, &format!("/workers/{}", name))
100+
.send()
101+
.await?;
102+
103+
if response.status() == reqwest::StatusCode::NOT_FOUND {
104+
return Err(BackendError::NotFound(format!(
105+
"Worker '{}' not found",
106+
name
107+
)));
108+
}
109+
110+
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
111+
return Err(BackendError::Unauthorized);
112+
}
113+
114+
if !response.status().is_success() {
115+
let text = response.text().await.unwrap_or_default();
116+
return Err(BackendError::Api(text));
117+
}
118+
119+
Ok(())
120+
}
121+
122+
async fn deploy_worker(
123+
&self,
124+
name: &str,
125+
input: DeployInput,
126+
) -> Result<Deployment, BackendError> {
127+
let response = self
128+
.request(reqwest::Method::POST, &format!("/workers/{}/deploy", name))
129+
.json(&input)
130+
.send()
131+
.await?;
132+
133+
if response.status() == reqwest::StatusCode::NOT_FOUND {
134+
return Err(BackendError::NotFound(format!(
135+
"Worker '{}' not found",
136+
name
137+
)));
138+
}
139+
140+
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
141+
return Err(BackendError::Unauthorized);
142+
}
143+
144+
if !response.status().is_success() {
145+
let text = response.text().await.unwrap_or_default();
146+
return Err(BackendError::Api(text));
147+
}
148+
149+
let deployment: Deployment = response.json().await?;
150+
Ok(deployment)
151+
}
152+
}

src/backend/db.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
use super::{Backend, BackendError, CreateWorkerInput, DeployInput, Deployment, Worker};
2+
use sha2::{Digest, Sha256};
3+
use sqlx::{PgPool, Row};
4+
5+
pub struct DbBackend {
6+
pool: PgPool,
7+
}
8+
9+
impl DbBackend {
10+
pub fn new(pool: PgPool) -> Self {
11+
Self { pool }
12+
}
13+
}
14+
15+
impl Backend for DbBackend {
16+
async fn list_workers(&self) -> Result<Vec<Worker>, BackendError> {
17+
let rows = sqlx::query(
18+
r#"
19+
SELECT id, name, "desc", current_version, created_at, updated_at
20+
FROM workers
21+
ORDER BY name
22+
"#,
23+
)
24+
.fetch_all(&self.pool)
25+
.await?;
26+
27+
let workers = rows
28+
.iter()
29+
.map(|row| Worker {
30+
id: row.get::<uuid::Uuid, _>("id").to_string(),
31+
name: row.get("name"),
32+
description: row.get("desc"),
33+
current_version: row.get("current_version"),
34+
created_at: row.get("created_at"),
35+
updated_at: row.get("updated_at"),
36+
})
37+
.collect();
38+
39+
Ok(workers)
40+
}
41+
42+
async fn get_worker(&self, name: &str) -> Result<Worker, BackendError> {
43+
let row = sqlx::query(
44+
r#"
45+
SELECT id, name, "desc", current_version, created_at, updated_at
46+
FROM workers
47+
WHERE name = $1
48+
"#,
49+
)
50+
.bind(name)
51+
.fetch_optional(&self.pool)
52+
.await?
53+
.ok_or_else(|| BackendError::NotFound(format!("Worker '{}' not found", name)))?;
54+
55+
Ok(Worker {
56+
id: row.get::<uuid::Uuid, _>("id").to_string(),
57+
name: row.get("name"),
58+
description: row.get("desc"),
59+
current_version: row.get("current_version"),
60+
created_at: row.get("created_at"),
61+
updated_at: row.get("updated_at"),
62+
})
63+
}
64+
65+
async fn create_worker(&self, input: CreateWorkerInput) -> Result<Worker, BackendError> {
66+
// For CLI/admin mode, we need a user_id
67+
// For now, get or create an "admin" user
68+
let user_id: uuid::Uuid = sqlx::query_scalar(
69+
r#"
70+
INSERT INTO users (id, username, created_at, updated_at)
71+
VALUES (gen_random_uuid(), 'cli-admin', now(), now())
72+
ON CONFLICT (username) DO UPDATE SET username = users.username
73+
RETURNING id
74+
"#,
75+
)
76+
.fetch_one(&self.pool)
77+
.await?;
78+
79+
let row = sqlx::query(
80+
r#"
81+
INSERT INTO workers (name, "desc", user_id)
82+
VALUES ($1, $2, $3)
83+
RETURNING id, name, "desc", current_version, created_at, updated_at
84+
"#,
85+
)
86+
.bind(&input.name)
87+
.bind(&input.description)
88+
.bind(user_id)
89+
.fetch_one(&self.pool)
90+
.await?;
91+
92+
Ok(Worker {
93+
id: row.get::<uuid::Uuid, _>("id").to_string(),
94+
name: row.get("name"),
95+
description: row.get("desc"),
96+
current_version: row.get("current_version"),
97+
created_at: row.get("created_at"),
98+
updated_at: row.get("updated_at"),
99+
})
100+
}
101+
102+
async fn delete_worker(&self, name: &str) -> Result<(), BackendError> {
103+
let result = sqlx::query("DELETE FROM workers WHERE name = $1")
104+
.bind(name)
105+
.execute(&self.pool)
106+
.await?;
107+
108+
if result.rows_affected() == 0 {
109+
return Err(BackendError::NotFound(format!(
110+
"Worker '{}' not found",
111+
name
112+
)));
113+
}
114+
115+
Ok(())
116+
}
117+
118+
async fn deploy_worker(
119+
&self,
120+
name: &str,
121+
input: DeployInput,
122+
) -> Result<Deployment, BackendError> {
123+
// Get worker ID
124+
let worker_id: uuid::Uuid = sqlx::query_scalar("SELECT id FROM workers WHERE name = $1")
125+
.bind(name)
126+
.fetch_optional(&self.pool)
127+
.await?
128+
.ok_or_else(|| BackendError::NotFound(format!("Worker '{}' not found", name)))?;
129+
130+
// Calculate hash
131+
let mut hasher = Sha256::new();
132+
hasher.update(&input.code);
133+
let hash = hex::encode(hasher.finalize());
134+
135+
// Get next version
136+
let current_version: Option<i32> =
137+
sqlx::query_scalar("SELECT MAX(version) FROM worker_deployments WHERE worker_id = $1")
138+
.bind(worker_id)
139+
.fetch_one(&self.pool)
140+
.await?;
141+
142+
let next_version = current_version.unwrap_or(0) + 1;
143+
144+
// Insert deployment
145+
let row = sqlx::query(
146+
r#"
147+
INSERT INTO worker_deployments (worker_id, version, hash, code_type, code, message)
148+
VALUES ($1, $2, $3, $4::enum_code_type, $5, $6)
149+
RETURNING worker_id, version, hash, code_type::text, deployed_at, message
150+
"#,
151+
)
152+
.bind(worker_id)
153+
.bind(next_version)
154+
.bind(&hash)
155+
.bind(&input.code_type)
156+
.bind(&input.code)
157+
.bind(&input.message)
158+
.fetch_one(&self.pool)
159+
.await?;
160+
161+
// Update worker's current_version
162+
sqlx::query("UPDATE workers SET current_version = $1 WHERE id = $2")
163+
.bind(next_version)
164+
.bind(worker_id)
165+
.execute(&self.pool)
166+
.await?;
167+
168+
Ok(Deployment {
169+
worker_id: row.get::<uuid::Uuid, _>("worker_id").to_string(),
170+
version: row.get("version"),
171+
hash: row.get("hash"),
172+
code_type: row.get("code_type"),
173+
deployed_at: row.get("deployed_at"),
174+
message: row.get("message"),
175+
})
176+
}
177+
}

0 commit comments

Comments
 (0)