feat: linux epoll backend for std.event.Loop (the kqueue twin)
Add library/modules/std/net/epoll.sx — raw epoll bindings, the linux twin of
std/net/kqueue.sx — and branch std.event.Loop on `inline if OS` so the
OS-neutral readiness Loop runs on linux (epoll) as well as darwin (kqueue);
callers never see the backend.
epoll_event has no packed-struct primitive in sx, so it is modelled as an
arch-branched struct of u32 fields — { events, data_lo, data_hi } → 12 bytes on
x86_64 (matching __attribute__((packed))), { events, pad, data_lo, data_hi } →
16 bytes on aarch64 — every field 4-aligned, so the layout is byte-exact for the
kernel ABI with no packed attribute and no unaligned access. The fd is stashed
in data_lo (epoll echoes one data word, not the fd separately).
epoll.sx is self-contained (libc only, no build.sx): the `inline if ARCH`
selecting the struct is resolved by the compiler's flatten pre-pass, so the
module's IR stays small. The epoll backend is imported INSIDE event.sx's
`inline if OS == .linux` branch (not top level): event.sx rides the std.sx
barrel, so a top-level import would register epoll's types into every std
program's type table on darwin and drift every .ir snapshot.
The epoll Loop keeps a small per-fd registration table (combined EPOLLIN/OUT
mask via EPOLL_CTL_ADD/MOD/DEL), maps the fd back to the caller's udata, arms
EPOLLRDHUP so a peer half-close surfaces as Event.eof (matching kqueue EV_EOF),
and uses an eventfd as the cross-thread wake channel (kqueue's EVFILT_USER).
Validation: the kqueue path runs end-to-end on the macOS host (1632 unchanged);
the epoll bindings + ABI layout are corpus-locked ir-only by
examples/event/1633 (x86_64-linux, both arches probe-verified). The epoll Loop
is verified to lower clean for both linux arches and self-reviewed, but is not
corpus-snapshotted (a Loop example drags the std barrel → ~18k-line brittle IR);
runtime behavior validates on a linux runner.
This commit is contained in:
@@ -6,24 +6,51 @@
|
||||
// registrations cost nothing — the substrate an httpz-shaped server
|
||||
// worker stands on.
|
||||
//
|
||||
// Backend: kqueue (std/net/kqueue) on darwin. The epoll twin
|
||||
// (std/net/epoll, PLAN-HTTPZ S4) slots in behind this same surface
|
||||
// when the linux target lands; callers never see the backend.
|
||||
// Backend: kqueue (std/net/kqueue) on darwin, epoll (std/net/epoll) on
|
||||
// linux. The whole `Loop` struct is selected per-OS by `inline if OS`
|
||||
// (the compiler's flatten pre-pass picks the matching top-level decl) —
|
||||
// callers never see the backend. The two backends differ enough in state
|
||||
// that they are separate structs rather than one struct with conditional
|
||||
// fields (sx has no conditional struct fields): kqueue carries only its
|
||||
// queue fd, while epoll keeps a small per-fd registration table (it has
|
||||
// ONE registration per fd with a combined interest mask, and its event
|
||||
// echoes back only a single `data` word — we stash the fd there and the
|
||||
// table maps fd → the caller's udata).
|
||||
//
|
||||
// Interest is per direction: read and write are registered and removed
|
||||
// independently (mirroring kqueue filters; the epoll backend will
|
||||
// compose its event mask internally). The typical server pattern:
|
||||
// read interest for a connection's whole life, write interest only
|
||||
// while a partial response is pending.
|
||||
// independently. On kqueue these are independent EVFILT_* filters; on
|
||||
// epoll the Loop composes the combined EPOLLIN/EPOLLOUT mask internally
|
||||
// and issues EPOLL_CTL_ADD/MOD/DEL. The typical server pattern: read
|
||||
// interest for a connection's whole life, write interest only while a
|
||||
// partial response is pending.
|
||||
//
|
||||
// Deadlines: the loop deliberately has no timer registrations —
|
||||
// httpz-style timeout bookkeeping (request/keepalive eviction) is
|
||||
// deadline math the caller does with `deadline_in`/`expired` between
|
||||
// waits, passing the nearest deadline as `wait`'s timeout.
|
||||
//
|
||||
// VALIDATION: the kqueue path runs end-to-end on the macOS dev host
|
||||
// (examples/event/1632 — which exercises the full facade surface:
|
||||
// add_read/write, add_wake/wake, wait, del_*, EOF). The epoll path has no
|
||||
// linux box here, so it is verified to LOWER clean for x86_64-linux and
|
||||
// aarch64-linux (the whole module + every epoll syscall emits) and is
|
||||
// self-reviewed; it is NOT corpus-snapshotted (a Loop example pulls in the
|
||||
// std barrel → an ~18k-line IR dump that would churn on any unrelated std
|
||||
// change — worse than the gap). The epoll ABI itself (the layout-sensitive
|
||||
// part) IS corpus-locked, by examples/event/1633 over the raw bindings.
|
||||
// Runtime behavior validates on a linux runner.
|
||||
|
||||
#import "modules/std.sx";
|
||||
kqb :: #import "modules/std/net/kqueue.sx";
|
||||
timp :: #import "modules/std/time.sx";
|
||||
// NOTE: the epoll backend is imported INSIDE the `inline if OS == .linux`
|
||||
// branch below, never at top level. event.sx rides the std.sx barrel, so a
|
||||
// top-level `#import "epoll.sx"` would register epoll's types into EVERY std
|
||||
// program's type table on darwin too — drifting every `.ir` snapshot. Scoping
|
||||
// the import to the linux branch keeps darwin's type graph unchanged. (kqb
|
||||
// stays top-level: it was already there before the epoll split, so darwin's
|
||||
// table — and the snapshots — match; on linux its kqueue externs are unused
|
||||
// declares.)
|
||||
|
||||
EventErr :: error {
|
||||
Init, // the kernel queue could not be created
|
||||
@@ -36,7 +63,8 @@ EventErr :: error {
|
||||
// eof — the peer finished writing (drain pending bytes, then close);
|
||||
// err — the registration itself failed asynchronously;
|
||||
// user — a cross-thread wake() (see add_wake), no fd attached;
|
||||
// nbytes — bytes readable / writable-buffer space (backend estimate);
|
||||
// nbytes — bytes readable / writable-buffer space (backend estimate;
|
||||
// kqueue reports it, epoll does not → 0 on linux);
|
||||
// udata — the word given at registration, verbatim.
|
||||
Event :: struct {
|
||||
fd: i32 = -1;
|
||||
@@ -49,6 +77,175 @@ Event :: struct {
|
||||
nbytes: i64 = 0;
|
||||
}
|
||||
|
||||
inline if OS == .linux {
|
||||
|
||||
ep :: #import "modules/std/net/epoll.sx";
|
||||
|
||||
// ── epoll backend (linux) ──────────────────────────────────────────────
|
||||
// epoll reports a single 64-bit `data` per event and carries ONE
|
||||
// registration per fd, so the Loop keeps a tiny table: each `Reg` records
|
||||
// the fd's current combined interest mask and the caller's udata. The fd
|
||||
// itself is stashed in epoll's `data` (so `epoll_wait` reports which fd
|
||||
// fired); the table recovers the udata and lets add/del compose the mask
|
||||
// into an EPOLL_CTL_ADD / MOD / DEL.
|
||||
//
|
||||
// One semantic difference from the kqueue backend: epoll has a SINGLE
|
||||
// udata per fd (not per direction), so registering read and write on the
|
||||
// same fd with different udata words keeps the most recent — a readable
|
||||
// and a writable event on that fd then report the same udata. Callers key
|
||||
// udata on the fd/connection (the universal pattern), so this is
|
||||
// invisible in practice; pass the same udata for both directions of a fd.
|
||||
Reg :: struct {
|
||||
fd: i32 = -1;
|
||||
mask: u32 = 0;
|
||||
udata: usize = 0;
|
||||
}
|
||||
|
||||
Loop :: struct {
|
||||
epfd: i32 = -1;
|
||||
wake_fd: i32 = -1; // eventfd, lazily created by add_wake
|
||||
wake_udata: usize = 0;
|
||||
regs: List(Reg);
|
||||
// The Loop outlives the caller's current `context.allocator` scope, so
|
||||
// capture the owning allocator at init and grow `regs` through it (the
|
||||
// long-lived-container rule).
|
||||
own: Allocator;
|
||||
|
||||
init :: () -> Loop !EventErr {
|
||||
e := ep.ep_create();
|
||||
if e < 0 { raise error.Init; }
|
||||
return Loop.{ epfd = e, regs = .{}, own = context.allocator };
|
||||
}
|
||||
|
||||
close :: (self: *Loop) {
|
||||
if self.epfd >= 0 { socket.close(self.epfd); }
|
||||
if self.wake_fd >= 0 { socket.close(self.wake_fd); }
|
||||
self.regs.deinit(self.own);
|
||||
self.epfd = -1;
|
||||
self.wake_fd = -1;
|
||||
}
|
||||
|
||||
// Index of the registration for `fd`, or -1. Linear scan — fd counts in
|
||||
// the M:1 / per-worker model are small (mirrors the scheduler's waiter
|
||||
// lists).
|
||||
reg_index :: (self: *Loop, fd: i32) -> i64 {
|
||||
i := 0;
|
||||
while i < self.regs.len {
|
||||
if self.regs.items[i].fd == fd { return i; }
|
||||
i += 1;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Drive `fd`'s registration to interest `mask`: ADD a new fd, MOD an
|
||||
// existing one, or DEL (and forget) when the mask drops to zero. The
|
||||
// table is kept in lockstep with the kernel. True on success.
|
||||
apply_mask :: (self: *Loop, fd: i32, mask: u32, udata: usize) -> bool {
|
||||
idx := self.reg_index(fd);
|
||||
if mask == 0 {
|
||||
if idx < 0 { return true; }
|
||||
ok := ep.ep_ctl(self.epfd, ep.EPOLL_CTL_DEL, fd, 0);
|
||||
// swap-remove the forgotten reg (order is irrelevant).
|
||||
self.regs.items[idx] = self.regs.items[self.regs.len - 1];
|
||||
self.regs.len = self.regs.len - 1;
|
||||
return ok;
|
||||
}
|
||||
if idx >= 0 {
|
||||
self.regs.items[idx].mask = mask;
|
||||
self.regs.items[idx].udata = udata;
|
||||
return ep.ep_ctl(self.epfd, ep.EPOLL_CTL_MOD, fd, mask);
|
||||
}
|
||||
self.regs.append(Reg.{ fd = fd, mask = mask, udata = udata }, self.own);
|
||||
return ep.ep_ctl(self.epfd, ep.EPOLL_CTL_ADD, fd, mask);
|
||||
}
|
||||
|
||||
// Read interest also arms EPOLLRDHUP so a peer half-close surfaces as
|
||||
// `Event.eof` — matching kqueue's EV_EOF, which comes for free.
|
||||
add_read :: (self: *Loop, fd: i32, udata: usize) -> !EventErr {
|
||||
idx := self.reg_index(fd);
|
||||
mask := ep.EPOLLIN | ep.EPOLLRDHUP;
|
||||
if idx >= 0 { mask = self.regs.items[idx].mask | ep.EPOLLIN | ep.EPOLLRDHUP; }
|
||||
if !self.apply_mask(fd, mask, udata) { raise error.Register; }
|
||||
return;
|
||||
}
|
||||
del_read :: (self: *Loop, fd: i32) {
|
||||
idx := self.reg_index(fd);
|
||||
if idx < 0 { return; }
|
||||
mask := self.regs.items[idx].mask & ~(ep.EPOLLIN | ep.EPOLLRDHUP);
|
||||
self.apply_mask(fd, mask, self.regs.items[idx].udata);
|
||||
}
|
||||
add_write :: (self: *Loop, fd: i32, udata: usize) -> !EventErr {
|
||||
idx := self.reg_index(fd);
|
||||
mask := ep.EPOLLOUT;
|
||||
if idx >= 0 { mask = self.regs.items[idx].mask | ep.EPOLLOUT; }
|
||||
if !self.apply_mask(fd, mask, udata) { raise error.Register; }
|
||||
return;
|
||||
}
|
||||
del_write :: (self: *Loop, fd: i32) {
|
||||
idx := self.reg_index(fd);
|
||||
if idx < 0 { return; }
|
||||
mask := self.regs.items[idx].mask & ~ep.EPOLLOUT;
|
||||
self.apply_mask(fd, mask, self.regs.items[idx].udata);
|
||||
}
|
||||
|
||||
// The loop's wake channel: an eventfd registered for EPOLLIN. wake()
|
||||
// from any thread writes the 8-byte counter, making wait() return an
|
||||
// Event carrying `udata` with `.user` set. (kqueue uses EVFILT_USER;
|
||||
// epoll's idiom is eventfd.) One registration serves the Loop's life.
|
||||
add_wake :: (self: *Loop, udata: usize) -> !EventErr {
|
||||
if self.wake_fd < 0 {
|
||||
self.wake_fd = ep.eventfd(0, ep.EFD_CLOEXEC | ep.EFD_NONBLOCK);
|
||||
if self.wake_fd < 0 { raise error.Register; }
|
||||
}
|
||||
self.wake_udata = udata;
|
||||
if !ep.ep_ctl(self.epfd, ep.EPOLL_CTL_ADD, self.wake_fd, ep.EPOLLIN) { raise error.Register; }
|
||||
return;
|
||||
}
|
||||
|
||||
// Thread-safe: writing the eventfd counter is atomic.
|
||||
wake :: (self: *Loop) {
|
||||
if self.wake_fd < 0 { return; }
|
||||
one : u64 = 1;
|
||||
socket.write(self.wake_fd, xx @one, 8);
|
||||
}
|
||||
|
||||
// Fill `out` with ready events, waiting at most `timeout_ms`
|
||||
// (negative = forever). Returns the count; 0 is a timeout.
|
||||
wait :: (self: *Loop, out: []Event, timeout_ms: i64) -> i64 !EventErr {
|
||||
raw : [64]ep.EpollEvent = ---;
|
||||
cap : i64 = 64;
|
||||
if xx out.len < cap { cap = xx out.len; }
|
||||
n := ep.ep_wait(self.epfd, .{ ptr = @raw[0], len = cap }, xx cap, xx timeout_ms);
|
||||
if n < 0 { raise error.Wait; }
|
||||
i := 0;
|
||||
while i < n {
|
||||
evr := raw[i];
|
||||
fd := ep.ev_fd(evr);
|
||||
e : Event = .{ fd = fd };
|
||||
if self.wake_fd >= 0 and fd == self.wake_fd {
|
||||
// Drain the eventfd counter so it doesn't re-fire immediately.
|
||||
drain : u64 = 0;
|
||||
socket.read(self.wake_fd, xx @drain, 8);
|
||||
e.user = true;
|
||||
e.udata = self.wake_udata;
|
||||
} else {
|
||||
idx := self.reg_index(fd);
|
||||
if idx >= 0 { e.udata = self.regs.items[idx].udata; }
|
||||
if ep.ev_readable(evr) { e.readable = true; }
|
||||
if ep.ev_writable(evr) { e.writable = true; }
|
||||
if ep.ev_eof(evr) { e.eof = true; }
|
||||
if ep.ev_err(evr) { e.err = true; }
|
||||
}
|
||||
out[i] = e;
|
||||
i += 1;
|
||||
}
|
||||
return xx n;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
// ── kqueue backend (darwin) ────────────────────────────────────────────
|
||||
Loop :: struct {
|
||||
kq: i32 = -1;
|
||||
|
||||
@@ -118,7 +315,10 @@ Loop :: struct {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ── deadline helpers (monotonic, std.time) ───────────────────────────
|
||||
// Backend-independent — shared by both Loop variants.
|
||||
|
||||
// The absolute monotonic instant `ms` from now.
|
||||
deadline_in :: (ms: i64) -> i64 {
|
||||
|
||||
Reference in New Issue
Block a user