feat: std.http pooled handler dispatch (PLAN-HTTPZ S7b)

thread_pool_count = 0 (default) keeps handlers inline on the loop
thread — the measured fast path (BENCH-HTTPZ.md). N > 0 dispatches
each parsed request to a std.thread Pool of N workers, completing the
httpz two-pool shape: the connection freezes as CONN_HANDLING (no
reads, growth, eviction, or recycling — the worker borrows views into
its read buffer), the worker runs the handler under a per-job arena
and serializes into job-owned bytes, the completion queues under the
PoolState mutex, and the loop wakes through the new std.event wake
channel (kqueue EVFILT_USER + EV_CLEAR; the epoll twin maps to
eventfd), attaches the response, compacts the buffer, and resumes
keep-alive/pipeline handling. A full backlog sheds with 503. Stale
completions (generation mismatch after close) are dropped. Pool mode
requires the server's constructing allocator to be thread-safe
(GPA/malloc), documented on the knob.

PoolState lives behind a heap pointer (it embeds a Mutex and is shared
with workers; the Server struct itself is returned by value).
serialize_response/run_handler_job share one serialize_bytes.

examples/1633 gains the pooled section (GET, body echo, 404 across
worker threads) plus the loop-wake path exercised end to end; AOT run
five times. examples/1632 unchanged but the Event struct gains `user`.
This commit is contained in:
agra
2026-06-12 22:31:27 +03:00
parent 7f23bb7530
commit e57a27205e
42 changed files with 95852 additions and 81382 deletions

View File

