Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,21 @@ help:
ds4: ds4_cli.o linenoise.o $(CORE_OBJS)
$(CC) $(CFLAGS) -o $@ ds4_cli.o linenoise.o $(CORE_OBJS) $(METAL_LDLIBS)

ds4-server: ds4_server.o ds4_kvstore.o rax.o $(CORE_OBJS)
$(CC) $(CFLAGS) -o $@ ds4_server.o ds4_kvstore.o rax.o $(CORE_OBJS) $(METAL_LDLIBS)
ds4-server: ds4_server.o ds4_kvstore.o rax.o lz4.o $(CORE_OBJS)
$(CC) $(CFLAGS) -o $@ ds4_server.o ds4_kvstore.o rax.o lz4.o $(CORE_OBJS) $(METAL_LDLIBS)

ds4-bench: ds4_bench.o $(CORE_OBJS)
$(CC) $(CFLAGS) -o $@ ds4_bench.o $(CORE_OBJS) $(METAL_LDLIBS)

ds4-eval: ds4_eval.o $(CORE_OBJS)
$(CC) $(CFLAGS) -o $@ ds4_eval.o $(CORE_OBJS) $(METAL_LDLIBS)

ds4-agent: ds4_agent.o ds4_kvstore.o linenoise.o $(CORE_OBJS)
$(CC) $(CFLAGS) -o $@ ds4_agent.o ds4_kvstore.o linenoise.o $(CORE_OBJS) $(METAL_LDLIBS)
ds4-agent: ds4_agent.o ds4_kvstore.o linenoise.o rax.o lz4.o $(CORE_OBJS)
$(CC) $(CFLAGS) -o $@ ds4_agent.o ds4_kvstore.o linenoise.o rax.o lz4.o $(CORE_OBJS) $(METAL_LDLIBS)

cpu: ds4_cli_cpu.o ds4_server_cpu.o ds4_bench_cpu.o ds4_eval_cpu.o ds4_agent_cpu.o ds4_kvstore.o linenoise.o rax.o $(CPU_CORE_OBJS)
cpu: ds4_cli_cpu.o ds4_server_cpu.o ds4_bench_cpu.o ds4_eval_cpu.o ds4_agent_cpu.o ds4_kvstore.o linenoise.o rax.o lz4.o $(CPU_CORE_OBJS)
$(CC) $(CFLAGS) -o ds4 ds4_cli_cpu.o linenoise.o $(CPU_CORE_OBJS) $(LDLIBS)
$(CC) $(CFLAGS) -o ds4-server ds4_server_cpu.o ds4_kvstore.o rax.o $(CPU_CORE_OBJS) $(LDLIBS)
$(CC) $(CFLAGS) -o ds4-server ds4_server_cpu.o ds4_kvstore.o rax.o lz4.o $(CPU_CORE_OBJS) $(LDLIBS)
$(CC) $(CFLAGS) -o ds4-bench ds4_bench_cpu.o $(CPU_CORE_OBJS) $(LDLIBS)
$(CC) $(CFLAGS) -o ds4-eval ds4_eval_cpu.o $(CPU_CORE_OBJS) $(LDLIBS)
$(CC) $(CFLAGS) -o ds4-agent ds4_agent_cpu.o ds4_kvstore.o linenoise.o $(CPU_CORE_OBJS) $(LDLIBS)
Expand Down Expand Up @@ -98,7 +98,7 @@ cuda:
ds4: ds4_cli.o linenoise.o $(CORE_OBJS)
$(NVCC) $(NVCCFLAGS) -o $@ $^ $(CUDA_LDLIBS)

ds4-server: ds4_server.o ds4_kvstore.o rax.o $(CORE_OBJS)
ds4-server: ds4_server.o ds4_kvstore.o rax.o lz4.o $(CORE_OBJS)
$(NVCC) $(NVCCFLAGS) -o $@ $^ $(CUDA_LDLIBS)

ds4-bench: ds4_bench.o $(CORE_OBJS)
Expand All @@ -107,12 +107,12 @@ ds4-bench: ds4_bench.o $(CORE_OBJS)
ds4-eval: ds4_eval.o $(CORE_OBJS)
$(NVCC) $(NVCCFLAGS) -o $@ $^ $(CUDA_LDLIBS)

ds4-agent: ds4_agent.o ds4_kvstore.o linenoise.o $(CORE_OBJS)
ds4-agent: ds4_agent.o ds4_kvstore.o linenoise.o rax.o lz4.o $(CORE_OBJS)
$(NVCC) $(NVCCFLAGS) -o $@ $^ $(CUDA_LDLIBS)

