feat: impl Io for Scheduler — fiber scheduler as a context.io vtable (Phase 1)
`impl Io for Scheduler` folds the M:1 fiber scheduler behind the same `Io`
protocol (core.sx) the blocking `CBlockingIo` implements, so the async layer
can run colorblind over whichever impl is installed via
`push Context { io = xx scheduler }`. The six methods are thin adapters over
the existing fiber primitives:
- `spawn_raw(entry, arg, opts)` — spawn a fiber that calls the erased
`(*void)->void` worker thunk `entry(arg)` (fn-ptr round-trip through
`*void`); returns the `*Fiber` handle. The fiber inherits this context
(Phase 0), so the worker's own `context.io` is this scheduler.
- `suspend_raw(park) -> !` — park the running fiber; the `!` is the
cancellation channel a suspending impl raises on (wired in Phase 3).
- `ready(park)` — `wake` the fiber recorded in the token (guarded on
`.suspended`).
- `poll(deadline_ms)` — one step of the run loop (drain ready + fire the
earliest virtual-time timer); fd-readiness stays on `run`.
- `now_ms` — the deterministic virtual clock.
- `arm_timer(deadline_ms, park)` — arm a virtual-time timer that re-readies
the current fiber.
Locked by examples/concurrency/1823 — two workers spawned + suspended +
resumed entirely through `context.io`, deterministic deadline order
(byte-identical aarch64-macOS host + aarch64-linux container). Full suite
green (828/0).
This commit is contained in:
42
examples/concurrency/1823-concurrency-fiber-io-vtable.sx
Normal file
42
examples/concurrency/1823-concurrency-fiber-io-vtable.sx
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
// Stream B2/A1 — the M:1 fiber scheduler installed AS an `Io` capability vtable,
|
||||||
|
// driven entirely through `context.io`. `impl Io for Scheduler` (sched.sx) folds
|
||||||
|
// the scheduler behind the same `Io` protocol the blocking `CBlockingIo`
|
||||||
|
// implements, so the worker below reaches real suspension/timers through the
|
||||||
|
// PROTOCOL (`context.io.spawn_raw`/`arm_timer`/`suspend_raw`/`now_ms`) rather
|
||||||
|
// than bespoke scheduler methods — the foundation for a colorblind
|
||||||
|
// `async`/`await`/`race` that runs over whichever `Io` is installed.
|
||||||
|
//
|
||||||
|
// Two workers are spawned via `context.io.spawn_raw`; each arms a virtual-time
|
||||||
|
// timer and `suspend_raw`s until it fires. They resume in DEADLINE order (10 then
|
||||||
|
// 20), deterministic on the virtual clock — proving the protocol round-trips
|
||||||
|
// spawn → arm → suspend → ready → resume against the fiber engine. Phase 0 (fibers
|
||||||
|
// inherit the spawn-time context) is what lets the worker's own `context.io`
|
||||||
|
// resolve back to this scheduler.
|
||||||
|
//
|
||||||
|
// aarch64-pinned (the scheduler's per-arch asm): runs end-to-end on a matching
|
||||||
|
// host (macOS + linux), ir-only on a mismatch.
|
||||||
|
#import "modules/std.sx";
|
||||||
|
sched :: #import "modules/std/sched.sx";
|
||||||
|
|
||||||
|
// Worker entry: an sx (*void)->void fn, erased to *void by spawn_raw.
|
||||||
|
sleeper :: (arg: *void) {
|
||||||
|
n : *i64 = xx arg;
|
||||||
|
tok : ParkToken = .{ handle = null };
|
||||||
|
context.io.arm_timer(context.io.now_ms() + n.*, tok);
|
||||||
|
context.io.suspend_raw(tok) catch {};
|
||||||
|
print("worker(sleep {}) resumed at now_ms = {}\n", n.*, context.io.now_ms());
|
||||||
|
}
|
||||||
|
|
||||||
|
main :: () -> i64 {
|
||||||
|
s := sched.Scheduler.init();
|
||||||
|
ps := @s;
|
||||||
|
d1 : i64 = 20;
|
||||||
|
d2 : i64 = 10;
|
||||||
|
push Context.{ allocator = context.allocator, data = null, io = xx s } {
|
||||||
|
context.io.spawn_raw(xx sleeper, xx @d1, .{});
|
||||||
|
context.io.spawn_raw(xx sleeper, xx @d2, .{});
|
||||||
|
ps.run();
|
||||||
|
}
|
||||||
|
print("final clock: {}ms\n", ps.now_ms());
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
@@ -588,6 +588,102 @@ Scheduler :: struct {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- B2/A1: the fiber scheduler AS an `Io` capability vtable -----------------
|
||||||
|
//
|
||||||
|
// `impl Io for Scheduler` folds the M:1 fiber scheduler behind `context.io` —
|
||||||
|
// the same `Io` protocol (core.sx) the blocking `CBlockingIo` (io.sx) implements,
|
||||||
|
// so the async ergonomic layer (`async`/`await`/`cancel`/`race`) runs COLORBLIND
|
||||||
|
// over whichever impl is installed. Install it the allocator way — by value, the
|
||||||
|
// receiver borrowed into the protocol field:
|
||||||
|
//
|
||||||
|
// s := Scheduler.init();
|
||||||
|
// push Context.{ allocator = context.allocator, data = null, io = xx s } {
|
||||||
|
// … context.io.async(…) / await / race …
|
||||||
|
// s.run(); // the explicit driver loop (stays as today)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// Phase 0 (fibers inherit the spawn-time context) is what makes this reach a
|
||||||
|
// WORKER: a fiber spawned through `spawn_raw` runs under the pushed context, so
|
||||||
|
// its own `context.io.suspend_raw`/`ready` resolve back to THIS scheduler, not
|
||||||
|
// the blocking default. The methods are thin adapters over the existing fiber
|
||||||
|
// primitives (`spawn`/`suspend_self`/`wake`/`Timer`/`clock_ms`).
|
||||||
|
impl Io for Scheduler {
|
||||||
|
// Spawn a fiber that runs `entry(arg)`. `entry` is an sx `(*void) -> void`
|
||||||
|
// function pointer (the worker thunk the `async` layer boxes), erased to
|
||||||
|
// `*void`; `arg` is its single payload pointer. Returns the `*Fiber` as the
|
||||||
|
// opaque handle (the `Future` stashes it in `task`). The fiber inherits this
|
||||||
|
// call's context (Phase 0), so the worker sees this scheduler as `context.io`.
|
||||||
|
spawn_raw :: (self: *Scheduler, entry: *void, arg: *void, opts: SpawnOpts) -> *void {
|
||||||
|
f := self.spawn(() => {
|
||||||
|
entry_fn : (*void) -> void = xx entry;
|
||||||
|
entry_fn(arg);
|
||||||
|
});
|
||||||
|
return xx f;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Park the running fiber until a matching `ready`. `-> !` is the cancellation
|
||||||
|
// channel the protocol mandates: a suspending impl raises `IoErr.Canceled`
|
||||||
|
// out here when the parked task was cancelled (wired in Phase 3). The M:1
|
||||||
|
// impl does not raise yet — it just parks the current fiber.
|
||||||
|
suspend_raw :: (self: *Scheduler, park: ParkToken) -> ! {
|
||||||
|
self.suspend_self();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-ready a fiber parked under `park` (its `handle` is the `*Fiber`, recorded
|
||||||
|
// by the suspending side). Funnels through `wake`, which is guarded on
|
||||||
|
// `.suspended`, so a ready of an already-running/queued fiber is a safe no-op.
|
||||||
|
ready :: (self: *Scheduler, park: ParkToken) {
|
||||||
|
if park.handle == null { return; }
|
||||||
|
f : *Fiber = xx park.handle;
|
||||||
|
self.wake(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advance the world one step (the unit `run` loops over). Drain every
|
||||||
|
// currently-ready fiber, then fire the earliest pending timer (advancing the
|
||||||
|
// virtual clock). Returns the new clock after a timer fire, or -1 when nothing
|
||||||
|
// ran and no timer is pending. NOTE: fd-readiness (kqueue/epoll) blocking is
|
||||||
|
// NOT driven here yet — `run` remains the canonical driver for fd-bound work;
|
||||||
|
// `poll` covers the compute + virtual-timer path the async layer needs.
|
||||||
|
poll :: (self: *Scheduler, deadline_ms: i64) -> i64 {
|
||||||
|
while self.ready_head != null {
|
||||||
|
f := dequeue(self);
|
||||||
|
self.current = f;
|
||||||
|
f.state = .running;
|
||||||
|
swap_context(@self.sched_ctx, @f.ctx);
|
||||||
|
self.current = null;
|
||||||
|
if f.state == .done {
|
||||||
|
munmap(f.stack_region, f.stack_len);
|
||||||
|
self.own_allocator.dealloc_bytes(xx f);
|
||||||
|
} else if f.state == .ready {
|
||||||
|
enqueue(self, f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
idx := earliest_timer(self);
|
||||||
|
if idx >= 0 {
|
||||||
|
t := self.timers.items[idx];
|
||||||
|
remove_timer(self, idx);
|
||||||
|
self.clock_ms = t.deadline_ms;
|
||||||
|
self.wake(t.fiber);
|
||||||
|
return self.clock_ms;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The VIRTUAL clock (deterministic) — NOT a wall clock, so a fiber program's
|
||||||
|
// timestamps are reproducible (mirrors `now_ms` on the struct).
|
||||||
|
now_ms :: (self: *Scheduler) -> i64 { return self.clock_ms; }
|
||||||
|
|
||||||
|
// Arm a virtual-time timer that re-readies the CURRENT fiber at `deadline_ms`
|
||||||
|
// (the caller suspends separately via `suspend_raw`). Does not suspend. Returns
|
||||||
|
// null (a timer-cancel handle is Phase 3). The fired timer wakes the fiber
|
||||||
|
// through the same `wake` path the run loop uses.
|
||||||
|
arm_timer :: (self: *Scheduler, deadline_ms: i64, park: ParkToken) -> *void {
|
||||||
|
t : Timer = .{ deadline_ms = deadline_ms, fiber = self.current };
|
||||||
|
self.timers.append(t, self.own_allocator);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// --- the context switch (naked) + first-entry trampoline -------------------
|
// --- the context switch (naked) + first-entry trampoline -------------------
|
||||||
|
|
||||||
// x0 = from, x1 = to (read straight from the ABI registers — a naked fn has no
|
// x0 = from, x1 = to (read straight from the ABI registers — a naked fn has no
|
||||||
|
|||||||
Reference in New Issue
Block a user