@@ -35,6 +35,7 @@ EventErr :: error {
// readable/writable — which direction is ready;
// eof — the peer finished writing (drain pending bytes, then close);
// err — the registration itself failed asynchronously;
// user — a cross-thread wake() (see add_wake), no fd attached;
// nbytes — bytes readable / writable-buffer space (backend estimate);
// udata — the word given at registration, verbatim.
Event :: struct {
@@ -44,6 +45,7 @@ Event :: struct {
writable: bool = false;
eof: bool = false;
err: bool = false;
user: bool = false; // a wake() delivery, not fd readiness
nbytes: i64 = 0;
}
@@ -76,6 +78,22 @@ Loop :: struct {
kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_WRITE, kqb.EV_DELETE, 0));
}
// Register the loop's wake channel: wake() from ANY thread makes
// wait() return an Event carrying `udata` with `.user` set. EV_CLEAR
// auto-resets, so one registration serves the loop's lifetime.
// (kqueue EVFILT_USER here; the epoll twin maps to eventfd.)
add_wake :: (self: *Loop, udata: usize) -> !EventErr {
ch : kqb.Kevent = .{ ident = 0, filter = kqb.EVFILT_USER, flags = kqb.EV_ADD | kqb.EV_CLEAR, udata = udata };
if !kqb.kq_apply(self.kq, ch) { raise error.Register; }
return;
}
// Thread-safe: kevent change submission is safe from any thread.
wake :: (self: *Loop) {
ch : kqb.Kevent = .{ ident = 0, filter = kqb.EVFILT_USER, fflags = kqb.NOTE_TRIGGER };
kqb.kq_apply(self.kq, ch);
}
// Fill `out` with ready events, waiting at most `timeout_ms`
// (negative = forever). Returns the count; 0 is a timeout.
wait :: (self: *Loop, out: []Event, timeout_ms: i64) -> (i64, !EventErr) {
@@ -90,6 +108,7 @@ Loop :: struct {
e : Event = .{ fd = xx ev.ident, udata = ev.udata, nbytes = ev.data };
if ev.filter == kqb.EVFILT_READ { e.readable = true; }
if ev.filter == kqb.EVFILT_WRITE { e.writable = true; }
if ev.filter == kqb.EVFILT_USER { e.user = true; }
if (ev.flags & kqb.EV_EOF) != 0 { e.eof = true; }
if (ev.flags & kqb.EV_ERROR) != 0 { e.err = true; }
out[i] = e;

View File

@@ -41,6 +41,14 @@ Config :: struct {
timeout_request_ms: i64 = 5000; // deliver a full request, or evicted
timeout_keepalive_ms: i64 = 5000;// idle between requests, or evicted
request_count: i64 = 100; // requests per connection, then close
// Handler dispatch. 0 (default) runs handlers INLINE on the loop
// thread — measured ~6x faster for fast handlers (BENCH-HTTPZ.md).
// N > 0 dispatches each parsed request to a thread pool of N
// workers (slow handlers stop stalling the loop, httpz's shape);
// requires the Server's constructing allocator to be THREAD-SAFE
// (GPA/malloc — never an Arena).
thread_pool_count: i64 = 0;
thread_pool_backlog: i64 = 500; // queued jobs beyond running ones; full = 503 shed
}
READ_BUF_INITIAL :: 16384;
@@ -127,6 +135,9 @@ CONN_FREE :u8: 0;
CONN_READING :u8: 1; // awaiting a complete request (deadline: request)
CONN_WRITING :u8: 2; // response partially sent (deadline: request)
CONN_KEEPALIVE :u8: 3; // between requests (deadline: keepalive)
CONN_HANDLING :u8: 4; // request dispatched to the pool: the loop must
// not read, evict, or recycle the slot (the
// worker holds views into its read buffer)
Conn :: struct {
fd: i32 = -1;
@@ -141,10 +152,103 @@ Conn :: struct {
served: i64 = 0;
close_after: bool = false;
write_armed: bool = false;
gen: i64 = 0; // bumped on close; a stale pool completion is dropped
}
// The listener's udata; connection udata is the slot index.
LISTENER_UDATA :usize: 0xFFFFFFFF;
// The pool wake channel's udata.
WAKE_UDATA :usize: 0xFFFFFFFE;
// Serialized response bytes (shared by the inline and pooled paths).
OutBytes :: struct {
buf: [*]u8 = null;
len: i64 = 0;
}
// One finished pool job, queued for the loop thread to attach.
Completion :: struct {
slot: i64 = 0;
gen: i64 = 0;
out: OutBytes = .{};
close_after: bool = false;
consumed: i64 = 0; // request bytes to drop from the read buffer
}
// Heap-resident pool machinery: embeds a Mutex and is shared with
// worker threads, so it must never move (the Server struct itself is
// returned by value from init and may be copied).
PoolState :: struct {
pool: *thread.Pool = null;
mu: thread.Mutex = .{};
done: [*]Completion = null;
done_cap: i64 = 0;
done_len: i64 = 0;
loop_copy: event.Loop = .{}; // Loop is an fd wrapper; copy-safe
}
// What a worker needs to run one handler. Allocated (and freed) per
// dispatch from the server's thread-safe allocator; `req`'s views point
// into the connection's read buffer, untouched while CONN_HANDLING.
HandlerJob :: struct {
handler: (*Request, *Response, usize) -> void;
app_ctx: usize = 0;
ps: *PoolState = null;
alloc: Allocator;
req: Request = .{};
slot: i64 = 0;
gen: i64 = 0;
keep: bool = false;
consumed: i64 = 0;
}
// The pool task: fabricated per-job arena over the (thread-safe) server
// allocator; serialize into job-owned bytes; queue the completion; wake
// the loop.
run_handler_job :: (arg: usize) {
job : *HandlerJob = xx arg;
resp : Response = .{};
arena := Arena.init(job.alloc, 65536);
ob : OutBytes = .{};
push Context.{ allocator = xx arena } {
h := job.handler;
h(@job.req, @resp, job.app_ctx);
ob = serialize_bytes(@resp, job.keep, job.alloc);
}
arena.deinit();
ps := job.ps;
done : Completion = .{
slot = job.slot, gen = job.gen, out = ob,
close_after = !job.keep, consumed = job.consumed,
};
ps.mu.lock();
if ps.done_len < ps.done_cap {
ps.done[ps.done_len] = done;
ps.done_len += 1;
}
ps.mu.unlock();
ps.loop_copy.wake();
a := job.alloc;
a.dealloc_bytes(xx job);
}
// Build the response bytes: status line, lengths, connection mode,
// extra headers, body — one allocation from `alloc`.
serialize_bytes :: (resp: *Response, keep: bool, alloc: Allocator) -> OutBytes {
head := concat("HTTP/1.1 ", concat(int_to_string(resp.status), concat(" ", reason_for(resp.status))));
head = concat(head, concat("\r\nContent-Length: ", int_to_string(resp.body.len)));
head = concat(head, concat("\r\nContent-Type: ", resp.content_type));
head = concat(head, if keep then "\r\nConnection: keep-alive\r\n" else "\r\nConnection: close\r\n");
if resp.extra_headers.len > 0 { head = concat(head, resp.extra_headers); }
head = concat(head, "\r\n");
total : i64 = xx (head.len + resp.body.len);
buf : [*]u8 = xx alloc.alloc_bytes(total);
memcpy(buf, head.ptr, head.len);
if resp.body.len > 0 { memcpy(@buf[head.len], resp.body.ptr, resp.body.len); }
return OutBytes.{ buf = buf, len = total };
}
Server :: struct {
cfg: Config;
@@ -157,6 +261,7 @@ Server :: struct {
// config), since the server owns the call site.
handler: (*Request, *Response, usize) -> void;
ctx: usize = 0;
ps: *PoolState = null; // non-null iff cfg.thread_pool_count > 0
init :: (cfg: Config, handler: (*Request, *Response, usize) -> void, ctx: usize) -> (Server, !HttpErr) {
lfd := socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
@@ -184,13 +289,35 @@ Server :: struct {
slots[i] = Conn.{};
i += 1;
}
ps : *PoolState = null;
if cfg.thread_pool_count > 0 {
ps = xx oa.alloc_bytes(size_of(PoolState));
ps.* = PoolState.{};
if !ps.mu.setup() { socket.close(lfd); raise error.Loop; }
// max completions in flight = queued + running
ps.done_cap = cfg.thread_pool_backlog + cfg.thread_pool_count;
ps.done = xx oa.alloc_bytes(ps.done_cap * size_of(Completion));
ps.done_len = 0;
ps.loop_copy = lp;
wre := false;
lp.add_wake(WAKE_UDATA) catch { wre = true; };
if wre { socket.close(lfd); raise error.Loop; }
pool, pce := thread.Pool.create(cfg.thread_pool_count, cfg.thread_pool_backlog);
if pce { socket.close(lfd); raise error.Loop; }
ps.pool = pool;
}
return Server.{
cfg = cfg, loop = lp, lfd = lfd, conns = slots,
own_alloc = oa, handler = handler, ctx = ctx,
own_alloc = oa, handler = handler, ctx = ctx, ps = ps,
};
}
close :: (self: *Server) {
if self.ps != null {
self.ps.pool.shutdown();
}
i : i64 = 0;
while i < self.cfg.max_conn {
if self.conns[i].state != CONN_FREE { self.conn_close(i); }
@@ -226,6 +353,7 @@ Server :: struct {
// read_buf stays allocated — reused by the next connection here.
c.fd = -1;
c.state = CONN_FREE;
c.gen += 1;
c.read_len = 0;
c.out_len = 0;
c.out_sent = 0;
@@ -243,7 +371,7 @@ Server :: struct {
i : i64 = 0;
while i < self.cfg.max_conn {
c := self.conns[i];
if c.state != CONN_FREE {
if c.state != CONN_FREE and c.state != CONN_HANDLING {
left := event.remaining_ms(c.deadline);
if left < wait_ms { wait_ms = left; }
}
@@ -262,9 +390,14 @@ Server :: struct {
self.accept_ready();
continue;
}
if ev.user or ev.udata == WAKE_UDATA {
self.drain_completions();
continue;
}
slot : i64 = xx ev.udata;
c := @self.conns[slot];
if c.state == CONN_FREE or c.fd != ev.fd { continue; } // stale event for a recycled slot
if c.state == CONN_HANDLING { continue; } // buffer frozen until the worker finishes
if ev.writable and c.state == CONN_WRITING {
self.write_more(slot);
continue;
@@ -275,10 +408,12 @@ Server :: struct {
}
// Deadline eviction — after I/O, so a request that just arrived
// under the wire is served, not evicted.
// under the wire is served, not evicted. A HANDLING conn is the
// worker's: never evicted (its read buffer is borrowed).
i = 0;
while i < self.cfg.max_conn {
if self.conns[i].state != CONN_FREE and event.expired(self.conns[i].deadline) {
st := self.conns[i].state;
if st != CONN_FREE and st != CONN_HANDLING and event.expired(self.conns[i].deadline) {
self.conn_close(i);
}
i += 1;
@@ -442,6 +577,27 @@ Server :: struct {
c.served += 1;
keep := req.keep_alive and c.served < self.cfg.request_count;
// Pooled dispatch: freeze this connection (CONN_HANDLING — no
// reads, no eviction, no recycling, so the job's request views
// into the read buffer stay valid), hand the job to a worker,
// and return; the completion re-enters via drain_completions.
// A full backlog sheds with 503 (httpz backpressure).
if self.ps != null {
job : *HandlerJob = xx self.own_alloc.alloc_bytes(size_of(HandlerJob));
job.* = HandlerJob.{
handler = self.handler, app_ctx = self.ctx, ps = self.ps,
alloc = self.own_alloc, req = req,
slot = slot, gen = c.gen, keep = keep, consumed = total,
};
c.state = CONN_HANDLING;
if !self.ps.pool.submit(run_handler_job, xx job) {
self.own_alloc.dealloc_bytes(xx job);
c.state = CONN_READING;
self.respond_error_close(slot, 503);
}
return false;
}
// Dispatch under a per-request arena: everything the handler
// (and serialization) allocates through the implicit context
// dies with the request — response bytes survive because
@@ -478,22 +634,60 @@ Server :: struct {
// start sending — try_serve_one compacts the read buffer between
// serialization and the first write (see the ordering note there).
serialize_response :: (self: *Server, slot: i64, resp: *Response, keep: bool) {
head := concat("HTTP/1.1 ", concat(int_to_string(resp.status), concat(" ", reason_for(resp.status))));
head = concat(head, concat("\r\nContent-Length: ", int_to_string(resp.body.len)));
head = concat(head, concat("\r\nContent-Type: ", resp.content_type));
head = concat(head, if keep then "\r\nConnection: keep-alive\r\n" else "\r\nConnection: close\r\n");
if resp.extra_headers.len > 0 { head = concat(head, resp.extra_headers); }
head = concat(head, "\r\n");
ob := serialize_bytes(resp, keep, self.own_alloc);
c := @self.conns[slot];
c.out_len = xx (head.len + resp.body.len);
c.out_buf = xx self.own_alloc.alloc_bytes(xx c.out_len);
memcpy(c.out_buf, head.ptr, head.len);
if resp.body.len > 0 { memcpy(@c.out_buf[head.len], resp.body.ptr, resp.body.len); }
c.out_buf = ob.buf;
c.out_len = ob.len;
c.out_sent = 0;
c.close_after = !keep;
}
// Attach every queued pool completion to its connection: drop the
// served request bytes, take the response, start writing. A
// completion whose generation no longer matches (the conn was
// closed by Server.close) just frees its bytes.
drain_completions :: (self: *Server) {
if self.ps == null { return; }
ps := self.ps;
ps.mu.lock();
n := ps.done_len;
ps.done_len = 0;
// copy out under the lock; the list is small (<= backlog + workers)
batch : [*]Completion = xx context.allocator.alloc_bytes(if n > 0 then n * size_of(Completion) else 8);
i : i64 = 0;
while i < n {
batch[i] = ps.done[i];
i += 1;
}
ps.mu.unlock();
i = 0;
while i < n {
done := batch[i];
i += 1;
c := @self.conns[done.slot];
if c.state != CONN_HANDLING or c.gen != done.gen {
self.own_alloc.dealloc_bytes(xx done.out.buf);
continue;
}
c.out_buf = done.out.buf;
c.out_len = done.out.len;
c.out_sent = 0;
c.close_after = done.close_after;
// drop the served bytes (dst < src: forward copy is safe)
rest := c.read_len - done.consumed;
m : i64 = 0;
while m < rest {
c.read_buf[m] = c.read_buf[done.consumed + m];
m += 1;
}
c.read_len = rest;
c.state = CONN_READING;
c.deadline = event.deadline_in(self.cfg.timeout_request_ms);
self.write_more(done.slot);
}
}
write_more :: (self: *Server, slot: i64) {
c := @self.conns[slot];
while c.out_sent < c.out_len {

View File

@@ -39,6 +39,10 @@ kevent :: (kq: i32, changelist: *Kevent, nchanges: i32, eventlist: *Kevent, neve
EVFILT_READ :i16: -1;
EVFILT_WRITE :i16: -2;
EVFILT_TIMER :i16: -7;
EVFILT_USER :i16: -10;
// EVFILT_USER fflags
NOTE_TRIGGER :u32: 0x01000000;
// Action/state flags (darwin)
EV_ADD :u16: 0x0001;