Skip to content

Commit 9c39e36

Browse files
committed
Stream new logs from NATS
1 parent d747c30 commit 9c39e36

3 files changed

Lines changed: 110 additions & 63 deletions

File tree

src/db.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use chrono::{DateTime, Utc};
22
use serde::Serialize;
33
use sqlx::{postgres::PgPoolOptions, PgPool};
4+
use std::str::FromStr;
45
use uuid::Uuid;
56

67
#[derive(Clone, Debug, PartialEq, sqlx::Type, Serialize)]
@@ -14,6 +15,22 @@ pub enum LogLevel {
1415
Trace,
1516
}
1617

18+
impl FromStr for LogLevel {
19+
type Err = ();
20+
21+
fn from_str(s: &str) -> Result<Self, Self::Err> {
22+
match s {
23+
"error" => Ok(LogLevel::Error),
24+
"warn" => Ok(LogLevel::Warn),
25+
"info" => Ok(LogLevel::Info),
26+
"log" => Ok(LogLevel::Log),
27+
"debug" => Ok(LogLevel::Debug),
28+
"trace" => Ok(LogLevel::Trace),
29+
_ => Ok(LogLevel::Info), // Default to Info for unknown levels
30+
}
31+
}
32+
}
33+
1734
#[derive(Debug, Clone, Serialize)]
1835
pub struct LogEntry {
1936
pub date: DateTime<Utc>,

src/main.rs

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@ mod routes;
55
use actix_web::{middleware, web, App, HttpServer};
66
use chrono::Utc;
77
use futures::StreamExt;
8-
use std::sync::Arc;
9-
use tokio::sync::broadcast;
108
use uuid::Uuid;
119

1210
use db::{create_pool, insert_log, LogEntry, LogLevel};
13-
use routes::{get_worker_logs, health, stream_worker_logs, AppState};
11+
use routes::{health, stream_worker_logs, AppState};
1412

1513
#[actix_web::main]
1614
async fn main() -> std::io::Result<()> {
@@ -22,21 +20,18 @@ async fn main() -> std::io::Result<()> {
2220
// Connect to database
2321
let pool = create_pool().await.expect("Failed to connect to database");
2422

25-
// Create broadcast channel for SSE
26-
let (tx, _rx) = broadcast::channel::<LogEntry>(100);
27-
let broadcaster = Arc::new(tx);
23+
// Connect to NATS
24+
let nats_client = nats::nats_connect().await;
2825

29-
// Spawn NATS subscriber task
26+
// Spawn NATS subscriber task for database persistence
3027
let pool_clone = pool.clone();
31-
let broadcaster_clone = broadcaster.clone();
28+
let nats_clone = nats_client.clone();
3229

3330
tokio::spawn(async move {
34-
log::info!("Starting NATS subscriber...");
35-
36-
let nc = nats::nats_connect().await;
31+
log::info!("Starting NATS subscriber for database persistence...");
3732

3833
// Subscribe to all worker logs: *.console.*
39-
let mut sub = nc
34+
let mut sub = nats_clone
4035
.subscribe("*.console.*")
4136
.await
4237
.expect("Failed to subscribe to NATS");
@@ -63,18 +58,7 @@ async fn main() -> std::io::Result<()> {
6358
}
6459
};
6560

66-
let level = match level_str {
67-
"error" => LogLevel::Error,
68-
"warn" => LogLevel::Warn,
69-
"info" => LogLevel::Info,
70-
"log" => LogLevel::Log,
71-
"debug" => LogLevel::Debug,
72-
"trace" => LogLevel::Trace,
73-
_ => {
74-
log::warn!("Unknown log level: {}", level_str);
75-
LogLevel::Info
76-
}
77-
};
61+
let level = level_str.parse().unwrap_or(LogLevel::Info);
7862

7963
let message = match String::from_utf8(msg.payload.to_vec()) {
8064
Ok(m) => m,
@@ -92,12 +76,9 @@ async fn main() -> std::io::Result<()> {
9276
};
9377

9478
// Insert into database
95-
if let Err(e) = insert_log(&pool_clone, log_entry.clone()).await {
79+
if let Err(e) = insert_log(&pool_clone, log_entry).await {
9680
log::error!("Failed to insert log: {:?}", e);
9781
}
98-
99-
// Broadcast to SSE subscribers
100-
let _ = broadcaster_clone.send(log_entry);
10182
}
10283

10384
log::warn!("NATS subscriber stopped");
@@ -115,11 +96,10 @@ async fn main() -> std::io::Result<()> {
11596
App::new()
11697
.app_data(web::Data::new(AppState {
11798
pool: pool.clone(),
118-
broadcaster: broadcaster.clone(),
99+
nats_client: nats_client.clone(),
119100
}))
120101
.wrap(middleware::Logger::default())
121102
.service(health)
122-
.service(get_worker_logs)
123103
.service(stream_worker_logs)
124104
})
125105
.bind(("0.0.0.0", port))?

src/routes.rs

Lines changed: 83 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,113 @@
11
use actix_web::{get, web, HttpResponse, Responder};
22
use actix_web_lab::sse::{self, Sse};
3+
use futures::StreamExt;
34
use sqlx::PgPool;
4-
use std::sync::Arc;
5-
use tokio::sync::broadcast;
65
use uuid::Uuid;
76

87
use crate::db::{get_logs, LogEntry};
98

109
pub struct AppState {
1110
pub pool: PgPool,
12-
pub broadcaster: Arc<broadcast::Sender<LogEntry>>,
11+
pub nats_client: async_nats::Client,
1312
}
1413

15-
#[get("/health")]
14+
// Helper struct for creating SSE log events from raw data
15+
struct LogEventData {
16+
date: i64,
17+
level: String,
18+
message: String,
19+
}
20+
21+
impl Into<sse::Data> for LogEntry {
22+
fn into(self) -> sse::Data {
23+
sse::Data::new_json(serde_json::json!({
24+
"date": self.date.timestamp_millis(),
25+
"level": format!("{:?}", self.level).to_lowercase(),
26+
"message": self.message
27+
}))
28+
.unwrap()
29+
.event("log")
30+
}
31+
}
32+
33+
impl Into<sse::Data> for LogEventData {
34+
fn into(self) -> sse::Data {
35+
sse::Data::new_json(serde_json::json!({
36+
"date": self.date,
37+
"level": self.level,
38+
"message": self.message
39+
}))
40+
.unwrap()
41+
.event("log")
42+
}
43+
}
44+
45+
#[get("/api/v1/health")]
1646
pub async fn health() -> impl Responder {
1747
HttpResponse::Ok().json(serde_json::json!({
1848
"status": "ok",
1949
"service": "openworkers-logs"
2050
}))
2151
}
2252

23-
#[get("/logs/{worker_id}")]
24-
pub async fn get_worker_logs(
53+
#[get("/api/v1/workers/{worker_id}/logs")]
54+
pub async fn stream_worker_logs(
2555
worker_id: web::Path<Uuid>,
2656
data: web::Data<AppState>,
2757
) -> impl Responder {
28-
match get_logs(&data.pool, *worker_id, 100).await {
29-
Ok(logs) => HttpResponse::Ok().json(logs),
58+
let worker_id = *worker_id;
59+
60+
// Fetch last 10 logs from database
61+
let historical_logs = match get_logs(&data.pool, worker_id, 10).await {
62+
Ok(logs) => logs,
3063
Err(e) => {
31-
log::error!("Failed to fetch logs: {:?}", e);
32-
HttpResponse::InternalServerError().json(serde_json::json!({
33-
"error": "Failed to fetch logs"
34-
}))
64+
log::error!("Failed to fetch historical logs: {:?}", e);
65+
vec![]
3566
}
67+
};
68+
69+
// Subscribe to NATS for this specific worker's logs
70+
let subject = format!("{}.console.>", worker_id);
71+
let nats_sub = data.nats_client.subscribe(subject).await.ok();
72+
73+
if nats_sub.is_none() {
74+
log::error!("Failed to subscribe to NATS");
3675
}
37-
}
3876

39-
#[get("/logs/{worker_id}/stream")]
40-
pub async fn stream_worker_logs(
41-
worker_id: web::Path<Uuid>,
42-
data: web::Data<AppState>,
43-
) -> impl Responder {
44-
let worker_id = *worker_id;
45-
let mut rx = data.broadcaster.subscribe();
77+
let mut nats_sub = nats_sub.unwrap();
78+
79+
let mut id_counter: u64 = 0;
4680

4781
let stream = async_stream::stream! {
48-
while let Ok(log) = rx.recv().await {
49-
// Only send logs for this worker
50-
if log.worker_id == worker_id {
51-
let event = sse::Event::Data(
52-
sse::Data::new_json(serde_json::json!({
53-
"date": log.date,
54-
"message": log.message,
55-
"level": log.level
56-
}))
57-
.unwrap()
58-
);
59-
yield Ok::<_, actix_web::Error>(event);
60-
}
82+
// First, yield historical logs in reverse order (oldest first)
83+
for log in historical_logs.into_iter().rev() {
84+
let mut data: sse::Data = log.into();
85+
data.set_id(format!("{}", id_counter));
86+
id_counter += 1;
87+
yield Ok::<_, actix_web::Error>(sse::Event::Data(data));
88+
}
89+
90+
// Then stream new logs in real-time from NATS
91+
while let Some(msg) = nats_sub.next().await {
92+
// Parse level from subject: {worker_id}.console.{level}
93+
let level_str = msg.subject.split('.').nth(2).unwrap_or("info");
94+
95+
let message = match String::from_utf8(msg.payload.to_vec()) {
96+
Ok(m) => m,
97+
Err(_) => continue,
98+
};
99+
100+
let data = LogEventData {
101+
date: chrono::Utc::now().timestamp_millis(),
102+
level: level_str.to_string(),
103+
message,
104+
};
105+
106+
let mut data: sse::Data = data.into();
107+
data.set_id(format!("{}", id_counter));
108+
id_counter += 1;
109+
110+
yield Ok::<_, actix_web::Error>(sse::Event::Data(data));
61111
}
62112
};
63113

0 commit comments

Comments
 (0)