refactor: retire bespoke Task async; one stack behind context.io (Phase 5)

Converge the Io unification (PLAN-IO-UNIFY Phase 5). The bespoke fiber-task layer
in sched.sx — Task / TaskState / TaskErr / go / wait / cancel(Task), plus
Scheduler.task_allocs and its deinit bookkeeping (~130 lines) — is removed. There
is now ONE async stack: context.io.async / await / cancel / race / sleep over the
Io protocol, with the Scheduler as the fiber Io's engine + driver (spawn /
yield_now / suspend_self / wake / run / block_on_fd remain as the raw primitives;
race stays in sched.sx because it needs meta.sx's make_enum/make_variant).

Migrated the four go/wait users to context.io:
- 1813 — interleave + cancel (sequence 1 2 3 42 100 -99)
- 1817 — m1 end-to-end (completion in deadline order, sum 123)
- 1819 — double-AWAIT loud-abort via the Future one-awaiter guard
- 1820 — deinit: dropped the go/task_allocs tasks; now exercises timers/io_waiters/
  kq cleanup (freed=2, live=3 = the documented per-spawn closure-env residual)

Updated readme.md (the user-facing async section documents context.io.async /
await / race / sleep) and the stale sched.go/sched.Task comments in io.sx.

Suite 854/0; no .ir churn (Task removal touched no snapshotted IR); migrated
examples byte-identical on aarch64-macOS + aarch64-linux. PLAN-IO-UNIFY Phases 0-5
all complete — the two parallel async stacks are now one, behind context.io.
This commit is contained in:
agra
2026-06-28 10:14:17 +03:00
parent 97b0abef66
commit aae7d72a66
11 changed files with 174 additions and 287 deletions

View File

