Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion src/platform_linux/laio.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,74 @@

#define LAIO_CLEANER_TERMINATE_CALLBACK NULL

static inline void
laio_record_submit_depth(io_process_context *pctx, uint64 depth)
{
uint64 bucket = MIN(depth, (uint64)LAIO_QD_HIST_BUCKETS - 1);
pctx->io_submit_hist[bucket]++;
pctx->io_submit_count++;
pctx->max_observed_io_count = MAX(pctx->max_observed_io_count, depth);
}

static void
laio_reset_stats(io_handle *ioh)
{
laio_handle *io = (laio_handle *)ioh;

for (uint64 i = 0; i < MAX_THREADS; i++) {
io_process_context *pctx = &io->ctx[i];
pctx->io_submit_count = 0;
pctx->io_submit_eagain = 0;
pctx->max_observed_io_count = pctx->io_count;
memset(pctx->io_submit_hist, 0, sizeof(pctx->io_submit_hist));
}
}

static void
laio_print_stats(io_handle *ioh, platform_log_handle *log_handle)
{
laio_handle *io = (laio_handle *)ioh;
uint64 total_submit_count = 0;
uint64 total_submit_eagain = 0;
uint64 max_observed_depth = 0;
uint64 hist[LAIO_QD_HIST_BUCKETS] = {0};

for (uint64 i = 0; i < MAX_THREADS; i++) {
io_process_context *pctx = &io->ctx[i];
total_submit_count += pctx->io_submit_count;
total_submit_eagain += pctx->io_submit_eagain;
max_observed_depth = MAX(max_observed_depth, pctx->max_observed_io_count);
for (uint64 bucket = 0; bucket < LAIO_QD_HIST_BUCKETS; bucket++) {
hist[bucket] += pctx->io_submit_hist[bucket];
}
}

platform_log(log_handle, "LAIO Queue Depth Histogram\n");
platform_log(
log_handle,
"------------------------------------------------------------\n");
platform_log(log_handle, "successful submits: %lu\n", total_submit_count);
platform_log(log_handle, "EAGAIN retries: %lu\n", total_submit_eagain);
platform_log(log_handle, "max observed depth: %lu\n", max_observed_depth);
platform_log(log_handle, "submit-depth histogram:\n");
for (uint64 bucket = 0; bucket < LAIO_QD_HIST_BUCKETS; bucket++) {
if (hist[bucket] == 0) {
continue;
}
if (bucket + 1 == LAIO_QD_HIST_BUCKETS) {
platform_log(log_handle,
" >=%-3u : %lu\n",
IO_DEFAULT_KERNEL_QUEUE_SIZE,
hist[bucket]);
} else {
platform_log(log_handle, " %-5lu: %lu\n", bucket, hist[bucket]);
}
}
platform_log(
log_handle,
"------------------------------------------------------------\n");
}

/*
* Context management
*/
Expand Down Expand Up @@ -149,6 +217,12 @@ laio_get_thread_context(laio_handle *io)
__sync_lock_release(&io->ctx[pid].lock);
return &io->ctx[pid];
}
io->ctx[pid].io_count = 0;
io->ctx[pid].io_submit_count = 0;
io->ctx[pid].io_submit_eagain = 0;
io->ctx[pid].max_observed_io_count = 0;
memset(
io->ctx[pid].io_submit_hist, 0, sizeof(io->ctx[pid].io_submit_hist));
int status = io_setup(io->cfg->kernel_queue_size, &io->ctx[pid].ctx);
if (status != 0) {
platform_error_log(
Expand Down Expand Up @@ -306,12 +380,15 @@ laio_async_run(io_async_state *gios)
async_wait_queue_lock(queue);
}

submit_status = io_submit(ios->pctx->ctx, 1, ios->reqs);
io_process_context *pctx = ios->pctx;

submit_status = io_submit(pctx->ctx, 1, ios->reqs);

if (submit_status == 1) {
// Successfully submitted, which means that our state was stored on the
// kernel's wait queue for this io, which means we have "given away"
// our state and therefore must not touch it again before returning.
laio_record_submit_depth(pctx, pctx->io_count);
if (queue != NULL) {
async_wait_queue_unlock(queue);
}
Expand Down Expand Up @@ -340,6 +417,7 @@ laio_async_run(io_async_state *gios)
} else if (queue != NULL) {
// Transient failure to submit, so we still own our state. Wait to try
// again.
ios->pctx->io_submit_eagain++;
async_wait_queue_append(
queue, &ios->waiter_node, ios->callback, ios->callback_arg);
async_yield_after(ios, async_wait_queue_unlock(queue));
Expand All @@ -348,7 +426,9 @@ laio_async_run(io_async_state *gios)
// Transient failure to submit, so we still own our state, but we were
// trying optimistically to submit w/o locking our wait queue. So try
// again with lock held.
ios->pctx->io_submit_eagain++;
queue = &ios->pctx->submit_waiters;
continue;
}
}