cpu: ds4_cli_cpu.o ds4_server_cpu.o ds4_bench_cpu.o ds4_eval_cpu.o ds4_agent_cpu.o ds4_kvstore.o linenoise.o rax.o $(CPU_CORE_OBJS)
cpu: ds4_cli_cpu.o ds4_server_cpu.o ds4_bench_cpu.o ds4_eval_cpu.o ds4_agent_cpu.o ds4_kvstore.o linenoise.o rax.o lz4.o $(CPU_CORE_OBJS)
$(CC) $(CFLAGS) -o ds4 ds4_cli_cpu.o linenoise.o $(CPU_CORE_OBJS) $(LDLIBS)
$(CC) $(CFLAGS) -o ds4-server ds4_server_cpu.o ds4_kvstore.o rax.o $(CPU_CORE_OBJS) $(LDLIBS)
$(CC) $(CFLAGS) -o ds4-server ds4_server_cpu.o ds4_kvstore.o rax.o lz4.o $(CPU_CORE_OBJS) $(LDLIBS)
$(CC) $(CFLAGS) -o ds4-bench ds4_bench_cpu.o $(CPU_CORE_OBJS) $(LDLIBS)
$(CC) $(CFLAGS) -o ds4-eval ds4_eval_cpu.o $(CPU_CORE_OBJS) $(LDLIBS)
$(CC) $(CFLAGS) -o ds4-agent ds4_agent_cpu.o ds4_kvstore.o linenoise.o $(CPU_CORE_OBJS) $(LDLIBS)
Expand All @@ -139,10 +139,10 @@ ds4_eval.o: ds4_eval.c ds4.h
ds4_agent.o: ds4_agent.c ds4.h ds4_kvstore.h linenoise.h
$(CC) $(CFLAGS) -c -o $@ ds4_agent.c

ds4_kvstore.o: ds4_kvstore.c ds4_kvstore.h ds4.h
ds4_kvstore.o: ds4_kvstore.c ds4_kvstore.h ds4.h lz4.h
$(CC) $(CFLAGS) -c -o $@ ds4_kvstore.c

ds4_test.o: tests/ds4_test.c ds4_server.c ds4.h ds4_kvstore.h rax.h
ds4_test.o: tests/ds4_test.c ds4_server.c ds4.h ds4_kvstore.h rax.h lz4.h
$(CC) $(CFLAGS) -Wno-unused-function -c -o $@ tests/ds4_test.c

tests/cuda_long_context_smoke.o: tests/cuda_long_context_smoke.c ds4_gpu.h
Expand All @@ -151,6 +151,9 @@ tests/cuda_long_context_smoke.o: tests/cuda_long_context_smoke.c ds4_gpu.h
rax.o: rax.c rax.h rax_malloc.h
$(CC) $(CFLAGS) -c -o $@ rax.c

lz4.o: lz4.c lz4.h
$(CC) $(CFLAGS) -Wno-unused-function -c -o $@ lz4.c

linenoise.o: linenoise.c linenoise.h
$(CC) $(CFLAGS) -c -o $@ linenoise.c

Expand Down Expand Up @@ -181,11 +184,11 @@ ds4_cuda.o: ds4_cuda.cu ds4_gpu.h ds4_iq2_tables_cuda.inc
tests/cuda_long_context_smoke: tests/cuda_long_context_smoke.o ds4_cuda.o
$(NVCC) $(NVCCFLAGS) -o $@ $^ $(CUDA_LDLIBS)

ds4_test: ds4_test.o ds4_kvstore.o rax.o $(CORE_OBJS)
ds4_test: ds4_test.o ds4_kvstore.o rax.o lz4.o $(CORE_OBJS)
ifeq ($(UNAME_S),Darwin)
$(CC) $(CFLAGS) -o $@ ds4_test.o ds4_kvstore.o rax.o $(CORE_OBJS) $(METAL_LDLIBS)
$(CC) $(CFLAGS) -o $@ ds4_test.o ds4_kvstore.o rax.o lz4.o $(CORE_OBJS) $(METAL_LDLIBS)
else
$(NVCC) $(NVCCFLAGS) -o $@ ds4_test.o ds4_kvstore.o rax.o $(CORE_OBJS) $(CUDA_LDLIBS)
$(NVCC) $(NVCCFLAGS) -o $@ ds4_test.o ds4_kvstore.o rax.o lz4.o $(CORE_OBJS) $(CUDA_LDLIBS)
endif

test: ds4_test
Expand Down
86 changes: 74 additions & 12 deletions ds4_agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -2244,13 +2244,30 @@ static bool agent_kv_load_path(agent_worker *w, const char *path,
}

