Skip to content

Commit e2beffa

Browse files
Replace JsInstance checkout/return with worker and FIFO queue
1 parent 10a4779 commit e2beffa

4 files changed

Lines changed: 650 additions & 275 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ fn map_reducer_error(e: ReducerCallError, reducer: &str) -> (StatusCode, String)
9898
log::debug!("Attempt to call non-existent reducer {reducer}");
9999
StatusCode::NOT_FOUND
100100
}
101+
ReducerCallError::WorkerError(_) => StatusCode::INTERNAL_SERVER_ERROR,
101102
ReducerCallError::LifecycleReducer(lifecycle) => {
102103
log::debug!("Attempt to call {lifecycle:?} lifecycle reducer {reducer}");
103104
StatusCode::BAD_REQUEST

crates/core/src/host/module_host.rs

Lines changed: 145 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::error::DBError;
1111
use crate::estimation::{check_row_limit, estimate_rows_scanned};
1212
use crate::hash::Hash;
1313
use crate::host::host_controller::CallProcedureReturn;
14-
use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams};
14+
use crate::host::scheduler::{CallScheduledFunctionError, CallScheduledFunctionResult, ScheduledFunctionParams};
1515
use crate::host::v8::JsInstance;
1616
pub use crate::host::wasm_common::module_host_actor::{InstanceCommon, WasmInstance};
1717
use crate::host::wasmtime::ModuleInstance;
@@ -357,7 +357,9 @@ struct WasmtimeModuleHost {
357357
}
358358

359359
struct V8ModuleHost {
360-
instance_manager: ModuleInstanceManager<super::v8::JsModule>,
360+
module: super::v8::JsModule,
361+
instance_lane: super::v8::JsInstanceLane,
362+
procedure_instances: ModuleInstanceManager<super::v8::JsModule>,
361363
}
362364

363365
/// A module; used as a bound on `InstanceManager`.
@@ -844,6 +846,23 @@ impl<M: GenericModule> ModuleInstanceManager<M> {
844846
}
845847
}
846848