@@ -132,14 +132,13 @@ sx_run_boxed_closure :: (arg: *void) {
// `*Future($R)` handle. The worker must be nullary because under the fiber impl
// the body crosses a fiber boundary, and a captured variadic pack segfaults there
// (issue 0156 Part 2) — so any inputs are captured at the CALL SITE in the lambda
// (`context.io.async(() -> i64 => compute(a, b))`), exactly like `sched.go`.
// (`context.io.async(() -> (i64, !) => compute(a, b))`).
//
// The Future (and the completion-closure `ThunkBox`) are HEAP-allocated (not
// returned by value): under the fiber impl the worker fills the Future AFTER
// `async` returns, so the awaiter and the worker must share one stable object.
// Like `sched.go`'s Task, they currently leak (bounded by the async count;
// invisible under the default GPA). Freeing them needs join-point ownership —
// deferred.
// They currently leak (bounded by the async count; invisible under the default
// GPA). Freeing them needs join-point ownership — deferred.
//
// ALLOCATOR-LIFETIME CONTRACT: both are allocated from the `context.allocator`
// in force at the `async` CALL, and that allocator MUST outlive the future —
@@ -149,8 +148,7 @@ sx_run_boxed_closure :: (arg: *void) {
// drives the worker frees the Future while it is still live (use-after-free).
// The common case (the program-stable default GPA, or a scheduler set up under a
// long-lived allocator) is safe. A deeper fix — `async` capturing the scheduler's
// own long-lived allocator the way `sched.go` does — needs a protocol affordance
// to reach it and is deferred to the convergence phase.
// own long-lived allocator — needs a protocol affordance to reach it; deferred.
async :: ufcs (io: Io, worker: Closure() -> ($R, !)) -> *Future($R) {
raw := context.allocator.alloc_bytes(size_of(Future($R)));
f : *Future($R) = xx raw;
@@ -201,9 +199,9 @@ await :: ufcs (f: *Future($R)) -> ($R, !IoErr) {
// ONE awaiter per future (M:1): the single `park` slot records one parked
// fiber, so a second concurrent `await` on the same pending future would
// OVERWRITE the first awaiter's handle and orphan it forever (the worker's
// single `ready(f.park)` wakes only the last). Enforce loudly here, exactly
// as `sched.Task.wait` does — a non-null handle on a still-pending future
// means another fiber is already parked on it. (Fan-in over many futures —
// single `ready(f.park)` wakes only the last). Enforce loudly here — a
// non-null handle on a still-pending future means another fiber is already
// parked on it. (Fan-in over many futures —
// `race` — registers ONE awaiter across SEPARATE futures, so it is fine.)
if f.park.handle != null {
out("io: await — future already has an awaiter (one awaiter per future in the M:1 model)\n");

View File

@@ -163,14 +163,6 @@ Scheduler :: struct {
// `own_allocator` (long-lived-container rule: a
// waiter outlives the `block_on_fd` call's scope).
// --- deinit bookkeeping: heap Tasks allocated by `go` --------------------
task_allocs: List(*void); // every heap `*Task` from `go`, recorded so
// `deinit` can free them. The scheduler does not
// otherwise know its Tasks (they are generic
// `Task($R)` handed back to the caller); without
// this list they would leak. Grown through
// `own_allocator` (a Task outlives the `go` call).
// Construct a scheduler BY VALUE (allocator value-return convention).
// Captures the current `context.allocator` into `own_allocator` — fibers and
// their heap `Fiber` structs outlive their spawn scope, so all internal
@@ -185,7 +177,7 @@ Scheduler :: struct {
current = null, ready_head = null, ready_tail = null,
own_allocator = context.allocator,
next_id = 0, n_spawned = 0, n_suspended = 0,
clock_ms = 0, timers = .{}, kq = -1, io_waiters = .{}, task_allocs = .{}
clock_ms = 0, timers = .{}, kq = -1, io_waiters = .{}
};
}
@@ -272,7 +264,7 @@ Scheduler :: struct {
wake :: (self: *Scheduler, f: *Fiber) {
if f.state != .suspended { return; }
// Evict any pending sleep timer for `f`. EVERY path that re-readies a
// suspended fiber funnels through `wake` (a manual/Task wake, or the
// suspended fiber funnels through `wake` (a manual/async wake, or the
// timer-fire in `run` — which already removed the fired timer, so this
// is a harmless re-scan there). Without this, a fiber that armed a
// `sleep` timer but was woken EARLY by another path would run to
@@ -284,7 +276,7 @@ Scheduler :: struct {
cancel_timer_for(self, f);
// Same UAF reasoning for fd waiters: every path that re-readies a
// suspended fiber funnels through `wake`. If a fiber armed `block_on_fd`
// but was woken by another path (a manual wake, a Task completion), its
// but was woken by another path (a manual wake, an async completion), its
// `IoWaiter` would otherwise survive pointing at a fiber that runs to
// completion and is reaped (stack munmap'd + Fiber freed). A later
// readiness drain matching that stale record would `wake` freed memory.
@@ -543,33 +535,31 @@ Scheduler :: struct {
}
// Release the scheduler's owned resources. TERMINAL: the scheduler is dead
// after this — no scheduler-owned handle (the `*Task`s returned by `go`, a
// `*Fiber` from `spawn`, the scheduler itself) may be used afterward; doing
// so is a use-after-free, the universal deinit contract. Idempotent: a
// second `deinit` is a no-op (it rests on `List.deinit` nulling `items` +
// zeroing `len`, and on `kq`/`ready_head` being reset below).
// after this — no scheduler-owned handle (a `*Fiber` from `spawn`, the
// scheduler itself) may be used afterward; doing so is a use-after-free, the
// universal deinit contract. Idempotent: a second `deinit` is a no-op (it
// rests on `List.deinit` nulling `items` + zeroing `len`, and on
// `kq`/`ready_head` being reset below).
//
// Call AFTER `run()` has returned: a clean `run()` leaves the ready queue
// empty and aborts loudly on any orphaned suspend, so nothing is mid-flight
// and every `task_allocs` entry is a COMPLETED task (safe to free). Frees,
// in order:
// empty and aborts loudly on any orphaned suspend, so nothing is mid-flight.
// Frees, in order:
// 1. any fibers still enqueued ready — a leak-SAFETY NET for the misuse
// path (`spawn`/`go` without a following `run()`, or after it returned),
// NOT a blessed reuse pattern: reaping a `go`'s fiber here while step (2)
// frees its paired `*Task` is self-consistent ONLY because the contract
// already forbade touching those handles post-`deinit`. A suspended
// (off-queue) fiber is unreachable from here, but a clean `run()` never
// leaves one (it aborts on an orphaned suspend);
// 2. every heap `*Task` from `go` (recorded in `task_allocs`);
// 3. the three `List` backings (`task_allocs`, `timers`, `io_waiters`),
// each grown through `own_allocator`;
// 4. the kqueue fd, if `block_on_fd` ever opened it (lazy `-1` otherwise).
// path (`spawn` without a following `run()`, or after it returned), NOT a
// blessed reuse pattern. A suspended (off-queue) fiber is unreachable
// from here, but a clean `run()` never leaves one (it aborts on an
// orphaned suspend);
// 2. the two `List` backings (`timers`, `io_waiters`), each grown through
// `own_allocator`;
// 3. the kqueue fd, if `block_on_fd` ever opened it (lazy `-1` otherwise).
//
// NOT freed (documented language limitation, unchanged): one closure env per
// `spawn`/`go`. The env is heap-allocated at the closure-literal site and sx
// `spawn`. The env is heap-allocated at the closure-literal site and sx
// exposes no way to free it (the scheduler cannot name the env pointer), so
// it leaks until program exit — bounded by the spawn/go count, invisible
// under the default GPA. Freeing it needs a closure-env-ownership affordance.
// it leaks until program exit — bounded by the spawn count, invisible under
// the default GPA. The unified `context.io.async` layer's heap `Future`s /
// `ThunkBox`es likewise leak (they are not scheduler-tracked) — freeing both
// needs join-point / closure-env ownership affordances.
deinit :: (self: *Scheduler) {
// (1) Reap leftover ready fibers: unmap the stack, free the Fiber.
f := self.ready_head;
@@ -582,17 +572,11 @@ Scheduler :: struct {
self.ready_head = null;
self.ready_tail = null;
// (2) Free every heap Task allocated by `go`.
for self.task_allocs.items[0..self.task_allocs.len] (t) {
self.own_allocator.dealloc_bytes(t);
}
// (3) Free the List backings (all grown through `own_allocator`).
self.task_allocs.deinit(self.own_allocator);
// (2) Free the List backings (all grown through `own_allocator`).
self.timers.deinit(self.own_allocator);
self.io_waiters.deinit(self.own_allocator);
// (4) Close the kqueue fd if it was ever opened (lazy: -1 if never used).
// (3) Close the kqueue fd if it was ever opened (lazy: -1 if never used).
if self.kq >= 0 {
close(self.kq);
self.kq = -1;
@@ -1039,135 +1023,6 @@ wake_io_waiter_for_fd :: (self: *Scheduler, fd: i32) {
// The public API lives as methods on `Scheduler` (above): `init`, `spawn`,
// `yield_now`, `suspend_self`, `wake`, `run`, `now_ms`, `sleep`.
// --- B1.4a: truly-suspending fiber-task async (`go` / `wait` / `cancel`) ----
//
// An async-task layer on top of the M:1 scheduler: `s.go(work)` runs `work` as
// a REAL fiber, and `t.wait()` SUSPENDS the caller fiber until the task's fiber
// completes — genuine interleaving, in contrast with io.sx's `context.io.async`
// (which runs the worker inline to completion before returning). Distinct from
// io.sx's `Future` by design: `Task` is defined here so the two modules stay
// decoupled (no cross-import; sched.sx must keep importing only `std.sx`, since
// a different import path re-emits the module's global `_fib_tramp` asm and
// duplicates the symbol).
//
// THE NULLARY-THUNK RATIONALE. `work` is a NULLARY thunk `Closure() -> $R`, not
// a worker-plus-`..args` pair like io.sx's `async`. A variadic pack is
// comptime-only and segfaults if captured into a deferred closure that crosses
// the fiber boundary (issue 0156 Part 2). So instead of forwarding inputs as a
// pack, the user captures any inputs in the lambda AT THE CALL SITE (where
// they're live): `s.go(() -> i64 => compute(a, b))`. Nothing variadic ever
// crosses into the fiber — the thunk is a plain `{fn_ptr, env}` fat closure.
//
// KNOWN LIMITATION (heap-Task leak): `go` heap-allocates the `Task` (it outlives
// the call — the fiber fills `value`/`state` later, after `go` has returned), but
// B1.4a never frees it. Like the closure-env leak documented on `spawn` above,
// this is bounded by the `go` count and invisible under the default GPA (frees
// at exit); a long-running scheduler under an arena/tracking allocator
// accumulates one `Task` per `go`. Freeing it safely needs join-point ownership
// tracking — deferred.
//
// WAKE-AFTER-COMPLETE ORDERING (both orderings are correct):
// - worker finishes BEFORE `wait`: the worker set `t.state = .ready` and saw
// `t.waiter == null`, so it issued no wake. `wait` sees `.ready` (not
// `.pending`), does NOT park, and returns `t.value` — no lost wakeup.
// - `wait` runs BEFORE the worker finishes: `wait` registers itself as
// `t.waiter` and parks via `suspend_self`. When the worker finishes it sees
// a non-null `t.waiter` and `wake`s it; `wait` resumes and returns the value.
TaskState :: enum { pending; ready; canceled; }
// The `!` channel for `wait`. Defined LOCALLY (not reusing io.sx's `IoErr`):
// `IoErr` is reachable here only as a re-export alias through std.sx, and the
// failable-type detection behind `raise` does not see through that alias to the
// underlying `error` set — so `raise error.Canceled` against `(.., !IoErr)`
// here is rejected as "not a failable function". A local `error` decl is
// recognized directly. (Same `.Canceled` contract as io.sx model (a).)
TaskErr :: error { Canceled }
Task :: struct ($R: Type) {
value: R;
state: TaskState = .pending;
waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one
sched: *Scheduler; // owning scheduler (for park/wake in `wait`)
canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics)
finished: i64; // set to 1 at the very END of the worker body (after the
// work ran OR was skipped on an early cancel). Distinct from
// `state == .canceled` (which `cancel` sets IMMEDIATELY, before
// the fiber has run): a JOINER (`race`) waits on `finished` so it
// knows the worker fiber actually reached its end — no loser
// outlives the `race` call.
}
// Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber
// finishes. Mirrors `spawn`'s alloc + null-check + abort.
go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) {
raw := self.own_allocator.alloc_bytes(size_of(Task($R)));
if raw == null {
print("sched: out of memory allocating a Task\n");
abort();
}
t : *Task($R) = xx raw;
t.state = .pending;
t.waiter = null;
t.sched = self;
t.canceled = 0;
t.finished = 0;
// Record the heap Task so `deinit` can free it (the scheduler otherwise has
// no handle on its generic Tasks). Long-lived: a Task outlives this call.
self.task_allocs.append(xx t, self.own_allocator);
self.spawn(() => {
// Cooperative cancel: skip the work entirely if cancel already landed
// before this fiber was scheduled (saves the compute + side effects). A
// cancel that lands DURING `work()` still lets it finish (no preemption
// in the M:1 model) — cancel suppresses DELIVERY, never an in-flight run.
if t.canceled == 0 {
t.value = work();
t.state = .ready;
}
// The worker has reached its end (ran the work, or skipped it on an early
// cancel). Mark `finished` BEFORE the wake so a joiner that checks the flag
// on resume always observes it set (a `race` JOIN distinguishes a finished
// worker from a merely-flagged-cancelled one via this).
t.finished = 1;
// Wake the awaiter only if one already parked (else `wait`/`race` will not
// park). Fires whether or not the work ran — both `wait` and a `race` join
// resume here.
if t.waiter != null { self.wake(xx t.waiter); }
});
return t;
}
// Suspend the caller until the task completes; return its value (or raise on
// cancel). MUST be called from inside a fiber (so there is a `self.current` to
// park) — typically from a fiber spawned via `s.spawn(...)`.
wait :: ufcs (t: *Task($R)) -> ($R, !TaskErr) {
if t.canceled != 0 { raise error.Canceled; }
if t.state == .pending {
// ONE waiter per task (enforced). A `Task` holds a single `waiter` slot;
// a second concurrent `wait` on the same pending task would OVERWRITE the
// first, and completion would wake only the second — the first fiber
// would stay suspended forever (silent deadlock). The M:1 model is
// single-await per task; enforce it loudly (mirrors `block_on_fd`'s
// one-waiter-per-fd guard). A multi-waiter task would need a waiter list.
if t.waiter != null {
print("sched: wait() — task already has a waiter (one awaiter per task in the M:1 model)\n");
abort();
}
t.waiter = xx t.sched.current; // register self as the waiter
t.sched.suspend_self(); // park until the task's fiber wakes us
}
if t.canceled != 0 or t.state == .canceled { raise error.Canceled; }
return t.value;
}
// Request cancellation — rides the `!` channel (model (a), like io.sx 1806). M:1
// cooperative: the worker fiber may already have run; cancel still makes a
// subsequent (or in-flight) `wait` raise `.Canceled`.
cancel :: ufcs (t: *Task($R)) {
t.canceled = 1;
t.state = .canceled;
}
// --- B2/A1: structured first-wins `race` over `context.io` Futures -----------
//
// `context.io.race((a: fa, b: fb, …))` starts from N already-spawned `*Future(..)`