diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ac810cdf..e9b1b27c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,3 +29,61 @@ jobs: if: success() || failure() run: odin check examples/complete -vet --strict-style && odin check examples/client -vet --strict-style timeout-minutes: 1 + + build: + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + opt: [none, minimal, size, speed, aggressive] + pkg: + - . + + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v4 + - uses: laytan/setup-odin@v2 + with: + release: nightly + + - name: Build ${{ matrix.pkg }} + shell: bash + run: | + if [ "${{ matrix.opt }}" = "none" ]; then + odin build ./${{ matrix.pkg }}/ -build-mode:lib -vet -strict-style -o:none -debug + else + odin build ./${{ matrix.pkg }}/ -build-mode:lib -vet -strict-style -o:${{ matrix.opt }} + fi + + test: + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + opt: [none, minimal, size, speed, aggressive] + pkg: + - internal/mpsc + + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v4 + - uses: laytan/setup-odin@v2 + with: + release: nightly + + - name: Test ${{ matrix.pkg }} + shell: bash + run: | + # On Windows, limit test threads to 1 if the package uses concurrent operations. + # Currently internal/mpsc tests are single-threaded; flag kept for future test additions. + THREAD_FLAGS="" + if [ "${{ runner.os }}" = "Windows" ]; then + THREAD_FLAGS="-define:ODIN_TEST_THREADS=1" + fi + if [ "${{ matrix.opt }}" = "none" ]; then + odin test ./${{ matrix.pkg }}/ -vet -strict-style -disallow-do -o:none -debug $THREAD_FLAGS -define:ODIN_TEST_FANCY=false + else + odin test ./${{ matrix.pkg }}/ -vet -strict-style -disallow-do -o:${{ matrix.opt }} $THREAD_FLAGS -define:ODIN_TEST_FANCY=false + fi diff --git a/.gitignore b/.gitignore index 5d51389b..a52aa77e 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,7 @@ minimal complete readme routing + +# Local build scripts (not part of the submodule). +build_and_test_debug.sh +build_and_test.sh diff --git a/examples/async/doc.odin b/examples/async/doc.odin new file mode 100644 index 00000000..14ffa10c --- /dev/null +++ b/examples/async/doc.odin @@ -0,0 +1,53 @@ +/* +Async handler examples for odin-http. + +Async handlers let the handler return immediately and get called again with the result. + +The same proc handles both calls. res.work_data == nil on the first call: + + my_handler :: proc(h: ^http.Handler, req: ^http.Request, res: ^http.Response) { + if res.work_data == nil { + // first call: allocate work struct, mark_async, start background work, return + } else { + // second call: work is done, send response, free work struct + defer { res.work_data = nil } + } + } + +The first call runs inside nbio.tick() — return quickly, the event loop is blocked +until you do. The second call runs after tick() returns, when the server loop +calls the second parts of pending async handlers. + +Allocate the work struct in the first call, store it via mark_async (saved in +res.work_data), read it in the second call, free before returning. + +Examples +-------- +ping_pong.odin no thread; body callback calls mark_async + resume inline +without_body_async.odin no body needed; spawns a thread in the first call +with_body_async.odin reads body first; body callback spawns the thread + +Note: these use thread.create per request to keep the flow easy to follow. +In production, use a worker pool. Only the completion code calls http.resume(res). + +API +--- +mark_async(h, res, work) + Tells the server this request is going async. + Call it before starting background work. + +cancel_async(res) + Call it to undo mark_async when background work fails to start. + +resume(res) + Schedules the second handler call. Call it from the background thread when work + is done, exactly once. Don't touch res after this. + +The background thread owns the work struct and may call resume once. From a +background thread: +- don't read or write res fields - res purpose just carry "async" info between calls +- don't call any http.* proc except resume +- don't allocate from context.temp_allocator (it's the per-connection arena, not thread-safe) + +*/ +package async_examples diff --git a/examples/async/ping_pong.odin b/examples/async/ping_pong.odin new file mode 100644 index 00000000..78b2e15f --- /dev/null +++ b/examples/async/ping_pong.odin @@ -0,0 +1,46 @@ +package async_examples + +import http "../.." + +// No background thread. Body callback calls mark_async + resume directly on the IO thread, +// inside nbio.tick(). The second handler call happens after tick() returns. + +Ping_Pong_Work :: struct { + body: string, +} + +ping_pong_handler :: proc(h: ^http.Handler, req: ^http.Request, res: ^http.Response) { + if res.work_data == nil { + // body callback only gets user_data (res), not h + res.async_handler = h + http.body(req, -1, res, ping_pong_callback) + return + } + + work := (^Ping_Pong_Work)(res.work_data) + defer {res.work_data = nil} + + if work.body == "ping" { + http.respond_plain(res, "pong") + } else { + http.respond(res, http.Status.Unprocessable_Content) + } +} + +// runs on the IO thread inside nbio.tick(); temp_allocator is already the connection arena +ping_pong_callback :: proc(user_data: rawptr, body: http.Body, err: http.Body_Error) { + res := (^http.Response)(user_data) + if err != nil { + http.respond(res, http.body_error_status(err)) + return + } + + work := new(Ping_Pong_Work, context.temp_allocator) + work.body = string(body) + + // mark_async before resume — same rule as the threaded patterns + http.mark_async(res.async_handler, res, work) + + // schedules the second handler call — it runs after tick() returns + http.resume(res) +} diff --git a/examples/async/with_body_async.odin b/examples/async/with_body_async.odin new file mode 100644 index 00000000..0ae0daa6 --- /dev/null +++ b/examples/async/with_body_async.odin @@ -0,0 +1,81 @@ +package async_examples + +import http "../.." +import "core:mem" +import "core:thread" +import "core:time" + +Body_Context :: struct { + alloc: mem.Allocator, +} + +Body_Work :: struct { + alloc: mem.Allocator, + thread: ^thread.Thread, + body: string, + result: string, +} + +body_handler :: proc(h: ^http.Handler, req: ^http.Request, res: ^http.Response) { + if res.work_data == nil { + // body callback only gets user_data (res), not h + res.async_handler = h + http.body(req, -1, res, body_callback) + return + } + + work := (^Body_Work)(res.work_data) + defer { + thread.join(work.thread) // the thread is already done — it called resume before we got here + thread.destroy(work.thread) + free(work, work.alloc) + res.work_data = nil + } + + http.respond_plain(res, work.result) +} + +// runs on the IO thread inside nbio.tick(), after the full body is received +body_callback :: proc(user_data: rawptr, body: http.Body, err: http.Body_Error) { + res := (^http.Response)(user_data) + ctx := (^Body_Context)(res.async_handler.user_data) + + if err != nil { + http.respond(res, http.body_error_status(err)) + return + } + + work := new(Body_Work, ctx.alloc) + work.alloc = ctx.alloc + work.body = string(body) + + // mark_async before thread.start, same as the direct pattern + http.mark_async(res.async_handler, res, work) + + t := thread.create(body_background_proc) + if t == nil { + // both required: cancel_async tells the server, respond tells the client + http.cancel_async(res) + free(work, ctx.alloc) + http.respond(res, http.Status.Internal_Server_Error) + return + } + t.data = res + work.thread = t + thread.start(t) +} + +body_background_proc :: proc(t: ^thread.Thread) { + res := (^http.Response)(t.data) + work := (^Body_Work)(res.work_data) + + // context.temp_allocator is the connection's arena — not ours to use from a background thread + old_temp := context.temp_allocator + defer {context.temp_allocator = old_temp} + + time.sleep(10 * time.Millisecond) + + // write result before calling resume — don't touch res after that + work.result = work.body + http.resume(res) +} diff --git a/examples/async/without_body_async.odin b/examples/async/without_body_async.odin new file mode 100644 index 00000000..e747b86c --- /dev/null +++ b/examples/async/without_body_async.odin @@ -0,0 +1,66 @@ +package async_examples + +import http "../.." +import "core:mem" +import "core:thread" +import "core:time" + +Without_Body_Context :: struct { + alloc: mem.Allocator, +} + +Without_Body_Work :: struct { + alloc: mem.Allocator, + thread: ^thread.Thread, + result: string, +} + +without_body_handler :: proc(h: ^http.Handler, req: ^http.Request, res: ^http.Response) { + ctx := (^Without_Body_Context)(h.user_data) + + if res.work_data == nil { + work := new(Without_Body_Work, ctx.alloc) + work.alloc = ctx.alloc + + // mark_async before thread.start — resume must not schedule the second call before mark_async runs + http.mark_async(h, res, work) + + t := thread.create(without_body_background_proc) + if t == nil { + // both required: cancel_async tells the server, respond tells the client + http.cancel_async(res) + free(work, ctx.alloc) + http.respond(res, http.Status.Internal_Server_Error) + return + } + t.data = res + work.thread = t + thread.start(t) + return + } + + work := (^Without_Body_Work)(res.work_data) + defer { + thread.join(work.thread) // the thread is already done — it called resume before we got here + thread.destroy(work.thread) + free(work, work.alloc) + res.work_data = nil + } + + http.respond_plain(res, work.result) +} + +without_body_background_proc :: proc(t: ^thread.Thread) { + res := (^http.Response)(t.data) + work := (^Without_Body_Work)(res.work_data) + + // context.temp_allocator is the connection's arena — not ours to use from a background thread + old_temp := context.temp_allocator + defer {context.temp_allocator = old_temp} + + time.sleep(10 * time.Millisecond) + + // write result before calling resume — don't touch res after that + work.result = "hello from background" + http.resume(res) +} diff --git a/internal/mpsc/edge_test.odin b/internal/mpsc/edge_test.odin new file mode 100644 index 00000000..49d8d473 --- /dev/null +++ b/internal/mpsc/edge_test.odin @@ -0,0 +1,142 @@ +//+test +package mpsc + +import "core:testing" +import "core:thread" + +// ---------------------------------------------------------------------------- +// Edge cases and stress tests +// ---------------------------------------------------------------------------- + +// test_stub_recycling_explicit exercises the stub-recycling path in pop. +// That path runs when exactly one item remains (head == tail, next == nil). +// Each push/pop cycle of a single item triggers it. +@(test) +test_stub_recycling_explicit :: proc(t: ^testing.T) { + q: Queue(_Test_Msg) + init(&q) + for i in 0 ..< 5 { + msg := _Test_Msg { + data = i, + } + msg_opt: Maybe(^_Test_Msg) = &msg + push(&q, &msg_opt) + got := pop(&q) + testing.expectf(t, got != nil && got.data == i, "round %d: pop should return the pushed message", i) + testing.expectf(t, length(&q) == 0, "round %d: length should be 0 after pop", i) + } +} + +// test_pop_exhausts_queue: push N, pop all — queue empty after. +@(test) +test_pop_exhausts_queue :: proc(t: ^testing.T) { + q: Queue(_Test_Msg) + init(&q) + + N :: 50 + msgs: [N]_Test_Msg + for i in 0 ..< N { + msgs[i].data = i + msg_opt: Maybe(^_Test_Msg) = &msgs[i] + push(&q, &msg_opt) + } + + count := 0 + for length(&q) > 0 || count < N { + if pop(&q) != nil { + count += 1 + } + if length(&q) == 0 && count == N { + break + } + } + + testing.expect(t, count == N, "should pop all pushed messages") + testing.expect(t, length(&q) == 0, "length zero after full pop") +} + +// _Stress_Ctx passes queue and message slice to each producer thread. +@(private) +_Stress_Ctx :: struct { + q: ^Queue(_Test_Msg), + msgs: []_Test_Msg, +} + +_STRESS_PRODUCERS :: 10 +_STRESS_ITEMS_PER_PROD :: 1000 + +// test_concurrent_push_stress: _STRESS_PRODUCERS threads push _STRESS_ITEMS_PER_PROD each. +// Main thread consumes all. No messages lost; length zero after. +@(test) +test_concurrent_push_stress :: proc(t: ^testing.T) { + q: Queue(_Test_Msg) + init(&q) + + total :: _STRESS_PRODUCERS * _STRESS_ITEMS_PER_PROD + + msgs := make([]_Test_Msg, total) + defer delete(msgs) + + ctxs := make([]_Stress_Ctx, _STRESS_PRODUCERS) + defer delete(ctxs) + + for i in 0 ..< _STRESS_PRODUCERS { + ctxs[i] = _Stress_Ctx { + q = &q, + msgs = msgs[i * _STRESS_ITEMS_PER_PROD:(i + 1) * _STRESS_ITEMS_PER_PROD], + } + } + + threads := make([dynamic]^thread.Thread, 0, _STRESS_PRODUCERS) + defer delete(threads) + + for i in 0 ..< _STRESS_PRODUCERS { + th := thread.create_and_start_with_poly_data(&ctxs[i], proc(ctx: ^_Stress_Ctx) { + for j in 0 ..< len(ctx.msgs) { + msg_opt: Maybe(^_Test_Msg) = &ctx.msgs[j] + push(ctx.q, &msg_opt) + } + }) + append(&threads, th) + } + + // Consume all. + received := 0 + for received < total { + if pop(&q) != nil { + received += 1 + } + } + + for th in threads { + thread.join(th) + thread.destroy(th) + } + + testing.expect(t, received == total, "should receive all pushed messages") + testing.expect(t, length(&q) == 0, "length zero after full pop") +} + +// test_length_consistency: after stress, pop count and length must agree. +@(test) +test_length_consistency :: proc(t: ^testing.T) { + q: Queue(_Test_Msg) + init(&q) + + N :: 200 + msgs: [N]_Test_Msg + for i in 0 ..< N { + msg_opt: Maybe(^_Test_Msg) = &msgs[i] + push(&q, &msg_opt) + } + + testing.expect(t, length(&q) == N, "length should equal number of pushes") + + count := 0 + for pop(&q) != nil { + count += 1 + } + + testing.expect(t, count == N, "should pop exactly N messages") + testing.expect(t, length(&q) == 0, "length zero after pop") +} diff --git a/internal/mpsc/queue.odin b/internal/mpsc/queue.odin new file mode 100644 index 00000000..45831e07 --- /dev/null +++ b/internal/mpsc/queue.odin @@ -0,0 +1,104 @@ +package mpsc + +import "base:intrinsics" +import list "core:container/intrusive/list" + +// Queue is a lock-free multi-producer, single-consumer intrusive queue. +// Based on Dmitry Vyukov’s MPSC algorithm. +// See: https://int08h.com/post/ode-to-a-vyukov-queue/ +// +// "Intrusive" means the link field lives inside T, not in a separate wrapper. +// T must have exactly: node: list.Node (no extra allocation per item). +// Do not copy Queue after init — it contains its own internal dummy node. +Queue :: struct($T: typeid) { + head: ^list.Node, // producers only + tail: ^list.Node, // consumer only + stub: list.Node, // dummy node used when queue is empty + len: int, // atomic item count +} + +// init must be called once before any push or pop. +init :: proc(q: ^Queue($T)) where intrinsics.type_has_field(T, "node"), + intrinsics.type_field_type(T, "node") == list.Node { + q.stub.next = nil + q.head = &q.stub + q.tail = &q.stub + q.len = 0 +} + +// push can be called safely from any number of threads at the same time. +// On success the passed Maybe is cleared (the queue now owns the message). +// Passing nil does nothing and returns false. +push :: proc(q: ^Queue($T), msg: ^Maybe(^T)) -> bool where intrinsics.type_has_field(T, "node"), + intrinsics.type_field_type(T, "node") == list.Node { + if msg == nil || msg^ == nil { + return false + } + + ptr := (msg^).? + node := &ptr.node + + intrinsics.atomic_store(&node.next, nil) + prev := intrinsics.atomic_exchange(&q.head, node) + intrinsics.atomic_store(&prev.next, node) + intrinsics.atomic_add(&q.len, 1) + + msg^ = nil + return true +} + +// pop can only be called from the single consumer thread. +// +// It returns nil in two cases: +// 1. The queue is really empty. +// 2. A short "stall" — a producer has started pushing but hasn’t finished linking yet. +// +// During a stall the length may still show > 0. Just call pop again — it will succeed soon. +pop :: proc(q: ^Queue($T)) -> ^T where intrinsics.type_has_field(T, "node"), + intrinsics.type_field_type(T, "node") == list.Node { + tail := q.tail + next := intrinsics.atomic_load(&tail.next) + + if tail == &q.stub { + if next == nil { + return nil + } + q.tail = next + tail = next + next = intrinsics.atomic_load(&tail.next) + } + + if next != nil { + q.tail = next + intrinsics.atomic_sub(&q.len, 1) + return container_of(tail, T, "node") + } + + // Possible stall or last item + head := intrinsics.atomic_load(&q.head) + if tail != head { + return nil // stall + } + + // Only one item left — reuse the dummy stub node + q.stub.next = nil + prev := intrinsics.atomic_exchange(&q.head, &q.stub) + intrinsics.atomic_store(&prev.next, &q.stub) + + next = intrinsics.atomic_load(&tail.next) + if next != nil { + q.tail = next + intrinsics.atomic_sub(&q.len, 1) + return container_of(tail, T, "node") + } + + return nil +} + +// length returns an approximate number of items. +// It may be non-zero while pop still returns nil (during a short stall). +// Use it only for logging or heuristics. +length :: proc(q: ^Queue($T)) -> int where intrinsics.type_has_field(T, "node"), + intrinsics.type_field_type(T, "node") == list.Node { + return intrinsics.atomic_load(&q.len) +} diff --git a/internal/mpsc/queue_test.odin b/internal/mpsc/queue_test.odin new file mode 100644 index 00000000..89662acf --- /dev/null +++ b/internal/mpsc/queue_test.odin @@ -0,0 +1,124 @@ +//+test +package mpsc + +import list "core:container/intrusive/list" +import "core:testing" + +// _Test_Msg is the message type used in all mpsc tests. +_Test_Msg :: struct { + node: list.Node, + data: int, +} + +// ---------------------------------------------------------------------------- +// Unit tests +// ---------------------------------------------------------------------------- + +@(test) +test_init :: proc(t: ^testing.T) { + q: Queue(_Test_Msg) + init(&q) + testing.expect(t, q.head == &q.stub, "head should point to stub after init") + testing.expect(t, q.tail == &q.stub, "tail should point to stub after init") + testing.expect(t, length(&q) == 0, "length should be 0 after init") +} + +@(test) +test_pop_empty :: proc(t: ^testing.T) { + q: Queue(_Test_Msg) + init(&q) + got := pop(&q) + testing.expect(t, got == nil, "pop on empty queue should return nil") +} + +@(test) +test_push_pop_one :: proc(t: ^testing.T) { + q: Queue(_Test_Msg) + init(&q) + msg := _Test_Msg { + data = 42, + } + msg_opt: Maybe(^_Test_Msg) = &msg + push(&q, &msg_opt) + testing.expect(t, length(&q) == 1, "length should be 1 after push") + got := pop(&q) + testing.expect(t, got != nil && got.data == 42, "pop should return the pushed message") + testing.expect(t, length(&q) == 0, "length should be 0 after pop") + got2 := pop(&q) + testing.expect(t, got2 == nil, "second pop should return nil") +} + +@(test) +test_fifo_order :: proc(t: ^testing.T) { + q: Queue(_Test_Msg) + init(&q) + a := _Test_Msg { + data = 1, + } + b := _Test_Msg { + data = 2, + } + c := _Test_Msg { + data = 3, + } + a_opt: Maybe(^_Test_Msg) = &a + push(&q, &a_opt) + b_opt: Maybe(^_Test_Msg) = &b + push(&q, &b_opt) + c_opt: Maybe(^_Test_Msg) = &c + push(&q, &c_opt) + testing.expect(t, length(&q) == 3, "length should be 3 after 3 pushes") + g1 := pop(&q) + g2 := pop(&q) + g3 := pop(&q) + g4 := pop(&q) + testing.expect(t, g1 != nil && g1.data == 1, "first pop should return 1") + testing.expect(t, g2 != nil && g2.data == 2, "second pop should return 2") + testing.expect(t, g3 != nil && g3.data == 3, "third pop should return 3") + testing.expect(t, g4 == nil, "fourth pop should return nil") + testing.expect(t, length(&q) == 0, "length zero after pop") +} + +@(test) +test_push_pop_interleaved :: proc(t: ^testing.T) { + q: Queue(_Test_Msg) + init(&q) + a := _Test_Msg { + data = 10, + } + b := _Test_Msg { + data = 20, + } + a_opt: Maybe(^_Test_Msg) = &a + push(&q, &a_opt) + g1 := pop(&q) + b_opt: Maybe(^_Test_Msg) = &b + push(&q, &b_opt) + g2 := pop(&q) + g3 := pop(&q) + testing.expect(t, g1 != nil && g1.data == 10, "first interleaved pop should return 10") + testing.expect(t, g2 != nil && g2.data == 20, "second interleaved pop should return 20") + testing.expect(t, g3 == nil, "third interleaved pop should return nil") +} + +// ---------------------------------------------------------------------------- +// Example +// ---------------------------------------------------------------------------- + +@(private) +_example_basic_usage :: proc() -> bool { + q: Queue(_Test_Msg) + init(&q) + msg := _Test_Msg { + data = 99, + } + msg_opt: Maybe(^_Test_Msg) = &msg + push(&q, &msg_opt) + got := pop(&q) + return got != nil && got.data == 99 +} + +@(test) +test_example_basic_usage :: proc(t: ^testing.T) { + testing.expect(t, _example_basic_usage(), "basic usage example should work") +} diff --git a/response.odin b/response.odin index e22f82c9..90d48856 100644 --- a/response.odin +++ b/response.odin @@ -1,6 +1,7 @@ package http import "core:bytes" +import list "core:container/intrusive/list" import "core:io" import "core:log" import "core:mem/virtual" @@ -9,6 +10,12 @@ import "core:slice" import "core:strconv" Response :: struct { + // ------ async "header" + using node: list.Node, + async_handler: ^Handler, // handler to resume + work_data: rawptr, // non-nil means async is pending + // ------ + // Add your headers and cookies here directly. headers: Headers, cookies: [dynamic]Cookie, @@ -26,10 +33,12 @@ Response :: struct { _buf: bytes.Buffer, _heading_written: bool, } +// mpsc.Queue assumes node is at offset 0 +#assert(offset_of(Response, node) == 0, "Response.node must remain the first field — required by mpsc.Queue") response_init :: proc(r: ^Response, allocator := context.allocator) { - r.status = .Not_Found - r.cookies.allocator = allocator + r.status = .Not_Found + r.cookies.allocator = allocator r._buf.buf.allocator = allocator headers_init(&r.headers, allocator) @@ -59,7 +68,7 @@ If, after calling, you want to change the status code, use the `response_status` For bodies where you do not know the size or want an `io.Writer`, use the `response_writer_init` procedure to create a writer. */ -body_set :: proc{ +body_set :: proc { body_set_str, body_set_bytes, } @@ -68,7 +77,7 @@ body_set :: proc{ Sets the status code with the safety of being able to do this after writing (part of) the body. */ response_status :: proc(r: ^Response, status: Status) { - if r.status == status { return } + if r.status == status {return} r.status = status @@ -114,14 +123,23 @@ response_writer_init :: proc(rw: ^Response_Writer, r: ^Response, buffer: []byte) _response_write_heading(r, -1) rw.buf = slice.into_dynamic(buffer) - rw.r = r - - rw.w = io.Stream{ - procedure = proc(stream_data: rawptr, mode: io.Stream_Mode, p: []byte, offset: i64, whence: io.Seek_From) -> (n: i64, err: io.Error) { + rw.r = r + + rw.w = io.Stream { + procedure = proc( + stream_data: rawptr, + mode: io.Stream_Mode, + p: []byte, + offset: i64, + whence: io.Seek_From, + ) -> ( + n: i64, + err: io.Error, + ) { ws :: bytes.buffer_write_string write_chunk :: proc(b: ^bytes.Buffer, chunk: []byte) { plen := i64(len(chunk)) - if plen == 0 { return } + if plen == 0 {return} log.debugf("response_writer chunk of size: %i", plen) @@ -213,16 +231,16 @@ You can pass `content_length < 0` to omit the content-length header, note that t required on most responses, but there are things like transfer-encodings that could leave it out. */ _response_write_heading :: proc(r: ^Response, content_length: int) { - if r._heading_written { return } + if r._heading_written {return} r._heading_written = true - ws :: bytes.buffer_write_string + ws :: bytes.buffer_write_string conn := r._conn - b := &r._buf + b := &r._buf - MIN :: len("HTTP/1.1 200 \r\ndate: \r\ncontent-length: 1000\r\n") + DATE_LENGTH + MIN :: len("HTTP/1.1 200 \r\ndate: \r\ncontent-length: 1000\r\n") + DATE_LENGTH AVG_HEADER_SIZE :: 20 - reserve_size := MIN + content_length + (AVG_HEADER_SIZE * headers_count(r.headers)) + reserve_size := MIN + content_length + (AVG_HEADER_SIZE * headers_count(r.headers)) bytes.buffer_grow(&r._buf, reserve_size) // According to RFC 7230 3.1.2 the reason phrase is insignificant, @@ -246,11 +264,9 @@ _response_write_heading :: proc(r: ^Response, content_length: int) { ws(b, "\r\n") } - if ( - content_length > -1 && - !headers_has_unsafe(r.headers, "content-length") && - response_needs_content_length(r, conn) \ - ) { + if (content_length > -1 && + !headers_has_unsafe(r.headers, "content-length") && + response_needs_content_length(r, conn)) { if content_length == 0 { ws(b, "content-length: 0\r\n") } else { @@ -286,6 +302,12 @@ _response_write_heading :: proc(r: ^Response, content_length: int) { // Closes the connection or starts the handling of the next request. @(private) response_send :: proc(r: ^Response, conn: ^Connection, loc := #caller_location) { + // If the connection is already closing or closed, just clean up. + if conn.state >= .Closing || conn.state == .Will_Close { + clean_request_loop(conn) + return + } + assert(!r.sent, "response has already been sent", loc) r.sent = true @@ -326,7 +348,7 @@ response_send_got_body :: proc(r: ^Response, will_close: bool) { conn := r._conn if will_close { - if !connection_set_state(r._conn, .Will_Close) { return } + if !connection_set_state(r._conn, .Will_Close) {return} } if bytes.buffer_length(&r._buf) == 0 { @@ -341,8 +363,10 @@ response_send_got_body :: proc(r: ^Response, will_close: bool) { @(private) on_response_sent :: proc(op: ^nbio.Operation, conn: ^Connection) { if op.send.err != nil { - log.errorf("could not send response: %v", op.send.err) - if !connection_set_state(conn, .Will_Close) { return } + if conn.state < .Closing { + log.warnf("could not send response: %v", op.send.err) + } + if !connection_set_state(conn, .Will_Close) {return} } clean_request_loop(conn) @@ -351,6 +375,10 @@ on_response_sent :: proc(op: ^nbio.Operation, conn: ^Connection) { // Response has been sent, clean up and close/handle next. @(private) clean_request_loop :: proc(conn: ^Connection, close: Maybe(bool) = nil) { + if conn.loop.res.work_data != nil { + cancel_async(&conn.loop.res) + } + context.temp_allocator = virtual.arena_allocator(&conn.temp_allocator) // blocks, size, used := allocator_free_all(&conn.temp_allocator) @@ -368,7 +396,7 @@ clean_request_loop :: proc(conn: ^Connection, close: Maybe(bool) = nil) { if c, ok := close.?; (ok && c) || conn.state == .Will_Close { connection_close(conn) } else { - if !connection_set_state(conn, .Idle) { return } + if !connection_set_state(conn, .Idle) {return} conn_handle_req(conn, context.temp_allocator) } } diff --git a/resume.odin b/resume.odin new file mode 100644 index 00000000..afb70dd3 --- /dev/null +++ b/resume.odin @@ -0,0 +1,68 @@ +package http + +import "base:intrinsics" +import "core:log" +import nbio "core:nbio" +import mpsc "internal/mpsc" + +// Tells the server this request is going async. +// Call from your handler or body callback on the IO thread, before starting background work. +// Pass h — the second call needs the right handler pointer, especially in a middleware chain +// work_data is available in the second call via res.work_data; use 1 if you don't need it. +mark_async :: proc(h: ^Handler, res: ^Response, work_data: rawptr = rawptr(uintptr(1))) { + if res == nil || res._conn == nil || res._conn.owning_thread == nil { + log.error("mark_async: invalid response or connection state") + return + } + + if atomic_load(&res._conn.server.closing) { + log.warn("mark_async: server is closing, ignoring") + return + } + + if h != nil { + res.async_handler = h + } else if res.async_handler == nil { + // h must be set for the second call to reach the right handler + assert(false, "mark_async: h is nil and res.async_handler not set. Always pass h in middleware.") + res.async_handler = &res._conn.server.handler // fallback + } + + res.work_data = work_data + intrinsics.atomic_add(&res._conn.owning_thread.async_pending, 1) + log.debugf("mark_async: pending count is %d", intrinsics.atomic_load(&res._conn.owning_thread.async_pending)) +} + +// Undoes mark_async when background work fails to start. +cancel_async :: proc(res: ^Response) { + if res == nil || res._conn == nil || res._conn.owning_thread == nil { + log.error("cancel_async: invalid response or connection state") + return + } + + if res.work_data == nil { + log.error("cancel_async: response is not async, nothing to undo") + return + } + + intrinsics.atomic_add(&res._conn.owning_thread.async_pending, -1) + log.debugf("cancel_async: pending count is %d", intrinsics.atomic_load(&res._conn.owning_thread.async_pending)) + res.work_data = nil + res.async_handler = nil +} + +// Schedules the second handler call. Call from the background thread when work is done. +// Don't touch res after this. +resume :: proc(res: ^Response) { + if res == nil || res._conn == nil || res._conn.owning_thread == nil { + log.error("resume: invalid response or connection state") + return + } + + td := res._conn.owning_thread + event_loop := td.event_loop + msg: Maybe(^Response) = res + if mpsc.push(&td.resume_queue, &msg) { + nbio.wake_up(event_loop) + } +} diff --git a/scanner.odin b/scanner.odin index 63286304..9c4cfbcb 100644 --- a/scanner.odin +++ b/scanner.odin @@ -147,6 +147,7 @@ scanner_scan :: proc( s.consecutive_empty_reads = 0 s.callback = nil s.user_data = nil + callback(user_data, string(token), s._err) return } diff --git a/server.odin b/server.odin index be92dd2e..b695e363 100644 --- a/server.odin +++ b/server.odin @@ -1,5 +1,6 @@ package http +import "base:intrinsics" import "base:runtime" import "core:bufio" @@ -16,15 +17,16 @@ import "core:slice" import "core:sync" import "core:thread" import "core:time" +import mpsc "internal/mpsc" Server_Opts :: struct { // Whether the server should accept every request that sends a "Expect: 100-continue" header automatically. // Defaults to true. - auto_expect_continue: bool, + auto_expect_continue: bool, // When this is true, any HEAD request is automatically redirected to the handler as a GET request. // Then, when the response is sent, the body is removed from the response. // Defaults to true. - redirect_head_to_get: bool, + redirect_head_to_get: bool, // Limit the maximum number of bytes to read for the request line (first line of request containing the URI). // The HTTP spec does not specify any limits but in practice it is safer. // RFC 7230 3.1.1 says: @@ -32,13 +34,13 @@ Server_Opts :: struct { // practice. It is RECOMMENDED that all HTTP senders and recipients // support, at a minimum, request-line lengths of 8000 octets. // defaults to 8000. - limit_request_line: int, + limit_request_line: int, // Limit the length of the headers. // The HTTP spec does not specify any limits but in practice it is safer. // defaults to 8000. - limit_headers: int, + limit_headers: int, // The thread count to use, defaults to your core count - 1. - thread_count: int, + thread_count: int, // // The initial size of the temp_allocator for each connection, defaults to 256KiB and doubles // // each time it needs to grow. @@ -51,10 +53,10 @@ Server_Opts :: struct { } Default_Server_Opts := Server_Opts { - auto_expect_continue = true, - redirect_head_to_get = true, - limit_request_line = 8000, - limit_headers = 8000, + auto_expect_continue = true, + redirect_head_to_get = true, + limit_request_line = 8000, + limit_headers = 8000, // initial_temp_block_cap = 256 * mem.Kilobyte, // max_free_blocks_queued = 64, } @@ -75,7 +77,6 @@ Server :: struct { tcp_sock: net.TCP_Socket, conn_allocator: mem.Allocator, handler: Handler, - threads: []Server_Thread, // Once the server starts closing/shutdown this is set to true, all threads will check it // and start their thread local shutdown procedure. @@ -93,11 +94,13 @@ Server :: struct { } Server_Thread :: struct { - thread: ^thread.Thread, - event_loop: ^nbio.Event_Loop, - conns: map[net.TCP_Socket]^Connection, - state: Server_State, - accept: ^nbio.Operation, + thread: ^thread.Thread, + event_loop: ^nbio.Event_Loop, + conns: map[net.TCP_Socket]^Connection, + state: Server_State, + accept: ^nbio.Operation, + resume_queue: mpsc.Queue(Response), // async resume queue — consumer: this io thread + async_pending: int, // atomic; counts requests between mark_async and resume // free_temp_blocks: map[int]queue.Queue(^Block), // free_temp_blocks_count: int, @@ -120,7 +123,9 @@ listen :: proc( s: ^Server, endpoint: net.Endpoint = Default_Endpoint, opts: Server_Opts = Default_Server_Opts, -) -> (err: net.Network_Error) { +) -> ( + err: net.Network_Error, +) { s.opts = opts s.conn_allocator = context.allocator // initial_block_cap = int(s.opts.initial_temp_block_cap) @@ -140,7 +145,7 @@ listen :: proc( } serve :: proc(s: ^Server, h: Handler) -> (err: net.Network_Error) { - if atomic_load(&s.closing) { return } + if atomic_load(&s.closing) {return} s.handler = h if s.opts.thread_count == 0 { @@ -165,7 +170,7 @@ serve :: proc(s: ^Server, h: Handler) -> (err: net.Network_Error) { net.shutdown(s.tcp_sock, .Both) net.close(s.tcp_sock) - for t in s.threads[1:] { thread.destroy(t.thread) } + for t in s.threads[1:] {thread.destroy(t.thread)} delete(s.threads) return nil @@ -176,13 +181,16 @@ listen_and_serve :: proc( h: Handler, endpoint: net.Endpoint = Default_Endpoint, opts: Server_Opts = Default_Server_Opts, -) -> (err: net.Network_Error) { +) -> ( + err: net.Network_Error, +) { listen(s, endpoint, opts) or_return return serve(s, h) } _server_thread_init :: proc(s: ^Server, ttd: ^Server_Thread) { td = ttd + mpsc.init(&td.resume_queue) td.conns = make(map[net.TCP_Socket]^Connection) // td.free_temp_blocks = make(map[int]queue.Queue(^Block)) @@ -201,16 +209,70 @@ _server_thread_init :: proc(s: ^Server, ttd: ^Server_Thread) { log.debug("starting event loop") td.state = .Serving + shutdown_start: time.Tick for { - if atomic_load(&s.closing) { _server_thread_shutdown(s) } - if td.state == .Closed { break } - if td.state == .Cleaning { continue } + if atomic_load(&s.closing) { + if intrinsics.atomic_load(&td.async_pending) == 0 { + _server_thread_shutdown(s) + break + } + + if shutdown_start == {} { + shutdown_start = time.tick_now() + } + + if time.tick_since(shutdown_start) > 5 * time.Second { + log.warnf( + "shutdown: %d async requests still pending after 5s timeout — force closing", + td.async_pending, + ) + _server_thread_shutdown(s) + break + } + } + + if td.state == .Closed {break} - err := nbio.tick() + if td.state == .Cleaning {continue} + + tick_ms := -1 if !atomic_load(&s.closing) else 1 * time.Millisecond + err := nbio.tick(tick_ms) if err != nil { log.errorf("non-blocking io tick error: %v", err) break } + + // call the handler (second part) for any async completions + for { + res := mpsc.pop(&td.resume_queue) + if res == nil { + stall := false + for _ in 0 ..< 3 { + res = mpsc.pop(&td.resume_queue) + if res != nil { + stall = true + break + } + } + if !stall { + break + } + } + + // Use the connection arena for the handler. + old_temp := context.temp_allocator + context.temp_allocator = virtual.arena_allocator(&res._conn.temp_allocator) + + // Call the original handler to avoid running middleware twice. + h := res.async_handler if res.async_handler != nil else &res._conn.server.handler + h.handle(h, &res._conn.loop.req, res) + + intrinsics.atomic_add(&td.async_pending, -1) + context.temp_allocator = old_temp + + res.work_data = nil + res.async_handler = nil + } } log.debug("event loop end") @@ -248,24 +310,20 @@ _server_thread_shutdown :: proc(s: ^Server, loc := #caller_location) { td.state = .Closing defer delete(td.conns) - // defer { - // blocks: int - // for _, &bucket in td.free_temp_blocks { - // for block in queue.pop_front_safe(&bucket) { - // blocks += 1 - // free(block) - // } - // queue.destroy(&bucket) - // } - // delete(td.free_temp_blocks) - // log.infof("had %i temp blocks to spare", blocks) - // } + + // cancel any requests still going async — they won't complete during shutdown + for _, conn in td.conns { + if conn.loop.res.work_data != nil { + log.warnf("shutdown: force canceling async request on connection %i", conn.socket) + cancel_async(&conn.loop.res) + } + } for { for sock, conn in td.conns { #partial switch conn.state { case .Active: - log.infof("shutdown: connection %i still active", sock) + log.debugf("shutdown: connection %i active, waiting for response send", sock) case .New, .Idle, .Pending: log.infof("shutdown: closing connection %i", sock) connection_close(conn) @@ -280,8 +338,8 @@ _server_thread_shutdown :: proc(s: ^Server, loc := #caller_location) { break } - err := nbio.tick() - fmt.assertf(err == nil, "IO tick error during shutdown: %v") + err := nbio.tick(1 * time.Millisecond) + fmt.assertf(err == nil, "IO tick error during shutdown: %v", err) } td.state = .Cleaning @@ -366,6 +424,7 @@ Connection :: struct { scanner: Scanner, temp_allocator: virtual.Arena, loop: Loop, + owning_thread: ^Server_Thread, // set once in on_accept, never changes } // Loop/request cycle state. @@ -395,20 +454,28 @@ connection_close :: proc(c: ^Connection, loc := #caller_location) { // to process the closing and receive any remaining data. net.shutdown(c.socket, net.Shutdown_Manner.Send) - nbio.timeout_poly(Conn_Close_Delay, c, proc(_: ^nbio.Operation, c: ^Connection) { - nbio.close_poly(c.socket, c, proc(_: ^nbio.Operation, c: ^Connection) { - log.debugf("closed connection: %i", c.socket) - - c.state = .Closed - - // allocator_destroy(&c.temp_allocator) - virtual.arena_destroy(&c.temp_allocator) - - scanner_destroy(&c.scanner) - delete_key(&td.conns, c.socket) - free(c, c.server.conn_allocator) - }) - }) + nbio.timeout_poly( + Conn_Close_Delay, + c, + proc(_: ^nbio.Operation, c: ^Connection) { + nbio.close_poly( + c.socket, + c, + proc(_: ^nbio.Operation, c: ^Connection) { + log.debugf("closed connection: %i", c.socket) + + c.state = .Closed + + // allocator_destroy(&c.temp_allocator) + virtual.arena_destroy(&c.temp_allocator) + + scanner_destroy(&c.scanner) + delete_key(&td.conns, c.socket) + free(c, c.server.conn_allocator) + }, + ) + }, + ) } @(private) @@ -425,17 +492,27 @@ on_accept :: proc(op: ^nbio.Operation, server: ^Server) { return } - fmt.panicf("accept error: %v", op.accept.err) + if !atomic_load(&server.closing) { + fmt.panicf("accept error: %v", op.accept.err) + } + return } // Accept next connection. td.accept = nbio.accept_poly(server.tcp_sock, server, on_accept) c := new(Connection, server.conn_allocator) + if c == nil { + log.error("on_accept: failed to allocate connection") + net.close(op.accept.client) + return + } + c.state = .New c.server = server c.socket = op.accept.client c.loop.req.client = op.accept.client_endpoint + c.owning_thread = td td.conns[c.socket] = c @@ -459,10 +536,12 @@ conn_handle_reqs :: proc(c: ^Connection) { @(private) conn_handle_req :: proc(c: ^Connection, allocator := context.temp_allocator) { + if atomic_load(&c.server.closing) {return} + on_rline1 :: proc(loop: rawptr, token: string, err: bufio.Scanner_Error) { l := cast(^Loop)loop - if !connection_set_state(l.conn, .Active) { return } + if !connection_set_state(l.conn, .Active) {return} if err != nil { if err == .EOF { @@ -631,7 +710,7 @@ server_date_start :: proc(s: ^Server) { // Updates the time and schedules itself for after a second. @(private) server_date_update :: proc(_: ^nbio.Operation, s: ^Server) { - if atomic_load(&s.closing) { return } + if atomic_load(&s.closing) {return} nbio.timeout_poly(time.Second, s, server_date_update)