diff --git a/examples/concurrency/1823-concurrency-fiber-io-vtable.sx b/examples/concurrency/1823-concurrency-fiber-io-vtable.sx new file mode 100644 index 00000000..65041b18 --- /dev/null +++ b/examples/concurrency/1823-concurrency-fiber-io-vtable.sx @@ -0,0 +1,42 @@ +// Stream B2/A1 — the M:1 fiber scheduler installed AS an `Io` capability vtable, +// driven entirely through `context.io`. `impl Io for Scheduler` (sched.sx) folds +// the scheduler behind the same `Io` protocol the blocking `CBlockingIo` +// implements, so the worker below reaches real suspension/timers through the +// PROTOCOL (`context.io.spawn_raw`/`arm_timer`/`suspend_raw`/`now_ms`) rather +// than bespoke scheduler methods — the foundation for a colorblind +// `async`/`await`/`race` that runs over whichever `Io` is installed. +// +// Two workers are spawned via `context.io.spawn_raw`; each arms a virtual-time +// timer and `suspend_raw`s until it fires. They resume in DEADLINE order (10 then +// 20), deterministic on the virtual clock — proving the protocol round-trips +// spawn → arm → suspend → ready → resume against the fiber engine. Phase 0 (fibers +// inherit the spawn-time context) is what lets the worker's own `context.io` +// resolve back to this scheduler. +// +// aarch64-pinned (the scheduler's per-arch asm): runs end-to-end on a matching +// host (macOS + linux), ir-only on a mismatch. +#import "modules/std.sx"; +sched :: #import "modules/std/sched.sx"; + +// Worker entry: an sx (*void)->void fn, erased to *void by spawn_raw. +sleeper :: (arg: *void) { + n : *i64 = xx arg; + tok : ParkToken = .{ handle = null }; + context.io.arm_timer(context.io.now_ms() + n.*, tok); + context.io.suspend_raw(tok) catch {}; + print("worker(sleep {}) resumed at now_ms = {}\n", n.*, context.io.now_ms()); +} + +main :: () -> i64 { + s := sched.Scheduler.init(); + ps := @s; + d1 : i64 = 20; + d2 : i64 = 10; + push Context.{ allocator = context.allocator, data = null, io = xx s } { + context.io.spawn_raw(xx sleeper, xx @d1, .{}); + context.io.spawn_raw(xx sleeper, xx @d2, .{}); + ps.run(); + } + print("final clock: {}ms\n", ps.now_ms()); + return 0; +} diff --git a/library/modules/std/sched.sx b/library/modules/std/sched.sx index 07f40c97..97970b7a 100644 --- a/library/modules/std/sched.sx +++ b/library/modules/std/sched.sx @@ -588,6 +588,102 @@ Scheduler :: struct { } } +// --- B2/A1: the fiber scheduler AS an `Io` capability vtable ----------------- +// +// `impl Io for Scheduler` folds the M:1 fiber scheduler behind `context.io` — +// the same `Io` protocol (core.sx) the blocking `CBlockingIo` (io.sx) implements, +// so the async ergonomic layer (`async`/`await`/`cancel`/`race`) runs COLORBLIND +// over whichever impl is installed. Install it the allocator way — by value, the +// receiver borrowed into the protocol field: +// +// s := Scheduler.init(); +// push Context.{ allocator = context.allocator, data = null, io = xx s } { +// … context.io.async(…) / await / race … +// s.run(); // the explicit driver loop (stays as today) +// } +// +// Phase 0 (fibers inherit the spawn-time context) is what makes this reach a +// WORKER: a fiber spawned through `spawn_raw` runs under the pushed context, so +// its own `context.io.suspend_raw`/`ready` resolve back to THIS scheduler, not +// the blocking default. The methods are thin adapters over the existing fiber +// primitives (`spawn`/`suspend_self`/`wake`/`Timer`/`clock_ms`). +impl Io for Scheduler { + // Spawn a fiber that runs `entry(arg)`. `entry` is an sx `(*void) -> void` + // function pointer (the worker thunk the `async` layer boxes), erased to + // `*void`; `arg` is its single payload pointer. Returns the `*Fiber` as the + // opaque handle (the `Future` stashes it in `task`). The fiber inherits this + // call's context (Phase 0), so the worker sees this scheduler as `context.io`. + spawn_raw :: (self: *Scheduler, entry: *void, arg: *void, opts: SpawnOpts) -> *void { + f := self.spawn(() => { + entry_fn : (*void) -> void = xx entry; + entry_fn(arg); + }); + return xx f; + } + + // Park the running fiber until a matching `ready`. `-> !` is the cancellation + // channel the protocol mandates: a suspending impl raises `IoErr.Canceled` + // out here when the parked task was cancelled (wired in Phase 3). The M:1 + // impl does not raise yet — it just parks the current fiber. + suspend_raw :: (self: *Scheduler, park: ParkToken) -> ! { + self.suspend_self(); + } + + // Re-ready a fiber parked under `park` (its `handle` is the `*Fiber`, recorded + // by the suspending side). Funnels through `wake`, which is guarded on + // `.suspended`, so a ready of an already-running/queued fiber is a safe no-op. + ready :: (self: *Scheduler, park: ParkToken) { + if park.handle == null { return; } + f : *Fiber = xx park.handle; + self.wake(f); + } + + // Advance the world one step (the unit `run` loops over). Drain every + // currently-ready fiber, then fire the earliest pending timer (advancing the + // virtual clock). Returns the new clock after a timer fire, or -1 when nothing + // ran and no timer is pending. NOTE: fd-readiness (kqueue/epoll) blocking is + // NOT driven here yet — `run` remains the canonical driver for fd-bound work; + // `poll` covers the compute + virtual-timer path the async layer needs. + poll :: (self: *Scheduler, deadline_ms: i64) -> i64 { + while self.ready_head != null { + f := dequeue(self); + self.current = f; + f.state = .running; + swap_context(@self.sched_ctx, @f.ctx); + self.current = null; + if f.state == .done { + munmap(f.stack_region, f.stack_len); + self.own_allocator.dealloc_bytes(xx f); + } else if f.state == .ready { + enqueue(self, f); + } + } + idx := earliest_timer(self); + if idx >= 0 { + t := self.timers.items[idx]; + remove_timer(self, idx); + self.clock_ms = t.deadline_ms; + self.wake(t.fiber); + return self.clock_ms; + } + return -1; + } + + // The VIRTUAL clock (deterministic) — NOT a wall clock, so a fiber program's + // timestamps are reproducible (mirrors `now_ms` on the struct). + now_ms :: (self: *Scheduler) -> i64 { return self.clock_ms; } + + // Arm a virtual-time timer that re-readies the CURRENT fiber at `deadline_ms` + // (the caller suspends separately via `suspend_raw`). Does not suspend. Returns + // null (a timer-cancel handle is Phase 3). The fired timer wakes the fiber + // through the same `wake` path the run loop uses. + arm_timer :: (self: *Scheduler, deadline_ms: i64, park: ParkToken) -> *void { + t : Timer = .{ deadline_ms = deadline_ms, fiber = self.current }; + self.timers.append(t, self.own_allocator); + return null; + } +} + // --- the context switch (naked) + first-entry trampoline ------------------- // x0 = from, x1 = to (read straight from the ABI registers — a naked fn has no