849+
fn new_empty(module: M, database_identity: Identity) -> Self {
850+
let host_type = module.host_type();
851+
let create_instance_time_metric = CreateInstanceTimeMetric {
852+
metric: WORKER_METRICS
853+
.module_create_instance_time_seconds
854+
.with_label_values(&database_identity, &host_type),
855+
host_type,
856+
database_identity,
857+
};
858+
859+
Self {
860+
instances: Mutex::new(VecDeque::new()),
861+
module,
862+
create_instance_time_metric,
863+
}
864+
}
865+
847866
async fn with_instance<R>(&self, f: impl AsyncFnOnce(M::Instance) -> (R, M::Instance)) -> R {
848867
let inst = self.get_instance().await;
849868
let (res, inst) = f(inst).await;
@@ -937,6 +956,8 @@ pub enum ReducerCallError {
937956
Args(#[from] InvalidReducerArguments),
938957
#[error(transparent)]
939958
NoSuchModule(#[from] NoSuchModule),
959+
#[error("The reducer worker encountered a fatal error: {0}")]
960+
WorkerError(String),
940961
#[error("no such reducer")]
941962
NoSuchReducer,
942963
#[error("no such scheduled reducer")]
@@ -1074,8 +1095,13 @@ impl ModuleHost {
10741095
}
10751096
ModuleWithInstance::Js { module, init_inst } => {
10761097
info = module.info();
1077-
let instance_manager = ModuleInstanceManager::new(module, init_inst, database_identity);
1078-
Arc::new(ModuleHostInner::Js(V8ModuleHost { instance_manager }))
1098+
let instance_lane = super::v8::JsInstanceLane::new(module.clone(), init_inst);
1099+
let procedure_instances = ModuleInstanceManager::new_empty(module.clone(), database_identity);
1100+
Arc::new(ModuleHostInner::Js(V8ModuleHost {
1101+
module,
1102+
instance_lane,
1103+
procedure_instances,
1104+
}))
10791105
}
10801106
};
10811107
let on_panic = Arc::new(on_panic);
@@ -1143,18 +1169,13 @@ impl ModuleHost {
11431169
})
11441170
.await
11451171
}
1146-
ModuleHostInner::Js(V8ModuleHost { instance_manager }) => {
1147-
instance_manager
1148-
.with_instance(async |mut inst| {
1149-
let res = inst
1150-
.run_on_thread(async move || {
1151-
drop(timer_guard);
1152-
f().await
1153-
})
1154-
.await;
1155-
(res, inst)
1172+
ModuleHostInner::Js(V8ModuleHost { instance_lane, .. }) => {
1173+
instance_lane
1174+
.run_on_thread(async move || {
1175+
drop(timer_guard);
1176+
f().await
11561177
})
1157-
.await
1178+
.await?
11581179
}
11591180
})
11601181
}
@@ -1193,7 +1214,7 @@ impl ModuleHost {
11931214
arg: A,
11941215
timer: impl FnOnce(&str) -> Guard,
11951216
work_wasm: impl AsyncFnOnce(Guard, &SingleCoreExecutor, Box<ModuleInstance>, A) -> (R, Box<ModuleInstance>),
1196-
work_js: impl AsyncFnOnce(Guard, &mut JsInstance, A) -> R,
1217+
work_js: impl AsyncFnOnce(Guard, &super::v8::JsInstanceLane, A) -> R,
11971218
) -> Result<R, NoSuchModule> {
11981219
self.guard_closed()?;
11991220
let timer_guard = timer(label);
@@ -1220,11 +1241,7 @@ impl ModuleHost {
12201241
.with_instance(async |inst| work_wasm(timer_guard, executor, inst, arg).await)
12211242
.await
12221243
}
1223-
ModuleHostInner::Js(V8ModuleHost { instance_manager }) => {
1224-
instance_manager
1225-
.with_instance(async |mut inst| (work_js(timer_guard, &mut inst, arg).await, inst))
1226-
.await
1227-
}
1244+
ModuleHostInner::Js(V8ModuleHost { instance_lane, .. }) => work_js(timer_guard, instance_lane, arg).await,
12281245
})
12291246
}
12301247

@@ -1237,7 +1254,7 @@ impl ModuleHost {
12371254
label: &str,
12381255
arg: A,
12391256
wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static,
1240-
js: impl AsyncFnOnce(A, &mut JsInstance) -> R,
1257+
js: impl AsyncFnOnce(A, &super::v8::JsInstanceLane) -> R,
12411258
) -> Result<R, NoSuchModule>
12421259
where
12431260
R: Send + 'static,
@@ -1269,6 +1286,45 @@ impl ModuleHost {
12691286
.await
12701287
}
12711288

1289+
async fn with_js_pooled_instance<R>(
1290+
&self,
1291+
label: &str,
1292+
f: impl AsyncFnOnce(&JsInstance) -> R,
1293+
) -> Result<R, NoSuchModule> {
1294+
self.guard_closed()?;
1295+
let timer_guard = self.start_call_timer(label);
1296+
1297+
scopeguard::defer_on_unwind!({
1298+
log::warn!("pooled JS instance operation {label} panicked");
1299+
(self.on_panic)();
1300+
});
1301+
1302+
Ok(match &*self.inner {
1303+
ModuleHostInner::Wasm(_) => unreachable!("WASM should not use the pooled JS instance path"),
1304+
ModuleHostInner::Js(V8ModuleHost {
1305+
procedure_instances, ..
1306+
}) => {
1307+
procedure_instances
1308+
.with_instance(async |inst| {
1309+
drop(timer_guard);
1310+
let res = f(&inst).await;
1311+
(res, inst)
1312+
})
1313+
.await
1314+
}
1315+
})
1316+
}
1317+
1318+
async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result<ViewCommandResult, ViewCallError> {
1319+
self.call(
1320+
label,
1321+
cmd,
1322+
async |cmd, inst| Ok(inst.call_view(cmd)),
1323+
async |cmd, inst| inst.call_view(cmd).await,
1324+
)
1325+
.await?
1326+
}
1327+
12721328
pub async fn disconnect_client(&self, client_id: ClientActorId) {
12731329
log::trace!("disconnecting client {client_id}");
12741330
if let Err(e) = self
@@ -1536,14 +1592,13 @@ impl ModuleHost {
15361592
args,
15371593
};
15381594

1539-
Ok(self
1540-
.call(
1541-
&reducer_def.name,
1542-
call_reducer_params,
1543-
async |p, inst| inst.call_reducer(p),
1544-
async |p, inst| inst.call_reducer(p).await,
1545-
)
1546-
.await?)
1595+
self.call(
1596+
&reducer_def.name,
1597+
call_reducer_params,
1598+
async |p, inst| Ok(inst.call_reducer(p)),
1599+
async |p, inst| inst.call_reducer(p).await,
1600+
)
1601+
.await?
15471602
}
15481603

