From 074935129eee78cf1cc4ee1ce21e00892070cc90 Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Tue, 19 May 2026 12:42:07 -0700 Subject: [PATCH 1/4] Add LAIO queue depth instrumentation Extracted from d932efbda8af9dfc72d0518d3a2443167fcb721d. --- src/platform_linux/laio.c | 79 ++++++++++++++++++++++++++++++++ src/platform_linux/laio.h | 8 +++- src/platform_linux/platform_io.h | 20 ++++++++ 3 files changed, 106 insertions(+), 1 deletion(-) diff --git a/src/platform_linux/laio.c b/src/platform_linux/laio.c index 721e5cd8..8d1d224b 100644 --- a/src/platform_linux/laio.c +++ b/src/platform_linux/laio.c @@ -38,6 +38,72 @@ #define LAIO_CLEANER_TERMINATE_CALLBACK NULL +static inline void +laio_record_submit_depth(io_process_context *pctx, uint64 depth) +{ + uint64 bucket = MIN(depth, (uint64)LAIO_QD_HIST_BUCKETS - 1); + pctx->io_submit_hist[bucket]++; + pctx->io_submit_count++; + pctx->max_observed_io_count = MAX(pctx->max_observed_io_count, depth); +} + +static void +laio_reset_stats(io_handle *ioh) +{ + laio_handle *io = (laio_handle *)ioh; + + for (uint64 i = 0; i < MAX_THREADS; i++) { + io_process_context *pctx = &io->ctx[i]; + pctx->io_submit_count = 0; + pctx->io_submit_eagain = 0; + pctx->max_observed_io_count = pctx->io_count; + memset(pctx->io_submit_hist, 0, sizeof(pctx->io_submit_hist)); + } +} + +static void +laio_print_stats(io_handle *ioh, platform_log_handle *log_handle) +{ + laio_handle *io = (laio_handle *)ioh; + uint64 total_submit_count = 0; + uint64 total_submit_eagain = 0; + uint64 max_observed_depth = 0; + uint64 hist[LAIO_QD_HIST_BUCKETS] = {0}; + + for (uint64 i = 0; i < MAX_THREADS; i++) { + io_process_context *pctx = &io->ctx[i]; + total_submit_count += pctx->io_submit_count; + total_submit_eagain += pctx->io_submit_eagain; + max_observed_depth = MAX(max_observed_depth, pctx->max_observed_io_count); + for (uint64 bucket = 0; bucket < LAIO_QD_HIST_BUCKETS; bucket++) { + hist[bucket] += pctx->io_submit_hist[bucket]; + } + } + + platform_log(log_handle, "LAIO Queue Depth Histogram\n"); + platform_log(log_handle, + "------------------------------------------------------------\n"); + platform_log(log_handle, "successful submits: %lu\n", total_submit_count); + platform_log(log_handle, "EAGAIN retries: %lu\n", total_submit_eagain); + platform_log(log_handle, "max observed depth: %lu\n", max_observed_depth); + platform_log(log_handle, "submit-depth histogram:\n"); + for (uint64 bucket = 0; bucket < LAIO_QD_HIST_BUCKETS; bucket++) { + if (hist[bucket] == 0) { + continue; + } + if (bucket + 1 == LAIO_QD_HIST_BUCKETS) { + platform_log(log_handle, + " >=%-3u : %lu\n", + IO_DEFAULT_KERNEL_QUEUE_SIZE, + hist[bucket]); + } else { + platform_log(log_handle, " %-5lu: %lu\n", bucket, hist[bucket]); + } + } + platform_log(log_handle, + "------------------------------------------------------------\n"); +} + /* * Context management */ @@ -149,6 +215,13 @@ laio_get_thread_context(laio_handle *io) __sync_lock_release(&io->ctx[pid].lock); return &io->ctx[pid]; } + io->ctx[pid].io_count = 0; + io->ctx[pid].io_submit_count = 0; + io->ctx[pid].io_submit_eagain = 0; + io->ctx[pid].max_observed_io_count = 0; + memset(io->ctx[pid].io_submit_hist, + 0, + sizeof(io->ctx[pid].io_submit_hist)); int status = io_setup(io->cfg->kernel_queue_size, &io->ctx[pid].ctx); if (status != 0) { platform_error_log( @@ -312,6 +385,7 @@ laio_async_run(io_async_state *gios) // Successfully submitted, which means that our state was stored on the // kernel's wait queue for this io, which means we have "given away" // our state and therefore must not touch it again before returning. + laio_record_submit_depth(ios->pctx, ios->pctx->io_count); if (queue != NULL) { async_wait_queue_unlock(queue); } @@ -340,6 +414,7 @@ laio_async_run(io_async_state *gios) } else if (queue != NULL) { // Transient failure to submit, so we still own our state. Wait to try // again. + ios->pctx->io_submit_eagain++; async_wait_queue_append( queue, &ios->waiter_node, ios->callback, ios->callback_arg); async_yield_after(ios, async_wait_queue_unlock(queue)); @@ -348,7 +423,9 @@ laio_async_run(io_async_state *gios) // Transient failure to submit, so we still own our state, but we were // trying optimistically to submit w/o locking our wait queue. So try // again with lock held. + ios->pctx->io_submit_eagain++; queue = &ios->pctx->submit_waiters; + continue; } } @@ -545,6 +622,8 @@ static io_ops laio_ops = { .async_state_init = laio_async_state_init, .cleanup = laio_cleanup, .wait_all = laio_wait_all, + .print_stats = laio_print_stats, + .reset_stats = laio_reset_stats, }; /* diff --git a/src/platform_linux/laio.h b/src/platform_linux/laio.h index 2bbb7d4f..ae97acd6 100644 --- a/src/platform_linux/laio.h +++ b/src/platform_linux/laio.h @@ -21,10 +21,16 @@ typedef enum process_context_state { PROCESS_CONTEXT_STATE_SHUTTING_DOWN, } process_context_state; +#define LAIO_QD_HIST_BUCKETS (IO_DEFAULT_KERNEL_QUEUE_SIZE + 2) + typedef struct io_process_context { process_context_state state; uint64 lock; uint64 io_count; // inflight ios + uint64 io_submit_count; + uint64 io_submit_eagain; + uint64 io_submit_hist[LAIO_QD_HIST_BUCKETS]; + uint64 max_observed_io_count; io_context_t ctx; pthread_t io_cleaner; async_wait_queue submit_waiters; @@ -50,4 +56,4 @@ laio_handle_create(io_config *cfg, platform_heap_id hid); // The IO system must be quiesced before calling this function. void -laio_handle_destroy(io_handle *ioh); \ No newline at end of file +laio_handle_destroy(io_handle *ioh); diff --git a/src/platform_linux/platform_io.h b/src/platform_linux/platform_io.h index 0be2afab..7c9ca592 100644 --- a/src/platform_linux/platform_io.h +++ b/src/platform_linux/platform_io.h @@ -81,6 +81,8 @@ typedef void (*io_wait_all_fn)(io_handle *io); typedef void (*io_register_thread_fn)(io_handle *io); typedef void (*io_deregister_thread_fn)(io_handle *io); typedef bool32 (*io_max_latency_elapsed_fn)(io_handle *io, timestamp ts); +typedef void (*io_print_stats_fn)(io_handle *io, platform_log_handle *log); +typedef void (*io_reset_stats_fn)(io_handle *io); typedef void *(*io_get_context_fn)(io_handle *io); @@ -96,6 +98,8 @@ typedef struct io_ops { io_register_thread_fn register_thread; io_deregister_thread_fn deregister_thread; io_max_latency_elapsed_fn max_latency_elapsed; + io_print_stats_fn print_stats; + io_reset_stats_fn reset_stats; io_get_context_fn get_context; } io_ops; @@ -225,6 +229,22 @@ io_max_latency_elapsed(io_handle *io, timestamp ts) return TRUE; } +static inline void +io_print_stats(io_handle *io, platform_log_handle *log_handle) +{ + if (io->ops->print_stats) { + io->ops->print_stats(io, log_handle); + } +} + +static inline void +io_reset_stats(io_handle *io) +{ + if (io->ops->reset_stats) { + io->ops->reset_stats(io); + } +} + /* *----------------------------------------------------------------------------- * io_config_init -- From 6c0860d121857f3208fee765a71865e63f44e35d Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Sun, 24 May 2026 18:08:56 -0700 Subject: [PATCH 2/4] Print LAIO queue depth stats in scan benchmark Extracted from the scan_benchmark add commits that followed d932efbd. --- tests/functional/scan_benchmark.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/functional/scan_benchmark.c b/tests/functional/scan_benchmark.c index 452653ac..d4c39ec9 100644 --- a/tests/functional/scan_benchmark.c +++ b/tests/functional/scan_benchmark.c @@ -500,6 +500,7 @@ scan_benchmark_run_scan(const splinterdb_config *cfg, } splinterdb_stats_reset(kvs); + io_reset_stats((io_handle *)splinterdb_get_io_handle(kvs)); splinterdb_iterator *iter = NULL; timestamp start_time = platform_get_timestamp(); @@ -562,6 +563,8 @@ scan_benchmark_run_scan(const splinterdb_config *cfg, if (print_lookup_stats) { splinterdb_stats_print_lookup(kvs); } + io_print_stats((io_handle *)splinterdb_get_io_handle(kvs), + Platform_default_log_handle); splinterdb_iterator_deinit(iter); splinterdb_close(&kvs); @@ -586,6 +589,7 @@ scan_benchmark_run_repeated_scans(const splinterdb_config *cfg, } splinterdb_stats_reset(kvs); + io_reset_stats((io_handle *)splinterdb_get_io_handle(kvs)); uint64 effective_scan_length = scan_length == 0 ? expected_records : scan_length; @@ -729,6 +733,8 @@ scan_benchmark_run_repeated_scans(const splinterdb_config *cfg, if (print_lookup_stats) { splinterdb_stats_print_lookup(kvs); } + io_print_stats((io_handle *)splinterdb_get_io_handle(kvs), + Platform_default_log_handle); splinterdb_close(&kvs); return rc; From 51da21ed67217289cc22c7d3caf7a73cbce89a83 Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Wed, 27 May 2026 16:35:15 -0700 Subject: [PATCH 3/4] Formatting Signed-off-by: Rob Johnson --- src/platform_linux/laio.c | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/platform_linux/laio.c b/src/platform_linux/laio.c index 8d1d224b..0e907b07 100644 --- a/src/platform_linux/laio.c +++ b/src/platform_linux/laio.c @@ -53,9 +53,9 @@ laio_reset_stats(io_handle *ioh) laio_handle *io = (laio_handle *)ioh; for (uint64 i = 0; i < MAX_THREADS; i++) { - io_process_context *pctx = &io->ctx[i]; - pctx->io_submit_count = 0; - pctx->io_submit_eagain = 0; + io_process_context *pctx = &io->ctx[i]; + pctx->io_submit_count = 0; + pctx->io_submit_eagain = 0; pctx->max_observed_io_count = pctx->io_count; memset(pctx->io_submit_hist, 0, sizeof(pctx->io_submit_hist)); } @@ -64,10 +64,10 @@ laio_reset_stats(io_handle *ioh) static void laio_print_stats(io_handle *ioh, platform_log_handle *log_handle) { - laio_handle *io = (laio_handle *)ioh; - uint64 total_submit_count = 0; - uint64 total_submit_eagain = 0; - uint64 max_observed_depth = 0; + laio_handle *io = (laio_handle *)ioh; + uint64 total_submit_count = 0; + uint64 total_submit_eagain = 0; + uint64 max_observed_depth = 0; uint64 hist[LAIO_QD_HIST_BUCKETS] = {0}; for (uint64 i = 0; i < MAX_THREADS; i++) { @@ -81,8 +81,9 @@ laio_print_stats(io_handle *ioh, platform_log_handle *log_handle) } platform_log(log_handle, "LAIO Queue Depth Histogram\n"); - platform_log(log_handle, - "------------------------------------------------------------\n"); + platform_log( + log_handle, + "------------------------------------------------------------\n"); platform_log(log_handle, "successful submits: %lu\n", total_submit_count); platform_log(log_handle, "EAGAIN retries: %lu\n", total_submit_eagain); platform_log(log_handle, "max observed depth: %lu\n", max_observed_depth); @@ -100,8 +101,9 @@ laio_print_stats(io_handle *ioh, platform_log_handle *log_handle) platform_log(log_handle, " %-5lu: %lu\n", bucket, hist[bucket]); } } - platform_log(log_handle, - "------------------------------------------------------------\n"); + platform_log( + log_handle, + "------------------------------------------------------------\n"); } /* @@ -219,9 +221,8 @@ laio_get_thread_context(laio_handle *io) io->ctx[pid].io_submit_count = 0; io->ctx[pid].io_submit_eagain = 0; io->ctx[pid].max_observed_io_count = 0; - memset(io->ctx[pid].io_submit_hist, - 0, - sizeof(io->ctx[pid].io_submit_hist)); + memset( + io->ctx[pid].io_submit_hist, 0, sizeof(io->ctx[pid].io_submit_hist)); int status = io_setup(io->cfg->kernel_queue_size, &io->ctx[pid].ctx); if (status != 0) { platform_error_log( From 23d498b0c4e807c8d2f1bb1f22bcecaba8febd38 Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Wed, 27 May 2026 17:18:51 -0700 Subject: [PATCH 4/4] fix use-after-free Signed-off-by: Rob Johnson --- src/platform_linux/laio.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/platform_linux/laio.c b/src/platform_linux/laio.c index 0e907b07..901e981b 100644 --- a/src/platform_linux/laio.c +++ b/src/platform_linux/laio.c @@ -380,13 +380,15 @@ laio_async_run(io_async_state *gios) async_wait_queue_lock(queue); } - submit_status = io_submit(ios->pctx->ctx, 1, ios->reqs); + io_process_context *pctx = ios->pctx; + + submit_status = io_submit(pctx->ctx, 1, ios->reqs); if (submit_status == 1) { // Successfully submitted, which means that our state was stored on the // kernel's wait queue for this io, which means we have "given away" // our state and therefore must not touch it again before returning. - laio_record_submit_depth(ios->pctx, ios->pctx->io_count); + laio_record_submit_depth(pctx, pctx->io_count); if (queue != NULL) { async_wait_queue_unlock(queue); }