Expand Down Expand Up @@ -545,6 +625,8 @@ static io_ops laio_ops = {
.async_state_init = laio_async_state_init,
.cleanup = laio_cleanup,
.wait_all = laio_wait_all,
.print_stats = laio_print_stats,
.reset_stats = laio_reset_stats,
};

/*
Expand Down
8 changes: 7 additions & 1 deletion src/platform_linux/laio.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ typedef enum process_context_state {
PROCESS_CONTEXT_STATE_SHUTTING_DOWN,
} process_context_state;

#define LAIO_QD_HIST_BUCKETS (IO_DEFAULT_KERNEL_QUEUE_SIZE + 2)

typedef struct io_process_context {
process_context_state state;
uint64 lock;
uint64 io_count; // inflight ios
uint64 io_submit_count;
uint64 io_submit_eagain;
uint64 io_submit_hist[LAIO_QD_HIST_BUCKETS];
uint64 max_observed_io_count;
io_context_t ctx;
pthread_t io_cleaner;
async_wait_queue submit_waiters;
Expand All @@ -50,4 +56,4 @@ laio_handle_create(io_config *cfg, platform_heap_id hid);

// The IO system must be quiesced before calling this function.
void
laio_handle_destroy(io_handle *ioh);
laio_handle_destroy(io_handle *ioh);
20 changes: 20 additions & 0 deletions src/platform_linux/platform_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ typedef void (*io_wait_all_fn)(io_handle *io);
typedef void (*io_register_thread_fn)(io_handle *io);
typedef void (*io_deregister_thread_fn)(io_handle *io);
typedef bool32 (*io_max_latency_elapsed_fn)(io_handle *io, timestamp ts);
typedef void (*io_print_stats_fn)(io_handle *io, platform_log_handle *log);
typedef void (*io_reset_stats_fn)(io_handle *io);

typedef void *(*io_get_context_fn)(io_handle *io);

Expand All @@ -96,6 +98,8 @@ typedef struct io_ops {
io_register_thread_fn register_thread;
io_deregister_thread_fn deregister_thread;
io_max_latency_elapsed_fn max_latency_elapsed;
io_print_stats_fn print_stats;
io_reset_stats_fn reset_stats;
io_get_context_fn get_context;
} io_ops;

Expand Down Expand Up @@ -225,6 +229,22 @@ io_max_latency_elapsed(io_handle *io, timestamp ts)
return TRUE;
}

static inline void
io_print_stats(io_handle *io, platform_log_handle *log_handle)
{
if (io->ops->print_stats) {
io->ops->print_stats(io, log_handle);
}
}

static inline void
io_reset_stats(io_handle *io)
{
if (io->ops->reset_stats) {
io->ops->reset_stats(io);
}
}

/*
*-----------------------------------------------------------------------------
* io_config_init --
Expand Down
6 changes: 6 additions & 0 deletions tests/functional/scan_benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ scan_benchmark_run_scan(const splinterdb_config *cfg,
}

splinterdb_stats_reset(kvs);
io_reset_stats((io_handle *)splinterdb_get_io_handle(kvs));

splinterdb_iterator *iter = NULL;
timestamp start_time = platform_get_timestamp();
Expand Down Expand Up @@ -562,6 +563,8 @@ scan_benchmark_run_scan(const splinterdb_config *cfg,
if (print_lookup_stats) {
splinterdb_stats_print_lookup(kvs);
}
io_print_stats((io_handle *)splinterdb_get_io_handle(kvs),
Platform_default_log_handle);

splinterdb_iterator_deinit(iter);
splinterdb_close(&kvs);
Expand All @@ -586,6 +589,7 @@ scan_benchmark_run_repeated_scans(const splinterdb_config *cfg,
}

splinterdb_stats_reset(kvs);
io_reset_stats((io_handle *)splinterdb_get_io_handle(kvs));

uint64 effective_scan_length =
scan_length == 0 ? expected_records : scan_length;
Expand Down Expand Up @@ -729,6 +733,8 @@ scan_benchmark_run_repeated_scans(const splinterdb_config *cfg,
if (print_lookup_stats) {
splinterdb_stats_print_lookup(kvs);
}
io_print_stats((io_handle *)splinterdb_get_io_handle(kvs),
Platform_default_log_handle);

splinterdb_close(&kvs);
return rc;
Expand Down
Loading