// std.event — the OS-neutral readiness Loop (PLAN-HTTPZ S5). // // One Loop multiplexes any number of fds without ever blocking on a // single one: register interest with an opaque `udata` word, then // `wait` yields normalized Events for whatever became ready. Idle // registrations cost nothing — the substrate an httpz-shaped server // worker stands on. // // Backend: kqueue (std/net/kqueue) on darwin. The epoll twin // (std/net/epoll, PLAN-HTTPZ S4) slots in behind this same surface // when the linux target lands; callers never see the backend. // // Interest is per direction: read and write are registered and removed // independently (mirroring kqueue filters; the epoll backend will // compose its event mask internally). The typical server pattern: // read interest for a connection's whole life, write interest only // while a partial response is pending. // // Deadlines: the loop deliberately has no timer registrations — // httpz-style timeout bookkeeping (request/keepalive eviction) is // deadline math the caller does with `deadline_in`/`expired` between // waits, passing the nearest deadline as `wait`'s timeout. #import "modules/std.sx"; kqb :: #import "modules/std/net/kqueue.sx"; timp :: #import "modules/std/time.sx"; EventErr :: error { Init, // the kernel queue could not be created Register, // an interest change was refused Wait, // the wait itself failed (not a timeout) } // A normalized readiness report for one registered fd. // readable/writable — which direction is ready; // eof — the peer finished writing (drain pending bytes, then close); // err — the registration itself failed asynchronously; // nbytes — bytes readable / writable-buffer space (backend estimate); // udata — the word given at registration, verbatim. Event :: struct { fd: i32 = -1; udata: usize = 0; readable: bool = false; writable: bool = false; eof: bool = false; err: bool = false; nbytes: i64 = 0; } Loop :: struct { kq: i32 = -1; init :: () -> (Loop, !EventErr) { q := kqb.kqueue(); if q < 0 { raise error.Init; } return Loop.{ kq = q }; } close :: (self: *Loop) { if self.kq >= 0 { socket.close(self.kq); } self.kq = -1; } add_read :: (self: *Loop, fd: i32, udata: usize) -> !EventErr { if !kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_READ, kqb.EV_ADD, udata)) { raise error.Register; } return; } del_read :: (self: *Loop, fd: i32) { kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_READ, kqb.EV_DELETE, 0)); } add_write :: (self: *Loop, fd: i32, udata: usize) -> !EventErr { if !kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_WRITE, kqb.EV_ADD, udata)) { raise error.Register; } return; } del_write :: (self: *Loop, fd: i32) { kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_WRITE, kqb.EV_DELETE, 0)); } // Fill `out` with ready events, waiting at most `timeout_ms` // (negative = forever). Returns the count; 0 is a timeout. wait :: (self: *Loop, out: []Event, timeout_ms: i64) -> (i64, !EventErr) { raw : [64]kqb.Kevent = ---; cap : i64 = 64; if xx out.len < cap { cap = xx out.len; } n := kqb.kq_wait(self.kq, @raw[0], xx cap, timeout_ms); if n < 0 { raise error.Wait; } i := 0; while i < n { ev := raw[i]; e : Event = .{ fd = xx ev.ident, udata = ev.udata, nbytes = ev.data }; if ev.filter == kqb.EVFILT_READ { e.readable = true; } if ev.filter == kqb.EVFILT_WRITE { e.writable = true; } if (ev.flags & kqb.EV_EOF) != 0 { e.eof = true; } if (ev.flags & kqb.EV_ERROR) != 0 { e.err = true; } out[i] = e; i += 1; } return xx n; } } // ── deadline helpers (monotonic, std.time) ─────────────────────────── // The absolute monotonic instant `ms` from now. deadline_in :: (ms: i64) -> i64 { return timp.mono_ms() + ms; } // True once `deadline` has passed. expired :: (deadline: i64) -> bool { return timp.mono_ms() >= deadline; } // Milliseconds until `deadline`, floored at 0 — the value to hand // `wait` so the loop wakes exactly when the nearest deadline fires. remaining_ms :: (deadline: i64) -> i64 { left := deadline - timp.mono_ms(); if left < 0 { return 0; } return left; }