Skip to content

Commit 02c4ab1

Browse files
committed
Add websocket feature
1 parent 5638de0 commit 02c4ab1

6 files changed

Lines changed: 146 additions & 11 deletions

File tree

Cargo.lock

Lines changed: 16 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
[package]
22
name = "openworkers-logs"
3-
version = "0.1.3"
3+
version = "0.1.4"
44
edition = "2024"
55

6+
[features]
7+
websocket = ["dep:actix-ws"]
8+
69
[dependencies]
710
actix-web = "4.12.1"
811
actix-web-lab = "0.24.3"
12+
actix-ws = { version = "0.3", optional = true }
913
tokio = { version = "1.43.0", features = ["full"] }
1014
sqlx = { version = "0.8.3", features = ["runtime-tokio", "postgres", "uuid", "chrono"] }
1115
async-nats = "0.45.0"

Dockerfile.multi

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ RUN --mount=type=cache,id=cargo-$TARGETPLATFORM,sharing=locked,target=$CARGO_HOM
4646
--mount=type=cache,id=cargo-$TARGETPLATFORM,sharing=locked,target=$CARGO_HOME/registry \
4747
--mount=type=cache,id=cargo-$TARGETPLATFORM,sharing=locked,target=/build/target \
4848
case "$TARGETPLATFORM" in \
49-
"linux/arm64") cargo chef cook --release --target aarch64-unknown-linux-gnu --recipe-path recipe.json ;; \
50-
"linux/amd64") cargo chef cook --release --target x86_64-unknown-linux-gnu --recipe-path recipe.json ;; \
49+
"linux/arm64") cargo chef cook --release --features websocket --target aarch64-unknown-linux-gnu --recipe-path recipe.json ;; \
50+
"linux/amd64") cargo chef cook --release --features websocket --target x86_64-unknown-linux-gnu --recipe-path recipe.json ;; \
5151
*) echo "Unsupported platform: $TARGETPLATFORM" && exit 1 ;; \
5252
esac
5353

@@ -58,9 +58,9 @@ RUN --mount=type=cache,id=cargo-$TARGETPLATFORM,sharing=locked,target=$CARGO_HOM
5858
--mount=type=cache,id=cargo-$TARGETPLATFORM,sharing=locked,target=$CARGO_HOME/registry \
5959
--mount=type=cache,id=cargo-$TARGETPLATFORM,sharing=locked,target=/build/target \
6060
case "$TARGETPLATFORM" in \
61-
"linux/arm64") cargo build --release --target aarch64-unknown-linux-gnu && \
61+
"linux/arm64") cargo build --release --features websocket --target aarch64-unknown-linux-gnu && \
6262
cp /build/target/aarch64-unknown-linux-gnu/release/openworkers-logs /build/output ;; \
63-
"linux/amd64") cargo build --release --target x86_64-unknown-linux-gnu && \
63+
"linux/amd64") cargo build --release --features websocket --target x86_64-unknown-linux-gnu && \
6464
cp /build/target/x86_64-unknown-linux-gnu/release/openworkers-logs /build/output ;; \
6565
*) echo "Unsupported platform: $TARGETPLATFORM" && exit 1 ;; \
6666
esac

src/main.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use futures::StreamExt;
88
use uuid::Uuid;
99

1010
use db::{LogEntry, LogLevel, create_pool, insert_log};
11+
#[cfg(feature = "websocket")]
12+
use routes::ws_worker_logs;
1113
use routes::{AppState, health, stream_worker_logs};
1214

