// Stream B1 (fibers) B1.5a — the M:1 cooperative fiber scheduler core. // // A `Scheduler` drives any number of `Fiber`s, each running a stackful // `body: Closure() -> void` on its own guarded `mmap` stack (the §8.1.1 guard // page turns a stack overflow into an immediate fault instead of silent // neighbor corruption). Fibers cooperate: a running fiber hands control back to // the scheduler loop via `yield_now` (re-enqueued, round-robin) or // `suspend_self` (parked off-queue until an external `wake`). When a body // returns, the fiber reaches `.done`, its stack is `munmap`'d and its heap // `Fiber` freed. // // Built on the proven primitives from examples/concurrency/1807-1809: // - `swap_context` (aarch64 `abi(.naked)`, 13-slot save area: x19..x28, fp, // lr, sp) saves the callee-saved registers + SP into `*from` and loads them // from `*to`, then `ret`s onto `to`'s stack. // - the `fib_tramp` first-entry trampoline (a naked sx fn): x19 holds the // bootstrapped `*Fiber` and x20 = `&fib_dispatch`; it moves the fiber to x0 // and `br`s through x20 to the C-ABI `fib_dispatch`, which calls the body // then switches back to the scheduler. // - guarded `mmap` stacks: `[GUARD | usable]`, low GUARD page `mprotect`'d // PROT_NONE, 16-aligned top returned as the bootstrapped SP. // // aarch64-pinned (macOS + linux): the `swap_context` asm + the 13-slot save // area are per-arch. The per-OS bits are branched at comptime — `mmap`'s // MAP_ANON flag (`MAP_AP`) and the fd-readiness backend (kqueue on darwin, // epoll on linux). Runs end-to-end on a matching aarch64 host, ir-only on an // arch mismatch. #import "modules/std.sx"; kqb :: #import "modules/std/net/kqueue.sx"; // The fd-readiness backend is per-OS: kqueue (kqb, above) on darwin, epoll on // linux. The epoll import is scoped to the linux branch so darwin never pulls // epoll's types into the concurrency examples' type tables (the same // std-barrel-drift rule std.event.Loop follows); `block_on_fd` / the run loop // reference `ep` only inside their own `inline if OS == .linux` arms. inline if OS == .linux { ep :: #import "modules/std/net/epoll.sx"; } // --- libc mmap stack primitives ------------------------------------------- mmap :: (addr: *void, len: i64, prot: i32, flags: i32, fd: i32, off: i64) -> *void extern libc "mmap"; mprotect :: (addr: *void, len: i64, prot: i32) -> i32 extern libc "mprotect"; munmap :: (addr: *void, len: i64) -> i32 extern libc "munmap"; // Canonical libc `close` signature `(i32) -> i32` — must match any other // binding in the program (the extern dedupe rejects a divergent one). Used by // `Scheduler.deinit` to close the lazily-opened kqueue fd. close :: (fd: i32) -> i32 extern libc "close"; abort :: () -> noreturn extern libc "abort"; PROT_NONE :: 0; PROT_RW :: 3; // PROT_READ | PROT_WRITE // Exhaustive on the SUPPORTED OSes (linux/macOS), no default case: an // unsupported target matches no case → MAP_AP undefined → a loud compile error // on use rather than a silent wrong flag. (The fiber runtime is aarch64-only // anyway — the swap_context asm — so only these two platforms are wired.) inline if OS == { case .linux: MAP_AP :: 0x22; // linux MAP_PRIVATE (0x2) | MAP_ANON (0x20) case .macos: MAP_AP :: 0x1002; // macOS MAP_PRIVATE (0x2) | MAP_ANON (0x1000) } GUARD :: 16384; // one 16 KB page (aarch64-macOS) STACK :: 131072; // 128 KB usable per fiber // Max fd events drained per kqueue wait (B1.4c). Sized for the M:1 model's // small fiber counts; a wait that fills it just drains the rest on the next // loop iteration (the woken fibers run, the queue re-drains, the still-pending // waiters block again). MAXEV :: 16; // --- core types ------------------------------------------------------------ // Saved context: x19..x28 (10), x29/fp, x30/lr, sp — 13 u64 slots. FiberCtx :: struct { regs: [13]u64; } FiberState :: enum { ready; running; suspended; done; } Fiber :: struct { ctx: FiberCtx; body: Closure() -> void; state: FiberState; sched: *Scheduler; stack_region: *void; // mmap base — for munmap on reap stack_len: i64; // GUARD + STACK, for munmap id: i64; next: *Fiber; // intrusive FIFO ready-queue link } // A pending virtual-time timer: wake `fiber` once the virtual clock reaches // `deadline_ms`. Stored in `Scheduler.timers` (a `List`) in insertion order, so // a linear min-scan that takes the FIRST entry at the minimum deadline gives a // stable FIFO tiebreak for equal deadlines. Timer :: struct { deadline_ms: i64; fiber: *Fiber; } // B1.4c: a fiber parked on REAL fd readiness. Unlike a `Timer` (virtual // time), an `IoWaiter` blocks the whole scheduler on `kevent` until the // kernel reports `fd` readable, then wakes `fiber`. Stored in // `Scheduler.io_waiters`; the registration is one-shot (EV_ONESHOT), so the // kernel auto-removes it after firing — we only have to drop the waiter // record. `cancel_io_waiter_for` evicts a stale record (mirror of // `cancel_timer_for`) so a reaped fiber's waiter can never be woken. IoWaiter :: struct { fd: i32; fiber: *Fiber; } Scheduler :: struct { sched_ctx: FiberCtx; // the scheduler loop's own saved context current: *Fiber; // running fiber; null while in the scheduler loop ready_head: *Fiber; ready_tail: *Fiber; own_allocator: Allocator; // captured at init — fibers outlive their spawn scope next_id: i64; n_spawned: i64; n_suspended: i64; // fibers parked off-queue (suspend_self minus wake) // --- B1.4b: deterministic virtual-time timer scheduling ---------------- clock_ms: i64; // the VIRTUAL clock (ms). Starts 0; advances ONLY // when the ready queue drains and the earliest // pending timer fires. No real wall clock is ever // read — wake ORDER + timestamps are reproducible. timers: List(Timer); // pending sleep timers, in insertion order. Grown // through `own_allocator` (long-lived-container // rule: a timer outlives the `sleep` call's scope). // --- B1.4c: real fd-readiness blocking via kqueue ---------------------- kq: i32; // the kqueue fd. LAZY: -1 until the first // `block_on_fd` opens it, so a pure-compute / // virtual-timer scheduler never opens a kqueue // fd (no leak for the common case). Once opened it // lives for the scheduler's lifetime; `deinit` // closes it (and resets this back to -1). io_waiters: List(IoWaiter); // fibers parked on fd readiness, grown through // `own_allocator` (long-lived-container rule: a // waiter outlives the `block_on_fd` call's scope). // --- deinit bookkeeping: heap Tasks allocated by `go` -------------------- task_allocs: List(*void); // every heap `*Task` from `go`, recorded so // `deinit` can free them. The scheduler does not // otherwise know its Tasks (they are generic // `Task($R)` handed back to the caller); without // this list they would leak. Grown through // `own_allocator` (a Task outlives the `go` call). // Construct a scheduler BY VALUE (allocator value-return convention). // Captures the current `context.allocator` into `own_allocator` — fibers and // their heap `Fiber` structs outlive their spawn scope, so all internal // allocation must go through this captured (long-lived) allocator, not // whatever transient one happens to be current at a later call. init :: () -> Scheduler { // Literal init (by value). `sched_ctx` is intentionally unnamed — the // partial literal zero-fills it, and it is written by the first // `swap_context` before ever being read. `kq = -1` is the lazy sentinel // (opened by the first `block_on_fd`). return Scheduler.{ current = null, ready_head = null, ready_tail = null, own_allocator = context.allocator, next_id = 0, n_spawned = 0, n_suspended = 0, clock_ms = 0, timers = .{}, kq = -1, io_waiters = .{}, task_allocs = .{} }; } // Spawn a fiber running `body`. Heap-allocates the `Fiber` and a guarded // stack, bootstraps the saved context (x19 = *Fiber, fp = 0, lr = // trampoline, sp = stack top), enqueues it ready (FIFO), returns the // `*Fiber`. // KNOWN LIMITATION (env leak): `body` is a fat `{fn_ptr, env}` whose env is // heap-allocated at the closure-literal site. The reap path frees the Fiber // struct + unmaps the stack, but sx exposes no way to free a closure's env // (the scheduler can't name the env pointer), so ONE env per spawned fiber // leaks until program exit. Bounded by the spawn count; under the default // GPA (which frees at exit) it is invisible, but a long-running scheduler // under an arena/tracking allocator accumulates one env per fiber. Freeing // it needs a language affordance for closure-env ownership — deferred. spawn :: (self: *Scheduler, body: Closure() -> void) -> *Fiber { raw := self.own_allocator.alloc_bytes(size_of(Fiber)); if raw == null { print("sched: out of memory allocating a Fiber\n"); abort(); } f : *Fiber = xx raw; f.body = body; f.sched = self; f.id = self.next_id; f.next = null; self.next_id = self.next_id + 1; self.n_spawned = self.n_spawned + 1; top := boot_stack(f, STACK); f.ctx.regs[0] = xx f; // x19 = self (→ x0 in the tramp) f.ctx.regs[1] = xx fib_dispatch; // x20 = dispatch entry (tramp `br`s to it) f.ctx.regs[10] = 0; // fp f.ctx.regs[11] = xx fib_tramp; // lr → trampoline f.ctx.regs[12] = top; // sp f.state = .ready; enqueue(self, f); return f; } // The running fiber yields cooperatively: mark ready, switch back to the // scheduler. The run loop re-enqueues it (round-robin). MUST be called from // inside a fiber (there must be a running fiber to yield). yield_now :: (self: *Scheduler) { cur := self.current; if cur == null { print("sched: yield_now() called outside a fiber (no running fiber)\n"); abort(); } cur.state = .ready; swap_context(@cur.ctx, @self.sched_ctx); } // The running fiber parks itself: mark suspended, switch back to the // scheduler. The run loop does NOT re-enqueue a suspended fiber — an // external `wake` must re-add it. (Used by FiberIo to park on a blocking // op until completion.) MUST be called from inside a fiber — a null // `current` (called from the bare scheduler/main context) would deref null; // bail loudly instead of segfaulting. suspend_self :: (self: *Scheduler) { cur := self.current; if cur == null { print("sched: suspend_self() called outside a fiber (no running fiber)\n"); abort(); } cur.state = .suspended; self.n_suspended = self.n_suspended + 1; swap_context(@cur.ctx, @self.sched_ctx); } // Re-ready a parked (suspended) fiber and enqueue it. Called from outside // the fiber (e.g. an I/O completion or another fiber) to wake it. // // GUARDED on `.suspended`: enqueue links `f` into the FIFO, so waking a // fiber that is ALREADY queued (`.ready`) or running (`.running`) would // re-link a node already in the list — nulling its `next` mid-list and // cycling `ready_tail` back onto it, corrupting the queue (a spurious / // double wake, or waking a yielded-not-parked fiber, would segfault). Only // a genuinely parked fiber may be re-enqueued; any other wake is a no-op. wake :: (self: *Scheduler, f: *Fiber) { if f.state != .suspended { return; } // Evict any pending sleep timer for `f`. EVERY path that re-readies a // suspended fiber funnels through `wake` (a manual/Task wake, or the // timer-fire in `run` — which already removed the fired timer, so this // is a harmless re-scan there). Without this, a fiber that armed a // `sleep` timer but was woken EARLY by another path would run to // completion and be reaped (stack munmap'd + Fiber freed) while its // Timer still held a dangling `*Fiber` — a later fire would dereference // freed memory (use-after-free). One timer per fiber max in the M:1 // model, so a single eviction suffices; it also prevents a stale timer // from spuriously re-waking a since-re-slept fiber. cancel_timer_for(self, f); // Same UAF reasoning for fd waiters: every path that re-readies a // suspended fiber funnels through `wake`. If a fiber armed `block_on_fd` // but was woken by another path (a manual wake, a Task completion), its // `IoWaiter` would otherwise survive pointing at a fiber that runs to // completion and is reaped (stack munmap'd + Fiber freed). A later // readiness drain matching that stale record would `wake` freed memory. // Evict it here. The kernel-side registration is handled per-OS inside // `cancel_io_waiter_for`: on darwin the EV_ONESHOT kqueue registration is // left to linger (a never-fired one-shot the drain ignores; the fd's // owner closes it, auto-removing it), but on linux the EPOLLONESHOT // registration stays enabled and must be `EPOLL_CTL_DEL`'d (else it could // fire later with no waiter and would block a re-arm of the same fd). cancel_io_waiter_for(self, f); self.n_suspended = self.n_suspended - 1; f.state = .ready; enqueue(self, f); } // Read the VIRTUAL clock — the simulated millisecond time. Advances only as // timers fire (in `run`), never from a real wall clock, so two runs of the // same fiber program observe identical timestamps. A fiber that just woke // from `sleep(ms)` sees `now_ms()` equal to its deadline. now_ms :: (self: *Scheduler) -> i64 { return self.clock_ms; } // Sleep the running fiber for `ms` simulated milliseconds: arm a timer at // `clock_ms + ms`, then park off-queue. The scheduler advances the virtual // clock to this deadline and wakes the fiber once the ready queue has fully // drained AND no earlier timer is pending (deadline order, FIFO tiebreak). // MUST be called from inside a fiber (there must be a `current` to park); // a null `current` bails loudly, mirroring `suspend_self`. // // Virtual time only moves forward: `ms >= 0` makes the deadline // `>= clock_ms`, so a fired timer never rewinds the clock. sleep :: (self: *Scheduler, ms: i64) { cur := self.current; if cur == null { print("sched: sleep() called outside a fiber (no running fiber)\n"); abort(); } // The virtual clock is MONOTONIC — it only advances as timers fire. A // negative duration would arm a deadline in the past, rewinding the // clock when it fired and breaking every ordering contract. Reject it // loudly rather than silently corrupting time. (`sleep(0)` is allowed: a // same-tick yield to the timer wheel.) if ms < 0 { print("sched: sleep({}) — negative duration would rewind the virtual clock\n", ms); abort(); } t : Timer = .{ deadline_ms = self.clock_ms + ms, fiber = cur }; // Long-lived-container rule: a timer outlives this `sleep` call's scope // (it survives in `self.timers` until the scheduler fires it), so grow // through the captured `own_allocator`, never the transient current one. self.timers.append(t, self.own_allocator); self.suspend_self(); // parks `cur` off-queue; the timer fire re-wakes it } // --- B1.4c: block the running fiber until `fd` is readable -------------- // // Register `fd` for EVFILT_READ with the scheduler's kqueue (lazily // opening it on first use), record an `IoWaiter`, then park the fiber // off-queue. The run loop blocks on `kevent` once nothing else is runnable // and wakes this fiber when the kernel reports `fd` ready (EV_ONESHOT — the // kernel auto-removes the registration after it fires, so the run loop only // has to drop the waiter record + `wake` the fiber). // // `want_read` is the readiness direction; only read-readiness is wired for // now (a write-readiness EVFILT_WRITE path would mirror this exactly). A // false `want_read` would be a write-wait — not yet implemented, so bail // loudly rather than silently arming a read filter (silent-wrong-arm rule). // // MUST be called from inside a fiber (there must be a `current` to park); a // null `current` bails loudly, mirroring `suspend_self` / `sleep`. block_on_fd :: (self: *Scheduler, fd: i32, want_read: bool) { cur := self.current; if cur == null { print("sched: block_on_fd() called outside a fiber (no running fiber)\n"); abort(); } if !want_read { print("sched: block_on_fd(want_read=false) — write-readiness not implemented\n"); abort(); } // ONE waiter per fd (enforced). macOS `EV_ADD` for an existing // (ident, filter) REPLACES the registration rather than stacking, so a // second fiber blocking on the same fd would leave only one live // registration: when the fd fires, the kernel delivers a single event, // one waiter wakes, and the other is stranded in `io_waiters` with no // registration — the next `kq_wait` then blocks forever. The M:1 model // (and `wake_io_waiter_for_fd`, which wakes the first match) assumes a // single waiter per fd; enforce it loudly instead of silently hanging. j := 0; while j < self.io_waiters.len { if self.io_waiters.items[j].fd == fd { print("sched: block_on_fd: fd {} already has a waiter (one waiter per fd in the M:1 model)\n", fd); abort(); } j = j + 1; } // Lazily open the event-queue fd the first time fd-blocking is used: // kqueue on darwin, epoll on linux. `self.kq` holds whichever — it is // just "the readiness queue fd". if self.kq < 0 { inline if OS == { case .linux: self.kq = ep.ep_create(); case .macos: self.kq = kqb.kqueue(); } if self.kq < 0 { print("sched: failed to open the event queue\n"); abort(); } } // Arm a one-shot read-readiness registration for `fd`, matched back by // the run-loop drain (kqueue by ident; epoll stashes the fd in `data`). // darwin EV_ONESHOT auto-removes the registration on fire; epoll's // EPOLLONESHOT only DISABLES it, so the linux paths additionally // EPOLL_CTL_DEL on fire (run) and on early-wake (cancel_io_waiter_for). inline if OS == { case .linux: { if !ep.ep_ctl(self.kq, ep.EPOLL_CTL_ADD, fd, ep.EPOLLIN | ep.EPOLLONESHOT) { print("sched: epoll_ctl() failed to register fd {} for read readiness\n", fd); abort(); } } case .macos: { chg := kqb.kev_change(fd, kqb.EVFILT_READ, kqb.EV_ADD | kqb.EV_ENABLE | kqb.EV_ONESHOT, 0); if !kqb.kq_apply(self.kq, chg) { print("sched: kevent() failed to register fd {} for read readiness\n", fd); abort(); } } } // Record the waiter BEFORE parking — the run loop matches the fired // event's ident back to this record. Long-lived-container rule: the // waiter outlives this call's scope (it survives in `self.io_waiters` // until the kqueue drain wakes it), so grow through `own_allocator`. w : IoWaiter = .{ fd = fd, fiber = cur }; self.io_waiters.append(w, self.own_allocator); self.suspend_self(); // parks `cur` off-queue; the kqueue drain re-wakes it } // The scheduler loop. Drives ready fibers to quiescence, then advances the // virtual clock by firing the earliest pending timer (which re-readies its // sleeper), and repeats — until both the ready queue and the timer set are // empty. Within the inner drain each iteration: dequeue the next fiber, // switch into it, and — on its switch back — reap it if done (munmap stack, // free the Fiber), re-enqueue it if it yielded, or leave it parked if it // suspended. run :: (self: *Scheduler) { while true { while self.ready_head != null { f := dequeue(self); self.current = f; f.state = .running; swap_context(@self.sched_ctx, @f.ctx); // returns here when f yields / suspends / finishes self.current = null; if f.state == .done { // We've switched OFF f's stack already (the final swap landed // here), so the stack is free to unmap. Free the Fiber struct // AFTER munmap. munmap(f.stack_region, f.stack_len); self.own_allocator.dealloc_bytes(xx f); } else if f.state == .ready { enqueue(self, f); } // .suspended: leave it parked (not in any queue; `wake` re-adds it). } // Ready queue drained. Decide what advances the world next. // // Mode 1 — VIRTUAL TIME: fire the earliest pending timer (advancing // the virtual clock to it), re-readying its sleeper. Timers take // precedence over fd-blocking: a program uses `sleep` OR fds, not // both at once. (Documented limitation: virtual-time timers and real // kqueue timeouts are NOT unified — if both a timer and an io-waiter // are pending we always fire the timer first and never block on // kqueue while a timer is outstanding. A program that genuinely // needs "fd-or-real-timeout" wants a kqueue EVFILT_TIMER, future // work.) idx := earliest_timer(self); if idx >= 0 { t := self.timers.items[idx]; remove_timer(self, idx); self.clock_ms = t.deadline_ms; // advance VIRTUAL time forward self.wake(t.fiber); // re-enqueue the sleeper → drain again continue; } // Mode 2 — REAL fd readiness: nothing is runnable and no timer is // pending, but fibers are parked on fds. BLOCK on kqueue until the // kernel reports at least one fd ready, then wake every waiter whose // fd fired. (null timeout via -1 → wait forever.) if self.io_waiters.len > 0 { // BLOCK on the readiness queue until ≥1 fd fires (timeout -1 = // forever), then for each fired event match the fd back to its // io-waiter, evict the record, and wake the fiber. inline if OS == { case .linux: { evbuf : [MAXEV]ep.EpollEvent = ---; n := ep.ep_wait(self.kq, .{ ptr = @evbuf[0], len = MAXEV }, MAXEV, -1); if n < 0 { print("sched: epoll_wait() failed while blocking on fd readiness\n"); abort(); } i := 0; while i < n { ready_fd := ep.ev_fd(evbuf[i]); wake_io_waiter_for_fd(self, ready_fd); // EPOLLONESHOT only DISABLED the registration; remove it // fully so the fd can be re-armed by a future block_on_fd // (kqueue's EV_ONESHOT removes it for free). ep.ep_ctl(self.kq, ep.EPOLL_CTL_DEL, ready_fd, 0); i = i + 1; } } case .macos: { evbuf : [MAXEV]kqb.Kevent = ---; n := kqb.kq_wait(self.kq, @evbuf[0], MAXEV, -1); if n < 0 { print("sched: kevent() wait failed while blocking on fd readiness\n"); abort(); } i := 0; while i < n { ready_fd : i32 = xx evbuf[i].ident; wake_io_waiter_for_fd(self, ready_fd); i = i + 1; } } } continue; } // Nothing runnable, no timer, no fd waiter → done. break; } // The ready queue, the timer set, AND the io-waiter set are all empty. If // a fiber is STILL parked, nothing will ever wake it (a `suspend_self` // without an armed timer or fd registration, never externally woken) — // its stack + struct are leaked and the program believes it finished. // That is a genuine deadlock; surface it loudly. (Timer sleepers and fd // waiters are balanced: each arming path increments `n_suspended` via // `suspend_self`, and its wake decrements it — so once every timer has // fired and every io-waiter has been woken, `n_suspended` counts only // these true orphans.) // // SCOPE — fd waiters are NOT covered by this check, BY DESIGN, not as an // oversight. While `io_waiters.len > 0` the loop above blocks in // `kq_wait(-1)` and never reaches here. A fiber blocked on an fd that the // OS never reports ready blocks FOREVER — which is the correct semantics // of an event loop (a server idling on a socket is indistinguishable from // one whose peer never sends; the scheduler cannot know an fd will never // become ready, so it must keep waiting). That is a caller-side logic // issue (blocking on input that never arrives), not a scheduler deadlock // to abort on. This check covers only pure `suspend_self` parks with no // pending wake source at all. if self.n_suspended != 0 { print("sched: deadlock — {} fiber(s) suspended with an empty run queue\n", self.n_suspended); abort(); } } // Release the scheduler's owned resources. TERMINAL: the scheduler is dead // after this — no scheduler-owned handle (the `*Task`s returned by `go`, a // `*Fiber` from `spawn`, the scheduler itself) may be used afterward; doing // so is a use-after-free, the universal deinit contract. Idempotent: a // second `deinit` is a no-op (it rests on `List.deinit` nulling `items` + // zeroing `len`, and on `kq`/`ready_head` being reset below). // // Call AFTER `run()` has returned: a clean `run()` leaves the ready queue // empty and aborts loudly on any orphaned suspend, so nothing is mid-flight // and every `task_allocs` entry is a COMPLETED task (safe to free). Frees, // in order: // 1. any fibers still enqueued ready — a leak-SAFETY NET for the misuse // path (`spawn`/`go` without a following `run()`, or after it returned), // NOT a blessed reuse pattern: reaping a `go`'s fiber here while step (2) // frees its paired `*Task` is self-consistent ONLY because the contract // already forbade touching those handles post-`deinit`. A suspended // (off-queue) fiber is unreachable from here, but a clean `run()` never // leaves one (it aborts on an orphaned suspend); // 2. every heap `*Task` from `go` (recorded in `task_allocs`); // 3. the three `List` backings (`task_allocs`, `timers`, `io_waiters`), // each grown through `own_allocator`; // 4. the kqueue fd, if `block_on_fd` ever opened it (lazy `-1` otherwise). // // NOT freed (documented language limitation, unchanged): one closure env per // `spawn`/`go`. The env is heap-allocated at the closure-literal site and sx // exposes no way to free it (the scheduler cannot name the env pointer), so // it leaks until program exit — bounded by the spawn/go count, invisible // under the default GPA. Freeing it needs a closure-env-ownership affordance. deinit :: (self: *Scheduler) { // (1) Reap leftover ready fibers: unmap the stack, free the Fiber. f := self.ready_head; while f != null { nxt := f.next; munmap(f.stack_region, f.stack_len); self.own_allocator.dealloc_bytes(xx f); f = nxt; } self.ready_head = null; self.ready_tail = null; // (2) Free every heap Task allocated by `go`. for self.task_allocs.items[0..self.task_allocs.len] (t) { self.own_allocator.dealloc_bytes(t); } // (3) Free the List backings (all grown through `own_allocator`). self.task_allocs.deinit(self.own_allocator); self.timers.deinit(self.own_allocator); self.io_waiters.deinit(self.own_allocator); // (4) Close the kqueue fd if it was ever opened (lazy: -1 if never used). if self.kq >= 0 { close(self.kq); self.kq = -1; } } } // --- the context switch (naked) + first-entry trampoline ------------------- // x0 = from, x1 = to (read straight from the ABI registers — a naked fn has no // frame, so its params are never spilled). SP-in ≠ SP-out by design. swap_context :: (from: *FiberCtx, to: *FiberCtx) abi(.naked) { asm volatile { #string ASM stp x19, x20, [x0, #0] stp x21, x22, [x0, #16] stp x23, x24, [x0, #32] stp x25, x26, [x0, #48] stp x27, x28, [x0, #64] stp x29, x30, [x0, #80] mov x9, sp str x9, [x0, #96] ldp x19, x20, [x1, #0] ldp x21, x22, [x1, #16] ldp x23, x24, [x1, #32] ldp x25, x26, [x1, #48] ldp x27, x28, [x1, #64] ldp x29, x30, [x1, #80] ldr x9, [x1, #96] mov sp, x9 ret ASM }; } // First-entry trampoline: a fiber's bootstrapped LR points here, with x19 = // `*Fiber` and x20 = `&fib_dispatch` (both preset in the saved context by // `spawn`, both callee-saved so `swap_context` restores them on first entry). // Move the fiber to x0 and tail-branch to dispatch via the REGISTER (x20) — so // there is no hand-written global-asm symbol and nothing here needs per-OS // symbol naming (`_fib_tramp` on darwin vs `fib_tramp` on linux) or a `bl` to a // named export. As a naked sx fn `fib_tramp`'s own symbol is emitted with the // platform-correct name automatically, so `spawn`'s `xx fib_tramp` resolves on // every target. This register-indirect bootstrap replaced an OS-conditional // global `asm` block (a top-level `asm` wrapped in an `inline if` is dropped in // this module's context — see issues/0193) and sidesteps the hand-written // symbol entirely, which is cleaner regardless. fib_tramp :: () abi(.naked) { asm volatile { #string T mov x0, x19 br x20 T }; } // The ONE place that runs a fiber body. Reached only from `fib_tramp` on first // entry, on the fiber's own fresh stack. Runs the body, marks the fiber done, // and switches back to the scheduler — never returns past the final switch. // // `export "fib_dispatch"` is MANDATORY, not decorative: it pins this fn to the // **C ABI** (first real arg `self` in x0). The trampoline hands the fiber over // in x0 (`mov x0, x19; br x20`), which is exactly C-ABI. Drop the export and the // fn reverts to sx's INTERNAL calling convention, which reserves x0 for the // implicit `context` pointer and shifts `self` to x1 — so the trampoline's x0 // would land in the context slot and `self` would be read from a garbage x1. On // first entry that garbage happens to alias `&fiber.ctx == self` (left in x1 by // the scheduler's prior `swap_context`), so the body runs once; but inside it // the closure loads `[Fiber+8] == regs[1] == &fib_dispatch` as its "first // capture" and re-invokes `fib_dispatch` forever → stack overflow → bus error // (issue 0193 Bug A, observed only on the go/wait/sleep capstone 1817). // // One consequence of the C-ABI boundary: an exported fn has no implicit // `context` param, so `self.body()` runs under the static `__sx_default_context` // — NOT whatever `push Context { allocator = ... }` was in force at the // `run()` call site. Fiber bodies do not inherit a caller-scoped allocator; a // body that needs one must capture it explicitly (the long-lived-container rule). fib_dispatch :: (self: *Fiber) export "fib_dispatch" { self.body(); self.state = .done; swap_context(@self.ctx, @self.sched.sched_ctx); } // --- guarded stack bootstrap ---------------------------------------------- // mmap a [guard | usable-stack] region, mprotect the low guard page PROT_NONE. // Stores the region base + len on the fiber (for munmap on reap) and returns // the 16-aligned stack top (the bootstrapped SP). boot_stack :: (f: *Fiber, size: i64) -> u64 { total := GUARD + size; region : *void = mmap(null, total, PROT_RW, MAP_AP, -1, 0); // mmap signals failure with MAP_FAILED = (void*)-1 (NOT null). Handing a // wild SP to the switch would `ret` onto garbage — bail loudly instead. if (xx region) == (xx (0 - 1)) { print("sched: mmap failed for a {}-byte fiber stack\n", total); abort(); } f.stack_region = region; f.stack_len = total; // Guard-arm: turn the low page unwritable so overflow faults at the // boundary. The guard is mandatory (§8.1.1); a stack handed out without it // would silently corrupt a neighbor on overflow, so a failed mprotect is // fatal, not ignorable. if mprotect(region, GUARD, PROT_NONE) != 0 { print("sched: mprotect(PROT_NONE) failed to arm the stack guard page\n"); abort(); } usable : u64 = (xx region) + GUARD; top : u64 = usable + size; return top - (top % 16); // 16-byte aligned stack top (AAPCS) } // --- intrusive FIFO ready-queue ------------------------------------------- enqueue :: (self: *Scheduler, f: *Fiber) { f.next = null; if self.ready_tail == null { self.ready_head = f; self.ready_tail = f; } else { self.ready_tail.next = f; self.ready_tail = f; } } dequeue :: (self: *Scheduler) -> *Fiber { f := self.ready_head; if f == null { return null; } self.ready_head = f.next; if self.ready_head == null { self.ready_tail = null; } f.next = null; return f; } // --- virtual-time timer set (linear min-scan, FIFO tiebreak) --------------- // // The timer set is a plain `List(Timer)` kept in INSERTION order. Fiber counts // are tiny, so a linear scan for the minimum deadline is ideal — no heap to // maintain — and "first entry at the minimum" naturally gives FIFO ordering for // equal deadlines (the earlier-inserted timer is visited first, so it wins the // tie). Removal shifts the tail down by one to preserve that insertion order for // the remaining entries. // Index of the earliest-deadline pending timer, or -1 if none. On a deadline // tie the lowest index (earliest inserted) wins → deterministic FIFO wake order. earliest_timer :: (self: *Scheduler) -> i64 { if self.timers.len == 0 { return -1; } best := 0; i := 1; while i < self.timers.len { // Strict `<` so equal deadlines do NOT displace the earlier (lower) // index — that is the FIFO tiebreak. if self.timers.items[i].deadline_ms < self.timers.items[best].deadline_ms { best = i; } i = i + 1; } return best; } // Remove the timer at `idx`, shifting every later entry down one slot so the // remaining timers keep their insertion order (preserving the FIFO tiebreak). remove_timer :: (self: *Scheduler, idx: i64) { i := idx; while i < self.timers.len - 1 { self.timers.items[i] = self.timers.items[i + 1]; i = i + 1; } self.timers.len = self.timers.len - 1; } // Remove a pending sleep timer referencing fiber `f`, if any. A fiber has at // most one pending timer in the M:1 model (it can only `sleep` once before // suspending), so the first match is the only one. No-op if `f` has none. cancel_timer_for :: (self: *Scheduler, f: *Fiber) { i := 0; while i < self.timers.len { if self.timers.items[i].fiber == f { remove_timer(self, i); return; } i = i + 1; } } // --- B1.4c: fd-waiter set (linear scan, fd-keyed) -------------------------- // // Like the timer set, a plain `List(IoWaiter)` scanned linearly — fiber counts // are tiny. Removal shifts the tail down one slot. // Remove the io-waiter at `idx`, shifting later entries down one slot. remove_io_waiter :: (self: *Scheduler, idx: i64) { i := idx; while i < self.io_waiters.len - 1 { self.io_waiters.items[i] = self.io_waiters.items[i + 1]; i = i + 1; } self.io_waiters.len = self.io_waiters.len - 1; } // Remove a pending fd-waiter referencing fiber `f`, if any. A fiber has at most // one pending io-waiter in the M:1 model (it can only `block_on_fd` once before // suspending), so the first match is the only one. No-op if `f` has none. Used // by `wake` to evict a waiter when the fiber is re-readied by another path. cancel_io_waiter_for :: (self: *Scheduler, f: *Fiber) { i := 0; while i < self.io_waiters.len { if self.io_waiters.items[i].fiber == f { // Early-wake: the fiber is re-readied by another path while its fd // registration is still armed. kqueue's EV_ONESHOT lingers // harmlessly (a never-fired one-shot the drain ignores); epoll's // EPOLLONESHOT registration stays enabled — it could fire later with // no waiter, and blocks a re-arm of the same fd — so remove it. inline if OS == { case .linux: { fd := self.io_waiters.items[i].fd; remove_io_waiter(self, i); if self.kq >= 0 { ep.ep_ctl(self.kq, ep.EPOLL_CTL_DEL, fd, 0); } } case .macos: remove_io_waiter(self, i); } return; } i = i + 1; } } // A fired kqueue event for `fd`: find the waiter registered on it, evict the // record, and wake its fiber. No-op if no waiter matches (a stale one-shot // registration whose fiber was already woken another way — see `wake`). Only // the FIRST match is woken: one waiter per fd in this model (a single fiber // blocks on a given read fd at a time). wake_io_waiter_for_fd :: (self: *Scheduler, fd: i32) { i := 0; while i < self.io_waiters.len { if self.io_waiters.items[i].fd == fd { wf := self.io_waiters.items[i].fiber; remove_io_waiter(self, i); self.wake(wf); // re-enqueues the parked fiber (also calls // cancel_io_waiter_for, now a harmless no-op — // the record is already removed) return; } i = i + 1; } } // The public API lives as methods on `Scheduler` (above): `init`, `spawn`, // `yield_now`, `suspend_self`, `wake`, `run`, `now_ms`, `sleep`. // --- B1.4a: truly-suspending fiber-task async (`go` / `wait` / `cancel`) ---- // // An async-task layer on top of the M:1 scheduler: `s.go(work)` runs `work` as // a REAL fiber, and `t.wait()` SUSPENDS the caller fiber until the task's fiber // completes — genuine interleaving, in contrast with io.sx's `context.io.async` // (which runs the worker inline to completion before returning). Distinct from // io.sx's `Future` by design: `Task` is defined here so the two modules stay // decoupled (no cross-import; sched.sx must keep importing only `std.sx`, since // a different import path re-emits the module's global `_fib_tramp` asm and // duplicates the symbol). // // THE NULLARY-THUNK RATIONALE. `work` is a NULLARY thunk `Closure() -> $R`, not // a worker-plus-`..args` pair like io.sx's `async`. A variadic pack is // comptime-only and segfaults if captured into a deferred closure that crosses // the fiber boundary (issue 0156 Part 2). So instead of forwarding inputs as a // pack, the user captures any inputs in the lambda AT THE CALL SITE (where // they're live): `s.go(() -> i64 => compute(a, b))`. Nothing variadic ever // crosses into the fiber — the thunk is a plain `{fn_ptr, env}` fat closure. // // KNOWN LIMITATION (heap-Task leak): `go` heap-allocates the `Task` (it outlives // the call — the fiber fills `value`/`state` later, after `go` has returned), but // B1.4a never frees it. Like the closure-env leak documented on `spawn` above, // this is bounded by the `go` count and invisible under the default GPA (frees // at exit); a long-running scheduler under an arena/tracking allocator // accumulates one `Task` per `go`. Freeing it safely needs join-point ownership // tracking — deferred. // // WAKE-AFTER-COMPLETE ORDERING (both orderings are correct): // - worker finishes BEFORE `wait`: the worker set `t.state = .ready` and saw // `t.waiter == null`, so it issued no wake. `wait` sees `.ready` (not // `.pending`), does NOT park, and returns `t.value` — no lost wakeup. // - `wait` runs BEFORE the worker finishes: `wait` registers itself as // `t.waiter` and parks via `suspend_self`. When the worker finishes it sees // a non-null `t.waiter` and `wake`s it; `wait` resumes and returns the value. TaskState :: enum { pending; ready; canceled; } // The `!` channel for `wait`. Defined LOCALLY (not reusing io.sx's `IoErr`): // `IoErr` is reachable here only as a re-export alias through std.sx, and the // failable-type detection behind `raise` does not see through that alias to the // underlying `error` set — so `raise error.Canceled` against `(.., !IoErr)` // here is rejected as "not a failable function". A local `error` decl is // recognized directly. (Same `.Canceled` contract as io.sx model (a).) TaskErr :: error { Canceled } Task :: struct ($R: Type) { value: R; state: TaskState = .pending; waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one sched: *Scheduler; // owning scheduler (for park/wake in `wait`) canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics) } // Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber // finishes. Mirrors `spawn`'s alloc + null-check + abort. go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) { raw := self.own_allocator.alloc_bytes(size_of(Task($R))); if raw == null { print("sched: out of memory allocating a Task\n"); abort(); } t : *Task($R) = xx raw; t.state = .pending; t.waiter = null; t.sched = self; t.canceled = 0; // Record the heap Task so `deinit` can free it (the scheduler otherwise has // no handle on its generic Tasks). Long-lived: a Task outlives this call. self.task_allocs.append(xx t, self.own_allocator); self.spawn(() => { // Cooperative cancel: skip the work entirely if cancel already landed // before this fiber was scheduled (saves the compute + side effects). A // cancel that lands DURING `work()` still lets it finish (no preemption // in the M:1 model) — cancel suppresses DELIVERY, never an in-flight run. if t.canceled == 0 { t.value = work(); t.state = .ready; } // Wake the awaiter only if one already parked (else `wait` will not park). if t.waiter != null { self.wake(xx t.waiter); } }); return t; } // Suspend the caller until the task completes; return its value (or raise on // cancel). MUST be called from inside a fiber (so there is a `self.current` to // park) — typically from a fiber spawned via `s.spawn(...)`. wait :: ufcs (t: *Task($R)) -> $R !TaskErr { if t.canceled != 0 { raise error.Canceled; } if t.state == .pending { // ONE waiter per task (enforced). A `Task` holds a single `waiter` slot; // a second concurrent `wait` on the same pending task would OVERWRITE the // first, and completion would wake only the second — the first fiber // would stay suspended forever (silent deadlock). The M:1 model is // single-await per task; enforce it loudly (mirrors `block_on_fd`'s // one-waiter-per-fd guard). A multi-waiter task would need a waiter list. if t.waiter != null { print("sched: wait() — task already has a waiter (one awaiter per task in the M:1 model)\n"); abort(); } t.waiter = xx t.sched.current; // register self as the waiter t.sched.suspend_self(); // park until the task's fiber wakes us } if t.canceled != 0 or t.state == .canceled { raise error.Canceled; } return t.value; } // Request cancellation — rides the `!` channel (model (a), like io.sx 1806). M:1 // cooperative: the worker fiber may already have run; cancel still makes a // subsequent (or in-flight) `wait` raise `.Canceled`. cancel :: ufcs (t: *Task($R)) { t.canceled = 1; t.state = .canceled; }