thread_pool_count = 0 (default) keeps handlers inline on the loop thread — the measured fast path (BENCH-HTTPZ.md). N > 0 dispatches each parsed request to a std.thread Pool of N workers, completing the httpz two-pool shape: the connection freezes as CONN_HANDLING (no reads, growth, eviction, or recycling — the worker borrows views into its read buffer), the worker runs the handler under a per-job arena and serializes into job-owned bytes, the completion queues under the PoolState mutex, and the loop wakes through the new std.event wake channel (kqueue EVFILT_USER + EV_CLEAR; the epoll twin maps to eventfd), attaches the response, compacts the buffer, and resumes keep-alive/pipeline handling. A full backlog sheds with 503. Stale completions (generation mismatch after close) are dropped. Pool mode requires the server's constructing allocator to be thread-safe (GPA/malloc), documented on the knob. PoolState lives behind a heap pointer (it embeds a Mutex and is shared with workers; the Server struct itself is returned by value). serialize_response/run_handler_job share one serialize_bytes. examples/1633 gains the pooled section (GET, body echo, 404 across worker threads) plus the loop-wake path exercised end to end; AOT run five times. examples/1632 unchanged but the Event struct gains `user`.
140 lines
5.4 KiB
Plaintext
140 lines
5.4 KiB
Plaintext
// std.event — the OS-neutral readiness Loop (PLAN-HTTPZ S5).
|
|
//
|
|
// One Loop multiplexes any number of fds without ever blocking on a
|
|
// single one: register interest with an opaque `udata` word, then
|
|
// `wait` yields normalized Events for whatever became ready. Idle
|
|
// registrations cost nothing — the substrate an httpz-shaped server
|
|
// worker stands on.
|
|
//
|
|
// Backend: kqueue (std/net/kqueue) on darwin. The epoll twin
|
|
// (std/net/epoll, PLAN-HTTPZ S4) slots in behind this same surface
|
|
// when the linux target lands; callers never see the backend.
|
|
//
|
|
// Interest is per direction: read and write are registered and removed
|
|
// independently (mirroring kqueue filters; the epoll backend will
|
|
// compose its event mask internally). The typical server pattern:
|
|
// read interest for a connection's whole life, write interest only
|
|
// while a partial response is pending.
|
|
//
|
|
// Deadlines: the loop deliberately has no timer registrations —
|
|
// httpz-style timeout bookkeeping (request/keepalive eviction) is
|
|
// deadline math the caller does with `deadline_in`/`expired` between
|
|
// waits, passing the nearest deadline as `wait`'s timeout.
|
|
|
|
#import "modules/std.sx";
|
|
kqb :: #import "modules/std/net/kqueue.sx";
|
|
timp :: #import "modules/std/time.sx";
|
|
|
|
EventErr :: error {
|
|
Init, // the kernel queue could not be created
|
|
Register, // an interest change was refused
|
|
Wait, // the wait itself failed (not a timeout)
|
|
}
|
|
|
|
// A normalized readiness report for one registered fd.
|
|
// readable/writable — which direction is ready;
|
|
// eof — the peer finished writing (drain pending bytes, then close);
|
|
// err — the registration itself failed asynchronously;
|
|
// user — a cross-thread wake() (see add_wake), no fd attached;
|
|
// nbytes — bytes readable / writable-buffer space (backend estimate);
|
|
// udata — the word given at registration, verbatim.
|
|
Event :: struct {
|
|
fd: i32 = -1;
|
|
udata: usize = 0;
|
|
readable: bool = false;
|
|
writable: bool = false;
|
|
eof: bool = false;
|
|
err: bool = false;
|
|
user: bool = false; // a wake() delivery, not fd readiness
|
|
nbytes: i64 = 0;
|
|
}
|
|
|
|
Loop :: struct {
|
|
kq: i32 = -1;
|
|
|
|
init :: () -> (Loop, !EventErr) {
|
|
q := kqb.kqueue();
|
|
if q < 0 { raise error.Init; }
|
|
return Loop.{ kq = q };
|
|
}
|
|
|
|
close :: (self: *Loop) {
|
|
if self.kq >= 0 { socket.close(self.kq); }
|
|
self.kq = -1;
|
|
}
|
|
|
|
add_read :: (self: *Loop, fd: i32, udata: usize) -> !EventErr {
|
|
if !kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_READ, kqb.EV_ADD, udata)) { raise error.Register; }
|
|
return;
|
|
}
|
|
del_read :: (self: *Loop, fd: i32) {
|
|
kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_READ, kqb.EV_DELETE, 0));
|
|
}
|
|
add_write :: (self: *Loop, fd: i32, udata: usize) -> !EventErr {
|
|
if !kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_WRITE, kqb.EV_ADD, udata)) { raise error.Register; }
|
|
return;
|
|
}
|
|
del_write :: (self: *Loop, fd: i32) {
|
|
kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_WRITE, kqb.EV_DELETE, 0));
|
|
}
|
|
|
|
// Register the loop's wake channel: wake() from ANY thread makes
|
|
// wait() return an Event carrying `udata` with `.user` set. EV_CLEAR
|
|
// auto-resets, so one registration serves the loop's lifetime.
|
|
// (kqueue EVFILT_USER here; the epoll twin maps to eventfd.)
|
|
add_wake :: (self: *Loop, udata: usize) -> !EventErr {
|
|
ch : kqb.Kevent = .{ ident = 0, filter = kqb.EVFILT_USER, flags = kqb.EV_ADD | kqb.EV_CLEAR, udata = udata };
|
|
if !kqb.kq_apply(self.kq, ch) { raise error.Register; }
|
|
return;
|
|
}
|
|
|
|
// Thread-safe: kevent change submission is safe from any thread.
|
|
wake :: (self: *Loop) {
|
|
ch : kqb.Kevent = .{ ident = 0, filter = kqb.EVFILT_USER, fflags = kqb.NOTE_TRIGGER };
|
|
kqb.kq_apply(self.kq, ch);
|
|
}
|
|
|
|
// Fill `out` with ready events, waiting at most `timeout_ms`
|
|
// (negative = forever). Returns the count; 0 is a timeout.
|
|
wait :: (self: *Loop, out: []Event, timeout_ms: i64) -> (i64, !EventErr) {
|
|
raw : [64]kqb.Kevent = ---;
|
|
cap : i64 = 64;
|
|
if xx out.len < cap { cap = xx out.len; }
|
|
n := kqb.kq_wait(self.kq, @raw[0], xx cap, timeout_ms);
|
|
if n < 0 { raise error.Wait; }
|
|
i := 0;
|
|
while i < n {
|
|
ev := raw[i];
|
|
e : Event = .{ fd = xx ev.ident, udata = ev.udata, nbytes = ev.data };
|
|
if ev.filter == kqb.EVFILT_READ { e.readable = true; }
|
|
if ev.filter == kqb.EVFILT_WRITE { e.writable = true; }
|
|
if ev.filter == kqb.EVFILT_USER { e.user = true; }
|
|
if (ev.flags & kqb.EV_EOF) != 0 { e.eof = true; }
|
|
if (ev.flags & kqb.EV_ERROR) != 0 { e.err = true; }
|
|
out[i] = e;
|
|
i += 1;
|
|
}
|
|
return xx n;
|
|
}
|
|
}
|
|
|
|
// ── deadline helpers (monotonic, std.time) ───────────────────────────
|
|
|
|
// The absolute monotonic instant `ms` from now.
|
|
deadline_in :: (ms: i64) -> i64 {
|
|
return timp.mono_ms() + ms;
|
|
}
|
|
|
|
// True once `deadline` has passed.
|
|
expired :: (deadline: i64) -> bool {
|
|
return timp.mono_ms() >= deadline;
|
|
}
|
|
|
|
// Milliseconds until `deadline`, floored at 0 — the value to hand
|
|
// `wait` so the loop wakes exactly when the nearest deadline fires.
|
|
remaining_ms :: (deadline: i64) -> i64 {
|
|
left := deadline - timp.mono_ms();
|
|
if left < 0 { return 0; }
|
|
return left;
|
|
}
|