fibers: event-loop Io — real fd readiness via kqueue (B1.4c)

A fiber can block on a file descriptor and the run loop blocks on
kevent until the kernel reports it ready. Reuses the existing
std/net/kqueue.sx bindings. Scheduler gains a lazy kq fd + an
io_waiters list; block_on_fd arms a one-shot EVFILT_READ registration,
records an IoWaiter, and suspends. Run-loop Mode 2: when the ready
queue drains and no timer is pending, block on kq_wait(-1), match each
fired ident to its waiter, evict it, wake the fiber. wake evicts a
pending fd-waiter (cancel_io_waiter_for) so no stale IoWaiter outlives
a reaped fiber.

Adversarial review found two CRITICALs: (1) two fibers on the same fd
share one kqueue registration (macOS EV_ADD replaces), so one is lost
and the loop hangs -- fixed by enforcing one-waiter-per-fd with a loud
abort; (2) an fd-waiter on a never-ready fd 'hangs' -- reclassified as
correct event-loop semantics (a server idling on a socket), with the
misleading orphan-check comment corrected. UAF parity, ident width,
EINTR handling, timer/io precedence all probed safe.

Example: 1816 (pipe roundtrip -- reader blocks, writer writes, reader
wakes via kqueue). macOS only; linux epoll twin deferred. Suite green 754/0.
This commit is contained in:
agra
2026-06-21 19:39:16 +03:00
parent 62ffea0663
commit 1b0d640f73
8 changed files with 433 additions and 40 deletions

View File