1315
#[actix_web::main]
@@ -93,14 +95,19 @@ async fn main() -> std::io::Result<()> {
9395
log::info!("Starting HTTP server on 0.0.0.0:{}", port);
9496

9597
HttpServer::new(move || {
96-
App::new()
98+
let app = App::new()
9799
.app_data(web::Data::new(AppState {
98100
pool: pool.clone(),
99101
nats_client: nats_client.clone(),
100102
}))
101103
.wrap(middleware::Logger::default())
102104
.service(health)
103-
.service(stream_worker_logs)
105+
.service(stream_worker_logs);
106+
107+
#[cfg(feature = "websocket")]
108+
let app = app.service(ws_worker_logs);
109+
110+
app
104111
})
105112
.bind(("0.0.0.0", port))?
106113
.run()

src/nats.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use base64::engine::general_purpose::STANDARD;
21
use base64::engine::Engine;
2+
use base64::engine::general_purpose::STANDARD;
33

44
pub async fn nats_connect() -> async_nats::Client {
55
let nats_servers = std::env::var("NATS_SERVERS").expect("NATS_SERVERS must be set");

src/routes.rs

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1-
use actix_web::{get, web, HttpResponse, Responder};
1+
use actix_web::{HttpResponse, Responder, get, web};
22
use actix_web_lab::sse::{self, Sse};
33
use futures::StreamExt;
44
use sqlx::PgPool;
55
use uuid::Uuid;
66

7-
use crate::db::{get_logs, LogEntry};
7+
#[cfg(feature = "websocket")]
8+
use actix_web::HttpRequest;
9+
#[cfg(feature = "websocket")]
10+
use actix_ws::Message;
11+
12+
use crate::db::{LogEntry, get_logs};
813

914
pub struct AppState {
1015
pub pool: PgPool,
@@ -113,3 +118,107 @@ pub async fn stream_worker_logs(
113118

114119
Sse::from_stream(stream).with_keep_alive(std::time::Duration::from_secs(5))
115120
}
121+
122+
#[cfg(feature = "websocket")]
123+
#[get("/api/v1/workers/{worker_id}/ws-logs")]
124+
pub async fn ws_worker_logs(
125+
req: HttpRequest,
126+
body: web::Payload,
127+
worker_id: web::Path<Uuid>,
128+
data: web::Data<AppState>,
129+
) -> Result<HttpResponse, actix_web::Error> {
130+
let worker_id = *worker_id;
131+
132+
let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
133+
134+
// Fetch last 10 logs from database
135+
let historical_logs = match get_logs(&data.pool, worker_id, 10).await {
136+
Ok(logs) => logs,
137+
Err(e) => {
138+
log::error!("Failed to fetch historical logs: {:?}", e);
139+
vec![]
140+
}
141+
};
142+
143+
// Subscribe to NATS for this specific worker's logs
144+
let subject = format!("{}.console.>", worker_id);
145+
let nats_sub = match data.nats_client.subscribe(subject).await {
146+
Ok(sub) => sub,
147+
Err(e) => {
148+
log::error!("Failed to subscribe to NATS: {:?}", e);
149+
return Ok(response);
150+
}
151+
};
152+
153+
// Spawn WebSocket handler task
154+
actix_web::rt::spawn(async move {
155+
let mut nats_sub = nats_sub;
156+
157+
// Send historical logs first (oldest first)
158+
for log_entry in historical_logs.into_iter().rev() {
159+
let json = serde_json::json!({
160+
"date": log_entry.date.timestamp_millis(),
161+
"level": format!("{:?}", log_entry.level).to_lowercase(),
162+
"message": log_entry.message
163+
});
164+
165+
if session.text(json.to_string()).await.is_err() {
166+
return;
167+
}
168+
}
169+
170+
// Handle incoming messages and NATS subscription concurrently
171+
loop {
172+
tokio::select! {
173+
// Handle incoming WebSocket messages (ping/pong, close)
174+
Some(msg) = msg_stream.next() => {
175+
match msg {
176+
Ok(Message::Ping(bytes)) => {
177+
if session.pong(&bytes).await.is_err() {
178+
break;
179+
}
180+
}
181+
Ok(Message::Close(_)) => {
182+
break;
183+
}
184+
Err(_) => {
185+
break;
186+
}
187+
_ => {}
188+
}
189+
}
190+
191+
// Stream logs from NATS
192+
Some(nats_msg) = nats_sub.next() => {
193+
let level_str = nats_msg.subject.split('.').nth(2).unwrap_or("info");
194+
195+
let message = match String::from_utf8(nats_msg.payload.to_vec()) {
196+
Ok(m) => m,
197+
Err(_) => continue,
198+
};
199+
200+
let json = serde_json::json!({
201+
"date": chrono::Utc::now().timestamp_millis(),
202+
"level": level_str,
203+
"message": message
204+
});
205+
206+
if session.text(json.to_string()).await.is_err() {
207+
break;
208+
}
209+
}
210+
211+
// Send ping every 30 seconds to keep connection alive
212+
_ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
213+
if session.ping(b"").await.is_err() {
214+
break;
215+
}
216+
}
217+
}
218+
}
219+
220+
let _ = session.close(None).await;
221+
});
222+
223+
Ok(response)
224+
}

0 commit comments

Comments
 (0)