From aae7d72a664cafbefd8a8e8c5c68e7dd2889da1c Mon Sep 17 00:00:00 2001 From: agra Date: Sun, 28 Jun 2026 10:14:17 +0300 Subject: [PATCH] refactor: retire bespoke Task async; one stack behind context.io (Phase 5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Converge the Io unification (PLAN-IO-UNIFY Phase 5). The bespoke fiber-task layer in sched.sx — Task / TaskState / TaskErr / go / wait / cancel(Task), plus Scheduler.task_allocs and its deinit bookkeeping (~130 lines) — is removed. There is now ONE async stack: context.io.async / await / cancel / race / sleep over the Io protocol, with the Scheduler as the fiber Io's engine + driver (spawn / yield_now / suspend_self / wake / run / block_on_fd remain as the raw primitives; race stays in sched.sx because it needs meta.sx's make_enum/make_variant). Migrated the four go/wait users to context.io: - 1813 — interleave + cancel (sequence 1 2 3 42 100 -99) - 1817 — m1 end-to-end (completion in deadline order, sum 123) - 1819 — double-AWAIT loud-abort via the Future one-awaiter guard - 1820 — deinit: dropped the go/task_allocs tasks; now exercises timers/io_waiters/ kq cleanup (freed=2, live=3 = the documented per-spawn closure-env residual) Updated readme.md (the user-facing async section documents context.io.async / await / race / sleep) and the stale sched.go/sched.Task comments in io.sx. Suite 854/0; no .ir churn (Task removal touched no snapshotted IR); migrated examples byte-identical on aarch64-macOS + aarch64-linux. PLAN-IO-UNIFY Phases 0-5 all complete — the two parallel async stacks are now one, behind context.io. --- current/PLAN-IO-UNIFY.md | 17 ++ .../1805-concurrency-io-blocking-async.sx | 8 +- .../1813-concurrency-fiber-async-suspend.sx | 88 ++++---- .../1817-concurrency-fiber-m1-end-to-end.sx | 32 +-- .../1819-concurrency-fiber-double-wait.sx | 20 +- ...1820-concurrency-fiber-scheduler-deinit.sx | 22 +- .../1819-concurrency-fiber-double-wait.stdout | 2 +- ...-concurrency-fiber-scheduler-deinit.stdout | 4 +- library/modules/std/io.sx | 16 +- library/modules/std/sched.sx | 193 +++--------------- readme.md | 59 +++--- 11 files changed, 174 insertions(+), 287 deletions(-) diff --git a/current/PLAN-IO-UNIFY.md b/current/PLAN-IO-UNIFY.md index 2dad5140..bb6a6a4c 100644 --- a/current/PLAN-IO-UNIFY.md +++ b/current/PLAN-IO-UNIFY.md @@ -36,6 +36,23 @@ installed via `push Context { io = xx scheduler } { … s.run(); }` — exactly just with the scheduler now reachable as `context.io`. ## Status (2026-06-28) +- **Phase 5 — CONVERGE: retire the bespoke fiber async API. DONE. Io unification + COMPLETE.** The bespoke `Task` layer (`Task`/`TaskState`/`TaskErr`/`go`/`wait`/ + `cancel(Task)` + `Scheduler.task_allocs` and its deinit handling, ~130 lines) + is removed from sched.sx. There is now ONE async stack: `context.io.async`/ + `await`/`cancel`/`race`/`sleep` over the `Io` protocol, with the `Scheduler` as + the fiber Io's engine + driver (`spawn`/`yield_now`/`suspend_self`/`wake`/`run`/ + `block_on_fd` stay as the raw primitives). Migrated the four `go`/`wait` users to + `context.io`: 1813 (interleave + cancel), 1817 (m1 end-to-end sum=123), 1819 + (double-AWAIT loud-abort via the Future one-awaiter guard), 1820 (deinit — the + `go`/`task_allocs` tasks dropped; it now exercises timers/io_waiters/kq cleanup, + `freed=2`/`live=3`). `race` stays in sched.sx (needs meta.sx). Updated readme.md + (the user-facing async section now documents `context.io.async`/`await`/`race`/ + `sleep`) and the stale `sched.go`/`sched.Task` comments in io.sx. Suite 854/0; no + `.ir` churn (the Task removal touched no snapshotted IR); migrated examples + byte-identical on aarch64-macOS + aarch64-linux. **PLAN-IO-UNIFY Phases 0–5 all + complete — the two parallel async stacks are now one, behind `context.io`.** + - **Phase 4 — `race` over Futures via `context.io.race`. DONE.** Re-homed the proven first-wins race from `sched.race(*Task)` onto `*Future` handles + the `Io` protocol; the old Task-based `race` is REPLACED (ufcs overload-by-receiver diff --git a/examples/concurrency/1805-concurrency-io-blocking-async.sx b/examples/concurrency/1805-concurrency-io-blocking-async.sx index 21ff0d2d..a9b07395 100644 --- a/examples/concurrency/1805-concurrency-io-blocking-async.sx +++ b/examples/concurrency/1805-concurrency-io-blocking-async.sx @@ -5,10 +5,10 @@ // the result (a value-failable `($R, !IoErr)`, handled with `or`). // `context.io.now_ms()` reads the clock through the same capability. // -// Worker form: a nullary lambda capturing any inputs at the CALL SITE -// (`() -> i64 => compute(a, b)`) — the colorblind shape that also works when the -// worker is deferred onto a fiber (a captured variadic pack can't cross the fiber -// boundary), mirroring `sched.go`. +// Worker form: a nullary failable lambda capturing any inputs at the CALL SITE +// (`() -> (i64, !) => compute(a, b)`) — the colorblind shape that also works when +// the worker is deferred onto a fiber (a captured variadic pack can't cross the +// fiber boundary). #import "modules/std.sx"; main :: () { diff --git a/examples/concurrency/1813-concurrency-fiber-async-suspend.sx b/examples/concurrency/1813-concurrency-fiber-async-suspend.sx index 717f62fe..0242afc8 100644 --- a/examples/concurrency/1813-concurrency-fiber-async-suspend.sx +++ b/examples/concurrency/1813-concurrency-fiber-async-suspend.sx @@ -1,23 +1,24 @@ -// Stream B1 (fibers) B1.4a — a truly-SUSPENDING fiber-task async layer -// (`go` / `wait` / `cancel`) over the M:1 scheduler, in pure sx. In contrast -// with 1805's `context.io.async` (which runs each worker INLINE to completion -// before returning a `.ready` future — no interleaving), here `s.go(work)` runs -// `work` as a REAL fiber and `t.wait()` SUSPENDS the caller until that fiber -// finishes, so a task that yields mid-body lets a sibling task run before the -// first completes — genuine cooperative interleaving. +// Stream B2 — the SUSPENDING `context.io.async` layer over the M:1 fiber +// scheduler (PLAN-IO-UNIFY: the unified async stack — the bespoke `go`/`wait` was +// retired in Phase 5). In contrast with 1805's `context.io.async` UNDER THE +// BLOCKING `Io` (which runs each worker INLINE to completion — no interleaving), +// here the scheduler is installed as `context.io`, so `context.io.async(work)` +// runs `work` as a REAL fiber and `await()` SUSPENDS the caller until it finishes +// — a worker that yields mid-body lets a sibling run first (cooperative +// interleaving). // -// `work` is a NULLARY thunk: any inputs are captured in the lambda at the call +// `work` is a NULLARY worker: any inputs are captured in the lambda at the call // site (no `..args` pack crosses the fiber boundary — that would hit issue 0156 -// Part 2). Outputs flow OUT through pointers captured in the thunk (the shared +// Part 2). Outputs flow OUT through pointers captured in the worker (the shared // `Log` struct), since closure capture-by-value does not write back. // // What this proves: -// - REAL suspend + interleave: task A records 1, YIELDS; task B then records 2 -// and completes; A resumes, records 3, completes → interleave order 1 2 3. -// - awaited VALUES: A returns 42, B returns 100 (recorded after both waits). +// - REAL suspend + interleave: worker A records 1, YIELDS; worker B then records +// 2 and completes; A resumes, records 3, completes → interleave order 1 2 3. +// - awaited VALUES: A returns 42, B returns 100 (recorded after both awaits). // → sequence: 1 2 3 42 100. -// - cancel rides the `!` channel (model (a), like 1806): a canceled task's -// `wait()` raises `.Canceled`, taken by the `or` default → -99. +// - cancel rides the `!` channel (model (a), like 1806): a canceled worker's +// `await()` raises `.Canceled`, taken by the `or` default → -99. // // `wait` must run inside a fiber (it parks `self.current`), so the "main task" // is itself a `s.spawn(...)` fiber that drives the two `go` tasks. @@ -38,36 +39,39 @@ main :: () -> i64 { ps := @s; pl := @lg; - // The "main task" fiber: drives two real tasks, waits both, then exercises - // cancel. It runs as a fiber so `wait` has a `self.current` to park. - s.spawn(() => { - // Task A yields mid-body so B interleaves before A completes. - a := ps.go(() -> i64 => { - rec(pl, 1); - ps.yield_now(); // suspend A; B (already spawned) runs to completion - rec(pl, 3); - 42 - }); - // Task B runs straight through (no yield). - b := ps.go(() -> i64 => { - rec(pl, 2); - 100 + // The coordinator fiber: drives two async workers, awaits both, then exercises + // cancel. It runs as a fiber so `await` has a `self.current` to park. The + // scheduler is installed as `context.io`, so the unified async layer reaches it. + push .{ io = xx s } { + ps.spawn(() => { + // Worker A yields mid-body so B interleaves before A completes. + a := context.io.async(() -> (i64, !) => { + rec(pl, 1); + ps.yield_now(); // suspend A; B (already spawned) runs to completion + rec(pl, 3); + 42 + }); + // Worker B runs straight through (no yield). + b := context.io.async(() -> (i64, !) => { + rec(pl, 2); + 100 + }); + + // Await both — suspends the coordinator fiber until each completes. + va := a.await() or { -1 }; + vb := b.await() or { -1 }; + rec(pl, va); + rec(pl, vb); + + // Cancel case: cancel before the worker runs; `await` raises .Canceled + // off the sticky flag, the `or` default (-99) is taken. + c := context.io.async(() -> (i64, !) => 7); + c.cancel(); + rec(pl, c.await() or { -99 }); }); - // Wait both — suspends the main-task fiber until each completes. - va := a.wait() or { -1 }; - vb := b.wait() or { -1 }; - rec(pl, va); - rec(pl, vb); - - // Cancel case: cancel before the worker runs; `wait` raises .Canceled, - // the `or` default (-99) is taken. - c := ps.go(() -> i64 => 7); - c.cancel(); - rec(pl, c.wait() or { -99 }); - }); - - s.run(); + ps.run(); + } // Interleaving + value contract: 1 2 3 42 100, then the cancel default -99. print("sequence:"); diff --git a/examples/concurrency/1817-concurrency-fiber-m1-end-to-end.sx b/examples/concurrency/1817-concurrency-fiber-m1-end-to-end.sx index 32b66ce1..236f5f3e 100644 --- a/examples/concurrency/1817-concurrency-fiber-m1-end-to-end.sx +++ b/examples/concurrency/1817-concurrency-fiber-m1-end-to-end.sx @@ -36,22 +36,26 @@ main :: () -> i64 { s := sched.Scheduler.init(); ps := @s; pl := @lg; - // The coordinator runs as a fiber so `wait` has a `current` to park. - s.spawn(() => { - // Launch three async tasks; each sleeps, logs its completion, returns. - a := ps.go(() -> i64 => { ps.sleep(30); rec(pl, 1, ps.now_ms()); 100 }); - b := ps.go(() -> i64 => { ps.sleep(10); rec(pl, 2, ps.now_ms()); 20 }); - c := ps.go(() -> i64 => { ps.sleep(20); rec(pl, 3, ps.now_ms()); 3 }); + // The coordinator runs as a fiber so `await` has a `current` to park. The + // scheduler is installed as `context.io`, so the unified async layer + // (`context.io.async`/`await`/`sleep`/`now_ms`) reaches it inside the workers. + push .{ io = xx s } { + ps.spawn(() => { + // Launch three async workers; each sleeps, logs its completion, returns. + a := context.io.async(() -> (i64, !) => { try context.io.sleep(30); rec(pl, 1, context.io.now_ms()); 100 }); + b := context.io.async(() -> (i64, !) => { try context.io.sleep(10); rec(pl, 2, context.io.now_ms()); 20 }); + c := context.io.async(() -> (i64, !) => { try context.io.sleep(20); rec(pl, 3, context.io.now_ms()); 3 }); - // Await in SPAWN order; results come back correct regardless. - va := a.wait() or { -1 }; - vb := b.wait() or { -1 }; - vc := c.wait() or { -1 }; - sum := va + vb + vc; + // Await in SPAWN order; results come back correct regardless. + va := a.await() or { -1 }; + vb := b.await() or { -1 }; + vc := c.await() or { -1 }; + sum := va + vb + vc; - rec(pl, 9, sum); // sentinel row: id=9 carries the sum in `at` - }); - s.run(); + rec(pl, 9, sum); // sentinel row: id=9 carries the sum in `at` + }); + ps.run(); + } print("completion order (id @ virtual-ms):\n"); i := 0; diff --git a/examples/concurrency/1819-concurrency-fiber-double-wait.sx b/examples/concurrency/1819-concurrency-fiber-double-wait.sx index 5de08867..0f98d191 100644 --- a/examples/concurrency/1819-concurrency-fiber-double-wait.sx +++ b/examples/concurrency/1819-concurrency-fiber-double-wait.sx @@ -1,18 +1,22 @@ -// A `Task` allows ONE awaiter — a second concurrent `wait` on the same pending -// task would overwrite the single `waiter` slot, and completion would wake only +// A `Future` allows ONE awaiter — a second concurrent `await` on the same pending +// future would overwrite the single `park` slot, and completion would wake only // the second, stranding the first forever. Regression (B1.4a review, P1-c): the -// guard aborts loudly instead of silently deadlocking. +// guard aborts loudly instead of silently deadlocking. Now over the unified +// `context.io` async layer (PLAN-IO-UNIFY Phase 5 — the bespoke `Task`/`wait` is +// retired). // // aborts (exit 134) after the diagnostic — aarch64-macOS-pinned. #import "modules/std.sx"; sched :: #import "modules/std/sched.sx"; -S :: struct { t: *sched.Task(i64); } +S :: struct { t: *Future(i64); } main :: () -> i64 { st : S = ---; st.t = null; s := sched.Scheduler.init(); ps := @s; pst := @st; - mkprod :: (ps: *sched.Scheduler, pst: *S) { pst.t = ps.go(() -> i64 => { ps.yield_now(); 42 }); } - mkw :: (ps: *sched.Scheduler, pst: *S) { ps.spawn(() => { x := pst.t.wait() or { -1 }; print("got {}\n", x); }); } - mkprod(ps, pst); mkw(ps, pst); mkw(ps, pst); // second waiter → loud abort - s.run(); + mkprod :: (ps: *sched.Scheduler, pst: *S) { pst.t = context.io.async(() -> (i64, !) => { ps.yield_now(); 42 }); } + mkw :: (ps: *sched.Scheduler, pst: *S) { ps.spawn(() => { x := pst.t.await() or { -1 }; print("got {}\n", x); }); } + push .{ io = xx s } { + mkprod(ps, pst); mkw(ps, pst); mkw(ps, pst); // second waiter → loud abort + s.run(); + } return 0; } diff --git a/examples/concurrency/1820-concurrency-fiber-scheduler-deinit.sx b/examples/concurrency/1820-concurrency-fiber-scheduler-deinit.sx index 5abba438..0d5abd7b 100644 --- a/examples/concurrency/1820-concurrency-fiber-scheduler-deinit.sx +++ b/examples/concurrency/1820-concurrency-fiber-scheduler-deinit.sx @@ -1,23 +1,23 @@ // Stream B1 (fibers) — `Scheduler.deinit` releases the scheduler's owned heap -// + fd resources, closing the documented bounded leaks (kq fd / heap Tasks / -// List backings). Verified by a tracking `GPA`: deinit drives the live -// allocation count DOWN, and resets the kqueue fd to -1. +// + fd resources, closing the documented bounded leaks (kq fd / List backings). +// Verified by a tracking `GPA`: deinit drives the live allocation count DOWN, +// and resets the kqueue fd to -1. // // Scenario (one run that touches every freed resource): // - a SLEEPER fiber `sleep(5)`s → exercises the `timers` List // - a READER fiber `block_on_fd`s a pipe → exercises the kqueue fd + the // `io_waiters` List // - a WRITER fiber writes 3 bytes → makes the pipe readable -// - two `go` tasks compute 42 / 7 → exercise the heap `Task`s + -// the `task_allocs` List -// After `run()` drains all of it, `deinit()` frees: the 2 heap Tasks, the -// `timers` / `io_waiters` / `task_allocs` List backings, and CLOSES the kqueue -// fd (resetting `kq` to -1). The Fibers were already reaped during `run()`. +// After `run()` drains all of it, `deinit()` frees: the `timers` / `io_waiters` +// List backings, and CLOSES the kqueue fd (resetting `kq` to -1). The Fibers +// were already reaped during `run()`. (The unified `context.io.async` layer's +// Futures are NOT scheduler-tracked — they leak with the closure-env residual +// below; the bespoke `go`/`Task`/`task_allocs` path was retired in Phase 5.) // // WHAT IT PROVES (the contract; numbers below are the snapshot): // - `freed by deinit: N` — live allocations reclaimed by `deinit` (> 0). // - `live after deinit` — the RESIDUAL. This is NOT zero and NOT a bug: it is -// exactly the documented closure-env leak — one heap env per `spawn`/`go` +// exactly the documented closure-env leak — one heap env per `spawn` // that sx cannot free (the runtime has no name for the env pointer). deinit // reclaims everything it CAN; the env residual is a language limitation. // - `kq open after run: 1` then `kq after deinit: -1` — the lazily-opened @@ -92,10 +92,6 @@ main :: () -> i64 { mk_reader(ps, pst, read_fd); mk_writer(ps, write_fd); - // Two async tasks — heap Tasks tracked for deinit to free. - ps.go(() -> i64 => 42); - ps.go(() -> i64 => 7); - ps.run(); after_run = gpa.alloc_count; diff --git a/examples/concurrency/expected/1819-concurrency-fiber-double-wait.stdout b/examples/concurrency/expected/1819-concurrency-fiber-double-wait.stdout index 8fb9525a..468dbc1b 100644 --- a/examples/concurrency/expected/1819-concurrency-fiber-double-wait.stdout +++ b/examples/concurrency/expected/1819-concurrency-fiber-double-wait.stdout @@ -1 +1 @@ -sched: wait() — task already has a waiter (one awaiter per task in the M:1 model) +io: await — future already has an awaiter (one awaiter per future in the M:1 model) diff --git a/examples/concurrency/expected/1820-concurrency-fiber-scheduler-deinit.stdout b/examples/concurrency/expected/1820-concurrency-fiber-scheduler-deinit.stdout index 3ca68da7..9e61a562 100644 --- a/examples/concurrency/expected/1820-concurrency-fiber-scheduler-deinit.stdout +++ b/examples/concurrency/expected/1820-concurrency-fiber-scheduler-deinit.stdout @@ -1,5 +1,5 @@ read: 3 [97 98 99] -freed by deinit: 5 -live after deinit: 5 +freed by deinit: 2 +live after deinit: 3 kq open after run: true kq after deinit: -1 diff --git a/library/modules/std/io.sx b/library/modules/std/io.sx index 74201bcc..f3683084 100644 --- a/library/modules/std/io.sx +++ b/library/modules/std/io.sx @@ -132,14 +132,13 @@ sx_run_boxed_closure :: (arg: *void) { // `*Future($R)` handle. The worker must be nullary because under the fiber impl // the body crosses a fiber boundary, and a captured variadic pack segfaults there // (issue 0156 Part 2) — so any inputs are captured at the CALL SITE in the lambda -// (`context.io.async(() -> i64 => compute(a, b))`), exactly like `sched.go`. +// (`context.io.async(() -> (i64, !) => compute(a, b))`). // // The Future (and the completion-closure `ThunkBox`) are HEAP-allocated (not // returned by value): under the fiber impl the worker fills the Future AFTER // `async` returns, so the awaiter and the worker must share one stable object. -// Like `sched.go`'s Task, they currently leak (bounded by the async count; -// invisible under the default GPA). Freeing them needs join-point ownership — -// deferred. +// They currently leak (bounded by the async count; invisible under the default +// GPA). Freeing them needs join-point ownership — deferred. // // ALLOCATOR-LIFETIME CONTRACT: both are allocated from the `context.allocator` // in force at the `async` CALL, and that allocator MUST outlive the future — @@ -149,8 +148,7 @@ sx_run_boxed_closure :: (arg: *void) { // drives the worker frees the Future while it is still live (use-after-free). // The common case (the program-stable default GPA, or a scheduler set up under a // long-lived allocator) is safe. A deeper fix — `async` capturing the scheduler's -// own long-lived allocator the way `sched.go` does — needs a protocol affordance -// to reach it and is deferred to the convergence phase. +// own long-lived allocator — needs a protocol affordance to reach it; deferred. async :: ufcs (io: Io, worker: Closure() -> ($R, !)) -> *Future($R) { raw := context.allocator.alloc_bytes(size_of(Future($R))); f : *Future($R) = xx raw; @@ -201,9 +199,9 @@ await :: ufcs (f: *Future($R)) -> ($R, !IoErr) { // ONE awaiter per future (M:1): the single `park` slot records one parked // fiber, so a second concurrent `await` on the same pending future would // OVERWRITE the first awaiter's handle and orphan it forever (the worker's - // single `ready(f.park)` wakes only the last). Enforce loudly here, exactly - // as `sched.Task.wait` does — a non-null handle on a still-pending future - // means another fiber is already parked on it. (Fan-in over many futures — + // single `ready(f.park)` wakes only the last). Enforce loudly here — a + // non-null handle on a still-pending future means another fiber is already + // parked on it. (Fan-in over many futures — // `race` — registers ONE awaiter across SEPARATE futures, so it is fine.) if f.park.handle != null { out("io: await — future already has an awaiter (one awaiter per future in the M:1 model)\n"); diff --git a/library/modules/std/sched.sx b/library/modules/std/sched.sx index 0da30dd0..a637ce86 100644 --- a/library/modules/std/sched.sx +++ b/library/modules/std/sched.sx @@ -163,14 +163,6 @@ Scheduler :: struct { // `own_allocator` (long-lived-container rule: a // waiter outlives the `block_on_fd` call's scope). - // --- deinit bookkeeping: heap Tasks allocated by `go` -------------------- - task_allocs: List(*void); // every heap `*Task` from `go`, recorded so - // `deinit` can free them. The scheduler does not - // otherwise know its Tasks (they are generic - // `Task($R)` handed back to the caller); without - // this list they would leak. Grown through - // `own_allocator` (a Task outlives the `go` call). - // Construct a scheduler BY VALUE (allocator value-return convention). // Captures the current `context.allocator` into `own_allocator` — fibers and // their heap `Fiber` structs outlive their spawn scope, so all internal @@ -185,7 +177,7 @@ Scheduler :: struct { current = null, ready_head = null, ready_tail = null, own_allocator = context.allocator, next_id = 0, n_spawned = 0, n_suspended = 0, - clock_ms = 0, timers = .{}, kq = -1, io_waiters = .{}, task_allocs = .{} + clock_ms = 0, timers = .{}, kq = -1, io_waiters = .{} }; } @@ -272,7 +264,7 @@ Scheduler :: struct { wake :: (self: *Scheduler, f: *Fiber) { if f.state != .suspended { return; } // Evict any pending sleep timer for `f`. EVERY path that re-readies a - // suspended fiber funnels through `wake` (a manual/Task wake, or the + // suspended fiber funnels through `wake` (a manual/async wake, or the // timer-fire in `run` — which already removed the fired timer, so this // is a harmless re-scan there). Without this, a fiber that armed a // `sleep` timer but was woken EARLY by another path would run to @@ -284,7 +276,7 @@ Scheduler :: struct { cancel_timer_for(self, f); // Same UAF reasoning for fd waiters: every path that re-readies a // suspended fiber funnels through `wake`. If a fiber armed `block_on_fd` - // but was woken by another path (a manual wake, a Task completion), its + // but was woken by another path (a manual wake, an async completion), its // `IoWaiter` would otherwise survive pointing at a fiber that runs to // completion and is reaped (stack munmap'd + Fiber freed). A later // readiness drain matching that stale record would `wake` freed memory. @@ -543,33 +535,31 @@ Scheduler :: struct { } // Release the scheduler's owned resources. TERMINAL: the scheduler is dead - // after this — no scheduler-owned handle (the `*Task`s returned by `go`, a - // `*Fiber` from `spawn`, the scheduler itself) may be used afterward; doing - // so is a use-after-free, the universal deinit contract. Idempotent: a - // second `deinit` is a no-op (it rests on `List.deinit` nulling `items` + - // zeroing `len`, and on `kq`/`ready_head` being reset below). + // after this — no scheduler-owned handle (a `*Fiber` from `spawn`, the + // scheduler itself) may be used afterward; doing so is a use-after-free, the + // universal deinit contract. Idempotent: a second `deinit` is a no-op (it + // rests on `List.deinit` nulling `items` + zeroing `len`, and on + // `kq`/`ready_head` being reset below). // // Call AFTER `run()` has returned: a clean `run()` leaves the ready queue - // empty and aborts loudly on any orphaned suspend, so nothing is mid-flight - // and every `task_allocs` entry is a COMPLETED task (safe to free). Frees, - // in order: + // empty and aborts loudly on any orphaned suspend, so nothing is mid-flight. + // Frees, in order: // 1. any fibers still enqueued ready — a leak-SAFETY NET for the misuse - // path (`spawn`/`go` without a following `run()`, or after it returned), - // NOT a blessed reuse pattern: reaping a `go`'s fiber here while step (2) - // frees its paired `*Task` is self-consistent ONLY because the contract - // already forbade touching those handles post-`deinit`. A suspended - // (off-queue) fiber is unreachable from here, but a clean `run()` never - // leaves one (it aborts on an orphaned suspend); - // 2. every heap `*Task` from `go` (recorded in `task_allocs`); - // 3. the three `List` backings (`task_allocs`, `timers`, `io_waiters`), - // each grown through `own_allocator`; - // 4. the kqueue fd, if `block_on_fd` ever opened it (lazy `-1` otherwise). + // path (`spawn` without a following `run()`, or after it returned), NOT a + // blessed reuse pattern. A suspended (off-queue) fiber is unreachable + // from here, but a clean `run()` never leaves one (it aborts on an + // orphaned suspend); + // 2. the two `List` backings (`timers`, `io_waiters`), each grown through + // `own_allocator`; + // 3. the kqueue fd, if `block_on_fd` ever opened it (lazy `-1` otherwise). // // NOT freed (documented language limitation, unchanged): one closure env per - // `spawn`/`go`. The env is heap-allocated at the closure-literal site and sx + // `spawn`. The env is heap-allocated at the closure-literal site and sx // exposes no way to free it (the scheduler cannot name the env pointer), so - // it leaks until program exit — bounded by the spawn/go count, invisible - // under the default GPA. Freeing it needs a closure-env-ownership affordance. + // it leaks until program exit — bounded by the spawn count, invisible under + // the default GPA. The unified `context.io.async` layer's heap `Future`s / + // `ThunkBox`es likewise leak (they are not scheduler-tracked) — freeing both + // needs join-point / closure-env ownership affordances. deinit :: (self: *Scheduler) { // (1) Reap leftover ready fibers: unmap the stack, free the Fiber. f := self.ready_head; @@ -582,17 +572,11 @@ Scheduler :: struct { self.ready_head = null; self.ready_tail = null; - // (2) Free every heap Task allocated by `go`. - for self.task_allocs.items[0..self.task_allocs.len] (t) { - self.own_allocator.dealloc_bytes(t); - } - - // (3) Free the List backings (all grown through `own_allocator`). - self.task_allocs.deinit(self.own_allocator); + // (2) Free the List backings (all grown through `own_allocator`). self.timers.deinit(self.own_allocator); self.io_waiters.deinit(self.own_allocator); - // (4) Close the kqueue fd if it was ever opened (lazy: -1 if never used). + // (3) Close the kqueue fd if it was ever opened (lazy: -1 if never used). if self.kq >= 0 { close(self.kq); self.kq = -1; @@ -1039,135 +1023,6 @@ wake_io_waiter_for_fd :: (self: *Scheduler, fd: i32) { // The public API lives as methods on `Scheduler` (above): `init`, `spawn`, // `yield_now`, `suspend_self`, `wake`, `run`, `now_ms`, `sleep`. -// --- B1.4a: truly-suspending fiber-task async (`go` / `wait` / `cancel`) ---- -// -// An async-task layer on top of the M:1 scheduler: `s.go(work)` runs `work` as -// a REAL fiber, and `t.wait()` SUSPENDS the caller fiber until the task's fiber -// completes — genuine interleaving, in contrast with io.sx's `context.io.async` -// (which runs the worker inline to completion before returning). Distinct from -// io.sx's `Future` by design: `Task` is defined here so the two modules stay -// decoupled (no cross-import; sched.sx must keep importing only `std.sx`, since -// a different import path re-emits the module's global `_fib_tramp` asm and -// duplicates the symbol). -// -// THE NULLARY-THUNK RATIONALE. `work` is a NULLARY thunk `Closure() -> $R`, not -// a worker-plus-`..args` pair like io.sx's `async`. A variadic pack is -// comptime-only and segfaults if captured into a deferred closure that crosses -// the fiber boundary (issue 0156 Part 2). So instead of forwarding inputs as a -// pack, the user captures any inputs in the lambda AT THE CALL SITE (where -// they're live): `s.go(() -> i64 => compute(a, b))`. Nothing variadic ever -// crosses into the fiber — the thunk is a plain `{fn_ptr, env}` fat closure. -// -// KNOWN LIMITATION (heap-Task leak): `go` heap-allocates the `Task` (it outlives -// the call — the fiber fills `value`/`state` later, after `go` has returned), but -// B1.4a never frees it. Like the closure-env leak documented on `spawn` above, -// this is bounded by the `go` count and invisible under the default GPA (frees -// at exit); a long-running scheduler under an arena/tracking allocator -// accumulates one `Task` per `go`. Freeing it safely needs join-point ownership -// tracking — deferred. -// -// WAKE-AFTER-COMPLETE ORDERING (both orderings are correct): -// - worker finishes BEFORE `wait`: the worker set `t.state = .ready` and saw -// `t.waiter == null`, so it issued no wake. `wait` sees `.ready` (not -// `.pending`), does NOT park, and returns `t.value` — no lost wakeup. -// - `wait` runs BEFORE the worker finishes: `wait` registers itself as -// `t.waiter` and parks via `suspend_self`. When the worker finishes it sees -// a non-null `t.waiter` and `wake`s it; `wait` resumes and returns the value. - -TaskState :: enum { pending; ready; canceled; } - -// The `!` channel for `wait`. Defined LOCALLY (not reusing io.sx's `IoErr`): -// `IoErr` is reachable here only as a re-export alias through std.sx, and the -// failable-type detection behind `raise` does not see through that alias to the -// underlying `error` set — so `raise error.Canceled` against `(.., !IoErr)` -// here is rejected as "not a failable function". A local `error` decl is -// recognized directly. (Same `.Canceled` contract as io.sx model (a).) -TaskErr :: error { Canceled } - -Task :: struct ($R: Type) { - value: R; - state: TaskState = .pending; - waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one - sched: *Scheduler; // owning scheduler (for park/wake in `wait`) - canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics) - finished: i64; // set to 1 at the very END of the worker body (after the - // work ran OR was skipped on an early cancel). Distinct from - // `state == .canceled` (which `cancel` sets IMMEDIATELY, before - // the fiber has run): a JOINER (`race`) waits on `finished` so it - // knows the worker fiber actually reached its end — no loser - // outlives the `race` call. -} - -// Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber -// finishes. Mirrors `spawn`'s alloc + null-check + abort. -go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) { - raw := self.own_allocator.alloc_bytes(size_of(Task($R))); - if raw == null { - print("sched: out of memory allocating a Task\n"); - abort(); - } - t : *Task($R) = xx raw; - t.state = .pending; - t.waiter = null; - t.sched = self; - t.canceled = 0; - t.finished = 0; - // Record the heap Task so `deinit` can free it (the scheduler otherwise has - // no handle on its generic Tasks). Long-lived: a Task outlives this call. - self.task_allocs.append(xx t, self.own_allocator); - self.spawn(() => { - // Cooperative cancel: skip the work entirely if cancel already landed - // before this fiber was scheduled (saves the compute + side effects). A - // cancel that lands DURING `work()` still lets it finish (no preemption - // in the M:1 model) — cancel suppresses DELIVERY, never an in-flight run. - if t.canceled == 0 { - t.value = work(); - t.state = .ready; - } - // The worker has reached its end (ran the work, or skipped it on an early - // cancel). Mark `finished` BEFORE the wake so a joiner that checks the flag - // on resume always observes it set (a `race` JOIN distinguishes a finished - // worker from a merely-flagged-cancelled one via this). - t.finished = 1; - // Wake the awaiter only if one already parked (else `wait`/`race` will not - // park). Fires whether or not the work ran — both `wait` and a `race` join - // resume here. - if t.waiter != null { self.wake(xx t.waiter); } - }); - return t; -} - -// Suspend the caller until the task completes; return its value (or raise on -// cancel). MUST be called from inside a fiber (so there is a `self.current` to -// park) — typically from a fiber spawned via `s.spawn(...)`. -wait :: ufcs (t: *Task($R)) -> ($R, !TaskErr) { - if t.canceled != 0 { raise error.Canceled; } - if t.state == .pending { - // ONE waiter per task (enforced). A `Task` holds a single `waiter` slot; - // a second concurrent `wait` on the same pending task would OVERWRITE the - // first, and completion would wake only the second — the first fiber - // would stay suspended forever (silent deadlock). The M:1 model is - // single-await per task; enforce it loudly (mirrors `block_on_fd`'s - // one-waiter-per-fd guard). A multi-waiter task would need a waiter list. - if t.waiter != null { - print("sched: wait() — task already has a waiter (one awaiter per task in the M:1 model)\n"); - abort(); - } - t.waiter = xx t.sched.current; // register self as the waiter - t.sched.suspend_self(); // park until the task's fiber wakes us - } - if t.canceled != 0 or t.state == .canceled { raise error.Canceled; } - return t.value; -} - -// Request cancellation — rides the `!` channel (model (a), like io.sx 1806). M:1 -// cooperative: the worker fiber may already have run; cancel still makes a -// subsequent (or in-flight) `wait` raise `.Canceled`. -cancel :: ufcs (t: *Task($R)) { - t.canceled = 1; - t.state = .canceled; -} - // --- B2/A1: structured first-wins `race` over `context.io` Futures ----------- // // `context.io.race((a: fa, b: fb, …))` starts from N already-spawned `*Future(..)` diff --git a/readme.md b/readme.md index ee853153..e09a2ded 100644 --- a/readme.md +++ b/readme.md @@ -573,12 +573,15 @@ fence(.seq_cst); // standalone memory fence combinations are compile errors. The same operations run at compile time (`#run`) under single-threaded semantics. -### Async / Concurrency (`modules/std/sched.sx`) +### Async / Concurrency (`context.io`, `modules/std/sched.sx`) -A pure-sx cooperative fiber runtime — **colorblind async**, with no `async` / -`await` keywords and no function coloring. Any function can suspend; a `Scheduler` -drives any number of stackful fibers, each on its own guard-paged stack. The -high-level API is `go` to spawn a task and `wait` to suspend until it completes: +A pure-sx cooperative fiber runtime — **colorblind async**, with no function +coloring. The async API rides the `Io` capability carried implicitly in +`context`: `context.io.async` spawns a worker, `await` suspends until it +completes. The SAME code runs under the default blocking `Io` (workers run inline) +or under the fiber `Scheduler` installed as `context.io` (workers are real fibers +that interleave). A `Scheduler` drives any number of stackful fibers, each on its +own guard-paged stack: ```sx #import "modules/std.sx"; @@ -588,31 +591,37 @@ main :: () { s := sched.Scheduler.init(); ps := @s; // closures capture by value — capture a pointer to the scheduler - // The coordinator runs as a fiber so `wait` has a fiber to park. - s.spawn(() => { - a := ps.go(() -> i64 => { ps.sleep(30); 100 }); // launch async tasks - b := ps.go(() -> i64 => { ps.sleep(10); 20 }); - c := ps.go(() -> i64 => { ps.sleep(20); 3 }); + // Install the fiber scheduler as `context.io`; the coordinator runs as a + // fiber so `await` has a fiber to park. + push .{ io = xx s } { + ps.spawn(() => { + a := context.io.async(() -> (i64, !) => { try context.io.sleep(30); 100 }); + b := context.io.async(() -> (i64, !) => { try context.io.sleep(10); 20 }); + c := context.io.async(() -> (i64, !) => { try context.io.sleep(20); 3 }); - sum := (a.wait() or 0) + (b.wait() or 0) + (c.wait() or 0); // 123 - print("sum: {}\n", sum); - }); - - s.run(); // drive the scheduler until all fibers finish + sum := (a.await() or 0) + (b.await() or 0) + (c.await() or 0); // 123 + print("sum: {}\n", sum); + }); + ps.run(); // drive the scheduler until all fibers finish + } } ``` -Tasks complete in deadline order, not spawn or await order. The runtime offers: +Workers complete in deadline order, not spawn or await order. The runtime offers: -- **`go(work) -> *Task($R)`** / **`wait() -> (R, !TaskErr)`** / **`cancel()`** — the - task layer. `wait` rides the `!` error channel so a cancel surfaces as - `error.Canceled`. -- **`spawn`**, **`yield_now`**, **`suspend_self`**, **`wake`** — the raw fiber - primitives the task layer is built on. -- **`sleep(ms)`** / **`now_ms()`** — timer-driven suspension on a virtual clock - (deterministic, no real wall time). -- **`block_on_fd(fd, want_read)`** — suspend until a file descriptor is ready, - backed by kqueue (darwin) or epoll (linux). +- **`context.io.async(worker) -> *Future($R)`** / **`await() -> (R, !IoErr)`** / + **`cancel()`** — the async layer over the `Io` protocol. `await` rides the `!` + error channel; a `cancel` makes the worker abandon its body at its next suspend + (true cancellation) and surfaces as `error.Canceled`. +- **`context.io.race(.(a = fa, b = fb, …))`** — structured first-wins over a named + tuple of `*Future`s; returns a synthesized tagged-union of the winner, cancels + the losers (which stop at their next suspend, so `race` returns at winner-time). +- **`context.io.sleep(ms)`** / **`context.io.now_ms()`** — timer-driven suspension + on a virtual clock (deterministic, no real wall time). +- **`Scheduler.spawn`**, **`yield_now`**, **`suspend_self`**, **`wake`**, + **`run`** — the raw fiber primitives + driver loop the async layer is built on. +- **`Scheduler.block_on_fd(fd, want_read)`** — suspend until a file descriptor is + ready, backed by kqueue (darwin) or epoll (linux). It's an M:1 model (cooperative, no preemption — so no data races between fibers and no atomics needed across them), built on `abi(.naked)` context switching over