@@ -25,6 +25,7 @@
// page are Apple-specific. Runs end-to-end on a matching host, ir-only on a
// mismatch.
#import "modules/std.sx";
kqb :: #import "modules/std/net/kqueue.sx";
// --- libc mmap stack primitives -------------------------------------------
@@ -39,6 +40,12 @@ MAP_AP :: 0x1002; // macOS MAP_PRIVATE (0x2) | MAP_ANON (0x1000)
GUARD :: 16384; // one 16 KB page (aarch64-macOS)
STACK :: 131072; // 128 KB usable per fiber
// Max fd events drained per kqueue wait (B1.4c). Sized for the M:1 model's
// small fiber counts; a wait that fills it just drains the rest on the next
// loop iteration (the woken fibers run, the queue re-drains, the still-pending
// waiters block again).
MAXEV :: 16;
// --- core types ------------------------------------------------------------
// Saved context: x19..x28 (10), x29/fp, x30/lr, sp — 13 u64 slots.
@@ -66,6 +73,18 @@ Timer :: struct {
fiber: *Fiber;
}
// B1.4c: a fiber parked on REAL fd readiness. Unlike a `Timer` (virtual
// time), an `IoWaiter` blocks the whole scheduler on `kevent` until the
// kernel reports `fd` readable, then wakes `fiber`. Stored in
// `Scheduler.io_waiters`; the registration is one-shot (EV_ONESHOT), so the
// kernel auto-removes it after firing — we only have to drop the waiter
// record. `cancel_io_waiter_for` evicts a stale record (mirror of
// `cancel_timer_for`) so a reaped fiber's waiter can never be woken.
IoWaiter :: struct {
fd: i32;
fiber: *Fiber;
}
Scheduler :: struct {
sched_ctx: FiberCtx; // the scheduler loop's own saved context
current: *Fiber; // running fiber; null while in the scheduler loop
@@ -85,6 +104,19 @@ Scheduler :: struct {
// through `own_allocator` (long-lived-container
// rule: a timer outlives the `sleep` call's scope).
// --- B1.4c: real fd-readiness blocking via kqueue ----------------------
kq: i32; // the kqueue fd. LAZY: -1 until the first
// `block_on_fd` opens it, so a pure-compute /
// virtual-timer scheduler never opens a kqueue
// fd (no leak for the common case). Once opened it
// lives for the scheduler's lifetime; there is no
// deinit yet, so it leaks one fd at program exit
// (bounded, harmless — same class as the spawn
// env / go Task leaks documented above).
io_waiters: List(IoWaiter); // fibers parked on fd readiness, grown through
// `own_allocator` (long-lived-container rule: a
// waiter outlives the `block_on_fd` call's scope).
// Construct a scheduler BY VALUE (allocator value-return convention).
// Captures the current `context.allocator` into `own_allocator` — fibers and
// their heap `Fiber` structs outlive their spawn scope, so all internal
@@ -101,6 +133,8 @@ Scheduler :: struct {
s.n_suspended = 0;
s.clock_ms = 0;
s.timers = .{};
s.kq = -1; // lazy: opened by the first block_on_fd
s.io_waiters = .{};
return s;
}
@@ -193,6 +227,18 @@ Scheduler :: struct {
// model, so a single eviction suffices; it also prevents a stale timer
// from spuriously re-waking a since-re-slept fiber.
cancel_timer_for(self, f);
// Same UAF reasoning for fd waiters: every path that re-readies a
// suspended fiber funnels through `wake`. If a fiber armed `block_on_fd`
// but was woken by another path (a manual wake, a Task completion), its
// `IoWaiter` would otherwise survive pointing at a fiber that runs to
// completion and is reaped (stack munmap'd + Fiber freed). A later
// kqueue drain matching that stale record would `wake` freed memory.
// Evict it here. NOTE: we do NOT EV_DELETE the kqueue registration — it
// is EV_ONESHOT, so a never-fired registration simply lingers in the
// kernel queue until the fd is readable, at which point the drain finds
// no matching waiter and ignores it (see `run`). The fd is the example's
// to close; closing it auto-removes any pending registration.
cancel_io_waiter_for(self, f);
self.n_suspended = self.n_suspended - 1;
f.state = .ready;
enqueue(self, f);
@@ -229,6 +275,72 @@ Scheduler :: struct {
self.suspend_self(); // parks `cur` off-queue; the timer fire re-wakes it
}
// --- B1.4c: block the running fiber until `fd` is readable --------------
//
// Register `fd` for EVFILT_READ with the scheduler's kqueue (lazily
// opening it on first use), record an `IoWaiter`, then park the fiber
// off-queue. The run loop blocks on `kevent` once nothing else is runnable
// and wakes this fiber when the kernel reports `fd` ready (EV_ONESHOT — the
// kernel auto-removes the registration after it fires, so the run loop only
// has to drop the waiter record + `wake` the fiber).
//
// `want_read` is the readiness direction; only read-readiness is wired for
// now (a write-readiness EVFILT_WRITE path would mirror this exactly). A
// false `want_read` would be a write-wait — not yet implemented, so bail
// loudly rather than silently arming a read filter (silent-wrong-arm rule).
//
// MUST be called from inside a fiber (there must be a `current` to park); a
// null `current` bails loudly, mirroring `suspend_self` / `sleep`.
block_on_fd :: (self: *Scheduler, fd: i32, want_read: bool) {
cur := self.current;
if cur == null {
print("sched: block_on_fd() called outside a fiber (no running fiber)\n");
abort();
}
if !want_read {
print("sched: block_on_fd(want_read=false) — write-readiness not implemented\n");
abort();
}
// ONE waiter per fd (enforced). macOS `EV_ADD` for an existing
// (ident, filter) REPLACES the registration rather than stacking, so a
// second fiber blocking on the same fd would leave only one live
// registration: when the fd fires, the kernel delivers a single event,
// one waiter wakes, and the other is stranded in `io_waiters` with no
// registration — the next `kq_wait` then blocks forever. The M:1 model
// (and `wake_io_waiter_for_fd`, which wakes the first match) assumes a
// single waiter per fd; enforce it loudly instead of silently hanging.
j := 0;
while j < self.io_waiters.len {
if self.io_waiters.items[j].fd == fd {
print("sched: block_on_fd: fd {} already has a waiter (one waiter per fd in the M:1 model)\n", fd);
abort();
}
j = j + 1;
}
// Lazily open the kqueue fd the first time fd-blocking is used.
if self.kq < 0 {
self.kq = kqb.kqueue();
if self.kq < 0 {
print("sched: kqueue() failed to open the event queue\n");
abort();
}
}
// Arm a one-shot read-readiness registration for `fd`. udata is unused
// (we match the waiter by fd in the drain), so pass 0.
chg := kqb.kev_change(fd, kqb.EVFILT_READ, kqb.EV_ADD | kqb.EV_ENABLE | kqb.EV_ONESHOT, 0);
if !kqb.kq_apply(self.kq, chg) {
print("sched: kevent() failed to register fd {} for read readiness\n", fd);
abort();
}
// Record the waiter BEFORE parking — the run loop matches the fired
// event's ident back to this record. Long-lived-container rule: the
// waiter outlives this call's scope (it survives in `self.io_waiters`
// until the kqueue drain wakes it), so grow through `own_allocator`.
w : IoWaiter = .{ fd = fd, fiber = cur };
self.io_waiters.append(w, self.own_allocator);
self.suspend_self(); // parks `cur` off-queue; the kqueue drain re-wakes it
}
// The scheduler loop. Drives ready fibers to quiescence, then advances the
// virtual clock by firing the earliest pending timer (which re-readies its
// sleeper), and repeats — until both the ready queue and the timer set are
@@ -255,24 +367,70 @@ Scheduler :: struct {
}
// .suspended: leave it parked (not in any queue; `wake` re-adds it).
}
// Ready queue drained. Fire the earliest pending timer — the one
// sleeper whose deadline is next — advancing the virtual clock to it.
// No timers left ⇒ nothing more can run; exit the loop.
// Ready queue drained. Decide what advances the world next.
//
// Mode 1 — VIRTUAL TIME: fire the earliest pending timer (advancing
// the virtual clock to it), re-readying its sleeper. Timers take
// precedence over fd-blocking: a program uses `sleep` OR fds, not
// both at once. (Documented limitation: virtual-time timers and real
// kqueue timeouts are NOT unified — if both a timer and an io-waiter
// are pending we always fire the timer first and never block on
// kqueue while a timer is outstanding. A program that genuinely
// needs "fd-or-real-timeout" wants a kqueue EVFILT_TIMER, future
// work.)
idx := earliest_timer(self);
if idx < 0 { break; }
t := self.timers.items[idx];
remove_timer(self, idx);
self.clock_ms = t.deadline_ms; // advance VIRTUAL time forward
self.wake(t.fiber); // re-enqueue the sleeper → drain again
if idx >= 0 {
t := self.timers.items[idx];
remove_timer(self, idx);
self.clock_ms = t.deadline_ms; // advance VIRTUAL time forward
self.wake(t.fiber); // re-enqueue the sleeper → drain again
continue;
}
// Mode 2 — REAL fd readiness: nothing is runnable and no timer is
// pending, but fibers are parked on fds. BLOCK on kqueue until the
// kernel reports at least one fd ready, then wake every waiter whose
// fd fired. (null timeout via -1 → wait forever.)
if self.io_waiters.len > 0 {
evbuf : [MAXEV]kqb.Kevent = ---;
n := kqb.kq_wait(self.kq, @evbuf[0], MAXEV, -1);
if n < 0 {
print("sched: kevent() wait failed while blocking on fd readiness\n");
abort();
}
// For each fired event, find the io-waiter whose fd matches its
// ident, evict it, and wake its fiber. EV_ONESHOT already removed
// the kernel registration, so we only drop the waiter record.
i := 0;
while i < n {
ready_fd : i32 = xx evbuf[i].ident;
wake_io_waiter_for_fd(self, ready_fd);
i = i + 1;
}
continue;
}
// Nothing runnable, no timer, no fd waiter → done.
break;
}
// Both the ready queue and the timer set are empty. If a fiber is STILL
// parked, no timer will ever wake it (a `suspend_self` without an armed
// timer, never externally woken) — its stack + struct are leaked and the
// program believes it finished. That is a genuine deadlock; surface it
// loudly. (Timer sleepers are balanced: each `sleep` increments
// `n_suspended` via `suspend_self`, and the timer-fire `wake` decrements
// it — so once every timer has fired, `n_suspended` counts only true
// orphans.)
// The ready queue, the timer set, AND the io-waiter set are all empty. If
// a fiber is STILL parked, nothing will ever wake it (a `suspend_self`
// without an armed timer or fd registration, never externally woken) —
// its stack + struct are leaked and the program believes it finished.
// That is a genuine deadlock; surface it loudly. (Timer sleepers and fd
// waiters are balanced: each arming path increments `n_suspended` via
// `suspend_self`, and its wake decrements it — so once every timer has
// fired and every io-waiter has been woken, `n_suspended` counts only
// these true orphans.)
//
// SCOPE — fd waiters are NOT covered by this check, BY DESIGN, not as an
// oversight. While `io_waiters.len > 0` the loop above blocks in
// `kq_wait(-1)` and never reaches here. A fiber blocked on an fd that the
// OS never reports ready blocks FOREVER — which is the correct semantics
// of an event loop (a server idling on a socket is indistinguishable from
// one whose peer never sends; the scheduler cannot know an fd will never
// become ready, so it must keep waiting). That is a caller-side logic
// issue (blocking on input that never arrives), not a scheduler deadlock
// to abort on. This check covers only pure `suspend_self` parks with no
// pending wake source at all.
if self.n_suspended != 0 {
print("sched: deadlock — {} fiber(s) suspended with an empty run queue\n", self.n_suspended);
abort();
@@ -433,6 +591,56 @@ cancel_timer_for :: (self: *Scheduler, f: *Fiber) {
}
}
// --- B1.4c: fd-waiter set (linear scan, fd-keyed) --------------------------
//
// Like the timer set, a plain `List(IoWaiter)` scanned linearly — fiber counts
// are tiny. Removal shifts the tail down one slot.
// Remove the io-waiter at `idx`, shifting later entries down one slot.
remove_io_waiter :: (self: *Scheduler, idx: i64) {
i := idx;
while i < self.io_waiters.len - 1 {
self.io_waiters.items[i] = self.io_waiters.items[i + 1];
i = i + 1;
}
self.io_waiters.len = self.io_waiters.len - 1;
}
// Remove a pending fd-waiter referencing fiber `f`, if any. A fiber has at most
// one pending io-waiter in the M:1 model (it can only `block_on_fd` once before
// suspending), so the first match is the only one. No-op if `f` has none. Used
// by `wake` to evict a waiter when the fiber is re-readied by another path.
cancel_io_waiter_for :: (self: *Scheduler, f: *Fiber) {
i := 0;
while i < self.io_waiters.len {
if self.io_waiters.items[i].fiber == f {
remove_io_waiter(self, i);
return;
}
i = i + 1;
}
}
// A fired kqueue event for `fd`: find the waiter registered on it, evict the
// record, and wake its fiber. No-op if no waiter matches (a stale one-shot
// registration whose fiber was already woken another way — see `wake`). Only
// the FIRST match is woken: one waiter per fd in this model (a single fiber
// blocks on a given read fd at a time).
wake_io_waiter_for_fd :: (self: *Scheduler, fd: i32) {
i := 0;
while i < self.io_waiters.len {
if self.io_waiters.items[i].fd == fd {
wf := self.io_waiters.items[i].fiber;
remove_io_waiter(self, i);
self.wake(wf); // re-enqueues the parked fiber (also calls
// cancel_io_waiter_for, now a harmless no-op —
// the record is already removed)
return;
}
i = i + 1;
}
}
// The public API lives as methods on `Scheduler` (above): `init`, `spawn`,
// `yield_now`, `suspend_self`, `wake`, `run`, `now_ms`, `sleep`.