feat: impl Io for Scheduler — fiber scheduler as a context.io vtable (Phase 1)
`impl Io for Scheduler` folds the M:1 fiber scheduler behind the same `Io`
protocol (core.sx) the blocking `CBlockingIo` implements, so the async layer
can run colorblind over whichever impl is installed via
`push Context { io = xx scheduler }`. The six methods are thin adapters over
the existing fiber primitives:
- `spawn_raw(entry, arg, opts)` — spawn a fiber that calls the erased
`(*void)->void` worker thunk `entry(arg)` (fn-ptr round-trip through
`*void`); returns the `*Fiber` handle. The fiber inherits this context
(Phase 0), so the worker's own `context.io` is this scheduler.
- `suspend_raw(park) -> !` — park the running fiber; the `!` is the
cancellation channel a suspending impl raises on (wired in Phase 3).
- `ready(park)` — `wake` the fiber recorded in the token (guarded on
`.suspended`).
- `poll(deadline_ms)` — one step of the run loop (drain ready + fire the
earliest virtual-time timer); fd-readiness stays on `run`.
- `now_ms` — the deterministic virtual clock.
- `arm_timer(deadline_ms, park)` — arm a virtual-time timer that re-readies
the current fiber.
Locked by examples/concurrency/1823 — two workers spawned + suspended +
resumed entirely through `context.io`, deterministic deadline order
(byte-identical aarch64-macOS host + aarch64-linux container). Full suite
green (828/0).
This commit is contained in:
42
examples/concurrency/1823-concurrency-fiber-io-vtable.sx
Normal file
42
examples/concurrency/1823-concurrency-fiber-io-vtable.sx
Normal file
@@ -0,0 +1,42 @@
|
||||
// Stream B2/A1 — the M:1 fiber scheduler installed AS an `Io` capability vtable,
|
||||
// driven entirely through `context.io`. `impl Io for Scheduler` (sched.sx) folds
|
||||
// the scheduler behind the same `Io` protocol the blocking `CBlockingIo`
|
||||
// implements, so the worker below reaches real suspension/timers through the
|
||||
// PROTOCOL (`context.io.spawn_raw`/`arm_timer`/`suspend_raw`/`now_ms`) rather
|
||||
// than bespoke scheduler methods — the foundation for a colorblind
|
||||
// `async`/`await`/`race` that runs over whichever `Io` is installed.
|
||||
//
|
||||
// Two workers are spawned via `context.io.spawn_raw`; each arms a virtual-time
|
||||
// timer and `suspend_raw`s until it fires. They resume in DEADLINE order (10 then
|
||||
// 20), deterministic on the virtual clock — proving the protocol round-trips
|
||||
// spawn → arm → suspend → ready → resume against the fiber engine. Phase 0 (fibers
|
||||
// inherit the spawn-time context) is what lets the worker's own `context.io`
|
||||
// resolve back to this scheduler.
|
||||
//
|
||||
// aarch64-pinned (the scheduler's per-arch asm): runs end-to-end on a matching
|
||||
// host (macOS + linux), ir-only on a mismatch.
|
||||
#import "modules/std.sx";
|
||||
sched :: #import "modules/std/sched.sx";
|
||||
|
||||
// Worker entry: an sx (*void)->void fn, erased to *void by spawn_raw.
|
||||
sleeper :: (arg: *void) {
|
||||
n : *i64 = xx arg;
|
||||
tok : ParkToken = .{ handle = null };
|
||||
context.io.arm_timer(context.io.now_ms() + n.*, tok);
|
||||
context.io.suspend_raw(tok) catch {};
|
||||
print("worker(sleep {}) resumed at now_ms = {}\n", n.*, context.io.now_ms());
|
||||
}
|
||||
|
||||
main :: () -> i64 {
|
||||
s := sched.Scheduler.init();
|
||||
ps := @s;
|
||||
d1 : i64 = 20;
|
||||
d2 : i64 = 10;
|
||||
push Context.{ allocator = context.allocator, data = null, io = xx s } {
|
||||
context.io.spawn_raw(xx sleeper, xx @d1, .{});
|
||||
context.io.spawn_raw(xx sleeper, xx @d2, .{});
|
||||
ps.run();
|
||||
}
|
||||
print("final clock: {}ms\n", ps.now_ms());
|
||||
return 0;
|
||||
}
|
||||
@@ -588,6 +588,102 @@ Scheduler :: struct {
|
||||
}
|
||||
}
|
||||
|
||||
// --- B2/A1: the fiber scheduler AS an `Io` capability vtable -----------------
|
||||
//
|
||||
// `impl Io for Scheduler` folds the M:1 fiber scheduler behind `context.io` —
|
||||
// the same `Io` protocol (core.sx) the blocking `CBlockingIo` (io.sx) implements,
|
||||
// so the async ergonomic layer (`async`/`await`/`cancel`/`race`) runs COLORBLIND
|
||||
// over whichever impl is installed. Install it the allocator way — by value, the
|
||||
// receiver borrowed into the protocol field:
|
||||
//
|
||||
// s := Scheduler.init();
|
||||
// push Context.{ allocator = context.allocator, data = null, io = xx s } {
|
||||
// … context.io.async(…) / await / race …
|
||||
// s.run(); // the explicit driver loop (stays as today)
|
||||
// }
|
||||
//
|
||||
// Phase 0 (fibers inherit the spawn-time context) is what makes this reach a
|
||||
// WORKER: a fiber spawned through `spawn_raw` runs under the pushed context, so
|
||||
// its own `context.io.suspend_raw`/`ready` resolve back to THIS scheduler, not
|
||||
// the blocking default. The methods are thin adapters over the existing fiber
|
||||
// primitives (`spawn`/`suspend_self`/`wake`/`Timer`/`clock_ms`).
|
||||
impl Io for Scheduler {
|
||||
// Spawn a fiber that runs `entry(arg)`. `entry` is an sx `(*void) -> void`
|
||||
// function pointer (the worker thunk the `async` layer boxes), erased to
|
||||
// `*void`; `arg` is its single payload pointer. Returns the `*Fiber` as the
|
||||
// opaque handle (the `Future` stashes it in `task`). The fiber inherits this
|
||||
// call's context (Phase 0), so the worker sees this scheduler as `context.io`.
|
||||
spawn_raw :: (self: *Scheduler, entry: *void, arg: *void, opts: SpawnOpts) -> *void {
|
||||
f := self.spawn(() => {
|
||||
entry_fn : (*void) -> void = xx entry;
|
||||
entry_fn(arg);
|
||||
});
|
||||
return xx f;
|
||||
}
|
||||
|
||||
// Park the running fiber until a matching `ready`. `-> !` is the cancellation
|
||||
// channel the protocol mandates: a suspending impl raises `IoErr.Canceled`
|
||||
// out here when the parked task was cancelled (wired in Phase 3). The M:1
|
||||
// impl does not raise yet — it just parks the current fiber.
|
||||
suspend_raw :: (self: *Scheduler, park: ParkToken) -> ! {
|
||||
self.suspend_self();
|
||||
}
|
||||
|
||||
// Re-ready a fiber parked under `park` (its `handle` is the `*Fiber`, recorded
|
||||
// by the suspending side). Funnels through `wake`, which is guarded on
|
||||
// `.suspended`, so a ready of an already-running/queued fiber is a safe no-op.
|
||||
ready :: (self: *Scheduler, park: ParkToken) {
|
||||
if park.handle == null { return; }
|
||||
f : *Fiber = xx park.handle;
|
||||
self.wake(f);
|
||||
}
|
||||
|
||||
// Advance the world one step (the unit `run` loops over). Drain every
|
||||
// currently-ready fiber, then fire the earliest pending timer (advancing the
|
||||
// virtual clock). Returns the new clock after a timer fire, or -1 when nothing
|
||||
// ran and no timer is pending. NOTE: fd-readiness (kqueue/epoll) blocking is
|
||||
// NOT driven here yet — `run` remains the canonical driver for fd-bound work;
|
||||
// `poll` covers the compute + virtual-timer path the async layer needs.
|
||||
poll :: (self: *Scheduler, deadline_ms: i64) -> i64 {
|
||||
while self.ready_head != null {
|
||||
f := dequeue(self);
|
||||
self.current = f;
|
||||
f.state = .running;
|
||||
swap_context(@self.sched_ctx, @f.ctx);
|
||||
self.current = null;
|
||||
if f.state == .done {
|
||||
munmap(f.stack_region, f.stack_len);
|
||||
self.own_allocator.dealloc_bytes(xx f);
|
||||
} else if f.state == .ready {
|
||||
enqueue(self, f);
|
||||
}
|
||||
}
|
||||
idx := earliest_timer(self);
|
||||
if idx >= 0 {
|
||||
t := self.timers.items[idx];
|
||||
remove_timer(self, idx);
|
||||
self.clock_ms = t.deadline_ms;
|
||||
self.wake(t.fiber);
|
||||
return self.clock_ms;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
// The VIRTUAL clock (deterministic) — NOT a wall clock, so a fiber program's
|
||||
// timestamps are reproducible (mirrors `now_ms` on the struct).
|
||||
now_ms :: (self: *Scheduler) -> i64 { return self.clock_ms; }
|
||||
|
||||
// Arm a virtual-time timer that re-readies the CURRENT fiber at `deadline_ms`
|
||||
// (the caller suspends separately via `suspend_raw`). Does not suspend. Returns
|
||||
// null (a timer-cancel handle is Phase 3). The fired timer wakes the fiber
|
||||
// through the same `wake` path the run loop uses.
|
||||
arm_timer :: (self: *Scheduler, deadline_ms: i64, park: ParkToken) -> *void {
|
||||
t : Timer = .{ deadline_ms = deadline_ms, fiber = self.current };
|
||||
self.timers.append(t, self.own_allocator);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// --- the context switch (naked) + first-entry trampoline -------------------
|
||||
|
||||
// x0 = from, x1 = to (read straight from the ABI registers — a naked fn has no
|
||||
|
||||
Reference in New Issue
Block a user