feat: async/await colorblind over the fiber Io (Phase 2 of Io unification)

`context.io.async(worker)` / `await` now run over the `Io` PROTOCOL, so the
same code interleaves under the fiber scheduler or runs inline under the
blocking `CBlockingIo` — one async stack, reached purely through `context.io`.

- Protocol: `suspend_raw(park: *ParkToken)` (was by-value). A suspending impl
  records the parked execution context into `park.handle` before parking, so a
  cross-context `ready(park)` knows whom to resume; `Scheduler.suspend_raw`
  writes `self.current`, `CBlockingIo` ignores it.
- io.sx async layer rewritten colorblind: `async` submits the worker through
  `io.spawn_raw` (inline under blocking, a fiber under the scheduler) and returns
  a HEAP `*Future($R)` the worker fills later; `await` suspends via `suspend_raw`
  until ready, then returns/raises. The generic worker is bridged to spawn_raw's
  raw `(*void)->void` entry via a monomorphic `ThunkBox` (a heap-boxed nullary
  completion closure) — all genericity lives in the closure env. Workers are
  nullary (inputs captured at the call site) because a variadic pack can't cross
  the fiber boundary. `CBlockingIo.spawn_raw` now runs the worker inline.
- Migrated 1805/1806 to the nullary `*Future` form; retrofit 1822/1823 to the
  `push .{ … }` partial-context literal (inherits allocator/data).
- The async machinery adds a few prelude types, shifting the type-name table —
  40 `.ir` snapshots regenerated (no behavior change; only `.exit`/`.stdout`/
  `.stderr` would signal that, and none changed).

Locked by examples/concurrency/1824 — two async tasks under the fiber Io, the
completion log proving deferral (1 2 then 10 20 then 123). Suite 829/0,
byte-identical aarch64-macOS host + aarch64-linux container.
This commit is contained in:
agra
2026-06-27 07:50:29 +03:00
parent c4977247b7
commit 967aed67d4
52 changed files with 210381 additions and 202872 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,28 +1,29 @@
// B1.2 — the async ergonomic layer over the `Io` capability, blocking // B1.2 / B2 — the async ergonomic layer over the `Io` capability, blocking
// default. `context.io.async(worker, ..args)` runs the worker to completion // default. `context.io.async(worker)` submits a NULLARY `worker: Closure() -> $R`
// inline and returns a `.ready` Future($R); `f.await()` yields the result // and returns a `*Future($R)` handle; under the blocking `CBlockingIo` the worker
// (a value-failable `($R, !IoErr)`, handled with `or`). `context.io.now_ms()` // runs to completion inline, so the Future is born `.ready`. `f.await()` yields
// reads the monotonic clock through the same capability. // the result (a value-failable `($R, !IoErr)`, handled with `or`).
// `context.io.now_ms()` reads the clock through the same capability.
// //
// Worker form: a lambda whose params are annotated at the call site // Worker form: a nullary lambda capturing any inputs at the CALL SITE
// (`(a: i64, b: i64) -> i64 => …`); `..args` forwards the call-site // (`() -> i64 => compute(a, b)`) — the colorblind shape that also works when the
// arguments to it. // worker is deferred onto a fiber (a captured variadic pack can't cross the fiber
// boundary), mirroring `sched.go`.
#import "modules/std.sx"; #import "modules/std.sx";
main :: () { main :: () {
// Homogeneous args. // Inputs captured at the call site.
s := context.io.async((a: i64, b: i64) -> i64 => a + b, 40, 2); s := context.io.async(() -> i64 => 40 + 2);
print("sum: {}\n", s.await() or { -1 }); print("sum: {}\n", s.await() or { -1 });
// Single arg. d := context.io.async(() -> i64 => 21 * 2);
d := context.io.async((x: i64) -> i64 => x * 2, 21);
print("double: {}\n", d.await() or { -1 }); print("double: {}\n", d.await() or { -1 });
// Nullary worker — the variadic `async` binds an empty pack, so no separate // A worker that closes over a local.
// `async_void` entry is needed. base := 42;
n := context.io.async(() -> i64 => 42); n := context.io.async(() -> i64 => base);
print("nullary: {}\n", n.await() or { -1 }); print("nullary: {}\n", n.await() or { -1 });
// The Io capability also carries a monotonic clock. // The Io capability also carries a clock.
if context.io.now_ms() >= 0 { print("clock ok\n"); } if context.io.now_ms() >= 0 { print("clock ok\n"); }
} }

View File