15491604
pub async fn call_reducer(
@@ -1611,12 +1666,7 @@ impl ModuleHost {
16111666
};
16121667

16131668
let res = self
1614-
.call(
1615-
"call_view_add_single_subscription",
1616-
cmd,
1617-
async |cmd, inst| inst.call_view(cmd),
1618-
async |cmd, inst| inst.call_view(cmd).await,
1619-
)
1669+
.call_view_command("call_view_add_single_subscription", cmd)
16201670
.await
16211671
//TODO: handle error better
16221672
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
@@ -1644,12 +1694,7 @@ impl ModuleHost {
16441694
};
16451695

16461696
let res = self
1647-
.call(
1648-
"call_view_add_multi_subscription",
1649-
cmd,
1650-
async |cmd, inst| inst.call_view(cmd),
1651-
async |cmd, inst| inst.call_view(cmd).await,
1652-
)
1697+
.call_view_command("call_view_add_multi_subscription", cmd)
16531698
.await
16541699
//TODO: handle error better
16551700
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
@@ -1677,12 +1722,7 @@ impl ModuleHost {
16771722
};
16781723

16791724
let res = self
1680-
.call(
1681-
"call_view_remove_v2_subscription",
1682-
cmd,
1683-
async |cmd, inst| inst.call_view(cmd),
1684-
async |cmd, inst| inst.call_view(cmd).await,
1685-
)
1725+
.call_view_command("call_view_remove_v2_subscription", cmd)
16861726
.await
16871727
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
16881728

@@ -1709,12 +1749,7 @@ impl ModuleHost {
17091749
};
17101750

17111751
let res = self
1712-
.call(
1713-
"call_view_add_multi_subscription",
1714-
cmd,
1715-
async |cmd, inst| inst.call_view(cmd),
1716-
async |cmd, inst| inst.call_view(cmd).await,
1717-
)
1752+
.call_view_command("call_view_add_multi_subscription", cmd)
17181753
.await
17191754
//TODO: handle error better
17201755
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
@@ -1742,12 +1777,7 @@ impl ModuleHost {
17421777
};
17431778

17441779
let res = self
1745-
.call(
1746-
"call_view_add_legacy_subscription",
1747-
cmd,
1748-
async |cmd, inst| inst.call_view(cmd),
1749-
async |cmd, inst| inst.call_view(cmd).await,
1750-
)
1780+
.call_view_command("call_view_add_legacy_subscription", cmd)
17511781
.await
17521782
//TODO: handle error better
17531783
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
@@ -1776,12 +1806,7 @@ impl ModuleHost {
17761806
};
17771807

17781808
let res = self
1779-
.call(
1780-
"call_view_sql",
1781-
cmd,
1782-
async |cmd, inst| inst.call_view(cmd),
1783-
async |cmd, inst| inst.call_view(cmd).await,
1784-
)
1809+
.call_view_command("call_view_sql", cmd)
17851810
.await
17861811
//TODO: handle error better
17871812
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
@@ -1885,26 +1910,63 @@ impl ModuleHost {
18851910
name: &str,
18861911
params: CallProcedureParams,
18871912
) -> Result<CallProcedureReturn, NoSuchModule> {
1888-
self.call(
1889-
name,
1890-
params,
1891-
async move |params, inst| inst.call_procedure(params).await,
1892-
async move |params, inst| inst.call_procedure(params).await,
1893-
)
1894-
.await
1913+
match &*self.inner {
1914+
ModuleHostInner::Wasm(_) => {
1915+
self.call(
1916+
name,
1917+
params,
1918+
async move |params, inst| inst.call_procedure(params).await,
1919+
async move |_params, _inst| unreachable!("JS procedure lane is not used for WASM modules"),
1920+
)
1921+
.await
1922+
}
1923+
ModuleHostInner::Js(_) => {
1924+
self.with_js_pooled_instance(name, async move |inst| inst.call_procedure(params).await)
1925+
.await
1926+
}
1927+
}
18951928
}
18961929

18971930
pub(super) async fn call_scheduled_function(
18981931
&self,
18991932
params: ScheduledFunctionParams,
1900-
) -> Result<CallScheduledFunctionResult, NoSuchModule> {
1901-
self.call(
1902-
"unknown scheduled function",
1903-
params,
1904-
async move |params, inst| inst.call_scheduled_function(params).await,
1905-
async move |params, inst| inst.call_scheduled_function(params).await,
1906-
)
1907-
.await
1933+
) -> Result<CallScheduledFunctionResult, CallScheduledFunctionError> {
1934+
match &*self.inner {
1935+
ModuleHostInner::Wasm(_) => {
1936+
self.call(
1937+
"unknown scheduled function",
1938+
params,
1939+
async move |params, inst| Ok(inst.call_scheduled_function(params).await),
1940+
async move |_params, _inst| unreachable!("JS scheduled-function lane is not used for WASM modules"),
1941+
)
1942+
.await?
1943+
}
1944+
ModuleHostInner::Js(V8ModuleHost { module, .. }) => {
1945+
let use_procedure_lane =
1946+
match params.uses_procedure_lane(&self.info, module.replica_ctx().relational_db.as_ref()) {
1947+
Ok(use_procedure_lane) => use_procedure_lane,
1948+
Err(err) => {
1949+
log::error!("failed to classify scheduled JS function; routing to procedure lane: {err:#}");
1950+
true
1951+
}
1952+
};
1953+
if use_procedure_lane {
1954+
Ok(self
1955+
.with_js_pooled_instance("unknown scheduled function", async move |inst| {
1956+
inst.call_scheduled_function(params).await
1957+
})
1958+
.await?)
1959+
} else {
1960+
self.call(
1961+
"unknown scheduled function",
1962+
params,
1963+
async move |params, inst| Ok(inst.call_scheduled_function(params).await),
1964+
async move |params, inst| inst.call_scheduled_function(params).await,
1965+
)
1966+
.await?
1967+
}
1968+
}
1969+
}
19081970
}
19091971

19101972
/// Materializes the views return by the `view_collector`, if not already materialized,
@@ -2467,14 +2529,14 @@ impl ModuleHost {
24672529
pub(crate) fn replica_ctx(&self) -> &ReplicaContext {
24682530
match &*self.inner {
24692531
ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.replica_ctx(),
2470-
ModuleHostInner::Js(js) => js.instance_manager.module.replica_ctx(),
2532+
ModuleHostInner::Js(js) => js.module.replica_ctx(),
24712533
}
24722534
}
24732535

24742536
fn scheduler(&self) -> &Scheduler {
24752537
match &*self.inner {
24762538
ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.scheduler(),
2477-
ModuleHostInner::Js(js) => js.instance_manager.module.scheduler(),
2539+
ModuleHostInner::Js(js) => js.module.scheduler(),
24782540
}
24792541
}
24802542
}

0 commit comments

Comments
 (0)