Files
sx/library/modules/std/event.sx
agra e52b6c9eae docs: record epoll Loop runtime validation on real Linux (Apple container)
The std.event.Loop epoll backend is now runtime-validated, not just
lower-verified: a static aarch64-linux build of the 1632-equivalent Loop test
(plus the eventfd wake path) runs 6/6 green inside an Apple `container` Linux VM
(kernel 6.18 aarch64) — add_read, idle-timeout, readable+fd+udata, the MOD-mask
add_write path, the eventfd wake channel, and EPOLLRDHUP/HUP eof all behave
identically to kqueue (lone difference: nbytes is 0 on epoll). Update the
event.sx VALIDATION note (with the re-run recipe) and the fibers checkpoint;
the epoll deliverable is complete.
2026-06-26 09:53:10 +03:00

345 lines
15 KiB
Plaintext

// std.event — the OS-neutral readiness Loop (PLAN-HTTPZ S5).
//
// One Loop multiplexes any number of fds without ever blocking on a
// single one: register interest with an opaque `udata` word, then
// `wait` yields normalized Events for whatever became ready. Idle
// registrations cost nothing — the substrate an httpz-shaped server
// worker stands on.
//
// Backend: kqueue (std/net/kqueue) on darwin, epoll (std/net/epoll) on
// linux. The whole `Loop` struct is selected per-OS by `inline if OS`
// (the compiler's flatten pre-pass picks the matching top-level decl) —
// callers never see the backend. The two backends differ enough in state
// that they are separate structs rather than one struct with conditional
// fields (sx has no conditional struct fields): kqueue carries only its
// queue fd, while epoll keeps a small per-fd registration table (it has
// ONE registration per fd with a combined interest mask, and its event
// echoes back only a single `data` word — we stash the fd there and the
// table maps fd → the caller's udata).
//
// Interest is per direction: read and write are registered and removed
// independently. On kqueue these are independent EVFILT_* filters; on
// epoll the Loop composes the combined EPOLLIN/EPOLLOUT mask internally
// and issues EPOLL_CTL_ADD/MOD/DEL. The typical server pattern: read
// interest for a connection's whole life, write interest only while a
// partial response is pending.
//
// Deadlines: the loop deliberately has no timer registrations —
// httpz-style timeout bookkeeping (request/keepalive eviction) is
// deadline math the caller does with `deadline_in`/`expired` between
// waits, passing the nearest deadline as `wait`'s timeout.
//
// VALIDATION: the kqueue path runs end-to-end on the macOS dev host
// (examples/event/1632 — full facade surface: add_read/write, add_wake/wake,
// wait, del_*, EOF). The epoll path is RUNTIME-VALIDATED on real Linux: a
// static aarch64-linux build of the 1632-equivalent test (plus the eventfd
// wake path) runs 6/6 green inside an Apple `container` Linux VM (kernel
// 6.18 aarch64) — add_read, idle-timeout, readable+fd+udata, the MOD-mask
// add_write path, the eventfd wake channel, and EPOLLRDHUP/EPOLLHUP eof all
// behave identically to kqueue. The one intentional backend difference is
// `nbytes` (kqueue reports the pending byte count; epoll reports 0). Re-run:
// sx build --target aarch64-linux --self-contained -o /tmp/ev <test>.sx
// container run --rm -v "$PWD/.sx-tmp:/work" alpine /work/ev
// It is NOT corpus-snapshotted (a Loop example pulls in the std barrel → an
// ~18k-line IR dump that churns on any unrelated std change, and the corpus
// runner is host-based, not container-aware). The epoll ABI itself (the
// layout-sensitive part) IS corpus-locked by examples/event/1633.
#import "modules/std.sx";
kqb :: #import "modules/std/net/kqueue.sx";
timp :: #import "modules/std/time.sx";
// NOTE: the epoll backend is imported INSIDE the `inline if OS == .linux`
// branch below, never at top level. event.sx rides the std.sx barrel, so a
// top-level `#import "epoll.sx"` would register epoll's types into EVERY std
// program's type table on darwin too — drifting every `.ir` snapshot. Scoping
// the import to the linux branch keeps darwin's type graph unchanged. (kqb
// stays top-level: it was already there before the epoll split, so darwin's
// table — and the snapshots — match; on linux its kqueue externs are unused
// declares.)
EventErr :: error {
Init, // the kernel queue could not be created
Register, // an interest change was refused
Wait, // the wait itself failed (not a timeout)
}
// A normalized readiness report for one registered fd.
// readable/writable — which direction is ready;
// eof — the peer finished writing (drain pending bytes, then close);
// err — the registration itself failed asynchronously;
// user — a cross-thread wake() (see add_wake), no fd attached;
// nbytes — bytes readable / writable-buffer space (backend estimate;
// kqueue reports it, epoll does not → 0 on linux);
// udata — the word given at registration, verbatim.
Event :: struct {
fd: i32 = -1;
udata: usize = 0;
readable: bool = false;
writable: bool = false;
eof: bool = false;
err: bool = false;
user: bool = false; // a wake() delivery, not fd readiness
nbytes: i64 = 0;
}
inline if OS == .linux {
ep :: #import "modules/std/net/epoll.sx";
// ── epoll backend (linux) ──────────────────────────────────────────────
// epoll reports a single 64-bit `data` per event and carries ONE
// registration per fd, so the Loop keeps a tiny table: each `Reg` records
// the fd's current combined interest mask and the caller's udata. The fd
// itself is stashed in epoll's `data` (so `epoll_wait` reports which fd
// fired); the table recovers the udata and lets add/del compose the mask
// into an EPOLL_CTL_ADD / MOD / DEL.
//
// One semantic difference from the kqueue backend: epoll has a SINGLE
// udata per fd (not per direction), so registering read and write on the
// same fd with different udata words keeps the most recent — a readable
// and a writable event on that fd then report the same udata. Callers key
// udata on the fd/connection (the universal pattern), so this is
// invisible in practice; pass the same udata for both directions of a fd.
Reg :: struct {
fd: i32 = -1;
mask: u32 = 0;
udata: usize = 0;
}
Loop :: struct {
epfd: i32 = -1;
wake_fd: i32 = -1; // eventfd, lazily created by add_wake
wake_udata: usize = 0;
regs: List(Reg);
// The Loop outlives the caller's current `context.allocator` scope, so
// capture the owning allocator at init and grow `regs` through it (the
// long-lived-container rule).
own: Allocator;
init :: () -> Loop !EventErr {
e := ep.ep_create();
if e < 0 { raise error.Init; }
return Loop.{ epfd = e, regs = .{}, own = context.allocator };
}
close :: (self: *Loop) {
if self.epfd >= 0 { socket.close(self.epfd); }
if self.wake_fd >= 0 { socket.close(self.wake_fd); }
self.regs.deinit(self.own);
self.epfd = -1;
self.wake_fd = -1;
}
// Index of the registration for `fd`, or -1. Linear scan — fd counts in
// the M:1 / per-worker model are small (mirrors the scheduler's waiter
// lists).
reg_index :: (self: *Loop, fd: i32) -> i64 {
i := 0;
while i < self.regs.len {
if self.regs.items[i].fd == fd { return i; }
i += 1;
}
return -1;
}
// Drive `fd`'s registration to interest `mask`: ADD a new fd, MOD an
// existing one, or DEL (and forget) when the mask drops to zero. The
// table is kept in lockstep with the kernel. True on success.
apply_mask :: (self: *Loop, fd: i32, mask: u32, udata: usize) -> bool {
idx := self.reg_index(fd);
if mask == 0 {
if idx < 0 { return true; }
ok := ep.ep_ctl(self.epfd, ep.EPOLL_CTL_DEL, fd, 0);
// swap-remove the forgotten reg (order is irrelevant).
self.regs.items[idx] = self.regs.items[self.regs.len - 1];
self.regs.len = self.regs.len - 1;
return ok;
}
if idx >= 0 {
self.regs.items[idx].mask = mask;
self.regs.items[idx].udata = udata;
return ep.ep_ctl(self.epfd, ep.EPOLL_CTL_MOD, fd, mask);
}
self.regs.append(Reg.{ fd = fd, mask = mask, udata = udata }, self.own);
return ep.ep_ctl(self.epfd, ep.EPOLL_CTL_ADD, fd, mask);
}
// Read interest also arms EPOLLRDHUP so a peer half-close surfaces as
// `Event.eof` — matching kqueue's EV_EOF, which comes for free.
add_read :: (self: *Loop, fd: i32, udata: usize) -> !EventErr {
idx := self.reg_index(fd);
mask := ep.EPOLLIN | ep.EPOLLRDHUP;
if idx >= 0 { mask = self.regs.items[idx].mask | ep.EPOLLIN | ep.EPOLLRDHUP; }
if !self.apply_mask(fd, mask, udata) { raise error.Register; }
return;
}
del_read :: (self: *Loop, fd: i32) {
idx := self.reg_index(fd);
if idx < 0 { return; }
mask := self.regs.items[idx].mask & ~(ep.EPOLLIN | ep.EPOLLRDHUP);
self.apply_mask(fd, mask, self.regs.items[idx].udata);
}
add_write :: (self: *Loop, fd: i32, udata: usize) -> !EventErr {
idx := self.reg_index(fd);
mask := ep.EPOLLOUT;
if idx >= 0 { mask = self.regs.items[idx].mask | ep.EPOLLOUT; }
if !self.apply_mask(fd, mask, udata) { raise error.Register; }
return;
}
del_write :: (self: *Loop, fd: i32) {
idx := self.reg_index(fd);
if idx < 0 { return; }
mask := self.regs.items[idx].mask & ~ep.EPOLLOUT;
self.apply_mask(fd, mask, self.regs.items[idx].udata);
}
// The loop's wake channel: an eventfd registered for EPOLLIN. wake()
// from any thread writes the 8-byte counter, making wait() return an
// Event carrying `udata` with `.user` set. (kqueue uses EVFILT_USER;
// epoll's idiom is eventfd.) One registration serves the Loop's life.
add_wake :: (self: *Loop, udata: usize) -> !EventErr {
if self.wake_fd < 0 {
self.wake_fd = ep.eventfd(0, ep.EFD_CLOEXEC | ep.EFD_NONBLOCK);
if self.wake_fd < 0 { raise error.Register; }
}
self.wake_udata = udata;
if !ep.ep_ctl(self.epfd, ep.EPOLL_CTL_ADD, self.wake_fd, ep.EPOLLIN) { raise error.Register; }
return;
}
// Thread-safe: writing the eventfd counter is atomic.
wake :: (self: *Loop) {
if self.wake_fd < 0 { return; }
one : u64 = 1;
socket.write(self.wake_fd, xx @one, 8);
}
// Fill `out` with ready events, waiting at most `timeout_ms`
// (negative = forever). Returns the count; 0 is a timeout.
wait :: (self: *Loop, out: []Event, timeout_ms: i64) -> i64 !EventErr {
raw : [64]ep.EpollEvent = ---;
cap : i64 = 64;
if xx out.len < cap { cap = xx out.len; }
n := ep.ep_wait(self.epfd, .{ ptr = @raw[0], len = cap }, xx cap, xx timeout_ms);
if n < 0 { raise error.Wait; }
i := 0;
while i < n {
evr := raw[i];
fd := ep.ev_fd(evr);
e : Event = .{ fd = fd };
if self.wake_fd >= 0 and fd == self.wake_fd {
// Drain the eventfd counter so it doesn't re-fire immediately.
drain : u64 = 0;
socket.read(self.wake_fd, xx @drain, 8);
e.user = true;
e.udata = self.wake_udata;
} else {
idx := self.reg_index(fd);
if idx >= 0 { e.udata = self.regs.items[idx].udata; }
if ep.ev_readable(evr) { e.readable = true; }
if ep.ev_writable(evr) { e.writable = true; }
if ep.ev_eof(evr) { e.eof = true; }
if ep.ev_err(evr) { e.err = true; }
}
out[i] = e;
i += 1;
}
return xx n;
}
}
} else {
// ── kqueue backend (darwin) ────────────────────────────────────────────
Loop :: struct {
kq: i32 = -1;
init :: () -> Loop !EventErr {
q := kqb.kqueue();
if q < 0 { raise error.Init; }
return Loop.{ kq = q };
}
close :: (self: *Loop) {
if self.kq >= 0 { socket.close(self.kq); }
self.kq = -1;
}
add_read :: (self: *Loop, fd: i32, udata: usize) -> !EventErr {
if !kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_READ, kqb.EV_ADD, udata)) { raise error.Register; }
return;
}
del_read :: (self: *Loop, fd: i32) {
kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_READ, kqb.EV_DELETE, 0));
}
add_write :: (self: *Loop, fd: i32, udata: usize) -> !EventErr {
if !kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_WRITE, kqb.EV_ADD, udata)) { raise error.Register; }
return;
}
del_write :: (self: *Loop, fd: i32) {
kqb.kq_apply(self.kq, kqb.kev_change(fd, kqb.EVFILT_WRITE, kqb.EV_DELETE, 0));
}
// Register the loop's wake channel: wake() from ANY thread makes
// wait() return an Event carrying `udata` with `.user` set. EV_CLEAR
// auto-resets, so one registration serves the loop's lifetime.
// (kqueue EVFILT_USER here; the epoll twin maps to eventfd.)
add_wake :: (self: *Loop, udata: usize) -> !EventErr {
ch : kqb.Kevent = .{ ident = 0, filter = kqb.EVFILT_USER, flags = kqb.EV_ADD | kqb.EV_CLEAR, udata = udata };
if !kqb.kq_apply(self.kq, ch) { raise error.Register; }
return;
}
// Thread-safe: kevent change submission is safe from any thread.
wake :: (self: *Loop) {
ch : kqb.Kevent = .{ ident = 0, filter = kqb.EVFILT_USER, fflags = kqb.NOTE_TRIGGER };
kqb.kq_apply(self.kq, ch);
}
// Fill `out` with ready events, waiting at most `timeout_ms`
// (negative = forever). Returns the count; 0 is a timeout.
wait :: (self: *Loop, out: []Event, timeout_ms: i64) -> i64 !EventErr {
raw : [64]kqb.Kevent = ---;
cap : i64 = 64;
if xx out.len < cap { cap = xx out.len; }
n := kqb.kq_wait(self.kq, @raw[0], xx cap, timeout_ms);
if n < 0 { raise error.Wait; }
i := 0;
while i < n {
ev := raw[i];
e : Event = .{ fd = xx ev.ident, udata = ev.udata, nbytes = ev.data };
if ev.filter == kqb.EVFILT_READ { e.readable = true; }
if ev.filter == kqb.EVFILT_WRITE { e.writable = true; }
if ev.filter == kqb.EVFILT_USER { e.user = true; }
if (ev.flags & kqb.EV_EOF) != 0 { e.eof = true; }
if (ev.flags & kqb.EV_ERROR) != 0 { e.err = true; }
out[i] = e;
i += 1;
}
return xx n;
}
}
}
// ── deadline helpers (monotonic, std.time) ───────────────────────────
// Backend-independent — shared by both Loop variants.
// The absolute monotonic instant `ms` from now.
deadline_in :: (ms: i64) -> i64 {
return timp.mono_ms() + ms;
}
// True once `deadline` has passed.
expired :: (deadline: i64) -> bool {
return timp.mono_ms() >= deadline;
}
// Milliseconds until `deadline`, floored at 0 — the value to hand
// `wait` so the loop wakes exactly when the nearest deadline fires.
remaining_ms :: (deadline: i64) -> i64 {
left := deadline - timp.mono_ms();
if left < 0 { return 0; }
return left;
}