Skip to content

Commit 9f547d0

Browse files
committed
refactor: round-robin worker pool, runtime v0.13.6, oneshot safety
Worker pool: - Round-robin dispatch across V8 threads for better parallelism - Configurable thread count via V8_THREADS env var - Graceful drain on SIGTERM (waits for in-flight requests) Dependencies: - Bump runner to 0.13.9 - Point runtime to v0.13.6 (unified pool, LRU context caching, clippy) - Switch v8 from local path to crates.io (openworkers-v8 v145.0.2) - Remove [patch.crates-io] override Oneshot mode: - Add thread-local semaphore to serialize OwnedIsolate per thread - Prevents non-LIFO isolate drop panic under concurrent load Tests: - Expand warm reuse tests with version change and multi-worker scenarios
1 parent 55ff9bd commit 9f547d0

8 files changed

Lines changed: 834 additions & 331 deletions

File tree

Cargo.lock

Lines changed: 333 additions & 197 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "openworkers-runner"
3-
version = "0.13.8"
3+
version = "0.13.9"
44
edition = "2024"
55
license = "MIT"
66
default-run = "openworkers-runner"
@@ -62,7 +62,7 @@ openworkers-core = { git = "https://github.com/openworkers/openworkers-core", ta
6262
openworkers-transform = { git = "https://github.com/openworkers/openworkers-transform", tag = "v0.1.0" }
6363

6464
# Runtime backend (v8 only for now, others require older version of core)
65-
openworkers-runtime-v8 = { git = "https://github.com/openworkers/openworkers-runtime-v8", tag = "v0.13.5", optional = true, features = ["ptrcomp"] }
65+
openworkers-runtime-v8 = { git = "https://github.com/openworkers/openworkers-runtime-v8", tag = "v0.13.6", optional = true, features = ["ptrcomp"] }
6666

6767
# WASM runtime (optional)
6868
# openworkers-runtime-wasm = { path = "../openworkers-runtime-wasm", optional = true }
@@ -124,3 +124,4 @@ path = "bin/snapshot.rs"
124124
[[bench]]
125125
name = "worker_benchmark"
126126
harness = false
127+

bin/main.rs

Lines changed: 122 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ use openworkers_runner::store::WorkerIdentifier;
1717
use sqlx::postgres::PgPoolOptions;
1818

1919
struct AppState {
20-
db: sqlx::Pool<sqlx::Postgres>,
20+
db_internal: sqlx::Pool<sqlx::Postgres>, // Runner's own queries (resolve, bindings lookup)
21+
db_worker: sqlx::Pool<sqlx::Postgres>, // Worker binding operations (KV, Database)
2122
log_tx: std::sync::mpsc::Sender<openworkers_runner::log::LogMessage>,
2223
shutdown_tx: tokio::sync::mpsc::Sender<()>,
2324
wall_clock_timeout_ms: u64,
@@ -131,14 +132,15 @@ async fn handle_request(
131132
headers.get("x-worker-name").and_then(|h| h.to_str().ok())
132133
);
133134

134-
// Acquire database connection
135-
let mut conn: sqlx::pool::PoolConnection<sqlx::Postgres> = match state.db.acquire().await {
136-
Ok(db) => db,
137-
Err(err) => {
138-
error!("Failed to acquire database connection: {}", err);
139-
return Ok(error_response(500, "Failed to acquire database connection"));
140-
}
141-
};
135+
// Acquire database connection from internal pool (for worker resolution queries)
136+
let mut conn: sqlx::pool::PoolConnection<sqlx::Postgres> =
137+
match state.db_internal.acquire().await {
138+
Ok(db) => db,
139+
Err(err) => {
140+
error!("Failed to acquire database connection: {}", err);
141+
return Ok(error_response(500, "Failed to acquire database connection"));
142+
}
143+
};
142144

143145
// Extract x-request-id header
144146
let request_id = match headers.get("x-request-id") {
@@ -440,6 +442,25 @@ async fn handle_worker_request(
440442
]);
441443
}
442444

445+
// Per-worker concurrency limit — applied before the global semaphore so a hot
446+
// worker gets 429 immediately without consuming global capacity
447+
let worker_sem = openworkers_runner::worker_pool::get_worker_semaphore(&worker.id);
448+
let _worker_permit = match tokio::time::timeout(
449+
std::time::Duration::from_millis(100),
450+
worker_sem.acquire_owned(),
451+
)
452+
.await
453+
{
454+
Ok(Ok(permit)) => permit,
455+
Ok(Err(_)) => return Ok(error_response(500, "Internal server error")),
456+
Err(_) => {
457+
return Ok(error_response(
458+
429,
459+
"Too many concurrent requests for this worker",
460+
));
461+
}
462+
};
463+
443464
let start = tokio::time::Instant::now();
444465

445466
// Collect request body (consumes req)
@@ -513,7 +534,7 @@ async fn handle_worker_request(
513534
termination_tx,
514535
state.log_tx.clone(),
515536
permit,
516-
state.db.clone(),
537+
state.db_worker.clone(),
517538
state.wall_clock_timeout_ms,
518539
span.clone(),
519540
);
@@ -652,52 +673,91 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
652673
.map(|v| !matches!(v.as_str(), "false" | "0"))
653674
.unwrap_or(true);
654675

655-
// Retry database connection with exponential backoff
656-
let mut retry_count = 0;
657-
let max_retries = 5;
658-
let pool = loop {
659-
match PgPoolOptions::new()
660-
.max_connections(20)
661-
.acquire_timeout(Duration::from_secs(5))
662-
.test_before_acquire(test_before_acquire)
663-
.connect(&db_url)
664-
.await
665-
{
666-
Ok(pool) => match sqlx::query("SELECT 1").fetch_one(&pool).await {
667-
Ok(_) => {
668-
debug!("connected to Postgres");
669-
break pool;
670-
}
676+
let internal_pool_size: u32 = std::env::var("DB_POOL_INTERNAL_SIZE")
677+
.ok()
678+
.and_then(|s| s.parse().ok())
679+
.unwrap_or(5);
680+
681+
let worker_pool_size: u32 = std::env::var("DB_POOL_WORKER_SIZE")
682+
.ok()
683+
.and_then(|s| s.parse().ok())
684+
.unwrap_or(15);
685+
686+
/// Create a PG pool with retry/backoff logic
687+
async fn create_pool(
688+
db_url: &str,
689+
max_connections: u32,
690+
test_before_acquire: bool,
691+
label: &str,
692+
) -> sqlx::Pool<sqlx::Postgres> {
693+
let mut retry_count: u32 = 0;
694+
let max_retries = 5;
695+
696+
loop {
697+
match PgPoolOptions::new()
698+
.max_connections(max_connections)
699+
.acquire_timeout(Duration::from_secs(5))
700+
.test_before_acquire(test_before_acquire)
701+
.connect(db_url)
702+
.await
703+
{
704+
Ok(pool) => match sqlx::query("SELECT 1").fetch_one(&pool).await {
705+
Ok(_) => {
706+
debug!("connected to Postgres ({})", label);
707+
return pool;
708+
}
709+
Err(e) => {
710+
retry_count += 1;
711+
error!("Database connection test failed ({}): {}", label, e);
712+
if retry_count >= max_retries {
713+
panic!(
714+
"Failed to connect to database ({}) after {} retries",
715+
label, max_retries
716+
);
717+
}
718+
let wait_time = Duration::from_secs(2u64.pow(retry_count.min(5)));
719+
warn!(
720+
"Database test query {} ({}) failed: {}. Retrying in {:?}...",
721+
retry_count, label, e, wait_time
722+
);
723+
tokio::time::sleep(wait_time).await;
724+
}
725+
},
671726
Err(e) => {
672-
error!("Database connection test failed: {}", e);
673-
if retry_count >= max_retries {
727+
retry_count += 1;
728+
if retry_count > max_retries {
674729
panic!(
675-
"Failed to connect to database after {} retries",
676-
max_retries
730+
"Failed to connect to database ({}) after {} retries: {}",
731+
label, max_retries, e
677732
);
678733
}
679-
}
680-
},
681-
Err(e) => {
682-
retry_count += 1;
683-
if retry_count > max_retries {
684-
panic!(
685-
"Failed to connect to database after {} retries: {}",
686-
max_retries, e
734+
let wait_time = Duration::from_secs(2u64.pow(retry_count.min(5)));
735+
warn!(
736+
"Database connection attempt {} ({}) failed: {}. Retrying in {:?}...",
737+
retry_count, label, e, wait_time
687738
);
739+
tokio::time::sleep(wait_time).await;
688740
}
689-
let wait_time = Duration::from_secs(2u64.pow(retry_count.min(5)));
690-
warn!(
691-
"Database connection attempt {} failed: {}. Retrying in {:?}...",
692-
retry_count, e, wait_time
693-
);
694-
tokio::time::sleep(wait_time).await;
695741
}
696742
}
697-
};
743+
}
744+
745+
let db_internal =
746+
create_pool(&db_url, internal_pool_size, test_before_acquire, "internal").await;
747+
let db_worker = create_pool(&db_url, worker_pool_size, test_before_acquire, "worker").await;
748+
749+
info!(
750+
"DB pools: internal={} worker={}",
751+
internal_pool_size, worker_pool_size
752+
);
753+
info!(
754+
"Per-worker concurrency limit: {}",
755+
openworkers_runner::worker_pool::get_max_concurrent_per_worker()
756+
);
698757

699758
// Connect to NATS with retries
700-
let mut retry_count = 0;
759+
let max_retries = 5;
760+
let mut retry_count: u32 = 0;
701761
loop {
702762
match openworkers_runner::nats::nats_connect()
703763
.await
@@ -745,16 +805,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
745805

746806
match v8_execute_mode {
747807
openworkers_runner::V8ExecuteMode::Pinned => {
748-
openworkers_runtime_v8::init_pinned_pool(pool_max_size, pool_limits);
749-
debug!(
750-
"Initialized pinned isolate pool with {} isolates max",
751-
pool_max_size
808+
openworkers_runtime_v8::init_pinned_pool(
809+
openworkers_runtime_v8::PinnedPoolConfig {
810+
max_per_thread: pool_max_size,
811+
max_per_owner: None,
812+
max_concurrent_per_isolate: 20,
813+
max_cached_contexts: 10,
814+
limits: pool_limits,
815+
},
752816
);
753-
}
754-
openworkers_runner::V8ExecuteMode::Pooled => {
755-
openworkers_runtime_v8::init_pool(pool_max_size, pool_limits);
756817
debug!(
757-
"Initialized global isolate pool with {} isolates max",
818+
"Initialized pinned isolate pool with {} isolates max per thread",
758819
pool_max_size
759820
);
760821
}
@@ -764,15 +825,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
764825
}
765826
}
766827

767-
openworkers_runner::event_scheduled::handle_scheduled(pool.clone(), log_tx.clone());
828+
openworkers_runner::event_scheduled::handle_scheduled(
829+
db_internal.clone(),
830+
db_worker.clone(),
831+
log_tx.clone(),
832+
);
768833

769834
// Shutdown signal channel
770835
let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::<()>(1);
771836
let shutdown_tx_signal = shutdown_tx.clone();
772837
let shutdown_tx_drain = shutdown_tx.clone();
773838

774839
let state = std::sync::Arc::new(AppState {
775-
db: pool,
840+
db_internal,
841+
db_worker,
776842
log_tx,
777843
shutdown_tx,
778844
wall_clock_timeout_ms,

src/event_scheduled.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,16 @@ async fn handle_scheduled_task(
2222
span: tracing::Span,
2323
task_id: String,
2424
data: ScheduledData,
25-
db: sqlx::Pool<sqlx::Postgres>,
25+
db_internal: sqlx::Pool<sqlx::Postgres>,
26+
db_worker: sqlx::Pool<sqlx::Postgres>,
2627
global_log_tx: std::sync::mpsc::Sender<crate::log::LogMessage>,
2728
) {
2829
// Start metrics timer
2930
#[cfg_attr(not(feature = "telemetry"), allow(unused_mut))]
3031
let mut metrics_timer = crate::metrics::MetricsTimer::new();
3132

32-
// Acquire connection per-task to avoid holding it across iterations
33-
let mut conn = match db.acquire().await {
33+
// Acquire connection from internal pool (worker lookup is an internal query)
34+
let mut conn = match db_internal.acquire().await {
3435
Ok(c) => c,
3536
Err(err) => {
3637
tracing::error!("Failed to acquire database connection: {}", err);
@@ -68,14 +69,15 @@ async fn handle_scheduled_task(
6869
]);
6970
}
7071

71-
// Connection is dropped here, returned to pool
72+
// Connection is dropped here, returned to internal pool
7273
drop(conn);
7374

75+
// Worker execution uses the worker pool for binding operations
7476
run_scheduled(
7577
task_id,
7678
data,
7779
worker_data,
78-
db,
80+
db_worker,
7981
global_log_tx,
8082
span,
8183
metrics_timer,
@@ -177,7 +179,8 @@ fn run_scheduled(
177179
}
178180

179181
pub fn handle_scheduled(
180-
db: sqlx::Pool<sqlx::Postgres>,
182+
db_internal: sqlx::Pool<sqlx::Postgres>,
183+
db_worker: sqlx::Pool<sqlx::Postgres>,
181184
global_log_tx: std::sync::mpsc::Sender<crate::log::LogMessage>,
182185
) {
183186
std::thread::spawn(move || {
@@ -252,7 +255,8 @@ pub fn handle_scheduled(
252255
span.clone(),
253256
task_id,
254257
data,
255-
db.clone(),
258+
db_internal.clone(),
259+
db_worker.clone(),
256260
global_log_tx.clone(),
257261
)
258262
.instrument(span)

0 commit comments

Comments
 (0)