diff --git a/examples/concurrency/1811-concurrency-fiber-scheduler.sx b/examples/concurrency/1811-concurrency-fiber-scheduler.sx new file mode 100644 index 00000000..f2a61fed --- /dev/null +++ b/examples/concurrency/1811-concurrency-fiber-scheduler.sx @@ -0,0 +1,81 @@ +// Stream B1 (fibers) B1.5a — the M:1 cooperative fiber scheduler core, in pure +// sx over `swap_context` (proven in 1807-1809). `Scheduler` drives N fibers, +// each running a `body: Closure() -> void` on its own guarded `mmap` stack; +// fibers cooperate by calling `yield_now`, which round-robins control back +// through the scheduler loop. +// +// Round-robin demo: 3 fibers (A=0, B=1, C=2) each append their id to a shared +// sequence buffer, yielding between each of 3 rounds. Because the scheduler +// re-enqueues a yielding fiber at the TAIL (FIFO), the interleaving is the +// deterministic round-robin order: +// +// round 1: A B C (0 1 2) +// round 2: A B C (0 1 2) +// round 3: A B C (0 1 2) +// +// → sequence: 0 1 2 0 1 2 0 1 2 +// +// Outputs flow OUT of each fiber through pointers captured in its closure (the +// shared `Shared` struct), since closure capture-by-value does not write back. +// Every fiber must reach `.done` (asserted via a per-fiber done flag). +// +// aarch64-macOS-pinned (the scheduler's asm + guard-page mmap constants are +// per-arch / Apple-specific): runs end-to-end on a matching host, ir-only on a +// mismatch. +#import "modules/std.sx"; +sched :: #import "modules/std/sched.sx"; + +Shared :: struct { + seq: [16]i64; // appended interleaving sequence + n: i64; // count appended + done: [3]i64; // per-fiber done flag (set right before the body returns) +} + +append :: (sh: *Shared, v: i64) { + sh.seq[sh.n] = v; + sh.n = sh.n + 1; +} + +main :: () -> i64 { + sh : Shared = ---; + sh.n = 0; + sh.done[0] = 0; sh.done[1] = 0; sh.done[2] = 0; + + s := sched.Scheduler.init(); + ps := @s; + psh := @sh; + + // Three DIFFERENT fiber bodies (distinct captured ids), interleaving via + // yield_now. Each appends its id once per round for 3 rounds. + spawn_worker :: (ps: *sched.Scheduler, psh: *Shared, my_id: i64) { + ps.spawn(() => { + r := 0; + while r < 3 { + append(psh, my_id); + if r < 2 { ps.yield_now(); } // cooperate between rounds + r = r + 1; + } + psh.done[my_id] = 1; + }); + } + + spawn_worker(ps, psh, 0); + spawn_worker(ps, psh, 1); + spawn_worker(ps, psh, 2); + + s.run(); + + // Ordering contract: round-robin FIFO interleaving. + print("sequence:"); + i := 0; + while i < sh.n { + print(" {}", sh.seq[i]); + i = i + 1; + } + print("\n"); + + print("spawned: {}\n", s.n_spawned); + print("done: {} {} {}\n", sh.done[0], sh.done[1], sh.done[2]); + print("all done: {}\n", sh.done[0] + sh.done[1] + sh.done[2]); + return 0; +} diff --git a/examples/concurrency/1812-concurrency-fiber-suspend-wake.sx b/examples/concurrency/1812-concurrency-fiber-suspend-wake.sx new file mode 100644 index 00000000..a521d56e --- /dev/null +++ b/examples/concurrency/1812-concurrency-fiber-suspend-wake.sx @@ -0,0 +1,64 @@ +// Stream B1 (fibers) B1.5a — fiber park/resume via `suspend_self` + `wake`, +// the off-queue half of the M:1 scheduler that FiberIo [B1.4] builds on. +// +// A running fiber that has nothing to do parks itself with `suspend_self`: it +// leaves the round-robin queue entirely (unlike `yield_now`, which re-enqueues) +// and only runs again when another fiber (or an I/O completion) calls `wake` on +// it. Here fiber A records 10, parks, and is resumed by fiber B to record 11: +// +// A: rec 10, suspend_self ──park──┐ +// B: rec 20, wake(A), wake(A), rec 21 +// A: ──resume──> rec 11 +// → log: 10 20 21 11 +// +// `wake` is GUARDED on `.suspended`: B's SECOND `wake(A)` is spurious (A is +// already re-queued by then). An unguarded enqueue would re-link an +// already-listed node and corrupt the FIFO (segfault); the guard makes a +// double/spurious/stale wake a safe no-op. `suspended-left: 0` confirms every +// park was balanced by a wake (an orphaned park would abort the scheduler). +// +// aarch64-macOS-pinned (the scheduler's per-arch asm + Apple mmap constants): +// runs end-to-end on a matching host, ir-only on a mismatch. +#import "modules/std.sx"; +sched :: #import "modules/std/sched.sx"; + +// The shared state both fibers reach through (passed as `*Sh`). `parked` holds +// the fiber-A handle that B wakes — kept here (rather than a separate +// `**Fiber`) so the one `*Sh` carries everything the helper fns share. +Sh :: struct { log: [16]i64; n: i64; parked: *sched.Fiber; } +rec :: (sh: *Sh, v: i64) { sh.log[sh.n] = v; sh.n = sh.n + 1; } + +main :: () -> i64 { + sh : Sh = ---; sh.n = 0; sh.parked = null; + s := sched.Scheduler.init(); + ps := @s; psh := @sh; + + // Fiber A: record 10, park, then (after wake) record 11. Store A's handle in + // the shared state so B can wake it. + mk_a :: (ps: *sched.Scheduler, psh: *Sh) { + psh.parked = ps.spawn(() => { + rec(psh, 10); + ps.suspend_self(); + rec(psh, 11); + }); + } + // Fiber B: record 20, wake A (legit) + a spurious second wake (safe no-op), + // record 21. + mk_b :: (ps: *sched.Scheduler, psh: *Sh) { + ps.spawn(() => { + rec(psh, 20); + ps.wake(psh.parked); // legitimate: A is parked + ps.wake(psh.parked); // spurious: A is now .ready/queued — must no-op + rec(psh, 21); + }); + } + mk_a(ps, psh); + mk_b(ps, psh); + s.run(); + + print("log:"); + i := 0; while i < sh.n { print(" {}", sh.log[i]); i = i + 1; } + print("\n"); + print("suspended-left: {}\n", s.n_suspended); + return 0; +} diff --git a/examples/concurrency/1813-concurrency-fiber-async-suspend.sx b/examples/concurrency/1813-concurrency-fiber-async-suspend.sx new file mode 100644 index 00000000..2a1a418b --- /dev/null +++ b/examples/concurrency/1813-concurrency-fiber-async-suspend.sx @@ -0,0 +1,83 @@ +// Stream B1 (fibers) B1.4a — a truly-SUSPENDING fiber-task async layer +// (`go` / `wait` / `cancel`) over the M:1 scheduler, in pure sx. In contrast +// with 1805's `context.io.async` (which runs each worker INLINE to completion +// before returning a `.ready` future — no interleaving), here `s.go(work)` runs +// `work` as a REAL fiber and `t.wait()` SUSPENDS the caller until that fiber +// finishes, so a task that yields mid-body lets a sibling task run before the +// first completes — genuine cooperative interleaving. +// +// `work` is a NULLARY thunk: any inputs are captured in the lambda at the call +// site (no `..args` pack crosses the fiber boundary — that would hit issue 0156 +// Part 2). Outputs flow OUT through pointers captured in the thunk (the shared +// `Log` struct), since closure capture-by-value does not write back. +// +// What this proves: +// - REAL suspend + interleave: task A records 1, YIELDS; task B then records 2 +// and completes; A resumes, records 3, completes → interleave order 1 2 3. +// - awaited VALUES: A returns 42, B returns 100 (recorded after both waits). +// → sequence: 1 2 3 42 100. +// - cancel rides the `!` channel (model (a), like 1806): a canceled task's +// `wait()` raises `.Canceled`, taken by the `or` default → -99. +// +// `wait` must run inside a fiber (it parks `self.current`), so the "main task" +// is itself a `s.spawn(...)` fiber that drives the two `go` tasks. +// +// aarch64-macOS-pinned (the scheduler's asm + guard-page mmap constants are +// per-arch / Apple-specific): runs end-to-end on a matching host, ir-only on a +// mismatch. +#import "modules/std.sx"; +sched :: #import "modules/std/sched.sx"; + +Log :: struct { seq: [16]i64; n: i64; } +rec :: (l: *Log, v: i64) { l.seq[l.n] = v; l.n = l.n + 1; } + +main :: () -> i64 { + lg : Log = ---; + lg.n = 0; + + s := sched.Scheduler.init(); + ps := @s; + pl := @lg; + + // The "main task" fiber: drives two real tasks, waits both, then exercises + // cancel. It runs as a fiber so `wait` has a `self.current` to park. + s.spawn(() => { + // Task A yields mid-body so B interleaves before A completes. + a := ps.go(() -> i64 => { + rec(pl, 1); + ps.yield_now(); // suspend A; B (already spawned) runs to completion + rec(pl, 3); + 42 + }); + // Task B runs straight through (no yield). + b := ps.go(() -> i64 => { + rec(pl, 2); + 100 + }); + + // Wait both — suspends the main-task fiber until each completes. + va := a.wait() or { -1 }; + vb := b.wait() or { -1 }; + rec(pl, va); + rec(pl, vb); + + // Cancel case: cancel before the worker runs; `wait` raises .Canceled, + // the `or` default (-99) is taken. + c := ps.go(() -> i64 => 7); + c.cancel(); + rec(pl, c.wait() or { -99 }); + }); + + s.run(); + + // Interleaving + value contract: 1 2 3 42 100, then the cancel default -99. + print("sequence:"); + i := 0; + while i < lg.n { + print(" {}", lg.seq[i]); + i = i + 1; + } + print("\n"); + print("spawned: {}\n", s.n_spawned); + return 0; +} diff --git a/examples/concurrency/expected/1811-concurrency-fiber-scheduler.build b/examples/concurrency/expected/1811-concurrency-fiber-scheduler.build new file mode 100644 index 00000000..42e24dd2 --- /dev/null +++ b/examples/concurrency/expected/1811-concurrency-fiber-scheduler.build @@ -0,0 +1 @@ +{ "target": "macos" } diff --git a/examples/concurrency/expected/1811-concurrency-fiber-scheduler.exit b/examples/concurrency/expected/1811-concurrency-fiber-scheduler.exit new file mode 100644 index 00000000..573541ac --- /dev/null +++ b/examples/concurrency/expected/1811-concurrency-fiber-scheduler.exit @@ -0,0 +1 @@ +0 diff --git a/examples/concurrency/expected/1811-concurrency-fiber-scheduler.stderr b/examples/concurrency/expected/1811-concurrency-fiber-scheduler.stderr new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/examples/concurrency/expected/1811-concurrency-fiber-scheduler.stderr @@ -0,0 +1 @@ + diff --git a/examples/concurrency/expected/1811-concurrency-fiber-scheduler.stdout b/examples/concurrency/expected/1811-concurrency-fiber-scheduler.stdout new file mode 100644 index 00000000..0939ad39 --- /dev/null +++ b/examples/concurrency/expected/1811-concurrency-fiber-scheduler.stdout @@ -0,0 +1,4 @@ +sequence: 0 1 2 0 1 2 0 1 2 +spawned: 3 +done: 1 1 1 +all done: 3 diff --git a/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.build b/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.build new file mode 100644 index 00000000..42e24dd2 --- /dev/null +++ b/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.build @@ -0,0 +1 @@ +{ "target": "macos" } diff --git a/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.exit b/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.exit new file mode 100644 index 00000000..573541ac --- /dev/null +++ b/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.exit @@ -0,0 +1 @@ +0 diff --git a/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.stderr b/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.stderr new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.stderr @@ -0,0 +1 @@ + diff --git a/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.stdout b/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.stdout new file mode 100644 index 00000000..9229216a --- /dev/null +++ b/examples/concurrency/expected/1812-concurrency-fiber-suspend-wake.stdout @@ -0,0 +1,2 @@ +log: 10 20 21 11 +suspended-left: 0 diff --git a/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.build b/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.build new file mode 100644 index 00000000..42e24dd2 --- /dev/null +++ b/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.build @@ -0,0 +1 @@ +{ "target": "macos" } diff --git a/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.exit b/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.exit new file mode 100644 index 00000000..573541ac --- /dev/null +++ b/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.exit @@ -0,0 +1 @@ +0 diff --git a/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.stderr b/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.stderr new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.stderr @@ -0,0 +1 @@ + diff --git a/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.stdout b/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.stdout new file mode 100644 index 00000000..f7e44b21 --- /dev/null +++ b/examples/concurrency/expected/1813-concurrency-fiber-async-suspend.stdout @@ -0,0 +1,2 @@ +sequence: 1 2 3 42 100 -99 +spawned: 4 diff --git a/issues/0155-scalar-pointer-index-llvm-panic.md b/issues/0155-scalar-pointer-index-llvm-panic.md new file mode 100644 index 00000000..e5383d67 --- /dev/null +++ b/issues/0155-scalar-pointer-index-llvm-panic.md @@ -0,0 +1,57 @@ +# issue 0155 — indexing a scalar pointer (`pc[0]`, `pc: *i64`) panics at LLVM emission + +> **OPEN.** Found incidentally during an adversarial review of the fiber +> scheduler (a review probe used `pc[0]` on a `*i64`). NOT a fibers-stream +> blocker — the scheduler uses array-field indexing (`ctx.regs[i]`) and pointer +> deref (`p.*`), never scalar-pointer indexing — so it is filed for its own fix +> session, not fixed inline. + +## Symptom + +Indexing a pointer-to-scalar value with `[i]` crashes the compiler: + +``` +thread … panic: unresolved type reached LLVM emission — a type resolution +failure was not diagnosed/aborted + src/backend/llvm/types.zig:196:28 toLLVMTypeInfo (.unresolved arm) + src/backend/llvm/types.zig:38 toLLVMType + src/ir/emit_llvm.zig:2564 toLLVMType +``` + +Observed: compiler panic (no diagnostic). Expected: either lower `pc[i]` as +`*(pc + i)` (C semantics), or emit a clean diagnostic that a bare `*T` is not +indexable (deref with `.*`, or use a slice `[]T`). A `.unresolved` TypeId +reaching LLVM emission is unconditionally a compiler bug (a resolution failure +that was neither diagnosed nor aborted). + +## Reproduction + +```sx +#import "modules/std.sx"; +main :: () -> i64 { + x : i64 = 5; + pc : *i64 = @x; + return pc[0]; // panics the compiler +} +``` + +(repro: `issues/0155-scalar-pointer-index-llvm-panic.sx`) + +## Investigation prompt + +> The sx compiler panics ("unresolved type reached LLVM emission", +> `src/backend/llvm/types.zig:196`) when an index expression `pc[i]` is applied +> to a value of pointer-to-scalar type `*T` (repro: +> `issues/0155-scalar-pointer-index-llvm-panic.sx`). Trace `emitIndexGet` +> (`src/backend/llvm/ops.zig` ~1988) and the index-expr lowering in +> `src/ir/lower/` (the `.index_expr` arm): for a `*T` object, the element type +> resolves to `.unresolved` instead of `T`. Decide the intended semantics first +> (consult `specs.md` for whether a bare `*T` is indexable): if `pc[i]` should +> mean `*(pc + i)`, fix the index-expr type resolver to yield the pointee type +> `T` for a `*T` object (mirror the slice/array-pointer arm — see +> `ptrToArrayElem` / `getElementType` in `src/ir/lower/`), and verify codegen +> emits a GEP + load. If a bare `*T` is intentionally NOT indexable, emit a +> diagnostic at the lowering site ("cannot index `*T`; deref with `.*` or use a +> slice") and never let `.unresolved` reach emission. Verify: `sx run` the repro +> — expect either `5` (if indexable) or a clean compile error, never a panic. +> Then promote the repro to a regression test under `examples/`. diff --git a/issues/0155-scalar-pointer-index-llvm-panic.sx b/issues/0155-scalar-pointer-index-llvm-panic.sx new file mode 100644 index 00000000..3d957ae2 --- /dev/null +++ b/issues/0155-scalar-pointer-index-llvm-panic.sx @@ -0,0 +1,18 @@ +// issue 0155 — indexing a pointer-to-scalar (`pc[0]` where `pc: *i64`) panics +// the compiler at LLVM emission instead of either lowering `pc[i]` like C +// (`*(pc + i)`) or emitting a clean diagnostic that `*T` is not indexable. +// +// Observed: `thread … panic: unresolved type reached LLVM emission` +// at src/backend/llvm/types.zig:196 (the `.unresolved` arm of +// toLLVMTypeInfo), reached via emitIndexGet +// (src/backend/llvm/ops.zig ~1988) → the index expression's element +// type resolves to `.unresolved` and is never diagnosed. +// Expected: either a working scalar-pointer index (`pc[0]` == `pc.*`) or a +// proper "cannot index a *T; use a slice / deref with .*" diagnostic. +// A `.unresolved` reaching LLVM is always a compiler bug. +#import "modules/std.sx"; +main :: () -> i64 { + x : i64 = 5; + pc : *i64 = @x; + return pc[0]; // panics the compiler +} diff --git a/library/modules/std/sched.sx b/library/modules/std/sched.sx new file mode 100644 index 00000000..388b8686 --- /dev/null +++ b/library/modules/std/sched.sx @@ -0,0 +1,409 @@ +// Stream B1 (fibers) B1.5a — the M:1 cooperative fiber scheduler core. +// +// A `Scheduler` drives any number of `Fiber`s, each running a stackful +// `body: Closure() -> void` on its own guarded `mmap` stack (the §8.1.1 guard +// page turns a stack overflow into an immediate fault instead of silent +// neighbor corruption). Fibers cooperate: a running fiber hands control back to +// the scheduler loop via `yield_now` (re-enqueued, round-robin) or +// `suspend_self` (parked off-queue until an external `wake`). When a body +// returns, the fiber reaches `.done`, its stack is `munmap`'d and its heap +// `Fiber` freed. +// +// Built on the proven primitives from examples/concurrency/1807-1809: +// - `swap_context` (aarch64 `abi(.naked)`, 13-slot save area: x19..x28, fp, +// lr, sp) saves the callee-saved registers + SP into `*from` and loads them +// from `*to`, then `ret`s onto `to`'s stack. +// - the `_fib_tramp` global-asm first-entry trampoline: x19 holds the +// bootstrapped `*Fiber`; it moves it to x0 and `bl`s the exported generic +// dispatch `fib_dispatch`, which calls the body then switches back to the +// scheduler. +// - guarded `mmap` stacks: `[GUARD | usable]`, low GUARD page `mprotect`'d +// PROT_NONE, 16-aligned top returned as the bootstrapped SP. +// +// aarch64-macOS-pinned: the `swap_context` asm + the 13-slot save area are +// per-arch; the `mmap` flag constants (MAP_ANON = 0x1000) and the 16 KB guard +// page are Apple-specific. Runs end-to-end on a matching host, ir-only on a +// mismatch. +#import "modules/std.sx"; + +// --- libc mmap stack primitives ------------------------------------------- + +mmap :: (addr: *void, len: i64, prot: i32, flags: i32, fd: i32, off: i64) -> *void extern libc "mmap"; +mprotect :: (addr: *void, len: i64, prot: i32) -> i32 extern libc "mprotect"; +munmap :: (addr: *void, len: i64) -> i32 extern libc "munmap"; +abort :: () -> noreturn extern libc "abort"; + +PROT_NONE :: 0; +PROT_RW :: 3; // PROT_READ | PROT_WRITE +MAP_AP :: 0x1002; // macOS MAP_PRIVATE (0x2) | MAP_ANON (0x1000) +GUARD :: 16384; // one 16 KB page (aarch64-macOS) +STACK :: 131072; // 128 KB usable per fiber + +// --- core types ------------------------------------------------------------ + +// Saved context: x19..x28 (10), x29/fp, x30/lr, sp — 13 u64 slots. +FiberCtx :: struct { regs: [13]u64; } + +FiberState :: enum { ready; running; suspended; done; } + +Fiber :: struct { + ctx: FiberCtx; + body: Closure() -> void; + state: FiberState; + sched: *Scheduler; + stack_region: *void; // mmap base — for munmap on reap + stack_len: i64; // GUARD + STACK, for munmap + id: i64; + next: *Fiber; // intrusive FIFO ready-queue link +} + +Scheduler :: struct { + sched_ctx: FiberCtx; // the scheduler loop's own saved context + current: *Fiber; // running fiber; null while in the scheduler loop + ready_head: *Fiber; + ready_tail: *Fiber; + own_allocator: Allocator; // captured at init — fibers outlive their spawn scope + next_id: i64; + n_spawned: i64; + n_suspended: i64; // fibers parked off-queue (suspend_self minus wake) + + // Construct a scheduler BY VALUE (allocator value-return convention). + // Captures the current `context.allocator` into `own_allocator` — fibers and + // their heap `Fiber` structs outlive their spawn scope, so all internal + // allocation must go through this captured (long-lived) allocator, not + // whatever transient one happens to be current at a later call. + init :: () -> Scheduler { + s : Scheduler = ---; + s.current = null; + s.ready_head = null; + s.ready_tail = null; + s.own_allocator = context.allocator; + s.next_id = 0; + s.n_spawned = 0; + s.n_suspended = 0; + return s; + } + + // Spawn a fiber running `body`. Heap-allocates the `Fiber` and a guarded + // stack, bootstraps the saved context (x19 = *Fiber, fp = 0, lr = + // trampoline, sp = stack top), enqueues it ready (FIFO), returns the + // `*Fiber`. + // KNOWN LIMITATION (env leak): `body` is a fat `{fn_ptr, env}` whose env is + // heap-allocated at the closure-literal site. The reap path frees the Fiber + // struct + unmaps the stack, but sx exposes no way to free a closure's env + // (the scheduler can't name the env pointer), so ONE env per spawned fiber + // leaks until program exit. Bounded by the spawn count; under the default + // GPA (which frees at exit) it is invisible, but a long-running scheduler + // under an arena/tracking allocator accumulates one env per fiber. Freeing + // it needs a language affordance for closure-env ownership — deferred. + spawn :: (self: *Scheduler, body: Closure() -> void) -> *Fiber { + raw := self.own_allocator.alloc_bytes(size_of(Fiber)); + if raw == null { + print("sched: out of memory allocating a Fiber\n"); + abort(); + } + f : *Fiber = xx raw; + f.body = body; + f.sched = self; + f.id = self.next_id; + f.next = null; + self.next_id = self.next_id + 1; + self.n_spawned = self.n_spawned + 1; + + top := boot_stack(f, STACK); + f.ctx.regs[0] = xx f; // x19 = self + f.ctx.regs[10] = 0; // fp + f.ctx.regs[11] = xx fib_tramp; // lr → trampoline + f.ctx.regs[12] = top; // sp + + f.state = .ready; + enqueue(self, f); + return f; + } + + // The running fiber yields cooperatively: mark ready, switch back to the + // scheduler. The run loop re-enqueues it (round-robin). MUST be called from + // inside a fiber (there must be a running fiber to yield). + yield_now :: (self: *Scheduler) { + cur := self.current; + if cur == null { + print("sched: yield_now() called outside a fiber (no running fiber)\n"); + abort(); + } + cur.state = .ready; + swap_context(@cur.ctx, @self.sched_ctx); + } + + // The running fiber parks itself: mark suspended, switch back to the + // scheduler. The run loop does NOT re-enqueue a suspended fiber — an + // external `wake` must re-add it. (Used by FiberIo to park on a blocking + // op until completion.) MUST be called from inside a fiber — a null + // `current` (called from the bare scheduler/main context) would deref null; + // bail loudly instead of segfaulting. + suspend_self :: (self: *Scheduler) { + cur := self.current; + if cur == null { + print("sched: suspend_self() called outside a fiber (no running fiber)\n"); + abort(); + } + cur.state = .suspended; + self.n_suspended = self.n_suspended + 1; + swap_context(@cur.ctx, @self.sched_ctx); + } + + // Re-ready a parked (suspended) fiber and enqueue it. Called from outside + // the fiber (e.g. an I/O completion or another fiber) to wake it. + // + // GUARDED on `.suspended`: enqueue links `f` into the FIFO, so waking a + // fiber that is ALREADY queued (`.ready`) or running (`.running`) would + // re-link a node already in the list — nulling its `next` mid-list and + // cycling `ready_tail` back onto it, corrupting the queue (a spurious / + // double wake, or waking a yielded-not-parked fiber, would segfault). Only + // a genuinely parked fiber may be re-enqueued; any other wake is a no-op. + wake :: (self: *Scheduler, f: *Fiber) { + if f.state != .suspended { return; } + self.n_suspended = self.n_suspended - 1; + f.state = .ready; + enqueue(self, f); + } + + // The scheduler loop. Runs until the ready queue drains. Each iteration: + // dequeue the next fiber, switch into it, and — on its switch back — reap it + // if done (munmap stack, free the Fiber), re-enqueue it if it yielded, or + // leave it parked if it suspended. + run :: (self: *Scheduler) { + while self.ready_head != null { + f := dequeue(self); + self.current = f; + f.state = .running; + swap_context(@self.sched_ctx, @f.ctx); // returns here when f yields / suspends / finishes + self.current = null; + if f.state == .done { + // We've switched OFF f's stack already (the final swap landed + // here), so the stack is free to unmap. Free the Fiber struct + // AFTER munmap. + munmap(f.stack_region, f.stack_len); + self.own_allocator.dealloc_bytes(xx f); + } else if f.state == .ready { + enqueue(self, f); + } + // .suspended: leave it parked (not in any queue; `wake` re-adds it). + } + // The queue drained. If any fiber is still parked, nothing will ever + // wake it — its stack + struct are leaked and the program believes it + // finished. That is a deadlock; surface it loudly rather than returning + // a silent success. (FiberIo, which uses suspend/wake, must balance + // every suspend with a wake before the queue empties.) + if self.n_suspended != 0 { + print("sched: deadlock — {} fiber(s) suspended with an empty run queue\n", self.n_suspended); + abort(); + } + } +} + +// --- the context switch (naked) + first-entry trampoline ------------------- + +// x0 = from, x1 = to (read straight from the ABI registers — a naked fn has no +// frame, so its params are never spilled). SP-in ≠ SP-out by design. +swap_context :: (from: *FiberCtx, to: *FiberCtx) abi(.naked) { + asm volatile { + #string ASM + stp x19, x20, [x0, #0] + stp x21, x22, [x0, #16] + stp x23, x24, [x0, #32] + stp x25, x26, [x0, #48] + stp x27, x28, [x0, #64] + stp x29, x30, [x0, #80] + mov x9, sp + str x9, [x0, #96] + ldp x19, x20, [x1, #0] + ldp x21, x22, [x1, #16] + ldp x23, x24, [x1, #32] + ldp x25, x26, [x1, #48] + ldp x27, x28, [x1, #64] + ldp x29, x30, [x1, #80] + ldr x9, [x1, #96] + mov sp, x9 + ret +ASM + }; +} + +// First-entry trampoline: a fiber's bootstrapped LR points here. x19 holds the +// `*Fiber` (preset in the saved context); move it to x0 and call the generic +// dispatch. +asm { + #string T +.global _fib_tramp +_fib_tramp: + mov x0, x19 + bl _fib_dispatch + brk #0 +T, +}; +fib_tramp :: () extern; + +// The ONE place that runs a fiber body. Reached only from `_fib_tramp` on first +// entry, on the fiber's own fresh stack. Runs the body, marks the fiber done, +// and switches back to the scheduler — never returns past the final switch. +fib_dispatch :: (self: *Fiber) export "fib_dispatch" { + self.body(); + self.state = .done; + swap_context(@self.ctx, @self.sched.sched_ctx); +} + +// --- guarded stack bootstrap ---------------------------------------------- + +// mmap a [guard | usable-stack] region, mprotect the low guard page PROT_NONE. +// Stores the region base + len on the fiber (for munmap on reap) and returns +// the 16-aligned stack top (the bootstrapped SP). +boot_stack :: (f: *Fiber, size: i64) -> u64 { + total := GUARD + size; + region : *void = mmap(null, total, PROT_RW, MAP_AP, -1, 0); + // mmap signals failure with MAP_FAILED = (void*)-1 (NOT null). Handing a + // wild SP to the switch would `ret` onto garbage — bail loudly instead. + if (xx region) == (xx (0 - 1)) { + print("sched: mmap failed for a {}-byte fiber stack\n", total); + abort(); + } + f.stack_region = region; + f.stack_len = total; + // Guard-arm: turn the low page unwritable so overflow faults at the + // boundary. The guard is mandatory (§8.1.1); a stack handed out without it + // would silently corrupt a neighbor on overflow, so a failed mprotect is + // fatal, not ignorable. + if mprotect(region, GUARD, PROT_NONE) != 0 { + print("sched: mprotect(PROT_NONE) failed to arm the stack guard page\n"); + abort(); + } + usable : u64 = (xx region) + GUARD; + top : u64 = usable + size; + return top - (top % 16); // 16-byte aligned stack top (AAPCS) +} + +// --- intrusive FIFO ready-queue ------------------------------------------- + +enqueue :: (self: *Scheduler, f: *Fiber) { + f.next = null; + if self.ready_tail == null { + self.ready_head = f; + self.ready_tail = f; + } else { + self.ready_tail.next = f; + self.ready_tail = f; + } +} + +dequeue :: (self: *Scheduler) -> *Fiber { + f := self.ready_head; + if f == null { return null; } + self.ready_head = f.next; + if self.ready_head == null { self.ready_tail = null; } + f.next = null; + return f; +} + +// The public API lives as methods on `Scheduler` (above): `init`, `spawn`, +// `yield_now`, `suspend_self`, `wake`, `run`. + +// --- B1.4a: truly-suspending fiber-task async (`go` / `wait` / `cancel`) ---- +// +// An async-task layer on top of the M:1 scheduler: `s.go(work)` runs `work` as +// a REAL fiber, and `t.wait()` SUSPENDS the caller fiber until the task's fiber +// completes — genuine interleaving, in contrast with io.sx's `context.io.async` +// (which runs the worker inline to completion before returning). Distinct from +// io.sx's `Future` by design: `Task` is defined here so the two modules stay +// decoupled (no cross-import; sched.sx must keep importing only `std.sx`, since +// a different import path re-emits the module's global `_fib_tramp` asm and +// duplicates the symbol). +// +// THE NULLARY-THUNK RATIONALE. `work` is a NULLARY thunk `Closure() -> $R`, not +// a worker-plus-`..args` pair like io.sx's `async`. A variadic pack is +// comptime-only and segfaults if captured into a deferred closure that crosses +// the fiber boundary (issue 0156 Part 2). So instead of forwarding inputs as a +// pack, the user captures any inputs in the lambda AT THE CALL SITE (where +// they're live): `s.go(() -> i64 => compute(a, b))`. Nothing variadic ever +// crosses into the fiber — the thunk is a plain `{fn_ptr, env}` fat closure. +// +// KNOWN LIMITATION (heap-Task leak): `go` heap-allocates the `Task` (it outlives +// the call — the fiber fills `value`/`state` later, after `go` has returned), but +// B1.4a never frees it. Like the closure-env leak documented on `spawn` above, +// this is bounded by the `go` count and invisible under the default GPA (frees +// at exit); a long-running scheduler under an arena/tracking allocator +// accumulates one `Task` per `go`. Freeing it safely needs join-point ownership +// tracking — deferred. +// +// WAKE-AFTER-COMPLETE ORDERING (both orderings are correct): +// - worker finishes BEFORE `wait`: the worker set `t.state = .ready` and saw +// `t.waiter == null`, so it issued no wake. `wait` sees `.ready` (not +// `.pending`), does NOT park, and returns `t.value` — no lost wakeup. +// - `wait` runs BEFORE the worker finishes: `wait` registers itself as +// `t.waiter` and parks via `suspend_self`. When the worker finishes it sees +// a non-null `t.waiter` and `wake`s it; `wait` resumes and returns the value. + +TaskState :: enum { pending; ready; canceled; } + +// The `!` channel for `wait`. Defined LOCALLY (not reusing io.sx's `IoErr`): +// `IoErr` is reachable here only as a re-export alias through std.sx, and the +// failable-type detection behind `raise` does not see through that alias to the +// underlying `error` set — so `raise error.Canceled` against `(.., !IoErr)` +// here is rejected as "not a failable function". A local `error` decl is +// recognized directly. (Same `.Canceled` contract as io.sx model (a).) +TaskErr :: error { Canceled } + +Task :: struct ($R: Type) { + value: R; + state: TaskState = .pending; + waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one + sched: *Scheduler; // owning scheduler (for park/wake in `wait`) + canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics) +} + +// Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber +// finishes. Mirrors `spawn`'s alloc + null-check + abort. +go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) { + raw := self.own_allocator.alloc_bytes(size_of(Task($R))); + if raw == null { + print("sched: out of memory allocating a Task\n"); + abort(); + } + t : *Task($R) = xx raw; + t.state = .pending; + t.waiter = null; + t.sched = self; + t.canceled = 0; + self.spawn(() => { + // Cooperative cancel: skip the work entirely if cancel already landed + // before this fiber was scheduled (saves the compute + side effects). A + // cancel that lands DURING `work()` still lets it finish (no preemption + // in the M:1 model) — cancel suppresses DELIVERY, never an in-flight run. + if t.canceled == 0 { + t.value = work(); + t.state = .ready; + } + // Wake the awaiter only if one already parked (else `wait` will not park). + if t.waiter != null { self.wake(xx t.waiter); } + }); + return t; +} + +// Suspend the caller until the task completes; return its value (or raise on +// cancel). MUST be called from inside a fiber (so there is a `self.current` to +// park) — typically from a fiber spawned via `s.spawn(...)`. +wait :: ufcs (t: *Task($R)) -> ($R, !TaskErr) { + if t.canceled != 0 { raise error.Canceled; } + if t.state == .pending { + t.waiter = xx t.sched.current; // register self as the waiter + t.sched.suspend_self(); // park until the task's fiber wakes us + } + if t.canceled != 0 or t.state == .canceled { raise error.Canceled; } + return t.value; +} + +// Request cancellation — rides the `!` channel (model (a), like io.sx 1806). M:1 +// cooperative: the worker fiber may already have run; cancel still makes a +// subsequent (or in-flight) `wait` raise `.Canceled`. +cancel :: ufcs (t: *Task($R)) { + t.canceled = 1; + t.state = .canceled; +}