fibers: M:1 scheduler core + suspending fiber-task async (B1.5a, B1.4a)
library/modules/std/sched.sx: a generic Fiber + Scheduler over the proven naked swap_context on guarded mmap stacks -- init/spawn/yield_now/suspend_self/wake/run (B1.5a), then Task($R) + go/wait/cancel, a truly-suspending nullary-thunk async layer (B1.4a). go(work) runs a thunk as a real fiber; wait() parks the caller until it completes. Self-contained in sched.sx (io.sx importing it would duplicate the _fib_tramp global asm). Hardened per adversarial review: wake guarded on .suspended (FIFO corruption), suspend_self/yield_now guard a null current, loud mmap/mprotect/OOM/deadlock bails, cancel skips not-yet-run work. Closure-env + heap-Task leaks documented (bounded, default-GPA-invisible). Examples: 1811 (round-robin), 1812 (suspend/wake + spurious-wake guard), 1813 (async interleave + await-suspend + cancel). Also files issue 0155 (scalar-pointer index panics codegen -- non-blocking, found in review).
This commit is contained in:
81
examples/concurrency/1811-concurrency-fiber-scheduler.sx
Normal file
81
examples/concurrency/1811-concurrency-fiber-scheduler.sx
Normal file
@@ -0,0 +1,81 @@
|
||||
// Stream B1 (fibers) B1.5a — the M:1 cooperative fiber scheduler core, in pure
|
||||
// sx over `swap_context` (proven in 1807-1809). `Scheduler` drives N fibers,
|
||||
// each running a `body: Closure() -> void` on its own guarded `mmap` stack;
|
||||
// fibers cooperate by calling `yield_now`, which round-robins control back
|
||||
// through the scheduler loop.
|
||||
//
|
||||
// Round-robin demo: 3 fibers (A=0, B=1, C=2) each append their id to a shared
|
||||
// sequence buffer, yielding between each of 3 rounds. Because the scheduler
|
||||
// re-enqueues a yielding fiber at the TAIL (FIFO), the interleaving is the
|
||||
// deterministic round-robin order:
|
||||
//
|
||||
// round 1: A B C (0 1 2)
|
||||
// round 2: A B C (0 1 2)
|
||||
// round 3: A B C (0 1 2)
|
||||
//
|
||||
// → sequence: 0 1 2 0 1 2 0 1 2
|
||||
//
|
||||
// Outputs flow OUT of each fiber through pointers captured in its closure (the
|
||||
// shared `Shared` struct), since closure capture-by-value does not write back.
|
||||
// Every fiber must reach `.done` (asserted via a per-fiber done flag).
|
||||
//
|
||||
// aarch64-macOS-pinned (the scheduler's asm + guard-page mmap constants are
|
||||
// per-arch / Apple-specific): runs end-to-end on a matching host, ir-only on a
|
||||
// mismatch.
|
||||
#import "modules/std.sx";
|
||||
sched :: #import "modules/std/sched.sx";
|
||||
|
||||
Shared :: struct {
|
||||
seq: [16]i64; // appended interleaving sequence
|
||||
n: i64; // count appended
|
||||
done: [3]i64; // per-fiber done flag (set right before the body returns)
|
||||
}
|
||||
|
||||
append :: (sh: *Shared, v: i64) {
|
||||
sh.seq[sh.n] = v;
|
||||
sh.n = sh.n + 1;
|
||||
}
|
||||
|
||||
main :: () -> i64 {
|
||||
sh : Shared = ---;
|
||||
sh.n = 0;
|
||||
sh.done[0] = 0; sh.done[1] = 0; sh.done[2] = 0;
|
||||
|
||||
s := sched.Scheduler.init();
|
||||
ps := @s;
|
||||
psh := @sh;
|
||||
|
||||
// Three DIFFERENT fiber bodies (distinct captured ids), interleaving via
|
||||
// yield_now. Each appends its id once per round for 3 rounds.
|
||||
spawn_worker :: (ps: *sched.Scheduler, psh: *Shared, my_id: i64) {
|
||||
ps.spawn(() => {
|
||||
r := 0;
|
||||
while r < 3 {
|
||||
append(psh, my_id);
|
||||
if r < 2 { ps.yield_now(); } // cooperate between rounds
|
||||
r = r + 1;
|
||||
}
|
||||
psh.done[my_id] = 1;
|
||||
});
|
||||
}
|
||||
|
||||
spawn_worker(ps, psh, 0);
|
||||
spawn_worker(ps, psh, 1);
|
||||
spawn_worker(ps, psh, 2);
|
||||
|
||||
s.run();
|
||||
|
||||
// Ordering contract: round-robin FIFO interleaving.
|
||||
print("sequence:");
|
||||
i := 0;
|
||||
while i < sh.n {
|
||||
print(" {}", sh.seq[i]);
|
||||
i = i + 1;
|
||||
}
|
||||
print("\n");
|
||||
|
||||
print("spawned: {}\n", s.n_spawned);
|
||||
print("done: {} {} {}\n", sh.done[0], sh.done[1], sh.done[2]);
|
||||
print("all done: {}\n", sh.done[0] + sh.done[1] + sh.done[2]);
|
||||
return 0;
|
||||
}
|
||||
64
examples/concurrency/1812-concurrency-fiber-suspend-wake.sx
Normal file
64
examples/concurrency/1812-concurrency-fiber-suspend-wake.sx
Normal file
@@ -0,0 +1,64 @@
|
||||
// Stream B1 (fibers) B1.5a — fiber park/resume via `suspend_self` + `wake`,
|
||||
// the off-queue half of the M:1 scheduler that FiberIo [B1.4] builds on.
|
||||
//
|
||||
// A running fiber that has nothing to do parks itself with `suspend_self`: it
|
||||
// leaves the round-robin queue entirely (unlike `yield_now`, which re-enqueues)
|
||||
// and only runs again when another fiber (or an I/O completion) calls `wake` on
|
||||
// it. Here fiber A records 10, parks, and is resumed by fiber B to record 11:
|
||||
//
|
||||
// A: rec 10, suspend_self ──park──┐
|
||||
// B: rec 20, wake(A), wake(A), rec 21
|
||||
// A: ──resume──> rec 11
|
||||
// → log: 10 20 21 11
|
||||
//
|
||||
// `wake` is GUARDED on `.suspended`: B's SECOND `wake(A)` is spurious (A is
|
||||
// already re-queued by then). An unguarded enqueue would re-link an
|
||||
// already-listed node and corrupt the FIFO (segfault); the guard makes a
|
||||
// double/spurious/stale wake a safe no-op. `suspended-left: 0` confirms every
|
||||
// park was balanced by a wake (an orphaned park would abort the scheduler).
|
||||
//
|
||||
// aarch64-macOS-pinned (the scheduler's per-arch asm + Apple mmap constants):
|
||||
// runs end-to-end on a matching host, ir-only on a mismatch.
|
||||
#import "modules/std.sx";
|
||||
sched :: #import "modules/std/sched.sx";
|
||||
|
||||
// The shared state both fibers reach through (passed as `*Sh`). `parked` holds
|
||||
// the fiber-A handle that B wakes — kept here (rather than a separate
|
||||
// `**Fiber`) so the one `*Sh` carries everything the helper fns share.
|
||||
Sh :: struct { log: [16]i64; n: i64; parked: *sched.Fiber; }
|
||||
rec :: (sh: *Sh, v: i64) { sh.log[sh.n] = v; sh.n = sh.n + 1; }
|
||||
|
||||
main :: () -> i64 {
|
||||
sh : Sh = ---; sh.n = 0; sh.parked = null;
|
||||
s := sched.Scheduler.init();
|
||||
ps := @s; psh := @sh;
|
||||
|
||||
// Fiber A: record 10, park, then (after wake) record 11. Store A's handle in
|
||||
// the shared state so B can wake it.
|
||||
mk_a :: (ps: *sched.Scheduler, psh: *Sh) {
|
||||
psh.parked = ps.spawn(() => {
|
||||
rec(psh, 10);
|
||||
ps.suspend_self();
|
||||
rec(psh, 11);
|
||||
});
|
||||
}
|
||||
// Fiber B: record 20, wake A (legit) + a spurious second wake (safe no-op),
|
||||
// record 21.
|
||||
mk_b :: (ps: *sched.Scheduler, psh: *Sh) {
|
||||
ps.spawn(() => {
|
||||
rec(psh, 20);
|
||||
ps.wake(psh.parked); // legitimate: A is parked
|
||||
ps.wake(psh.parked); // spurious: A is now .ready/queued — must no-op
|
||||
rec(psh, 21);
|
||||
});
|
||||
}
|
||||
mk_a(ps, psh);
|
||||
mk_b(ps, psh);
|
||||
s.run();
|
||||
|
||||
print("log:");
|
||||
i := 0; while i < sh.n { print(" {}", sh.log[i]); i = i + 1; }
|
||||
print("\n");
|
||||
print("suspended-left: {}\n", s.n_suspended);
|
||||
return 0;
|
||||
}
|
||||
83
examples/concurrency/1813-concurrency-fiber-async-suspend.sx
Normal file
83
examples/concurrency/1813-concurrency-fiber-async-suspend.sx
Normal file
@@ -0,0 +1,83 @@
|
||||
// Stream B1 (fibers) B1.4a — a truly-SUSPENDING fiber-task async layer
|
||||
// (`go` / `wait` / `cancel`) over the M:1 scheduler, in pure sx. In contrast
|
||||
// with 1805's `context.io.async` (which runs each worker INLINE to completion
|
||||
// before returning a `.ready` future — no interleaving), here `s.go(work)` runs
|
||||
// `work` as a REAL fiber and `t.wait()` SUSPENDS the caller until that fiber
|
||||
// finishes, so a task that yields mid-body lets a sibling task run before the
|
||||
// first completes — genuine cooperative interleaving.
|
||||
//
|
||||
// `work` is a NULLARY thunk: any inputs are captured in the lambda at the call
|
||||
// site (no `..args` pack crosses the fiber boundary — that would hit issue 0156
|
||||
// Part 2). Outputs flow OUT through pointers captured in the thunk (the shared
|
||||
// `Log` struct), since closure capture-by-value does not write back.
|
||||
//
|
||||
// What this proves:
|
||||
// - REAL suspend + interleave: task A records 1, YIELDS; task B then records 2
|
||||
// and completes; A resumes, records 3, completes → interleave order 1 2 3.
|
||||
// - awaited VALUES: A returns 42, B returns 100 (recorded after both waits).
|
||||
// → sequence: 1 2 3 42 100.
|
||||
// - cancel rides the `!` channel (model (a), like 1806): a canceled task's
|
||||
// `wait()` raises `.Canceled`, taken by the `or` default → -99.
|
||||
//
|
||||
// `wait` must run inside a fiber (it parks `self.current`), so the "main task"
|
||||
// is itself a `s.spawn(...)` fiber that drives the two `go` tasks.
|
||||
//
|
||||
// aarch64-macOS-pinned (the scheduler's asm + guard-page mmap constants are
|
||||
// per-arch / Apple-specific): runs end-to-end on a matching host, ir-only on a
|
||||
// mismatch.
|
||||
#import "modules/std.sx";
|
||||
sched :: #import "modules/std/sched.sx";
|
||||
|
||||
Log :: struct { seq: [16]i64; n: i64; }
|
||||
rec :: (l: *Log, v: i64) { l.seq[l.n] = v; l.n = l.n + 1; }
|
||||
|
||||
main :: () -> i64 {
|
||||
lg : Log = ---;
|
||||
lg.n = 0;
|
||||
|
||||
s := sched.Scheduler.init();
|
||||
ps := @s;
|
||||
pl := @lg;
|
||||
|
||||
// The "main task" fiber: drives two real tasks, waits both, then exercises
|
||||
// cancel. It runs as a fiber so `wait` has a `self.current` to park.
|
||||
s.spawn(() => {
|
||||
// Task A yields mid-body so B interleaves before A completes.
|
||||
a := ps.go(() -> i64 => {
|
||||
rec(pl, 1);
|
||||
ps.yield_now(); // suspend A; B (already spawned) runs to completion
|
||||
rec(pl, 3);
|
||||
42
|
||||
});
|
||||
// Task B runs straight through (no yield).
|
||||
b := ps.go(() -> i64 => {
|
||||
rec(pl, 2);
|
||||
100
|
||||
});
|
||||
|
||||
// Wait both — suspends the main-task fiber until each completes.
|
||||
va := a.wait() or { -1 };
|
||||
vb := b.wait() or { -1 };
|
||||
rec(pl, va);
|
||||
rec(pl, vb);
|
||||
|
||||
// Cancel case: cancel before the worker runs; `wait` raises .Canceled,
|
||||
// the `or` default (-99) is taken.
|
||||
c := ps.go(() -> i64 => 7);
|
||||
c.cancel();
|
||||
rec(pl, c.wait() or { -99 });
|
||||
});
|
||||
|
||||
s.run();
|
||||
|
||||
// Interleaving + value contract: 1 2 3 42 100, then the cancel default -99.
|
||||
print("sequence:");
|
||||
i := 0;
|
||||
while i < lg.n {
|
||||
print(" {}", lg.seq[i]);
|
||||
i = i + 1;
|
||||
}
|
||||
print("\n");
|
||||
print("spawned: {}\n", s.n_spawned);
|
||||
return 0;
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
{ "target": "macos" }
|
||||
@@ -0,0 +1 @@
|
||||
0
|
||||
@@ -0,0 +1 @@
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
sequence: 0 1 2 0 1 2 0 1 2
|
||||
spawned: 3
|
||||
done: 1 1 1
|
||||
all done: 3
|
||||
@@ -0,0 +1 @@
|
||||
{ "target": "macos" }
|
||||
@@ -0,0 +1 @@
|
||||
0
|
||||
@@ -0,0 +1 @@
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
log: 10 20 21 11
|
||||
suspended-left: 0
|
||||
@@ -0,0 +1 @@
|
||||
{ "target": "macos" }
|
||||
@@ -0,0 +1 @@
|
||||
0
|
||||
@@ -0,0 +1 @@
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
sequence: 1 2 3 42 100 -99
|
||||
spawned: 4
|
||||
57
issues/0155-scalar-pointer-index-llvm-panic.md
Normal file
57
issues/0155-scalar-pointer-index-llvm-panic.md
Normal file
@@ -0,0 +1,57 @@
|
||||
# issue 0155 — indexing a scalar pointer (`pc[0]`, `pc: *i64`) panics at LLVM emission
|
||||
|
||||
> **OPEN.** Found incidentally during an adversarial review of the fiber
|
||||
> scheduler (a review probe used `pc[0]` on a `*i64`). NOT a fibers-stream
|
||||
> blocker — the scheduler uses array-field indexing (`ctx.regs[i]`) and pointer
|
||||
> deref (`p.*`), never scalar-pointer indexing — so it is filed for its own fix
|
||||
> session, not fixed inline.
|
||||
|
||||
## Symptom
|
||||
|
||||
Indexing a pointer-to-scalar value with `[i]` crashes the compiler:
|
||||
|
||||
```
|
||||
thread … panic: unresolved type reached LLVM emission — a type resolution
|
||||
failure was not diagnosed/aborted
|
||||
src/backend/llvm/types.zig:196:28 toLLVMTypeInfo (.unresolved arm)
|
||||
src/backend/llvm/types.zig:38 toLLVMType
|
||||
src/ir/emit_llvm.zig:2564 toLLVMType
|
||||
```
|
||||
|
||||
Observed: compiler panic (no diagnostic). Expected: either lower `pc[i]` as
|
||||
`*(pc + i)` (C semantics), or emit a clean diagnostic that a bare `*T` is not
|
||||
indexable (deref with `.*`, or use a slice `[]T`). A `.unresolved` TypeId
|
||||
reaching LLVM emission is unconditionally a compiler bug (a resolution failure
|
||||
that was neither diagnosed nor aborted).
|
||||
|
||||
## Reproduction
|
||||
|
||||
```sx
|
||||
#import "modules/std.sx";
|
||||
main :: () -> i64 {
|
||||
x : i64 = 5;
|
||||
pc : *i64 = @x;
|
||||
return pc[0]; // panics the compiler
|
||||
}
|
||||
```
|
||||
|
||||
(repro: `issues/0155-scalar-pointer-index-llvm-panic.sx`)
|
||||
|
||||
## Investigation prompt
|
||||
|
||||
> The sx compiler panics ("unresolved type reached LLVM emission",
|
||||
> `src/backend/llvm/types.zig:196`) when an index expression `pc[i]` is applied
|
||||
> to a value of pointer-to-scalar type `*T` (repro:
|
||||
> `issues/0155-scalar-pointer-index-llvm-panic.sx`). Trace `emitIndexGet`
|
||||
> (`src/backend/llvm/ops.zig` ~1988) and the index-expr lowering in
|
||||
> `src/ir/lower/` (the `.index_expr` arm): for a `*T` object, the element type
|
||||
> resolves to `.unresolved` instead of `T`. Decide the intended semantics first
|
||||
> (consult `specs.md` for whether a bare `*T` is indexable): if `pc[i]` should
|
||||
> mean `*(pc + i)`, fix the index-expr type resolver to yield the pointee type
|
||||
> `T` for a `*T` object (mirror the slice/array-pointer arm — see
|
||||
> `ptrToArrayElem` / `getElementType` in `src/ir/lower/`), and verify codegen
|
||||
> emits a GEP + load. If a bare `*T` is intentionally NOT indexable, emit a
|
||||
> diagnostic at the lowering site ("cannot index `*T`; deref with `.*` or use a
|
||||
> slice") and never let `.unresolved` reach emission. Verify: `sx run` the repro
|
||||
> — expect either `5` (if indexable) or a clean compile error, never a panic.
|
||||
> Then promote the repro to a regression test under `examples/`.
|
||||
18
issues/0155-scalar-pointer-index-llvm-panic.sx
Normal file
18
issues/0155-scalar-pointer-index-llvm-panic.sx
Normal file
@@ -0,0 +1,18 @@
|
||||
// issue 0155 — indexing a pointer-to-scalar (`pc[0]` where `pc: *i64`) panics
|
||||
// the compiler at LLVM emission instead of either lowering `pc[i]` like C
|
||||
// (`*(pc + i)`) or emitting a clean diagnostic that `*T` is not indexable.
|
||||
//
|
||||
// Observed: `thread … panic: unresolved type reached LLVM emission`
|
||||
// at src/backend/llvm/types.zig:196 (the `.unresolved` arm of
|
||||
// toLLVMTypeInfo), reached via emitIndexGet
|
||||
// (src/backend/llvm/ops.zig ~1988) → the index expression's element
|
||||
// type resolves to `.unresolved` and is never diagnosed.
|
||||
// Expected: either a working scalar-pointer index (`pc[0]` == `pc.*`) or a
|
||||
// proper "cannot index a *T; use a slice / deref with .*" diagnostic.
|
||||
// A `.unresolved` reaching LLVM is always a compiler bug.
|
||||
#import "modules/std.sx";
|
||||
main :: () -> i64 {
|
||||
x : i64 = 5;
|
||||
pc : *i64 = @x;
|
||||
return pc[0]; // panics the compiler
|
||||
}
|
||||
409
library/modules/std/sched.sx
Normal file
409
library/modules/std/sched.sx
Normal file
@@ -0,0 +1,409 @@
|
||||
// Stream B1 (fibers) B1.5a — the M:1 cooperative fiber scheduler core.
|
||||
//
|
||||
// A `Scheduler` drives any number of `Fiber`s, each running a stackful
|
||||
// `body: Closure() -> void` on its own guarded `mmap` stack (the §8.1.1 guard
|
||||
// page turns a stack overflow into an immediate fault instead of silent
|
||||
// neighbor corruption). Fibers cooperate: a running fiber hands control back to
|
||||
// the scheduler loop via `yield_now` (re-enqueued, round-robin) or
|
||||
// `suspend_self` (parked off-queue until an external `wake`). When a body
|
||||
// returns, the fiber reaches `.done`, its stack is `munmap`'d and its heap
|
||||
// `Fiber` freed.
|
||||
//
|
||||
// Built on the proven primitives from examples/concurrency/1807-1809:
|
||||
// - `swap_context` (aarch64 `abi(.naked)`, 13-slot save area: x19..x28, fp,
|
||||
// lr, sp) saves the callee-saved registers + SP into `*from` and loads them
|
||||
// from `*to`, then `ret`s onto `to`'s stack.
|
||||
// - the `_fib_tramp` global-asm first-entry trampoline: x19 holds the
|
||||
// bootstrapped `*Fiber`; it moves it to x0 and `bl`s the exported generic
|
||||
// dispatch `fib_dispatch`, which calls the body then switches back to the
|
||||
// scheduler.
|
||||
// - guarded `mmap` stacks: `[GUARD | usable]`, low GUARD page `mprotect`'d
|
||||
// PROT_NONE, 16-aligned top returned as the bootstrapped SP.
|
||||
//
|
||||
// aarch64-macOS-pinned: the `swap_context` asm + the 13-slot save area are
|
||||
// per-arch; the `mmap` flag constants (MAP_ANON = 0x1000) and the 16 KB guard
|
||||
// page are Apple-specific. Runs end-to-end on a matching host, ir-only on a
|
||||
// mismatch.
|
||||
#import "modules/std.sx";
|
||||
|
||||
// --- libc mmap stack primitives -------------------------------------------
|
||||
|
||||
mmap :: (addr: *void, len: i64, prot: i32, flags: i32, fd: i32, off: i64) -> *void extern libc "mmap";
|
||||
mprotect :: (addr: *void, len: i64, prot: i32) -> i32 extern libc "mprotect";
|
||||
munmap :: (addr: *void, len: i64) -> i32 extern libc "munmap";
|
||||
abort :: () -> noreturn extern libc "abort";
|
||||
|
||||
PROT_NONE :: 0;
|
||||
PROT_RW :: 3; // PROT_READ | PROT_WRITE
|
||||
MAP_AP :: 0x1002; // macOS MAP_PRIVATE (0x2) | MAP_ANON (0x1000)
|
||||
GUARD :: 16384; // one 16 KB page (aarch64-macOS)
|
||||
STACK :: 131072; // 128 KB usable per fiber
|
||||
|
||||
// --- core types ------------------------------------------------------------
|
||||
|
||||
// Saved context: x19..x28 (10), x29/fp, x30/lr, sp — 13 u64 slots.
|
||||
FiberCtx :: struct { regs: [13]u64; }
|
||||
|
||||
FiberState :: enum { ready; running; suspended; done; }
|
||||
|
||||
Fiber :: struct {
|
||||
ctx: FiberCtx;
|
||||
body: Closure() -> void;
|
||||
state: FiberState;
|
||||
sched: *Scheduler;
|
||||
stack_region: *void; // mmap base — for munmap on reap
|
||||
stack_len: i64; // GUARD + STACK, for munmap
|
||||
id: i64;
|
||||
next: *Fiber; // intrusive FIFO ready-queue link
|
||||
}
|
||||
|
||||
Scheduler :: struct {
|
||||
sched_ctx: FiberCtx; // the scheduler loop's own saved context
|
||||
current: *Fiber; // running fiber; null while in the scheduler loop
|
||||
ready_head: *Fiber;
|
||||
ready_tail: *Fiber;
|
||||
own_allocator: Allocator; // captured at init — fibers outlive their spawn scope
|
||||
next_id: i64;
|
||||
n_spawned: i64;
|
||||
n_suspended: i64; // fibers parked off-queue (suspend_self minus wake)
|
||||
|
||||
// Construct a scheduler BY VALUE (allocator value-return convention).
|
||||
// Captures the current `context.allocator` into `own_allocator` — fibers and
|
||||
// their heap `Fiber` structs outlive their spawn scope, so all internal
|
||||
// allocation must go through this captured (long-lived) allocator, not
|
||||
// whatever transient one happens to be current at a later call.
|
||||
init :: () -> Scheduler {
|
||||
s : Scheduler = ---;
|
||||
s.current = null;
|
||||
s.ready_head = null;
|
||||
s.ready_tail = null;
|
||||
s.own_allocator = context.allocator;
|
||||
s.next_id = 0;
|
||||
s.n_spawned = 0;
|
||||
s.n_suspended = 0;
|
||||
return s;
|
||||
}
|
||||
|
||||
// Spawn a fiber running `body`. Heap-allocates the `Fiber` and a guarded
|
||||
// stack, bootstraps the saved context (x19 = *Fiber, fp = 0, lr =
|
||||
// trampoline, sp = stack top), enqueues it ready (FIFO), returns the
|
||||
// `*Fiber`.
|
||||
// KNOWN LIMITATION (env leak): `body` is a fat `{fn_ptr, env}` whose env is
|
||||
// heap-allocated at the closure-literal site. The reap path frees the Fiber
|
||||
// struct + unmaps the stack, but sx exposes no way to free a closure's env
|
||||
// (the scheduler can't name the env pointer), so ONE env per spawned fiber
|
||||
// leaks until program exit. Bounded by the spawn count; under the default
|
||||
// GPA (which frees at exit) it is invisible, but a long-running scheduler
|
||||
// under an arena/tracking allocator accumulates one env per fiber. Freeing
|
||||
// it needs a language affordance for closure-env ownership — deferred.
|
||||
spawn :: (self: *Scheduler, body: Closure() -> void) -> *Fiber {
|
||||
raw := self.own_allocator.alloc_bytes(size_of(Fiber));
|
||||
if raw == null {
|
||||
print("sched: out of memory allocating a Fiber\n");
|
||||
abort();
|
||||
}
|
||||
f : *Fiber = xx raw;
|
||||
f.body = body;
|
||||
f.sched = self;
|
||||
f.id = self.next_id;
|
||||
f.next = null;
|
||||
self.next_id = self.next_id + 1;
|
||||
self.n_spawned = self.n_spawned + 1;
|
||||
|
||||
top := boot_stack(f, STACK);
|
||||
f.ctx.regs[0] = xx f; // x19 = self
|
||||
f.ctx.regs[10] = 0; // fp
|
||||
f.ctx.regs[11] = xx fib_tramp; // lr → trampoline
|
||||
f.ctx.regs[12] = top; // sp
|
||||
|
||||
f.state = .ready;
|
||||
enqueue(self, f);
|
||||
return f;
|
||||
}
|
||||
|
||||
// The running fiber yields cooperatively: mark ready, switch back to the
|
||||
// scheduler. The run loop re-enqueues it (round-robin). MUST be called from
|
||||
// inside a fiber (there must be a running fiber to yield).
|
||||
yield_now :: (self: *Scheduler) {
|
||||
cur := self.current;
|
||||
if cur == null {
|
||||
print("sched: yield_now() called outside a fiber (no running fiber)\n");
|
||||
abort();
|
||||
}
|
||||
cur.state = .ready;
|
||||
swap_context(@cur.ctx, @self.sched_ctx);
|
||||
}
|
||||
|
||||
// The running fiber parks itself: mark suspended, switch back to the
|
||||
// scheduler. The run loop does NOT re-enqueue a suspended fiber — an
|
||||
// external `wake` must re-add it. (Used by FiberIo to park on a blocking
|
||||
// op until completion.) MUST be called from inside a fiber — a null
|
||||
// `current` (called from the bare scheduler/main context) would deref null;
|
||||
// bail loudly instead of segfaulting.
|
||||
suspend_self :: (self: *Scheduler) {
|
||||
cur := self.current;
|
||||
if cur == null {
|
||||
print("sched: suspend_self() called outside a fiber (no running fiber)\n");
|
||||
abort();
|
||||
}
|
||||
cur.state = .suspended;
|
||||
self.n_suspended = self.n_suspended + 1;
|
||||
swap_context(@cur.ctx, @self.sched_ctx);
|
||||
}
|
||||
|
||||
// Re-ready a parked (suspended) fiber and enqueue it. Called from outside
|
||||
// the fiber (e.g. an I/O completion or another fiber) to wake it.
|
||||
//
|
||||
// GUARDED on `.suspended`: enqueue links `f` into the FIFO, so waking a
|
||||
// fiber that is ALREADY queued (`.ready`) or running (`.running`) would
|
||||
// re-link a node already in the list — nulling its `next` mid-list and
|
||||
// cycling `ready_tail` back onto it, corrupting the queue (a spurious /
|
||||
// double wake, or waking a yielded-not-parked fiber, would segfault). Only
|
||||
// a genuinely parked fiber may be re-enqueued; any other wake is a no-op.
|
||||
wake :: (self: *Scheduler, f: *Fiber) {
|
||||
if f.state != .suspended { return; }
|
||||
self.n_suspended = self.n_suspended - 1;
|
||||
f.state = .ready;
|
||||
enqueue(self, f);
|
||||
}
|
||||
|
||||
// The scheduler loop. Runs until the ready queue drains. Each iteration:
|
||||
// dequeue the next fiber, switch into it, and — on its switch back — reap it
|
||||
// if done (munmap stack, free the Fiber), re-enqueue it if it yielded, or
|
||||
// leave it parked if it suspended.
|
||||
run :: (self: *Scheduler) {
|
||||
while self.ready_head != null {
|
||||
f := dequeue(self);
|
||||
self.current = f;
|
||||
f.state = .running;
|
||||
swap_context(@self.sched_ctx, @f.ctx); // returns here when f yields / suspends / finishes
|
||||
self.current = null;
|
||||
if f.state == .done {
|
||||
// We've switched OFF f's stack already (the final swap landed
|
||||
// here), so the stack is free to unmap. Free the Fiber struct
|
||||
// AFTER munmap.
|
||||
munmap(f.stack_region, f.stack_len);
|
||||
self.own_allocator.dealloc_bytes(xx f);
|
||||
} else if f.state == .ready {
|
||||
enqueue(self, f);
|
||||
}
|
||||
// .suspended: leave it parked (not in any queue; `wake` re-adds it).
|
||||
}
|
||||
// The queue drained. If any fiber is still parked, nothing will ever
|
||||
// wake it — its stack + struct are leaked and the program believes it
|
||||
// finished. That is a deadlock; surface it loudly rather than returning
|
||||
// a silent success. (FiberIo, which uses suspend/wake, must balance
|
||||
// every suspend with a wake before the queue empties.)
|
||||
if self.n_suspended != 0 {
|
||||
print("sched: deadlock — {} fiber(s) suspended with an empty run queue\n", self.n_suspended);
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- the context switch (naked) + first-entry trampoline -------------------
|
||||
|
||||
// x0 = from, x1 = to (read straight from the ABI registers — a naked fn has no
|
||||
// frame, so its params are never spilled). SP-in ≠ SP-out by design.
|
||||
swap_context :: (from: *FiberCtx, to: *FiberCtx) abi(.naked) {
|
||||
asm volatile {
|
||||
#string ASM
|
||||
stp x19, x20, [x0, #0]
|
||||
stp x21, x22, [x0, #16]
|
||||
stp x23, x24, [x0, #32]
|
||||
stp x25, x26, [x0, #48]
|
||||
stp x27, x28, [x0, #64]
|
||||
stp x29, x30, [x0, #80]
|
||||
mov x9, sp
|
||||
str x9, [x0, #96]
|
||||
ldp x19, x20, [x1, #0]
|
||||
ldp x21, x22, [x1, #16]
|
||||
ldp x23, x24, [x1, #32]
|
||||
ldp x25, x26, [x1, #48]
|
||||
ldp x27, x28, [x1, #64]
|
||||
ldp x29, x30, [x1, #80]
|
||||
ldr x9, [x1, #96]
|
||||
mov sp, x9
|
||||
ret
|
||||
ASM
|
||||
};
|
||||
}
|
||||
|
||||
// First-entry trampoline: a fiber's bootstrapped LR points here. x19 holds the
|
||||
// `*Fiber` (preset in the saved context); move it to x0 and call the generic
|
||||
// dispatch.
|
||||
asm {
|
||||
#string T
|
||||
.global _fib_tramp
|
||||
_fib_tramp:
|
||||
mov x0, x19
|
||||
bl _fib_dispatch
|
||||
brk #0
|
||||
T,
|
||||
};
|
||||
fib_tramp :: () extern;
|
||||
|
||||
// The ONE place that runs a fiber body. Reached only from `_fib_tramp` on first
|
||||
// entry, on the fiber's own fresh stack. Runs the body, marks the fiber done,
|
||||
// and switches back to the scheduler — never returns past the final switch.
|
||||
fib_dispatch :: (self: *Fiber) export "fib_dispatch" {
|
||||
self.body();
|
||||
self.state = .done;
|
||||
swap_context(@self.ctx, @self.sched.sched_ctx);
|
||||
}
|
||||
|
||||
// --- guarded stack bootstrap ----------------------------------------------
|
||||
|
||||
// mmap a [guard | usable-stack] region, mprotect the low guard page PROT_NONE.
|
||||
// Stores the region base + len on the fiber (for munmap on reap) and returns
|
||||
// the 16-aligned stack top (the bootstrapped SP).
|
||||
boot_stack :: (f: *Fiber, size: i64) -> u64 {
|
||||
total := GUARD + size;
|
||||
region : *void = mmap(null, total, PROT_RW, MAP_AP, -1, 0);
|
||||
// mmap signals failure with MAP_FAILED = (void*)-1 (NOT null). Handing a
|
||||
// wild SP to the switch would `ret` onto garbage — bail loudly instead.
|
||||
if (xx region) == (xx (0 - 1)) {
|
||||
print("sched: mmap failed for a {}-byte fiber stack\n", total);
|
||||
abort();
|
||||
}
|
||||
f.stack_region = region;
|
||||
f.stack_len = total;
|
||||
// Guard-arm: turn the low page unwritable so overflow faults at the
|
||||
// boundary. The guard is mandatory (§8.1.1); a stack handed out without it
|
||||
// would silently corrupt a neighbor on overflow, so a failed mprotect is
|
||||
// fatal, not ignorable.
|
||||
if mprotect(region, GUARD, PROT_NONE) != 0 {
|
||||
print("sched: mprotect(PROT_NONE) failed to arm the stack guard page\n");
|
||||
abort();
|
||||
}
|
||||
usable : u64 = (xx region) + GUARD;
|
||||
top : u64 = usable + size;
|
||||
return top - (top % 16); // 16-byte aligned stack top (AAPCS)
|
||||
}
|
||||
|
||||
// --- intrusive FIFO ready-queue -------------------------------------------
|
||||
|
||||
enqueue :: (self: *Scheduler, f: *Fiber) {
|
||||
f.next = null;
|
||||
if self.ready_tail == null {
|
||||
self.ready_head = f;
|
||||
self.ready_tail = f;
|
||||
} else {
|
||||
self.ready_tail.next = f;
|
||||
self.ready_tail = f;
|
||||
}
|
||||
}
|
||||
|
||||
dequeue :: (self: *Scheduler) -> *Fiber {
|
||||
f := self.ready_head;
|
||||
if f == null { return null; }
|
||||
self.ready_head = f.next;
|
||||
if self.ready_head == null { self.ready_tail = null; }
|
||||
f.next = null;
|
||||
return f;
|
||||
}
|
||||
|
||||
// The public API lives as methods on `Scheduler` (above): `init`, `spawn`,
|
||||
// `yield_now`, `suspend_self`, `wake`, `run`.
|
||||
|
||||
// --- B1.4a: truly-suspending fiber-task async (`go` / `wait` / `cancel`) ----
|
||||
//
|
||||
// An async-task layer on top of the M:1 scheduler: `s.go(work)` runs `work` as
|
||||
// a REAL fiber, and `t.wait()` SUSPENDS the caller fiber until the task's fiber
|
||||
// completes — genuine interleaving, in contrast with io.sx's `context.io.async`
|
||||
// (which runs the worker inline to completion before returning). Distinct from
|
||||
// io.sx's `Future` by design: `Task` is defined here so the two modules stay
|
||||
// decoupled (no cross-import; sched.sx must keep importing only `std.sx`, since
|
||||
// a different import path re-emits the module's global `_fib_tramp` asm and
|
||||
// duplicates the symbol).
|
||||
//
|
||||
// THE NULLARY-THUNK RATIONALE. `work` is a NULLARY thunk `Closure() -> $R`, not
|
||||
// a worker-plus-`..args` pair like io.sx's `async`. A variadic pack is
|
||||
// comptime-only and segfaults if captured into a deferred closure that crosses
|
||||
// the fiber boundary (issue 0156 Part 2). So instead of forwarding inputs as a
|
||||
// pack, the user captures any inputs in the lambda AT THE CALL SITE (where
|
||||
// they're live): `s.go(() -> i64 => compute(a, b))`. Nothing variadic ever
|
||||
// crosses into the fiber — the thunk is a plain `{fn_ptr, env}` fat closure.
|
||||
//
|
||||
// KNOWN LIMITATION (heap-Task leak): `go` heap-allocates the `Task` (it outlives
|
||||
// the call — the fiber fills `value`/`state` later, after `go` has returned), but
|
||||
// B1.4a never frees it. Like the closure-env leak documented on `spawn` above,
|
||||
// this is bounded by the `go` count and invisible under the default GPA (frees
|
||||
// at exit); a long-running scheduler under an arena/tracking allocator
|
||||
// accumulates one `Task` per `go`. Freeing it safely needs join-point ownership
|
||||
// tracking — deferred.
|
||||
//
|
||||
// WAKE-AFTER-COMPLETE ORDERING (both orderings are correct):
|
||||
// - worker finishes BEFORE `wait`: the worker set `t.state = .ready` and saw
|
||||
// `t.waiter == null`, so it issued no wake. `wait` sees `.ready` (not
|
||||
// `.pending`), does NOT park, and returns `t.value` — no lost wakeup.
|
||||
// - `wait` runs BEFORE the worker finishes: `wait` registers itself as
|
||||
// `t.waiter` and parks via `suspend_self`. When the worker finishes it sees
|
||||
// a non-null `t.waiter` and `wake`s it; `wait` resumes and returns the value.
|
||||
|
||||
TaskState :: enum { pending; ready; canceled; }
|
||||
|
||||
// The `!` channel for `wait`. Defined LOCALLY (not reusing io.sx's `IoErr`):
|
||||
// `IoErr` is reachable here only as a re-export alias through std.sx, and the
|
||||
// failable-type detection behind `raise` does not see through that alias to the
|
||||
// underlying `error` set — so `raise error.Canceled` against `(.., !IoErr)`
|
||||
// here is rejected as "not a failable function". A local `error` decl is
|
||||
// recognized directly. (Same `.Canceled` contract as io.sx model (a).)
|
||||
TaskErr :: error { Canceled }
|
||||
|
||||
Task :: struct ($R: Type) {
|
||||
value: R;
|
||||
state: TaskState = .pending;
|
||||
waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one
|
||||
sched: *Scheduler; // owning scheduler (for park/wake in `wait`)
|
||||
canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics)
|
||||
}
|
||||
|
||||
// Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber
|
||||
// finishes. Mirrors `spawn`'s alloc + null-check + abort.
|
||||
go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) {
|
||||
raw := self.own_allocator.alloc_bytes(size_of(Task($R)));
|
||||
if raw == null {
|
||||
print("sched: out of memory allocating a Task\n");
|
||||
abort();
|
||||
}
|
||||
t : *Task($R) = xx raw;
|
||||
t.state = .pending;
|
||||
t.waiter = null;
|
||||
t.sched = self;
|
||||
t.canceled = 0;
|
||||
self.spawn(() => {
|
||||
// Cooperative cancel: skip the work entirely if cancel already landed
|
||||
// before this fiber was scheduled (saves the compute + side effects). A
|
||||
// cancel that lands DURING `work()` still lets it finish (no preemption
|
||||
// in the M:1 model) — cancel suppresses DELIVERY, never an in-flight run.
|
||||
if t.canceled == 0 {
|
||||
t.value = work();
|
||||
t.state = .ready;
|
||||
}
|
||||
// Wake the awaiter only if one already parked (else `wait` will not park).
|
||||
if t.waiter != null { self.wake(xx t.waiter); }
|
||||
});
|
||||
return t;
|
||||
}
|
||||
|
||||
// Suspend the caller until the task completes; return its value (or raise on
|
||||
// cancel). MUST be called from inside a fiber (so there is a `self.current` to
|
||||
// park) — typically from a fiber spawned via `s.spawn(...)`.
|
||||
wait :: ufcs (t: *Task($R)) -> ($R, !TaskErr) {
|
||||
if t.canceled != 0 { raise error.Canceled; }
|
||||
if t.state == .pending {
|
||||
t.waiter = xx t.sched.current; // register self as the waiter
|
||||
t.sched.suspend_self(); // park until the task's fiber wakes us
|
||||
}
|
||||
if t.canceled != 0 or t.state == .canceled { raise error.Canceled; }
|
||||
return t.value;
|
||||
}
|
||||
|
||||
// Request cancellation — rides the `!` channel (model (a), like io.sx 1806). M:1
|
||||
// cooperative: the worker fiber may already have run; cancel still makes a
|
||||
// subsequent (or in-flight) `wait` raise `.Canceled`.
|
||||
cancel :: ufcs (t: *Task($R)) {
|
||||
t.canceled = 1;
|
||||
t.state = .canceled;
|
||||
}
|
||||
Reference in New Issue
Block a user