@@ -7,11 +7,11 @@
main :: () { main :: () {
// Not canceled → await yields the value. // Not canceled → await yields the value.
ok := context.io.async((n: i64) -> i64 => n, 7); ok := context.io.async(() -> i64 => 7);
print("ok: {}\n", ok.await() or { -1 }); print("ok: {}\n", ok.await() or { -1 });
// Canceled → await raises .Canceled → the `or` default is taken. // Canceled → await raises .Canceled → the `or` default is taken.
c := context.io.async((n: i64) -> i64 => n, 7); c := context.io.async(() -> i64 => 7);
c.cancel(); c.cancel();
print("canceled: {}\n", c.await() or { -99 }); print("canceled: {}\n", c.await() or { -99 });
} }

View File

@@ -23,7 +23,7 @@ main :: () -> i64 {
s := sched.Scheduler.init(); s := sched.Scheduler.init();
ps := @s; ps := @s;
print("outside: marker id = {}\n", mk.id); print("outside: marker id = {}\n", mk.id);
push Context.{ allocator = context.allocator, data = xx @mk, io = context.io } { push .{ data = xx @mk } {
ps.spawn(() => { ps.spawn(() => {
m : *Marker = xx context.data; // inherited from the spawn-time context m : *Marker = xx context.data; // inherited from the spawn-time context
print("inside fiber: context.data marker id = {}\n", m.id); print("inside fiber: context.data marker id = {}\n", m.id);

View File

@@ -23,7 +23,7 @@ sleeper :: (arg: *void) {
n : *i64 = xx arg; n : *i64 = xx arg;
tok : ParkToken = .{ handle = null }; tok : ParkToken = .{ handle = null };
context.io.arm_timer(context.io.now_ms() + n.*, tok); context.io.arm_timer(context.io.now_ms() + n.*, tok);
context.io.suspend_raw(tok) catch {}; context.io.suspend_raw(@tok) catch {};
print("worker(sleep {}) resumed at now_ms = {}\n", n.*, context.io.now_ms()); print("worker(sleep {}) resumed at now_ms = {}\n", n.*, context.io.now_ms());
} }
@@ -32,7 +32,7 @@ main :: () -> i64 {
ps := @s; ps := @s;
d1 : i64 = 20; d1 : i64 = 20;
d2 : i64 = 10; d2 : i64 = 10;
push Context.{ allocator = context.allocator, data = null, io = xx s } { push .{ io = xx s } {
context.io.spawn_raw(xx sleeper, xx @d1, .{}); context.io.spawn_raw(xx sleeper, xx @d1, .{});
context.io.spawn_raw(xx sleeper, xx @d2, .{}); context.io.spawn_raw(xx sleeper, xx @d2, .{});
ps.run(); ps.run();

View File

@@ -0,0 +1,42 @@
// Stream B2 — `async`/`await` (the io.sx ergonomic layer) running COLORBLIND over
// the fiber `Io` scheduler. The SAME `context.io.async(worker)` that runs inline
// under the blocking `CBlockingIo` (1805) here spawns the worker as a real fiber
// and returns a PENDING `*Future`; `await` suspends the calling fiber until the
// worker completes. No bespoke `go`/`wait` — this is the unified async stack
// (io.sx async over the `Io` protocol), reaching the fiber scheduler purely
// through `context.io`.
//
// The completion log makes the deferral visible: the coordinator records 1,2
// BEFORE either worker runs (async only SPAWNS them), then `await` parks it while
// the workers run (10,20), then it resumes and sums (123). Deterministic.
//
// aarch64-pinned (the scheduler's per-arch asm): runs end-to-end on a matching
// host (macOS + linux), ir-only on a mismatch.
#import "modules/std.sx";
sched :: #import "modules/std/sched.sx";
Log :: struct { seq: [8]i64; n: i64; }
rec :: (l: *Log, v: i64) { l.seq[l.n] = v; l.n = l.n + 1; }
main :: () -> i64 {
lg : Log = ---; lg.n = 0;
s := sched.Scheduler.init();
ps := @s; pl := @lg;
push .{ io = xx s } {
ps.spawn(() => {
rec(pl, 1); // coordinator starts
a := context.io.async(() -> i64 => { rec(pl, 10); 100 }); // worker A — deferred
b := context.io.async(() -> i64 => { rec(pl, 20); 23 }); // worker B — deferred
rec(pl, 2); // both spawned, neither has run
va := a.await() or { -1 }; // park; A runs, wakes us
vb := b.await() or { -1 };
rec(pl, va + vb); // 123
});
ps.run();
}
print("sequence:");
i := 0;
while i < lg.n { print(" {}", lg.seq[i]); i = i + 1; }
print("\n");
return 0;
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1 @@
{ "target": "macos" }

View File

@@ -0,0 +1 @@
sequence: 1 2 10 20 123

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -110,7 +110,12 @@ ParkToken :: struct {
Io :: protocol #inline { Io :: protocol #inline {
spawn_raw :: (self: *Self, entry: *void, arg: *void, opts: SpawnOpts) -> *void; spawn_raw :: (self: *Self, entry: *void, arg: *void, opts: SpawnOpts) -> *void;
suspend_raw :: (self: *Self, park: ParkToken) -> !; // `park` is IN/OUT: a suspending impl records the parked execution context
// (e.g. the awaiter's fiber) into `park.handle` before parking, so a later
// `ready(park)` — called from a DIFFERENT context (the worker that completes
// the awaited task) — knows which context to resume. Passed by pointer for
// exactly that write-back. `ready`/`arm_timer` read the recorded handle.
suspend_raw :: (self: *Self, park: *ParkToken) -> !;
ready :: (self: *Self, park: ParkToken); ready :: (self: *Self, park: ParkToken);
poll :: (self: *Self, deadline_ms: i64) -> i64; poll :: (self: *Self, deadline_ms: i64) -> i64;
now_ms :: (self: *Self) -> i64; now_ms :: (self: *Self) -> i64;

View File

@@ -49,13 +49,19 @@ impl Io for CBlockingIo {
// is here for the protocol shape the scheduler [B1.3] will use; the // is here for the protocol shape the scheduler [B1.3] will use; the
// blocking impl never routes through it, so it is a no-op handle. // blocking impl never routes through it, so it is a no-op handle.
spawn_raw :: (self: *CBlockingIo, entry: *void, arg: *void, opts: SpawnOpts) -> *void { spawn_raw :: (self: *CBlockingIo, entry: *void, arg: *void, opts: SpawnOpts) -> *void {
// The blocking model has no scheduler: run the worker thunk INLINE to
// completion right here, so the `async` free-fn's Future is born `.ready`.
// (A suspending impl — the fiber scheduler — instead defers `entry` onto a
// fiber.) Same `(*void)->void` erased-thunk contract `spawn_raw` mandates.
entry_fn : (*void) -> void = xx entry;
entry_fn(arg);
return null; return null;
} }
// Blocking never suspends — a suspend at the bottom of the M:1 stack // Blocking never suspends — a suspend at the bottom of the M:1 stack
// would deadlock. No-op (returns success). The `!` is part of the // would deadlock. No-op (returns success). The `!` is part of the
// protocol contract (a suspending impl raises `.Canceled` out here), // protocol contract (a suspending impl raises `.Canceled` out here),
// so the conforming blocking impl keeps it even though it never raises. // so the conforming blocking impl keeps it even though it never raises.
suspend_raw :: (self: *CBlockingIo, park: ParkToken) -> ! { suspend_raw :: (self: *CBlockingIo, park: *ParkToken) -> ! {
return; return;
} }
ready :: (self: *CBlockingIo, park: ParkToken) {} ready :: (self: *CBlockingIo, park: ParkToken) {}
@@ -89,31 +95,74 @@ Future :: struct ($R: Type) {
} }
// --- The async ergonomic layer (generic free-fns over the protocol) --- // --- The async ergonomic layer (generic free-fns over the protocol) ---
// `async(io, worker, ..args)` — submit `worker(..args)`. Blocking: runs
// the worker to completion inline, Future born `.ready`. The worker is a
// `Closure(..$args) -> $R` (a lambda whose params are annotated at the
// call site); `..$args` forwards the call-site arguments to it.
// //
// NOTE on construction shape: the Future is built with `= ---` + per-field // COLORBLIND over the `Io` impl: `async` always submits the worker through
// assignment, NOT a `return Future.{...}` struct-literal. A struct-literal // `io.spawn_raw`, so the SAME code runs the worker inline under `CBlockingIo`
// in `return` position trips a generic-instantiation gap for the `Atomic` // (Future born `.ready`) or as a real fiber under the scheduler (Future born
// field; the `= ---` (uninit) + field-assign form is the verified idiom. // `.pending`, completed later — `await` suspends until then). The only protocol-
async :: ufcs (io: Io, worker: Closure(..$args) -> $R, ..$args) -> Future($R) { // level value `spawn_raw` accepts is a raw `(*void)->void` entry + a `*void`
f : Future($R) = ---; // arg, so the generic worker is bridged via a MONOMORPHIC boxed-closure thunk
f.value = worker(..args); // (`sx_run_boxed_closure`): all the generic-ness lives in the closure's env, and
f.state = .ready; // the thunk is one fixed `Closure()->void` invoker — no per-instantiation entry.
// The one fixed entry `spawn_raw` ever calls: cast the arg back to the heap-boxed
// completion closure and run it. Monomorphic (over `Closure()->void`), so a
// single top-level symbol serves every `async($R)` instantiation.
// The heap box the bridge carries: a struct holding the nullary completion
// closure. A struct field is the one position a `Closure() -> void` type parses
// in (a bare alias / `size_of(Closure()->void)` trips the parser), and it gives
// the bridge a concrete `*ThunkBox` to `size_of`/cast/call through.
ThunkBox :: struct { run: Closure() -> void; }
sx_run_boxed_closure :: (arg: *void) {
b : *ThunkBox = xx arg;
b.run();
}
// `async(io, worker)` — submit a NULLARY `worker: Closure() -> $R` and get a
// `*Future($R)` handle. The worker must be nullary because under the fiber impl
// the body crosses a fiber boundary, and a captured variadic pack segfaults there
// (issue 0156 Part 2) — so any inputs are captured at the CALL SITE in the lambda
// (`context.io.async(() -> i64 => compute(a, b))`), exactly like `sched.go`.
//
// The Future is HEAP-allocated (not returned by value): under the fiber impl the
// worker fills it AFTER `async` returns, so the awaiter and the worker must share
// one stable object. Like `sched.go`'s Task, it currently leaks (bounded by the
// async count; invisible under the default GPA). Freeing it needs join-point
// ownership — deferred.
async :: ufcs (io: Io, worker: Closure() -> $R) -> *Future($R) {
raw := context.allocator.alloc_bytes(size_of(Future($R)));
f : *Future($R) = xx raw;
f.state = .pending;
f.park = .{ handle = null };
f.task = null;
f.canceled = Atomic(bool).init(false); f.canceled = Atomic(bool).init(false);
// The completion closure: run the worker, publish the result, wake any parked
// awaiter. Heap-boxed so it survives until the worker actually runs (deferred
// under the fiber impl). It captures `f` + `worker`; nothing variadic crosses.
braw := context.allocator.alloc_bytes(size_of(ThunkBox));
b : *ThunkBox = xx braw;
b.run = () => {
f.value = worker();
f.state = .ready;
context.io.ready(f.park); // no-op if no awaiter parked yet
};
f.task = io.spawn_raw(xx sx_run_boxed_closure, xx b, .{});
return f; return f;
} }
// (A nullary worker needs no separate entry: the variadic `async` above binds // `await(f)` — value-carrying failable. Suspends the caller until `f` completes
// `..$args` to the empty pack, so `context.io.async(() -> $R => …)` calls // (no-op under the blocking impl, where it is already `.ready`), then `.ready` →
// `worker()` and returns `Future($R)`. Locked by examples/1805.) // the result; `.failed`/`.canceled` → raise. Under the fiber impl the caller is a
// fiber; `suspend_raw` records it into `f.park` so the worker's `ready(f.park)`
// `await(f)` — value-carrying failable. `.ready` → the result; `.failed` // resumes it. Re-checks state after the wake (the worker set `.ready` before
// / `.canceled` → raise the stored / cancellation error. // waking). A worker that finished BEFORE `await` leaves `.ready`, so no park, no
// lost wakeup.
await :: ufcs (f: *Future($R)) -> $R !IoErr { await :: ufcs (f: *Future($R)) -> $R !IoErr {
if f.canceled.load(.acquire) { raise error.Canceled; }
if f.state == .pending {
context.io.suspend_raw(@f.park) catch {}; // Phase 3 propagates Canceled
}
if f.canceled.load(.acquire) { raise error.Canceled; } if f.canceled.load(.acquire) { raise error.Canceled; }
if f.state == .canceled { raise error.Canceled; } if f.state == .canceled { raise error.Canceled; }
if f.state == .failed { raise error.Failed; } if f.state == .failed { raise error.Failed; }

View File

@@ -637,7 +637,10 @@ impl Io for Scheduler {
// channel the protocol mandates: a suspending impl raises `IoErr.Canceled` // channel the protocol mandates: a suspending impl raises `IoErr.Canceled`
// out here when the parked task was cancelled (wired in Phase 3). The M:1 // out here when the parked task was cancelled (wired in Phase 3). The M:1
// impl does not raise yet — it just parks the current fiber. // impl does not raise yet — it just parks the current fiber.
suspend_raw :: (self: *Scheduler, park: ParkToken) -> ! { suspend_raw :: (self: *Scheduler, park: *ParkToken) -> ! {
// Record the parking fiber so a cross-fiber `ready(park)` (the worker that
// completes the awaited task) can find and wake it.
park.handle = xx self.current;
self.suspend_self(); self.suspend_self();
} }