feat: impl Io for Scheduler — fiber scheduler as a context.io vtable (Phase 1)

`impl Io for Scheduler` folds the M:1 fiber scheduler behind the same `Io`
protocol (core.sx) the blocking `CBlockingIo` implements, so the async layer
can run colorblind over whichever impl is installed via
`push Context { io = xx scheduler }`. The six methods are thin adapters over
the existing fiber primitives:

- `spawn_raw(entry, arg, opts)` — spawn a fiber that calls the erased
  `(*void)->void` worker thunk `entry(arg)` (fn-ptr round-trip through
  `*void`); returns the `*Fiber` handle. The fiber inherits this context
  (Phase 0), so the worker's own `context.io` is this scheduler.
- `suspend_raw(park) -> !` — park the running fiber; the `!` is the
  cancellation channel a suspending impl raises on (wired in Phase 3).
- `ready(park)` — `wake` the fiber recorded in the token (guarded on
  `.suspended`).
- `poll(deadline_ms)` — one step of the run loop (drain ready + fire the
  earliest virtual-time timer); fd-readiness stays on `run`.
- `now_ms` — the deterministic virtual clock.
- `arm_timer(deadline_ms, park)` — arm a virtual-time timer that re-readies
  the current fiber.

Locked by examples/concurrency/1823 — two workers spawned + suspended +
resumed entirely through `context.io`, deterministic deadline order
(byte-identical aarch64-macOS host + aarch64-linux container). Full suite
green (828/0).
This commit is contained in:
agra
2026-06-27 06:49:37 +03:00
parent 2f2d7f1db7
commit 5c30bfe0c2
2 changed files with 138 additions and 0 deletions

View File

@@ -0,0 +1,42 @@
// Stream B2/A1 — the M:1 fiber scheduler installed AS an `Io` capability vtable,
// driven entirely through `context.io`. `impl Io for Scheduler` (sched.sx) folds
// the scheduler behind the same `Io` protocol the blocking `CBlockingIo`
// implements, so the worker below reaches real suspension/timers through the
// PROTOCOL (`context.io.spawn_raw`/`arm_timer`/`suspend_raw`/`now_ms`) rather
// than bespoke scheduler methods — the foundation for a colorblind
// `async`/`await`/`race` that runs over whichever `Io` is installed.
//
// Two workers are spawned via `context.io.spawn_raw`; each arms a virtual-time
// timer and `suspend_raw`s until it fires. They resume in DEADLINE order (10 then
// 20), deterministic on the virtual clock — proving the protocol round-trips
// spawn → arm → suspend → ready → resume against the fiber engine. Phase 0 (fibers
// inherit the spawn-time context) is what lets the worker's own `context.io`
// resolve back to this scheduler.
//
// aarch64-pinned (the scheduler's per-arch asm): runs end-to-end on a matching
// host (macOS + linux), ir-only on a mismatch.
#import "modules/std.sx";
sched :: #import "modules/std/sched.sx";
// Worker entry: an sx (*void)->void fn, erased to *void by spawn_raw.
sleeper :: (arg: *void) {
n : *i64 = xx arg;
tok : ParkToken = .{ handle = null };
context.io.arm_timer(context.io.now_ms() + n.*, tok);
context.io.suspend_raw(tok) catch {};
print("worker(sleep {}) resumed at now_ms = {}\n", n.*, context.io.now_ms());
}
main :: () -> i64 {
s := sched.Scheduler.init();
ps := @s;
d1 : i64 = 20;
d2 : i64 = 10;
push Context.{ allocator = context.allocator, data = null, io = xx s } {
context.io.spawn_raw(xx sleeper, xx @d1, .{});
context.io.spawn_raw(xx sleeper, xx @d2, .{});
ps.run();
}
print("final clock: {}ms\n", ps.now_ms());
return 0;
}

View File

