Add a virtual clock + sleep timers to the M:1 scheduler so fibers
schedule in reproducible simulated time. Scheduler gains clock_ms (the
virtual clock, advances only as timers fire), a timers list, now_ms(),
sleep(ms) (arm {clock_ms+ms, current} + suspend), and a timer-driven
run (drain ready -> fire earliest timer -> advance clock -> wake ->
repeat; the orphan-suspend deadlock check is preserved for a genuine
no-timer park). Wakes fire in deadline order with a FIFO tiebreak.
Adversarial review found a use-after-free: a fiber woken early (manual
or Task wake) before its sleep timer fired was reaped while its Timer
kept a dangling *Fiber, so a later fire dereferenced freed memory.
Fixed: wake evicts the fiber's pending timer (cancel_timer_for) -- every
re-ready path funnels through wake, so no stale timer outlives its fiber.
Examples: 1814 (sim-timer deadline ordering), 1815 (early-wake timer
eviction regression). Suite green 753/0.
540 lines
24 KiB
Plaintext
540 lines
24 KiB
Plaintext
// Stream B1 (fibers) B1.5a — the M:1 cooperative fiber scheduler core.
|
|
//
|
|
// A `Scheduler` drives any number of `Fiber`s, each running a stackful
|
|
// `body: Closure() -> void` on its own guarded `mmap` stack (the §8.1.1 guard
|
|
// page turns a stack overflow into an immediate fault instead of silent
|
|
// neighbor corruption). Fibers cooperate: a running fiber hands control back to
|
|
// the scheduler loop via `yield_now` (re-enqueued, round-robin) or
|
|
// `suspend_self` (parked off-queue until an external `wake`). When a body
|
|
// returns, the fiber reaches `.done`, its stack is `munmap`'d and its heap
|
|
// `Fiber` freed.
|
|
//
|
|
// Built on the proven primitives from examples/concurrency/1807-1809:
|
|
// - `swap_context` (aarch64 `abi(.naked)`, 13-slot save area: x19..x28, fp,
|
|
// lr, sp) saves the callee-saved registers + SP into `*from` and loads them
|
|
// from `*to`, then `ret`s onto `to`'s stack.
|
|
// - the `_fib_tramp` global-asm first-entry trampoline: x19 holds the
|
|
// bootstrapped `*Fiber`; it moves it to x0 and `bl`s the exported generic
|
|
// dispatch `fib_dispatch`, which calls the body then switches back to the
|
|
// scheduler.
|
|
// - guarded `mmap` stacks: `[GUARD | usable]`, low GUARD page `mprotect`'d
|
|
// PROT_NONE, 16-aligned top returned as the bootstrapped SP.
|
|
//
|
|
// aarch64-macOS-pinned: the `swap_context` asm + the 13-slot save area are
|
|
// per-arch; the `mmap` flag constants (MAP_ANON = 0x1000) and the 16 KB guard
|
|
// page are Apple-specific. Runs end-to-end on a matching host, ir-only on a
|
|
// mismatch.
|
|
#import "modules/std.sx";
|
|
|
|
// --- libc mmap stack primitives -------------------------------------------
|
|
|
|
mmap :: (addr: *void, len: i64, prot: i32, flags: i32, fd: i32, off: i64) -> *void extern libc "mmap";
|
|
mprotect :: (addr: *void, len: i64, prot: i32) -> i32 extern libc "mprotect";
|
|
munmap :: (addr: *void, len: i64) -> i32 extern libc "munmap";
|
|
abort :: () -> noreturn extern libc "abort";
|
|
|
|
PROT_NONE :: 0;
|
|
PROT_RW :: 3; // PROT_READ | PROT_WRITE
|
|
MAP_AP :: 0x1002; // macOS MAP_PRIVATE (0x2) | MAP_ANON (0x1000)
|
|
GUARD :: 16384; // one 16 KB page (aarch64-macOS)
|
|
STACK :: 131072; // 128 KB usable per fiber
|
|
|
|
// --- core types ------------------------------------------------------------
|
|
|
|
// Saved context: x19..x28 (10), x29/fp, x30/lr, sp — 13 u64 slots.
|
|
FiberCtx :: struct { regs: [13]u64; }
|
|
|
|
FiberState :: enum { ready; running; suspended; done; }
|
|
|
|
Fiber :: struct {
|
|
ctx: FiberCtx;
|
|
body: Closure() -> void;
|
|
state: FiberState;
|
|
sched: *Scheduler;
|
|
stack_region: *void; // mmap base — for munmap on reap
|
|
stack_len: i64; // GUARD + STACK, for munmap
|
|
id: i64;
|
|
next: *Fiber; // intrusive FIFO ready-queue link
|
|
}
|
|
|
|
// A pending virtual-time timer: wake `fiber` once the virtual clock reaches
|
|
// `deadline_ms`. Stored in `Scheduler.timers` (a `List`) in insertion order, so
|
|
// a linear min-scan that takes the FIRST entry at the minimum deadline gives a
|
|
// stable FIFO tiebreak for equal deadlines.
|
|
Timer :: struct {
|
|
deadline_ms: i64;
|
|
fiber: *Fiber;
|
|
}
|
|
|
|
Scheduler :: struct {
|
|
sched_ctx: FiberCtx; // the scheduler loop's own saved context
|
|
current: *Fiber; // running fiber; null while in the scheduler loop
|
|
ready_head: *Fiber;
|
|
ready_tail: *Fiber;
|
|
own_allocator: Allocator; // captured at init — fibers outlive their spawn scope
|
|
next_id: i64;
|
|
n_spawned: i64;
|
|
n_suspended: i64; // fibers parked off-queue (suspend_self minus wake)
|
|
|
|
// --- B1.4b: deterministic virtual-time timer scheduling ----------------
|
|
clock_ms: i64; // the VIRTUAL clock (ms). Starts 0; advances ONLY
|
|
// when the ready queue drains and the earliest
|
|
// pending timer fires. No real wall clock is ever
|
|
// read — wake ORDER + timestamps are reproducible.
|
|
timers: List(Timer); // pending sleep timers, in insertion order. Grown
|
|
// through `own_allocator` (long-lived-container
|
|
// rule: a timer outlives the `sleep` call's scope).
|
|
|
|
// Construct a scheduler BY VALUE (allocator value-return convention).
|
|
// Captures the current `context.allocator` into `own_allocator` — fibers and
|
|
// their heap `Fiber` structs outlive their spawn scope, so all internal
|
|
// allocation must go through this captured (long-lived) allocator, not
|
|
// whatever transient one happens to be current at a later call.
|
|
init :: () -> Scheduler {
|
|
s : Scheduler = ---;
|
|
s.current = null;
|
|
s.ready_head = null;
|
|
s.ready_tail = null;
|
|
s.own_allocator = context.allocator;
|
|
s.next_id = 0;
|
|
s.n_spawned = 0;
|
|
s.n_suspended = 0;
|
|
s.clock_ms = 0;
|
|
s.timers = .{};
|
|
return s;
|
|
}
|
|
|
|
// Spawn a fiber running `body`. Heap-allocates the `Fiber` and a guarded
|
|
// stack, bootstraps the saved context (x19 = *Fiber, fp = 0, lr =
|
|
// trampoline, sp = stack top), enqueues it ready (FIFO), returns the
|
|
// `*Fiber`.
|
|
// KNOWN LIMITATION (env leak): `body` is a fat `{fn_ptr, env}` whose env is
|
|
// heap-allocated at the closure-literal site. The reap path frees the Fiber
|
|
// struct + unmaps the stack, but sx exposes no way to free a closure's env
|
|
// (the scheduler can't name the env pointer), so ONE env per spawned fiber
|
|
// leaks until program exit. Bounded by the spawn count; under the default
|
|
// GPA (which frees at exit) it is invisible, but a long-running scheduler
|
|
// under an arena/tracking allocator accumulates one env per fiber. Freeing
|
|
// it needs a language affordance for closure-env ownership — deferred.
|
|
spawn :: (self: *Scheduler, body: Closure() -> void) -> *Fiber {
|
|
raw := self.own_allocator.alloc_bytes(size_of(Fiber));
|
|
if raw == null {
|
|
print("sched: out of memory allocating a Fiber\n");
|
|
abort();
|
|
}
|
|
f : *Fiber = xx raw;
|
|
f.body = body;
|
|
f.sched = self;
|
|
f.id = self.next_id;
|
|
f.next = null;
|
|
self.next_id = self.next_id + 1;
|
|
self.n_spawned = self.n_spawned + 1;
|
|
|
|
top := boot_stack(f, STACK);
|
|
f.ctx.regs[0] = xx f; // x19 = self
|
|
f.ctx.regs[10] = 0; // fp
|
|
f.ctx.regs[11] = xx fib_tramp; // lr → trampoline
|
|
f.ctx.regs[12] = top; // sp
|
|
|
|
f.state = .ready;
|
|
enqueue(self, f);
|
|
return f;
|
|
}
|
|
|
|
// The running fiber yields cooperatively: mark ready, switch back to the
|
|
// scheduler. The run loop re-enqueues it (round-robin). MUST be called from
|
|
// inside a fiber (there must be a running fiber to yield).
|
|
yield_now :: (self: *Scheduler) {
|
|
cur := self.current;
|
|
if cur == null {
|
|
print("sched: yield_now() called outside a fiber (no running fiber)\n");
|
|
abort();
|
|
}
|
|
cur.state = .ready;
|
|
swap_context(@cur.ctx, @self.sched_ctx);
|
|
}
|
|
|
|
// The running fiber parks itself: mark suspended, switch back to the
|
|
// scheduler. The run loop does NOT re-enqueue a suspended fiber — an
|
|
// external `wake` must re-add it. (Used by FiberIo to park on a blocking
|
|
// op until completion.) MUST be called from inside a fiber — a null
|
|
// `current` (called from the bare scheduler/main context) would deref null;
|
|
// bail loudly instead of segfaulting.
|
|
suspend_self :: (self: *Scheduler) {
|
|
cur := self.current;
|
|
if cur == null {
|
|
print("sched: suspend_self() called outside a fiber (no running fiber)\n");
|
|
abort();
|
|
}
|
|
cur.state = .suspended;
|
|
self.n_suspended = self.n_suspended + 1;
|
|
swap_context(@cur.ctx, @self.sched_ctx);
|
|
}
|
|
|
|
// Re-ready a parked (suspended) fiber and enqueue it. Called from outside
|
|
// the fiber (e.g. an I/O completion or another fiber) to wake it.
|
|
//
|
|
// GUARDED on `.suspended`: enqueue links `f` into the FIFO, so waking a
|
|
// fiber that is ALREADY queued (`.ready`) or running (`.running`) would
|
|
// re-link a node already in the list — nulling its `next` mid-list and
|
|
// cycling `ready_tail` back onto it, corrupting the queue (a spurious /
|
|
// double wake, or waking a yielded-not-parked fiber, would segfault). Only
|
|
// a genuinely parked fiber may be re-enqueued; any other wake is a no-op.
|
|
wake :: (self: *Scheduler, f: *Fiber) {
|
|
if f.state != .suspended { return; }
|
|
// Evict any pending sleep timer for `f`. EVERY path that re-readies a
|
|
// suspended fiber funnels through `wake` (a manual/Task wake, or the
|
|
// timer-fire in `run` — which already removed the fired timer, so this
|
|
// is a harmless re-scan there). Without this, a fiber that armed a
|
|
// `sleep` timer but was woken EARLY by another path would run to
|
|
// completion and be reaped (stack munmap'd + Fiber freed) while its
|
|
// Timer still held a dangling `*Fiber` — a later fire would dereference
|
|
// freed memory (use-after-free). One timer per fiber max in the M:1
|
|
// model, so a single eviction suffices; it also prevents a stale timer
|
|
// from spuriously re-waking a since-re-slept fiber.
|
|
cancel_timer_for(self, f);
|
|
self.n_suspended = self.n_suspended - 1;
|
|
f.state = .ready;
|
|
enqueue(self, f);
|
|
}
|
|
|
|
// Read the VIRTUAL clock — the simulated millisecond time. Advances only as
|
|
// timers fire (in `run`), never from a real wall clock, so two runs of the
|
|
// same fiber program observe identical timestamps. A fiber that just woke
|
|
// from `sleep(ms)` sees `now_ms()` equal to its deadline.
|
|
now_ms :: (self: *Scheduler) -> i64 {
|
|
return self.clock_ms;
|
|
}
|
|
|
|
// Sleep the running fiber for `ms` simulated milliseconds: arm a timer at
|
|
// `clock_ms + ms`, then park off-queue. The scheduler advances the virtual
|
|
// clock to this deadline and wakes the fiber once the ready queue has fully
|
|
// drained AND no earlier timer is pending (deadline order, FIFO tiebreak).
|
|
// MUST be called from inside a fiber (there must be a `current` to park);
|
|
// a null `current` bails loudly, mirroring `suspend_self`.
|
|
//
|
|
// Virtual time only moves forward: `ms >= 0` makes the deadline
|
|
// `>= clock_ms`, so a fired timer never rewinds the clock.
|
|
sleep :: (self: *Scheduler, ms: i64) {
|
|
cur := self.current;
|
|
if cur == null {
|
|
print("sched: sleep() called outside a fiber (no running fiber)\n");
|
|
abort();
|
|
}
|
|
t : Timer = .{ deadline_ms = self.clock_ms + ms, fiber = cur };
|
|
// Long-lived-container rule: a timer outlives this `sleep` call's scope
|
|
// (it survives in `self.timers` until the scheduler fires it), so grow
|
|
// through the captured `own_allocator`, never the transient current one.
|
|
self.timers.append(t, self.own_allocator);
|
|
self.suspend_self(); // parks `cur` off-queue; the timer fire re-wakes it
|
|
}
|
|
|
|
// The scheduler loop. Drives ready fibers to quiescence, then advances the
|
|
// virtual clock by firing the earliest pending timer (which re-readies its
|
|
// sleeper), and repeats — until both the ready queue and the timer set are
|
|
// empty. Within the inner drain each iteration: dequeue the next fiber,
|
|
// switch into it, and — on its switch back — reap it if done (munmap stack,
|
|
// free the Fiber), re-enqueue it if it yielded, or leave it parked if it
|
|
// suspended.
|
|
run :: (self: *Scheduler) {
|
|
while true {
|
|
while self.ready_head != null {
|
|
f := dequeue(self);
|
|
self.current = f;
|
|
f.state = .running;
|
|
swap_context(@self.sched_ctx, @f.ctx); // returns here when f yields / suspends / finishes
|
|
self.current = null;
|
|
if f.state == .done {
|
|
// We've switched OFF f's stack already (the final swap landed
|
|
// here), so the stack is free to unmap. Free the Fiber struct
|
|
// AFTER munmap.
|
|
munmap(f.stack_region, f.stack_len);
|
|
self.own_allocator.dealloc_bytes(xx f);
|
|
} else if f.state == .ready {
|
|
enqueue(self, f);
|
|
}
|
|
// .suspended: leave it parked (not in any queue; `wake` re-adds it).
|
|
}
|
|
// Ready queue drained. Fire the earliest pending timer — the one
|
|
// sleeper whose deadline is next — advancing the virtual clock to it.
|
|
// No timers left ⇒ nothing more can run; exit the loop.
|
|
idx := earliest_timer(self);
|
|
if idx < 0 { break; }
|
|
t := self.timers.items[idx];
|
|
remove_timer(self, idx);
|
|
self.clock_ms = t.deadline_ms; // advance VIRTUAL time forward
|
|
self.wake(t.fiber); // re-enqueue the sleeper → drain again
|
|
}
|
|
// Both the ready queue and the timer set are empty. If a fiber is STILL
|
|
// parked, no timer will ever wake it (a `suspend_self` without an armed
|
|
// timer, never externally woken) — its stack + struct are leaked and the
|
|
// program believes it finished. That is a genuine deadlock; surface it
|
|
// loudly. (Timer sleepers are balanced: each `sleep` increments
|
|
// `n_suspended` via `suspend_self`, and the timer-fire `wake` decrements
|
|
// it — so once every timer has fired, `n_suspended` counts only true
|
|
// orphans.)
|
|
if self.n_suspended != 0 {
|
|
print("sched: deadlock — {} fiber(s) suspended with an empty run queue\n", self.n_suspended);
|
|
abort();
|
|
}
|
|
}
|
|
}
|
|
|
|
// --- the context switch (naked) + first-entry trampoline -------------------
|
|
|
|
// x0 = from, x1 = to (read straight from the ABI registers — a naked fn has no
|
|
// frame, so its params are never spilled). SP-in ≠ SP-out by design.
|
|
swap_context :: (from: *FiberCtx, to: *FiberCtx) abi(.naked) {
|
|
asm volatile {
|
|
#string ASM
|
|
stp x19, x20, [x0, #0]
|
|
stp x21, x22, [x0, #16]
|
|
stp x23, x24, [x0, #32]
|
|
stp x25, x26, [x0, #48]
|
|
stp x27, x28, [x0, #64]
|
|
stp x29, x30, [x0, #80]
|
|
mov x9, sp
|
|
str x9, [x0, #96]
|
|
ldp x19, x20, [x1, #0]
|
|
ldp x21, x22, [x1, #16]
|
|
ldp x23, x24, [x1, #32]
|
|
ldp x25, x26, [x1, #48]
|
|
ldp x27, x28, [x1, #64]
|
|
ldp x29, x30, [x1, #80]
|
|
ldr x9, [x1, #96]
|
|
mov sp, x9
|
|
ret
|
|
ASM
|
|
};
|
|
}
|
|
|
|
// First-entry trampoline: a fiber's bootstrapped LR points here. x19 holds the
|
|
// `*Fiber` (preset in the saved context); move it to x0 and call the generic
|
|
// dispatch.
|
|
asm {
|
|
#string T
|
|
.global _fib_tramp
|
|
_fib_tramp:
|
|
mov x0, x19
|
|
bl _fib_dispatch
|
|
brk #0
|
|
T,
|
|
};
|
|
fib_tramp :: () extern;
|
|
|
|
// The ONE place that runs a fiber body. Reached only from `_fib_tramp` on first
|
|
// entry, on the fiber's own fresh stack. Runs the body, marks the fiber done,
|
|
// and switches back to the scheduler — never returns past the final switch.
|
|
fib_dispatch :: (self: *Fiber) export "fib_dispatch" {
|
|
self.body();
|
|
self.state = .done;
|
|
swap_context(@self.ctx, @self.sched.sched_ctx);
|
|
}
|
|
|
|
// --- guarded stack bootstrap ----------------------------------------------
|
|
|
|
// mmap a [guard | usable-stack] region, mprotect the low guard page PROT_NONE.
|
|
// Stores the region base + len on the fiber (for munmap on reap) and returns
|
|
// the 16-aligned stack top (the bootstrapped SP).
|
|
boot_stack :: (f: *Fiber, size: i64) -> u64 {
|
|
total := GUARD + size;
|
|
region : *void = mmap(null, total, PROT_RW, MAP_AP, -1, 0);
|
|
// mmap signals failure with MAP_FAILED = (void*)-1 (NOT null). Handing a
|
|
// wild SP to the switch would `ret` onto garbage — bail loudly instead.
|
|
if (xx region) == (xx (0 - 1)) {
|
|
print("sched: mmap failed for a {}-byte fiber stack\n", total);
|
|
abort();
|
|
}
|
|
f.stack_region = region;
|
|
f.stack_len = total;
|
|
// Guard-arm: turn the low page unwritable so overflow faults at the
|
|
// boundary. The guard is mandatory (§8.1.1); a stack handed out without it
|
|
// would silently corrupt a neighbor on overflow, so a failed mprotect is
|
|
// fatal, not ignorable.
|
|
if mprotect(region, GUARD, PROT_NONE) != 0 {
|
|
print("sched: mprotect(PROT_NONE) failed to arm the stack guard page\n");
|
|
abort();
|
|
}
|
|
usable : u64 = (xx region) + GUARD;
|
|
top : u64 = usable + size;
|
|
return top - (top % 16); // 16-byte aligned stack top (AAPCS)
|
|
}
|
|
|
|
// --- intrusive FIFO ready-queue -------------------------------------------
|
|
|
|
enqueue :: (self: *Scheduler, f: *Fiber) {
|
|
f.next = null;
|
|
if self.ready_tail == null {
|
|
self.ready_head = f;
|
|
self.ready_tail = f;
|
|
} else {
|
|
self.ready_tail.next = f;
|
|
self.ready_tail = f;
|
|
}
|
|
}
|
|
|
|
dequeue :: (self: *Scheduler) -> *Fiber {
|
|
f := self.ready_head;
|
|
if f == null { return null; }
|
|
self.ready_head = f.next;
|
|
if self.ready_head == null { self.ready_tail = null; }
|
|
f.next = null;
|
|
return f;
|
|
}
|
|
|
|
// --- virtual-time timer set (linear min-scan, FIFO tiebreak) ---------------
|
|
//
|
|
// The timer set is a plain `List(Timer)` kept in INSERTION order. Fiber counts
|
|
// are tiny, so a linear scan for the minimum deadline is ideal — no heap to
|
|
// maintain — and "first entry at the minimum" naturally gives FIFO ordering for
|
|
// equal deadlines (the earlier-inserted timer is visited first, so it wins the
|
|
// tie). Removal shifts the tail down by one to preserve that insertion order for
|
|
// the remaining entries.
|
|
|
|
// Index of the earliest-deadline pending timer, or -1 if none. On a deadline
|
|
// tie the lowest index (earliest inserted) wins → deterministic FIFO wake order.
|
|
earliest_timer :: (self: *Scheduler) -> i64 {
|
|
if self.timers.len == 0 { return -1; }
|
|
best := 0;
|
|
i := 1;
|
|
while i < self.timers.len {
|
|
// Strict `<` so equal deadlines do NOT displace the earlier (lower)
|
|
// index — that is the FIFO tiebreak.
|
|
if self.timers.items[i].deadline_ms < self.timers.items[best].deadline_ms {
|
|
best = i;
|
|
}
|
|
i = i + 1;
|
|
}
|
|
return best;
|
|
}
|
|
|
|
// Remove the timer at `idx`, shifting every later entry down one slot so the
|
|
// remaining timers keep their insertion order (preserving the FIFO tiebreak).
|
|
remove_timer :: (self: *Scheduler, idx: i64) {
|
|
i := idx;
|
|
while i < self.timers.len - 1 {
|
|
self.timers.items[i] = self.timers.items[i + 1];
|
|
i = i + 1;
|
|
}
|
|
self.timers.len = self.timers.len - 1;
|
|
}
|
|
|
|
// Remove a pending sleep timer referencing fiber `f`, if any. A fiber has at
|
|
// most one pending timer in the M:1 model (it can only `sleep` once before
|
|
// suspending), so the first match is the only one. No-op if `f` has none.
|
|
cancel_timer_for :: (self: *Scheduler, f: *Fiber) {
|
|
i := 0;
|
|
while i < self.timers.len {
|
|
if self.timers.items[i].fiber == f {
|
|
remove_timer(self, i);
|
|
return;
|
|
}
|
|
i = i + 1;
|
|
}
|
|
}
|
|
|
|
// The public API lives as methods on `Scheduler` (above): `init`, `spawn`,
|
|
// `yield_now`, `suspend_self`, `wake`, `run`, `now_ms`, `sleep`.
|
|
|
|
// --- B1.4a: truly-suspending fiber-task async (`go` / `wait` / `cancel`) ----
|
|
//
|
|
// An async-task layer on top of the M:1 scheduler: `s.go(work)` runs `work` as
|
|
// a REAL fiber, and `t.wait()` SUSPENDS the caller fiber until the task's fiber
|
|
// completes — genuine interleaving, in contrast with io.sx's `context.io.async`
|
|
// (which runs the worker inline to completion before returning). Distinct from
|
|
// io.sx's `Future` by design: `Task` is defined here so the two modules stay
|
|
// decoupled (no cross-import; sched.sx must keep importing only `std.sx`, since
|
|
// a different import path re-emits the module's global `_fib_tramp` asm and
|
|
// duplicates the symbol).
|
|
//
|
|
// THE NULLARY-THUNK RATIONALE. `work` is a NULLARY thunk `Closure() -> $R`, not
|
|
// a worker-plus-`..args` pair like io.sx's `async`. A variadic pack is
|
|
// comptime-only and segfaults if captured into a deferred closure that crosses
|
|
// the fiber boundary (issue 0156 Part 2). So instead of forwarding inputs as a
|
|
// pack, the user captures any inputs in the lambda AT THE CALL SITE (where
|
|
// they're live): `s.go(() -> i64 => compute(a, b))`. Nothing variadic ever
|
|
// crosses into the fiber — the thunk is a plain `{fn_ptr, env}` fat closure.
|
|
//
|
|
// KNOWN LIMITATION (heap-Task leak): `go` heap-allocates the `Task` (it outlives
|
|
// the call — the fiber fills `value`/`state` later, after `go` has returned), but
|
|
// B1.4a never frees it. Like the closure-env leak documented on `spawn` above,
|
|
// this is bounded by the `go` count and invisible under the default GPA (frees
|
|
// at exit); a long-running scheduler under an arena/tracking allocator
|
|
// accumulates one `Task` per `go`. Freeing it safely needs join-point ownership
|
|
// tracking — deferred.
|
|
//
|
|
// WAKE-AFTER-COMPLETE ORDERING (both orderings are correct):
|
|
// - worker finishes BEFORE `wait`: the worker set `t.state = .ready` and saw
|
|
// `t.waiter == null`, so it issued no wake. `wait` sees `.ready` (not
|
|
// `.pending`), does NOT park, and returns `t.value` — no lost wakeup.
|
|
// - `wait` runs BEFORE the worker finishes: `wait` registers itself as
|
|
// `t.waiter` and parks via `suspend_self`. When the worker finishes it sees
|
|
// a non-null `t.waiter` and `wake`s it; `wait` resumes and returns the value.
|
|
|
|
TaskState :: enum { pending; ready; canceled; }
|
|
|
|
// The `!` channel for `wait`. Defined LOCALLY (not reusing io.sx's `IoErr`):
|
|
// `IoErr` is reachable here only as a re-export alias through std.sx, and the
|
|
// failable-type detection behind `raise` does not see through that alias to the
|
|
// underlying `error` set — so `raise error.Canceled` against `(.., !IoErr)`
|
|
// here is rejected as "not a failable function". A local `error` decl is
|
|
// recognized directly. (Same `.Canceled` contract as io.sx model (a).)
|
|
TaskErr :: error { Canceled }
|
|
|
|
Task :: struct ($R: Type) {
|
|
value: R;
|
|
state: TaskState = .pending;
|
|
waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one
|
|
sched: *Scheduler; // owning scheduler (for park/wake in `wait`)
|
|
canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics)
|
|
}
|
|
|
|
// Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber
|
|
// finishes. Mirrors `spawn`'s alloc + null-check + abort.
|
|
go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) {
|
|
raw := self.own_allocator.alloc_bytes(size_of(Task($R)));
|
|
if raw == null {
|
|
print("sched: out of memory allocating a Task\n");
|
|
abort();
|
|
}
|
|
t : *Task($R) = xx raw;
|
|
t.state = .pending;
|
|
t.waiter = null;
|
|
t.sched = self;
|
|
t.canceled = 0;
|
|
self.spawn(() => {
|
|
// Cooperative cancel: skip the work entirely if cancel already landed
|
|
// before this fiber was scheduled (saves the compute + side effects). A
|
|
// cancel that lands DURING `work()` still lets it finish (no preemption
|
|
// in the M:1 model) — cancel suppresses DELIVERY, never an in-flight run.
|
|
if t.canceled == 0 {
|
|
t.value = work();
|
|
t.state = .ready;
|
|
}
|
|
// Wake the awaiter only if one already parked (else `wait` will not park).
|
|
if t.waiter != null { self.wake(xx t.waiter); }
|
|
});
|
|
return t;
|
|
}
|
|
|
|
// Suspend the caller until the task completes; return its value (or raise on
|
|
// cancel). MUST be called from inside a fiber (so there is a `self.current` to
|
|
// park) — typically from a fiber spawned via `s.spawn(...)`.
|
|
wait :: ufcs (t: *Task($R)) -> ($R, !TaskErr) {
|
|
if t.canceled != 0 { raise error.Canceled; }
|
|
if t.state == .pending {
|
|
t.waiter = xx t.sched.current; // register self as the waiter
|
|
t.sched.suspend_self(); // park until the task's fiber wakes us
|
|
}
|
|
if t.canceled != 0 or t.state == .canceled { raise error.Canceled; }
|
|
return t.value;
|
|
}
|
|
|
|
// Request cancellation — rides the `!` channel (model (a), like io.sx 1806). M:1
|
|
// cooperative: the worker fiber may already have run; cancel still makes a
|
|
// subsequent (or in-flight) `wait` raise `.Canceled`.
|
|
cancel :: ufcs (t: *Task($R)) {
|
|
t.canceled = 1;
|
|
t.state = .canceled;
|
|
}
|