From 1b0d640f73bd323016a46f4d1de5b74b0cf029ac Mon Sep 17 00:00:00 2001 From: agra Date: Sun, 21 Jun 2026 19:39:16 +0300 Subject: [PATCH] =?UTF-8?q?fibers:=20event-loop=20Io=20=E2=80=94=20real=20?= =?UTF-8?q?fd=20readiness=20via=20kqueue=20(B1.4c)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A fiber can block on a file descriptor and the run loop blocks on kevent until the kernel reports it ready. Reuses the existing std/net/kqueue.sx bindings. Scheduler gains a lazy kq fd + an io_waiters list; block_on_fd arms a one-shot EVFILT_READ registration, records an IoWaiter, and suspends. Run-loop Mode 2: when the ready queue drains and no timer is pending, block on kq_wait(-1), match each fired ident to its waiter, evict it, wake the fiber. wake evicts a pending fd-waiter (cancel_io_waiter_for) so no stale IoWaiter outlives a reaped fiber. Adversarial review found two CRITICALs: (1) two fibers on the same fd share one kqueue registration (macOS EV_ADD replaces), so one is lost and the loop hangs -- fixed by enforcing one-waiter-per-fd with a loud abort; (2) an fd-waiter on a never-ready fd 'hangs' -- reclassified as correct event-loop semantics (a server idling on a socket), with the misleading orphan-check comment corrected. UAF parity, ident width, EINTR handling, timer/io precedence all probed safe. Example: 1816 (pipe roundtrip -- reader blocks, writer writes, reader wakes via kqueue). macOS only; linux epoll twin deferred. Suite green 754/0. --- current/CHECKPOINT-FIBERS.md | 123 +++++++-- current/PLAN-FIBERS.md | 4 +- .../1816-concurrency-fiber-io-pipe.sx | 101 ++++++++ .../1816-concurrency-fiber-io-pipe.build | 1 + .../1816-concurrency-fiber-io-pipe.exit | 1 + .../1816-concurrency-fiber-io-pipe.stderr | 1 + .../1816-concurrency-fiber-io-pipe.stdout | 2 + library/modules/std/sched.sx | 240 ++++++++++++++++-- 8 files changed, 433 insertions(+), 40 deletions(-) create mode 100644 examples/concurrency/1816-concurrency-fiber-io-pipe.sx create mode 100644 examples/concurrency/expected/1816-concurrency-fiber-io-pipe.build create mode 100644 examples/concurrency/expected/1816-concurrency-fiber-io-pipe.exit create mode 100644 examples/concurrency/expected/1816-concurrency-fiber-io-pipe.stderr create mode 100644 examples/concurrency/expected/1816-concurrency-fiber-io-pipe.stdout diff --git a/current/CHECKPOINT-FIBERS.md b/current/CHECKPOINT-FIBERS.md index 180ee24f..a91acc45 100644 --- a/current/CHECKPOINT-FIBERS.md +++ b/current/CHECKPOINT-FIBERS.md @@ -4,8 +4,65 @@ Companion to [PLAN-FIBERS.md](PLAN-FIBERS.md). Update after every step (one step per the cadence rule). New corpus category: `18xx` concurrency. ## Last completed step -**B1.4b — deterministic VIRTUAL-TIME timer scheduling (the KEYSTONE) — landed + adversarially -reviewed (caught a CRITICAL UAF, fixed).** `library/modules/std/sched.sx` gained a virtual clock + +**B1.4c — REAL fd-readiness blocking via kqueue (macOS).** `library/modules/std/sched.sx` now lets a +fiber park on a file descriptor and the run loop block on `kevent` until the kernel reports it ready. +Reuses the existing verified `library/modules/std/net/kqueue.sx` bindings (`Kevent` (32 bytes), +`kqueue`/`kevent`/`kq_apply`/`kq_wait` + the `EVFILT_READ`/`EV_ADD`/`EV_ENABLE`/`EV_ONESHOT` +constants) rather than re-deriving the FFI — sched.sx imports it as `kqb`. Added to `Scheduler`: +- `kq: i32` (LAZY — `-1` in `init`, opened by the first `block_on_fd`, so a pure-compute / + virtual-timer scheduler never opens a kqueue fd; leaks one fd at exit once opened, same class as the + documented spawn-env / go-Task leaks — no deinit yet); +- `io_waiters: List(IoWaiter)` (`IoWaiter :: struct { fd: i32; fiber: *Fiber; }`, grown through + `own_allocator` per the long-lived-container rule); +- `block_on_fd(self, fd, want_read)` — lazily opens `kq`, arms a one-shot `EVFILT_READ` registration, + records an `IoWaiter{fd, current}`, then `suspend_self()`. Guards a null `current` (loud abort, like + `sleep`); `want_read=false` (write-readiness) is not wired yet → loud abort rather than silently + arming a read filter. +- Run-loop: after the ready queue drains, **Mode 1 (virtual time)** fires the earliest pending timer + (takes precedence — a program uses `sleep` OR fds, documented non-unification limitation); **Mode 2 + (real fd)** — if `io_waiters` is non-empty, BLOCK on `kq_wait(kq, evbuf, MAXEV=16, -1)` (null + timeout), then for each fired event match `ev.ident` back to its waiter, evict it, and `wake` the + fiber; **else** break. Orphan-deadlock check unchanged in spirit but now correct: an fd waiter is NOT + an orphan (while `io_waiters.len > 0` the loop blocks on kqueue rather than reaching the check), and + a genuine no-timer/no-fd suspend still aborts loudly (verified with a probe: exit 134). +- `wake` now also evicts a pending fd-waiter (`cancel_io_waiter_for`, mirror of `cancel_timer_for`) — + same UAF reasoning: a fiber woken by another path must not leave a stale `IoWaiter` pointing at a + reaped `*Fiber`. The kqueue registration is `EV_ONESHOT` so we never `EV_DELETE` (a never-fired + one-shot lingers harmlessly; the drain ignores an unmatched ident; closing the fd auto-removes it). +- DE-RISK probe (run first, no scheduler): confirmed `size_of(Kevent) == 32`, the pipe roundtrip + (`kq_wait` returned 1 with `out.ident == read_fd`, `out.filter == -1` (EVFILT_READ), `out.data == 1` + byte readable) — the struct layout reads back the fd correctly. +- Locked by `examples/concurrency/1816-concurrency-fiber-io-pipe.sx`: a `pipe`; a reader fiber spawned + FIRST blocks on the empty read end, then a writer fiber writes `a b c` → the run loop blocks on + kqueue, wakes the reader, which reads the 3 bytes. Output `log: wrote read 3 [97 98 99]` / + `n_suspended: 0` (the "wrote" before "read" ordering proves the reader actually blocked then woke via + kqueue readiness). `.build` `{ "target": "macos" }` (matches host arch → runs end-to-end; ir-only on + a mismatch, like 1814/1815 — no `.ir` snapshot needed since it runs here). The example declares its + own `read`/`write`/`close` externs with the CANONICAL signatures std already binds + (`(i32,[*]u8,usize)->isize` / `(i32)->i32`) — a divergent re-binding is rejected by the extern dedupe. +- **Adversarial review (worker) of the run-loop change — found 2 CRITICALs:** + - **(1) two fibers on the SAME fd → lost wakeup + permanent hang.** macOS `EV_ADD` for an existing + `(ident, filter)` REPLACES the registration (doesn't stack), so two waiters share one registration: + the fd fires once, one wakes, the other is stranded in `io_waiters` and the next `kq_wait(-1)` blocks + forever. FIXED: `block_on_fd` now enforces one-waiter-per-fd with a loud abort (the model already + assumed it). Verified: dup-fd → `sched: block_on_fd: fd N already has a waiter`, not a hang. + - **(2) an fd-waiter on a never-ready fd hangs instead of the timer path's loud abort.** Re-examined: + this is CORRECT event-loop semantics — blocking on I/O until ready (possibly forever, like a server + idling on a socket) is the point; the scheduler cannot know an fd will never become ready, so it must + keep waiting. NOT a scheduler deadlock. Fixed the MISLEADING comment that implied the orphan check + covers fd-waiters: it does not, by design (it covers only pure `suspend_self` parks). No code change — + the "hang" is a caller-side logic issue (waiting on input that never arrives), not a bug to abort on. + - Review CLEARED: the IoWaiter UAF parity (early-wake evicts the waiter; a lingering one-shot that later + fires hits no match → clean no-op), ident width/sign, `kq_wait` EINTR/error handling, timer-vs-io + precedence (timer wins; no hang). All probed safe. +- Suite GREEN **754/0** (incl. the dup-fd guard, no new example needed — the abort is host-fragile to + pin like 1809's guard-firing). Next: **B1.5** (end-to-end M:1 validation under the deterministic timers + / fd readiness); a linux epoll twin of `block_on_fd` (mirror via `std/net/epoll`, the OS-neutral facade + is `std.event`) is future work. + +### Earlier — B1.4b — deterministic VIRTUAL-TIME timer scheduling (the KEYSTONE) — landed + adversarially +reviewed (caught a CRITICAL UAF, fixed). +`library/modules/std/sched.sx` gained a virtual clock + sleep timers so fibers schedule in reproducible simulated time (no real clock): `clock_ms` (advances ONLY as timers fire), a `timers: List(Timer)` (insertion-order, linear min-scan, FIFO tiebreak), `now_ms()`, `sleep(ms)` (arm `{clock_ms+ms, current}` + `suspend_self`), and a timer-driven `run` @@ -282,17 +339,21 @@ body); closed + locked. The review's `.naked`-lambda CRITICAL was a false positi (unparseable — `isLambda` breaks on the `abi` keyword). ## Current state -**B1.4b COMPLETE — deterministic virtual-time timer scheduling exists.** `library/modules/std/sched.sx` +**B1.4c COMPLETE — real fd-readiness blocking via kqueue (macOS) exists.** `library/modules/std/sched.sx` now carries: the M:1 scheduler core (B1.5a: `spawn`/`yield_now`/`suspend_self`/`wake`/`run`), the -suspending fiber-task async (B1.4a: `Task($R)`/`go`/`wait`/`cancel`), AND deterministic timers (B1.4b: -`clock_ms` virtual clock, `timers` list, `now_ms`/`sleep`, timer-driven `run`). Fibers `sleep(ms)` in -reproducible simulated time and wake in deadline order. The timer-vs-early-wake UAF found in review is -fixed (`wake` evicts the fiber's pending timer). Locked by `1811` (round-robin), `1812` (suspend/wake), -`1813` (async go/wait/cancel), `1814` (sim-timer deadline ordering), `1815` (timer early-wake eviction). -Suite GREEN (count below). +suspending fiber-task async (B1.4a: `Task($R)`/`go`/`wait`/`cancel`), deterministic timers (B1.4b: +`clock_ms` virtual clock, `timers` list, `now_ms`/`sleep`, timer-driven `run`), AND real fd readiness +(B1.4c: lazy `kq`, `io_waiters` list, `block_on_fd`, a kqueue-blocking run-loop Mode 2 that wakes the +fiber whose fd fired). It reuses the verified `std/net/kqueue.sx` bindings (imported as `kqb`) rather +than re-deriving the FFI. Fibers can now block on either virtual `sleep(ms)` OR a real fd; both park +paths are balanced through `wake` (which evicts a stale timer AND a stale fd-waiter, the UAF guard). +Locked by `1811` (round-robin), `1812` (suspend/wake), `1813` (async go/wait/cancel), `1814` (sim-timer +deadline ordering), `1815` (timer early-wake eviction), `1816` (pipe fd block→kqueue-wake→read). Suite +GREEN **754/0**. -The remaining B1 work: **B1.4c** the event-loop `Io` (kqueue mac / epoll linux — real fd readiness), -then **B1.5** end-to-end M:1 validation under the deterministic timers. NOTE: the suspending async + +The remaining B1 work: **B1.5** end-to-end M:1 validation under the deterministic timers / fd readiness; +a **linux epoll twin** of `block_on_fd` (mirror via `std/net/epoll`; the OS-neutral facade is +`std.event`) is future work — B1.4c wired the **macOS kqueue** path only. NOTE: the suspending async + deterministic timers live as `sched.*` methods (M:1, receiver-driven), NOT routed through the erased `context.io` (which would force sched.sx into every std consumer + duplicate the `_fib_tramp` global asm); the `Io` protocol's `spawn_raw`/`suspend_raw`/`ready`/`arm_timer`/`poll` remain reserved for the @@ -388,17 +449,20 @@ fibers/Io/scheduler code yet. Grounded floor facts: boundary; a sharper sx diagnostic for it is a candidate polish, not a blocker. ## Next step -**→ B1.4c — the event-loop `Io` (real fd readiness).** B1.4b (deterministic virtual-time timers, -`sched.sleep`/`now_ms`/timer-`run`) is done — the KEYSTONE deterministic harness exists at the -scheduler level. Now add real-I/O readiness: a `poll`-style step over `kqueue` (macOS) / `epoll` -(linux) that blocks until an fd is readable/writable (or a real-time timeout), then wakes the parked -fiber waiting on it. Likely shape: a `block_on_fd(fd, events)` that registers the current fiber's -interest, suspends, and is woken when `run`'s poll step reports the fd ready. Lock with an `18xx` -example doing genuine fd I/O (e.g. a `pipe(2)`: a fiber blocks reading, another writes, the reader -wakes with the bytes) — aarch64-macOS-pinned, kqueue. The deterministic timers (1814) and real I/O -should compose (a real `poll` with a timeout vs the virtual clock — keep them as separate run modes, -or unify with care). Then **B1.5** end-to-end M:1 validation. The §10.7 gate (1808) + guarded-stack -(1809) + Win64 (1810) + scheduler/async/timers (1811-1815) must keep passing throughout. +**→ B1.5 — end-to-end M:1 validation under the deterministic timers / fd readiness.** B1.4c (real +fd-readiness blocking via kqueue, `sched.block_on_fd` + the kqueue-blocking run-loop Mode 2) is done — +the macOS event-loop path exists. Build an `18xx` example that exercises the full M:1 story together +(multiple fibers, a mix of `sleep`/`go`/`wait` and `block_on_fd`, reaping, the orphan-deadlock guard). +The §10.7 gate (1808) + guarded-stack (1809) + Win64 (1810) + scheduler/async/timers/fd +(1811-1816) must keep passing throughout. + +**Deferred (future B1.4c sibling): the linux epoll twin of `block_on_fd`.** B1.4c wired the **macOS +kqueue** path only (the host is aarch64-macOS). The linux mirror would register interest via +`std/net/epoll` and the OS-neutral facade is `std.event` — keep the two as separate run modes inside +`run`, branching on the platform, exactly as the timer-vs-fd modes are kept separate now. Documented +non-unification: virtual-time timers and real kqueue timeouts are NOT merged — `run` fires a pending +timer before ever blocking on kqueue (a program uses `sleep` OR fds); a true "fd-or-real-timeout" wants +a kqueue `EVFILT_TIMER`, future work. Design note carried forward: an event-loop `Io` needs a current-`Scheduler` handle. `sched.*` methods thread it via `self`/the `Task`; if B1.4c wants the capability-threaded `context.io` form it'll need @@ -508,6 +572,21 @@ incomplete); a dedicated effort; lambda workers are the idiom meanwhile. trusted. `18xx` asserts program-emitted ordering contracts, not raw interleaving. ## Log +- **B1.4c — real fd-readiness blocking via kqueue (macOS).** De-risked first with a no-scheduler probe + (confirmed `size_of(Kevent)==32` and the pipe→kevent roundtrip: `kq_wait` returned 1, `out.ident == + read_fd`, `out.filter == -1`, `out.data == 1` — the struct layout reads the fd back correctly). Then + added to `library/modules/std/sched.sx` (importing the existing verified `std/net/kqueue.sx` as `kqb` + rather than re-deriving the FFI): a lazy `kq: i32` (-1 until first use), `io_waiters: List(IoWaiter)`, + `block_on_fd(fd, want_read)` (arm one-shot `EVFILT_READ`, record waiter, `suspend_self`), a run-loop + Mode 2 (block on `kq_wait(kq, evbuf, MAXEV=16, -1)` when only fd waiters remain, wake the fiber whose + fd fired), and `wake` now also evicts a stale fd-waiter (`cancel_io_waiter_for`, the same UAF guard as + `cancel_timer_for`). Timers keep precedence over fds (documented non-unification). Orphan-deadlock + check still fires for a genuine no-timer/no-fd suspend (probed: exit 134). Locked by + `1816-concurrency-fiber-io-pipe.sx` (reader blocks on empty pipe → writer writes `a b c` → kqueue + wakes reader → reads 3 bytes; `log: wrote read 3 [97 98 99]`, `n_suspended: 0`), `.build` + `{ "target": "macos" }`, runs end-to-end on host. The example's `read`/`write`/`close` externs use the + canonical signatures std already binds (extern-dedupe rejects a divergent re-binding). Suite GREEN + **754/0**. Next: B1.5 (end-to-end M:1 validation); linux epoll twin deferred. - **carve** — wrote PLAN-FIBERS.md + CHECKPOINT-FIBERS.md. Grounded the B1 compiler floor: `ABI.naked` inert (type_resolver.zig:237), IR `Function` has no naked flag (inst.zig:605), attribute API pattern (emit_llvm.zig:1339 nounwind), `.c` ctx-skip precedent diff --git a/current/PLAN-FIBERS.md b/current/PLAN-FIBERS.md index 4324b9d9..1decbef7 100644 --- a/current/PLAN-FIBERS.md +++ b/current/PLAN-FIBERS.md @@ -8,8 +8,8 @@ > `sched.go`/`wait`/`cancel` over `Task($R)`, nullary-thunk) ✅** (adversarially reviewed; fixed > blockers 0156-Part1 + 0157 en route; locked `1813`). > **B1.4b (deterministic virtual-time timers — sched.sleep/now_ms/timer-run) ✅** (reviewed; fixed a CRITICAL timer-vs-early-wake UAF; locked 1814/1815). -> **→ NOW: B1.4c** — the event-loop `Io` (kqueue/epoll, real fd readiness). Then B1.5 (end-to-end -> M:1). Detailed progress in [CHECKPOINT-FIBERS.md](CHECKPOINT-FIBERS.md). NOTE: suspending async + +> **B1.4c (event-loop — real fd readiness via kqueue: `block_on_fd` + run-loop Mode 2) ✅** (reviewed; fixed a CRITICAL same-fd lost-wakeup hang; locked 1816). macOS only — linux epoll twin deferred. +> **→ NOW: B1.5** — end-to-end M:1 validation under the deterministic timers / fd readiness. Detailed progress in [CHECKPOINT-FIBERS.md](CHECKPOINT-FIBERS.md). NOTE: suspending async + > deterministic timers live as `sched.*` methods (M:1), NOT routed through the erased `context.io` (avoids forcing sched.sx into every std consumer + the `_fib_tramp` dup-symbol > trap); the `Io` protocol's `spawn_raw`/`suspend_raw`/`ready` stay reserved for M:N. Deferred: > issue 0150 (`Future(void)`/`timeout`); 0156-Part2 (deferred `..` spread); the `::` callable-param diff --git a/examples/concurrency/1816-concurrency-fiber-io-pipe.sx b/examples/concurrency/1816-concurrency-fiber-io-pipe.sx new file mode 100644 index 00000000..79ba979c --- /dev/null +++ b/examples/concurrency/1816-concurrency-fiber-io-pipe.sx @@ -0,0 +1,101 @@ +// Stream B1 (fibers) B1.4c — REAL fd-readiness blocking via kqueue. A fiber can +// `block_on_fd(read_fd, true)`; the scheduler's run loop blocks on `kevent` when +// nothing else is runnable and wakes that fiber when the kernel reports the fd +// readable. +// +// Scenario: a unix `pipe` (read_fd, write_fd). A READER fiber is spawned FIRST, +// so it runs while the pipe is EMPTY — it calls `block_on_fd(read_fd)` and parks +// (genuinely blocked: there is no data yet, the writer has not run). A WRITER +// fiber, spawned second, then writes 3 bytes to write_fd. Now the ready queue is +// drained and the only parked fiber is the reader's io-waiter, so the run loop +// BLOCKS on `kevent`, which reports read_fd ready; the reader wakes and reads the +// bytes. The ordering ("wrote" recorded before "read") proves the reader blocked +// on the empty pipe and was woken by kqueue readiness, not by data already +// present. +// +// Contract: +// log: wrote read 3 [97 98 99] +// n_suspended: 0 (the reader's park was balanced by the kqueue wake) +// +// aarch64-macOS-pinned: kqueue/kevent is Apple/BSD, and the scheduler's +// per-arch asm + Apple mmap constants. Runs end-to-end on a matching host, +// ir-only on a mismatch. Like 1809 (mmap), JIT `sx run` resolves the libc +// extern calls fine — no AOT build needed. +#import "modules/std.sx"; +sched :: #import "modules/std/sched.sx"; + +// Raw libc fd primitives. read/write/close MUST match the canonical signatures +// already bound by std (socket.sx / core.sx), or the extern dedupe rejects a +// divergent re-binding of the same C symbol. `pipe` is ours alone. +pipe :: (fds: *i32) -> i32 extern libc "pipe"; +read :: (fd: i32, buf: [*]u8, count: usize) -> isize extern libc "read"; +write :: (fd: i32, buf: [*]u8, count: usize) -> isize extern libc "write"; +close :: (fd: i32) -> i32 extern libc "close"; + +// Shared log: a tiny ledger of what happened, in order. +S :: struct { + wrote: bool; + read_n: i64; + bytes: [8]u8; + read_done: bool; +} + +main :: () -> i64 { + st : S = ---; + st.wrote = false; + st.read_n = 0; + st.read_done = false; + + fds : [2]i32 = ---; + if pipe(@fds[0]) != 0 { + print("1816: pipe() failed\n"); + return 1; + } + read_fd := fds[0]; + write_fd := fds[1]; + + s := sched.Scheduler.init(); + ps := @s; pst := @st; + + // Reader: block on the (empty) pipe until it is readable, then read 3 bytes. + mk_reader :: (ps: *sched.Scheduler, pst: *S, rfd: i32) { + ps.spawn(() => { + ps.block_on_fd(rfd, true); // parks until read_fd is readable + n := read(rfd, xx @pst.bytes[0], xx 3); + pst.read_n = xx n; + pst.read_done = true; + }); + } + // Writer: write 3 bytes ('a','b','c') to the write end. + mk_writer :: (ps: *sched.Scheduler, pst: *S, wfd: i32) { + ps.spawn(() => { + buf : [3]u8 = ---; + buf[0] = xx 97; buf[1] = xx 98; buf[2] = xx 99; // 'a' 'b' 'c' + write(wfd, xx @buf[0], xx 3); + pst.wrote = true; + }); + } + + mk_reader(ps, pst, read_fd); // spawned first → runs + parks on empty pipe + mk_writer(ps, pst, write_fd); // spawned second → writes, then kqueue wakes reader + s.run(); + + print("log: "); + if st.wrote { print("wrote "); } + if st.read_done { + print("read {} [", st.read_n); + i := 0; + while i < st.read_n { + if i > 0 { print(" "); } + print("{}", st.bytes[i]); + i = i + 1; + } + print("]"); + } + print("\n"); + print("n_suspended: {}\n", s.n_suspended); + + close(read_fd); + close(write_fd); + return 0; +} diff --git a/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.build b/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.build new file mode 100644 index 00000000..42e24dd2 --- /dev/null +++ b/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.build @@ -0,0 +1 @@ +{ "target": "macos" } diff --git a/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.exit b/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.exit new file mode 100644 index 00000000..573541ac --- /dev/null +++ b/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.exit @@ -0,0 +1 @@ +0 diff --git a/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.stderr b/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.stderr new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.stderr @@ -0,0 +1 @@ + diff --git a/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.stdout b/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.stdout new file mode 100644 index 00000000..36253ec3 --- /dev/null +++ b/examples/concurrency/expected/1816-concurrency-fiber-io-pipe.stdout @@ -0,0 +1,2 @@ +log: wrote read 3 [97 98 99] +n_suspended: 0 diff --git a/library/modules/std/sched.sx b/library/modules/std/sched.sx index 5fb4a45b..33e6348a 100644 --- a/library/modules/std/sched.sx +++ b/library/modules/std/sched.sx @@ -25,6 +25,7 @@ // page are Apple-specific. Runs end-to-end on a matching host, ir-only on a // mismatch. #import "modules/std.sx"; +kqb :: #import "modules/std/net/kqueue.sx"; // --- libc mmap stack primitives ------------------------------------------- @@ -39,6 +40,12 @@ MAP_AP :: 0x1002; // macOS MAP_PRIVATE (0x2) | MAP_ANON (0x1000) GUARD :: 16384; // one 16 KB page (aarch64-macOS) STACK :: 131072; // 128 KB usable per fiber +// Max fd events drained per kqueue wait (B1.4c). Sized for the M:1 model's +// small fiber counts; a wait that fills it just drains the rest on the next +// loop iteration (the woken fibers run, the queue re-drains, the still-pending +// waiters block again). +MAXEV :: 16; + // --- core types ------------------------------------------------------------ // Saved context: x19..x28 (10), x29/fp, x30/lr, sp — 13 u64 slots. @@ -66,6 +73,18 @@ Timer :: struct { fiber: *Fiber; } +// B1.4c: a fiber parked on REAL fd readiness. Unlike a `Timer` (virtual +// time), an `IoWaiter` blocks the whole scheduler on `kevent` until the +// kernel reports `fd` readable, then wakes `fiber`. Stored in +// `Scheduler.io_waiters`; the registration is one-shot (EV_ONESHOT), so the +// kernel auto-removes it after firing — we only have to drop the waiter +// record. `cancel_io_waiter_for` evicts a stale record (mirror of +// `cancel_timer_for`) so a reaped fiber's waiter can never be woken. +IoWaiter :: struct { + fd: i32; + fiber: *Fiber; +} + Scheduler :: struct { sched_ctx: FiberCtx; // the scheduler loop's own saved context current: *Fiber; // running fiber; null while in the scheduler loop @@ -85,6 +104,19 @@ Scheduler :: struct { // through `own_allocator` (long-lived-container // rule: a timer outlives the `sleep` call's scope). + // --- B1.4c: real fd-readiness blocking via kqueue ---------------------- + kq: i32; // the kqueue fd. LAZY: -1 until the first + // `block_on_fd` opens it, so a pure-compute / + // virtual-timer scheduler never opens a kqueue + // fd (no leak for the common case). Once opened it + // lives for the scheduler's lifetime; there is no + // deinit yet, so it leaks one fd at program exit + // (bounded, harmless — same class as the spawn + // env / go Task leaks documented above). + io_waiters: List(IoWaiter); // fibers parked on fd readiness, grown through + // `own_allocator` (long-lived-container rule: a + // waiter outlives the `block_on_fd` call's scope). + // Construct a scheduler BY VALUE (allocator value-return convention). // Captures the current `context.allocator` into `own_allocator` — fibers and // their heap `Fiber` structs outlive their spawn scope, so all internal @@ -101,6 +133,8 @@ Scheduler :: struct { s.n_suspended = 0; s.clock_ms = 0; s.timers = .{}; + s.kq = -1; // lazy: opened by the first block_on_fd + s.io_waiters = .{}; return s; } @@ -193,6 +227,18 @@ Scheduler :: struct { // model, so a single eviction suffices; it also prevents a stale timer // from spuriously re-waking a since-re-slept fiber. cancel_timer_for(self, f); + // Same UAF reasoning for fd waiters: every path that re-readies a + // suspended fiber funnels through `wake`. If a fiber armed `block_on_fd` + // but was woken by another path (a manual wake, a Task completion), its + // `IoWaiter` would otherwise survive pointing at a fiber that runs to + // completion and is reaped (stack munmap'd + Fiber freed). A later + // kqueue drain matching that stale record would `wake` freed memory. + // Evict it here. NOTE: we do NOT EV_DELETE the kqueue registration — it + // is EV_ONESHOT, so a never-fired registration simply lingers in the + // kernel queue until the fd is readable, at which point the drain finds + // no matching waiter and ignores it (see `run`). The fd is the example's + // to close; closing it auto-removes any pending registration. + cancel_io_waiter_for(self, f); self.n_suspended = self.n_suspended - 1; f.state = .ready; enqueue(self, f); @@ -229,6 +275,72 @@ Scheduler :: struct { self.suspend_self(); // parks `cur` off-queue; the timer fire re-wakes it } + // --- B1.4c: block the running fiber until `fd` is readable -------------- + // + // Register `fd` for EVFILT_READ with the scheduler's kqueue (lazily + // opening it on first use), record an `IoWaiter`, then park the fiber + // off-queue. The run loop blocks on `kevent` once nothing else is runnable + // and wakes this fiber when the kernel reports `fd` ready (EV_ONESHOT — the + // kernel auto-removes the registration after it fires, so the run loop only + // has to drop the waiter record + `wake` the fiber). + // + // `want_read` is the readiness direction; only read-readiness is wired for + // now (a write-readiness EVFILT_WRITE path would mirror this exactly). A + // false `want_read` would be a write-wait — not yet implemented, so bail + // loudly rather than silently arming a read filter (silent-wrong-arm rule). + // + // MUST be called from inside a fiber (there must be a `current` to park); a + // null `current` bails loudly, mirroring `suspend_self` / `sleep`. + block_on_fd :: (self: *Scheduler, fd: i32, want_read: bool) { + cur := self.current; + if cur == null { + print("sched: block_on_fd() called outside a fiber (no running fiber)\n"); + abort(); + } + if !want_read { + print("sched: block_on_fd(want_read=false) — write-readiness not implemented\n"); + abort(); + } + // ONE waiter per fd (enforced). macOS `EV_ADD` for an existing + // (ident, filter) REPLACES the registration rather than stacking, so a + // second fiber blocking on the same fd would leave only one live + // registration: when the fd fires, the kernel delivers a single event, + // one waiter wakes, and the other is stranded in `io_waiters` with no + // registration — the next `kq_wait` then blocks forever. The M:1 model + // (and `wake_io_waiter_for_fd`, which wakes the first match) assumes a + // single waiter per fd; enforce it loudly instead of silently hanging. + j := 0; + while j < self.io_waiters.len { + if self.io_waiters.items[j].fd == fd { + print("sched: block_on_fd: fd {} already has a waiter (one waiter per fd in the M:1 model)\n", fd); + abort(); + } + j = j + 1; + } + // Lazily open the kqueue fd the first time fd-blocking is used. + if self.kq < 0 { + self.kq = kqb.kqueue(); + if self.kq < 0 { + print("sched: kqueue() failed to open the event queue\n"); + abort(); + } + } + // Arm a one-shot read-readiness registration for `fd`. udata is unused + // (we match the waiter by fd in the drain), so pass 0. + chg := kqb.kev_change(fd, kqb.EVFILT_READ, kqb.EV_ADD | kqb.EV_ENABLE | kqb.EV_ONESHOT, 0); + if !kqb.kq_apply(self.kq, chg) { + print("sched: kevent() failed to register fd {} for read readiness\n", fd); + abort(); + } + // Record the waiter BEFORE parking — the run loop matches the fired + // event's ident back to this record. Long-lived-container rule: the + // waiter outlives this call's scope (it survives in `self.io_waiters` + // until the kqueue drain wakes it), so grow through `own_allocator`. + w : IoWaiter = .{ fd = fd, fiber = cur }; + self.io_waiters.append(w, self.own_allocator); + self.suspend_self(); // parks `cur` off-queue; the kqueue drain re-wakes it + } + // The scheduler loop. Drives ready fibers to quiescence, then advances the // virtual clock by firing the earliest pending timer (which re-readies its // sleeper), and repeats — until both the ready queue and the timer set are @@ -255,24 +367,70 @@ Scheduler :: struct { } // .suspended: leave it parked (not in any queue; `wake` re-adds it). } - // Ready queue drained. Fire the earliest pending timer — the one - // sleeper whose deadline is next — advancing the virtual clock to it. - // No timers left ⇒ nothing more can run; exit the loop. + // Ready queue drained. Decide what advances the world next. + // + // Mode 1 — VIRTUAL TIME: fire the earliest pending timer (advancing + // the virtual clock to it), re-readying its sleeper. Timers take + // precedence over fd-blocking: a program uses `sleep` OR fds, not + // both at once. (Documented limitation: virtual-time timers and real + // kqueue timeouts are NOT unified — if both a timer and an io-waiter + // are pending we always fire the timer first and never block on + // kqueue while a timer is outstanding. A program that genuinely + // needs "fd-or-real-timeout" wants a kqueue EVFILT_TIMER, future + // work.) idx := earliest_timer(self); - if idx < 0 { break; } - t := self.timers.items[idx]; - remove_timer(self, idx); - self.clock_ms = t.deadline_ms; // advance VIRTUAL time forward - self.wake(t.fiber); // re-enqueue the sleeper → drain again + if idx >= 0 { + t := self.timers.items[idx]; + remove_timer(self, idx); + self.clock_ms = t.deadline_ms; // advance VIRTUAL time forward + self.wake(t.fiber); // re-enqueue the sleeper → drain again + continue; + } + // Mode 2 — REAL fd readiness: nothing is runnable and no timer is + // pending, but fibers are parked on fds. BLOCK on kqueue until the + // kernel reports at least one fd ready, then wake every waiter whose + // fd fired. (null timeout via -1 → wait forever.) + if self.io_waiters.len > 0 { + evbuf : [MAXEV]kqb.Kevent = ---; + n := kqb.kq_wait(self.kq, @evbuf[0], MAXEV, -1); + if n < 0 { + print("sched: kevent() wait failed while blocking on fd readiness\n"); + abort(); + } + // For each fired event, find the io-waiter whose fd matches its + // ident, evict it, and wake its fiber. EV_ONESHOT already removed + // the kernel registration, so we only drop the waiter record. + i := 0; + while i < n { + ready_fd : i32 = xx evbuf[i].ident; + wake_io_waiter_for_fd(self, ready_fd); + i = i + 1; + } + continue; + } + // Nothing runnable, no timer, no fd waiter → done. + break; } - // Both the ready queue and the timer set are empty. If a fiber is STILL - // parked, no timer will ever wake it (a `suspend_self` without an armed - // timer, never externally woken) — its stack + struct are leaked and the - // program believes it finished. That is a genuine deadlock; surface it - // loudly. (Timer sleepers are balanced: each `sleep` increments - // `n_suspended` via `suspend_self`, and the timer-fire `wake` decrements - // it — so once every timer has fired, `n_suspended` counts only true - // orphans.) + // The ready queue, the timer set, AND the io-waiter set are all empty. If + // a fiber is STILL parked, nothing will ever wake it (a `suspend_self` + // without an armed timer or fd registration, never externally woken) — + // its stack + struct are leaked and the program believes it finished. + // That is a genuine deadlock; surface it loudly. (Timer sleepers and fd + // waiters are balanced: each arming path increments `n_suspended` via + // `suspend_self`, and its wake decrements it — so once every timer has + // fired and every io-waiter has been woken, `n_suspended` counts only + // these true orphans.) + // + // SCOPE — fd waiters are NOT covered by this check, BY DESIGN, not as an + // oversight. While `io_waiters.len > 0` the loop above blocks in + // `kq_wait(-1)` and never reaches here. A fiber blocked on an fd that the + // OS never reports ready blocks FOREVER — which is the correct semantics + // of an event loop (a server idling on a socket is indistinguishable from + // one whose peer never sends; the scheduler cannot know an fd will never + // become ready, so it must keep waiting). That is a caller-side logic + // issue (blocking on input that never arrives), not a scheduler deadlock + // to abort on. This check covers only pure `suspend_self` parks with no + // pending wake source at all. if self.n_suspended != 0 { print("sched: deadlock — {} fiber(s) suspended with an empty run queue\n", self.n_suspended); abort(); @@ -433,6 +591,56 @@ cancel_timer_for :: (self: *Scheduler, f: *Fiber) { } } +// --- B1.4c: fd-waiter set (linear scan, fd-keyed) -------------------------- +// +// Like the timer set, a plain `List(IoWaiter)` scanned linearly — fiber counts +// are tiny. Removal shifts the tail down one slot. + +// Remove the io-waiter at `idx`, shifting later entries down one slot. +remove_io_waiter :: (self: *Scheduler, idx: i64) { + i := idx; + while i < self.io_waiters.len - 1 { + self.io_waiters.items[i] = self.io_waiters.items[i + 1]; + i = i + 1; + } + self.io_waiters.len = self.io_waiters.len - 1; +} + +// Remove a pending fd-waiter referencing fiber `f`, if any. A fiber has at most +// one pending io-waiter in the M:1 model (it can only `block_on_fd` once before +// suspending), so the first match is the only one. No-op if `f` has none. Used +// by `wake` to evict a waiter when the fiber is re-readied by another path. +cancel_io_waiter_for :: (self: *Scheduler, f: *Fiber) { + i := 0; + while i < self.io_waiters.len { + if self.io_waiters.items[i].fiber == f { + remove_io_waiter(self, i); + return; + } + i = i + 1; + } +} + +// A fired kqueue event for `fd`: find the waiter registered on it, evict the +// record, and wake its fiber. No-op if no waiter matches (a stale one-shot +// registration whose fiber was already woken another way — see `wake`). Only +// the FIRST match is woken: one waiter per fd in this model (a single fiber +// blocks on a given read fd at a time). +wake_io_waiter_for_fd :: (self: *Scheduler, fd: i32) { + i := 0; + while i < self.io_waiters.len { + if self.io_waiters.items[i].fd == fd { + wf := self.io_waiters.items[i].fiber; + remove_io_waiter(self, i); + self.wake(wf); // re-enqueues the parked fiber (also calls + // cancel_io_waiter_for, now a harmless no-op — + // the record is already removed) + return; + } + i = i + 1; + } +} + // The public API lives as methods on `Scheduler` (above): `init`, `spawn`, // `yield_now`, `suspend_self`, `wake`, `run`, `now_ms`, `sleep`.