@@ -588,6 +588,102 @@ Scheduler :: struct {
}
}
// --- B2/A1: the fiber scheduler AS an `Io` capability vtable -----------------
//
// `impl Io for Scheduler` folds the M:1 fiber scheduler behind `context.io` —
// the same `Io` protocol (core.sx) the blocking `CBlockingIo` (io.sx) implements,
// so the async ergonomic layer (`async`/`await`/`cancel`/`race`) runs COLORBLIND
// over whichever impl is installed. Install it the allocator way — by value, the
// receiver borrowed into the protocol field:
//
// s := Scheduler.init();
// push Context.{ allocator = context.allocator, data = null, io = xx s } {
// … context.io.async(…) / await / race …
// s.run(); // the explicit driver loop (stays as today)
// }
//
// Phase 0 (fibers inherit the spawn-time context) is what makes this reach a
// WORKER: a fiber spawned through `spawn_raw` runs under the pushed context, so
// its own `context.io.suspend_raw`/`ready` resolve back to THIS scheduler, not
// the blocking default. The methods are thin adapters over the existing fiber
// primitives (`spawn`/`suspend_self`/`wake`/`Timer`/`clock_ms`).
impl Io for Scheduler {
// Spawn a fiber that runs `entry(arg)`. `entry` is an sx `(*void) -> void`
// function pointer (the worker thunk the `async` layer boxes), erased to
// `*void`; `arg` is its single payload pointer. Returns the `*Fiber` as the
// opaque handle (the `Future` stashes it in `task`). The fiber inherits this
// call's context (Phase 0), so the worker sees this scheduler as `context.io`.
spawn_raw :: (self: *Scheduler, entry: *void, arg: *void, opts: SpawnOpts) -> *void {
f := self.spawn(() => {
entry_fn : (*void) -> void = xx entry;
entry_fn(arg);
});
return xx f;
}
// Park the running fiber until a matching `ready`. `-> !` is the cancellation
// channel the protocol mandates: a suspending impl raises `IoErr.Canceled`
// out here when the parked task was cancelled (wired in Phase 3). The M:1
// impl does not raise yet — it just parks the current fiber.
suspend_raw :: (self: *Scheduler, park: ParkToken) -> ! {
self.suspend_self();
}
// Re-ready a fiber parked under `park` (its `handle` is the `*Fiber`, recorded
// by the suspending side). Funnels through `wake`, which is guarded on
// `.suspended`, so a ready of an already-running/queued fiber is a safe no-op.
ready :: (self: *Scheduler, park: ParkToken) {
if park.handle == null { return; }
f : *Fiber = xx park.handle;
self.wake(f);
}
// Advance the world one step (the unit `run` loops over). Drain every
// currently-ready fiber, then fire the earliest pending timer (advancing the
// virtual clock). Returns the new clock after a timer fire, or -1 when nothing
// ran and no timer is pending. NOTE: fd-readiness (kqueue/epoll) blocking is
// NOT driven here yet — `run` remains the canonical driver for fd-bound work;
// `poll` covers the compute + virtual-timer path the async layer needs.
poll :: (self: *Scheduler, deadline_ms: i64) -> i64 {
while self.ready_head != null {
f := dequeue(self);
self.current = f;
f.state = .running;
swap_context(@self.sched_ctx, @f.ctx);
self.current = null;
if f.state == .done {
munmap(f.stack_region, f.stack_len);
self.own_allocator.dealloc_bytes(xx f);
} else if f.state == .ready {
enqueue(self, f);
}
}
idx := earliest_timer(self);
if idx >= 0 {
t := self.timers.items[idx];
remove_timer(self, idx);
self.clock_ms = t.deadline_ms;
self.wake(t.fiber);
return self.clock_ms;
}
return -1;
}
// The VIRTUAL clock (deterministic) — NOT a wall clock, so a fiber program's
// timestamps are reproducible (mirrors `now_ms` on the struct).
now_ms :: (self: *Scheduler) -> i64 { return self.clock_ms; }
// Arm a virtual-time timer that re-readies the CURRENT fiber at `deadline_ms`
// (the caller suspends separately via `suspend_raw`). Does not suspend. Returns
// null (a timer-cancel handle is Phase 3). The fired timer wakes the fiber
// through the same `wake` path the run loop uses.
arm_timer :: (self: *Scheduler, deadline_ms: i64, park: ParkToken) -> *void {
t : Timer = .{ deadline_ms = deadline_ms, fiber = self.current };
self.timers.append(t, self.own_allocator);
return null;
}
}
// --- the context switch (naked) + first-entry trampoline -------------------
// x0 = from, x1 = to (read straight from the ABI registers — a naked fn has no