char load_err[160] = {0};
if (ok &&
ds4_session_load_payload(w->session, fp, hdr.payload_bytes,
load_err, sizeof(load_err)) != 0)
{
snprintf(err, err_len, "%s", load_err[0] ? load_err : "failed to load KV payload");
ds4_session_invalidate(w->session);
ok = false;
if (ok) {
int rc = 1;
if (hdr.codec == DS4_KVSTORE_CODEC_LZ4) {
uint64_t uncompressed_total = 0;
const int n_workers = ds4_kvstore_default_options().compression_threads;
FILE *cr = kv_lz4_reader_open(fp, hdr.payload_bytes, hdr.chunk_size,
n_workers > 0 ? n_workers : 1,
&uncompressed_total);
if (!cr) {
snprintf(load_err, sizeof(load_err), "failed to open lz4 reader");
} else {
rc = ds4_session_load_payload(w->session, cr, uncompressed_total,
load_err, sizeof(load_err));
fclose(cr);
}
} else {
rc = ds4_session_load_payload(w->session, fp, hdr.payload_bytes,
load_err, sizeof(load_err));
}
if (rc != 0) {
snprintf(err, err_len, "%s", load_err[0] ? load_err : "failed to load KV payload");
ds4_session_invalidate(w->session);
ok = false;
}
}
fclose(fp);

Expand Down Expand Up @@ -2333,22 +2350,67 @@ static bool agent_kv_save_path(agent_worker *w, const char *path,

const uint64_t now = (uint64_t)time(NULL);
uint8_t h[DS4_KVSTORE_FIXED_HEADER];
/* Stamp NONE / 0 for codec and chunk_size now; the actual values are
* patched back after the payload region is written below. */
ds4_kvstore_fill_header(h, (uint8_t)quant_bits,
ds4_kvstore_reason_code(reason),
0, (uint32_t)tokens->len, 0,
0, DS4_KVSTORE_CODEC_NONE, 0,
(uint32_t)tokens->len, 0,
(uint32_t)ds4_session_ctx(w->session),
now, now, payload_bytes);
uint8_t tb[4];
ds4_kvstore_le_put32(tb, (uint32_t)text_len);

char save_err[160] = {0};
errno = 0;
const int n_workers = ds4_kvstore_default_options().compression_threads;
const uint32_t chunk_bytes = DS4_KVSTORE_DEFAULT_CHUNK_BYTES;
uint8_t codec = DS4_KVSTORE_CODEC_NONE;
uint64_t on_disk_payload = 0;
bool ok = fwrite(h, 1, sizeof(h), fp) == sizeof(h) &&
fwrite(tb, 1, sizeof(tb), fp) == sizeof(tb) &&
fwrite(text, 1, text_len, fp) == text_len &&
ds4_session_save_payload(w->session, fp,
save_err, sizeof(save_err)) == 0 &&
fflush(fp) == 0;
fwrite(text, 1, text_len, fp) == text_len;
if (ok) {
const off_t payload_start = ftello(fp);
if (payload_start < 0) {
ok = false;
} else if (n_workers == 0) {
ok = ds4_session_save_payload(w->session, fp,
save_err, sizeof(save_err)) == 0;
} else {
FILE *cw = kv_lz4_writer_open(fp, chunk_bytes, n_workers);
if (!cw) {
ok = false;
} else {
ok = ds4_session_save_payload(w->session, cw,
save_err, sizeof(save_err)) == 0;
if (fclose(cw) != 0) ok = false;
if (ok) codec = DS4_KVSTORE_CODEC_LZ4;
}
}
const off_t payload_end = ftello(fp);
if (ok && payload_end < 0) ok = false;
if (ok) on_disk_payload = (uint64_t)(payload_end - payload_start);
}
if (ok) ok = fflush(fp) == 0;
/* Patch codec/chunk_size/payload_bytes in the header now that we know
* the real on-disk size. Mirror of the kvstore store path. */
if (ok) {
uint8_t patched[DS4_KVSTORE_FIXED_HEADER];
ds4_kvstore_fill_header(patched, (uint8_t)quant_bits,
ds4_kvstore_reason_code(reason),
0, codec,
codec == DS4_KVSTORE_CODEC_LZ4 ? chunk_bytes : 0,
(uint32_t)tokens->len, 0,
(uint32_t)ds4_session_ctx(w->session),
now, now, on_disk_payload);
if (fseeko(fp, 0, SEEK_SET) != 0 ||
fwrite(patched, 1, sizeof(patched), fp) != sizeof(patched) ||
fflush(fp) != 0)
{
ok = false;
}
}
int saved_errno = errno;
if (fclose(fp) != 0) {
if (!saved_errno) saved_errno = errno;
Expand Down
Loading