Files
sx/library/modules/std/net/kqueue.sx
agra e57a27205e feat: std.http pooled handler dispatch (PLAN-HTTPZ S7b)
thread_pool_count = 0 (default) keeps handlers inline on the loop
thread — the measured fast path (BENCH-HTTPZ.md). N > 0 dispatches
each parsed request to a std.thread Pool of N workers, completing the
httpz two-pool shape: the connection freezes as CONN_HANDLING (no
reads, growth, eviction, or recycling — the worker borrows views into
its read buffer), the worker runs the handler under a per-job arena
and serializes into job-owned bytes, the completion queues under the
PoolState mutex, and the loop wakes through the new std.event wake
channel (kqueue EVFILT_USER + EV_CLEAR; the epoll twin maps to
eventfd), attaches the response, compacts the buffer, and resumes
keep-alive/pipeline handling. A full backlog sheds with 503. Stale
completions (generation mismatch after close) are dropped. Pool mode
requires the server's constructing allocator to be thread-safe
(GPA/malloc), documented on the knob.

PoolState lives behind a heap pointer (it embeds a Mutex and is shared
with workers; the Server struct itself is returned by value).
serialize_response/run_handler_job share one serialize_bytes.

examples/1633 gains the pooled section (GET, body echo, 404 across
worker threads) plus the loop-wake path exercised end to end; AOT run
five times. examples/1632 unchanged but the Event struct gains `user`.
2026-06-12 22:31:27 +03:00

86 lines
2.9 KiB
Plaintext

// std/net/kqueue — raw kqueue/kevent bindings (PLAN-HTTPZ S3).
// darwin-only by definition; the linux twin is std/net/epoll (S4) and
// the OS-neutral Loop facade over both is std.event (S5). Import this
// module explicitly — it deliberately does not ride the std.sx barrel.
//
// One kernel queue multiplexes readiness for any number of fds: a
// registered (ident, filter) pair reports through `kevent` when ready,
// and an idle registration costs nothing — the head-of-line-free
// substrate httpz workers stand on.
libc :: #library "c";
// darwin 64-bit struct kevent — 32 bytes:
// uintptr_t ident; int16_t filter; uint16_t flags; uint32_t fflags;
// intptr_t data; void *udata;
// `ident` is the fd for READ/WRITE filters; `udata` is an opaque
// caller word handed back verbatim with each event (connection
// pointer/index in a loop).
Kevent :: struct {
ident: usize = 0;
filter: i16 = 0;
flags: u16 = 0;
fflags: u32 = 0;
data: i64 = 0;
udata: usize = 0;
}
// kevent's timeout — same layout as std.time's Timespec, declared
// locally so the module stands alone.
KqTimespec :: struct {
sec: i64 = 0;
nsec: i64 = 0;
}
kqueue :: () -> i32 #foreign libc;
kevent :: (kq: i32, changelist: *Kevent, nchanges: i32, eventlist: *Kevent, nevents: i32, timeout: *KqTimespec) -> i32 #foreign libc;
// Filters (darwin)
EVFILT_READ :i16: -1;
EVFILT_WRITE :i16: -2;
EVFILT_TIMER :i16: -7;
EVFILT_USER :i16: -10;
// EVFILT_USER fflags
NOTE_TRIGGER :u32: 0x01000000;
// Action/state flags (darwin)
EV_ADD :u16: 0x0001;
EV_DELETE :u16: 0x0002;
EV_ENABLE :u16: 0x0004;
EV_DISABLE :u16: 0x0008;
EV_ONESHOT :u16: 0x0010;
EV_CLEAR :u16: 0x0020;
EV_ERROR :u16: 0x4000;
EV_EOF :u16: 0x8000;
// A change entry for one (fd, filter) registration.
kev_change :: (fd: i32, filter: i16, flags: u16, udata: usize) -> Kevent {
return Kevent.{ ident = xx fd, filter = filter, flags = flags, udata = udata };
}
// Apply one registration change immediately (no event drain).
// True on success.
kq_apply :: (kq: i32, change: Kevent) -> bool {
ch := change;
return kevent(kq, @ch, 1, null, 0, null) >= 0;
}
// Drain ready events into `events` (capacity `cap`), waiting at most
// `timeout_ms` (negative = wait forever). Returns the event count
// (0 = timeout); -1 only for a real kevent failure — EINTR is retried.
kq_wait :: (kq: i32, events: *Kevent, cap: i32, timeout_ms: i64) -> i32 {
ts : KqTimespec = .{ sec = timeout_ms / 1000, nsec = (timeout_ms % 1000) * 1000000 };
while true {
n := if timeout_ms < 0
then kevent(kq, null, 0, events, cap, null)
else kevent(kq, null, 0, events, cap, @ts);
if n >= 0 { return n; }
if errno_slot_kq().* != 4 { return -1; } // 4 = EINTR: reissue
}
return -1;
}
// errno, bound locally (the std.socket accessor is module-scoped there).
errno_slot_kq :: () -> *i32 #foreign libc "__error";