feat: structured first-wins race over the M:1 fiber scheduler
`s.race((a: ta, b: tb, …))` takes a named tuple of already-spawned `*Task(..)` handles, suspends the calling fiber until the FIRST task is ready, and returns a comptime-synthesized tagged-union (`RaceResult`) mirroring the tuple's labels — variant NAME = the tuple label, payload = that task's result type. After picking the winner it CANCELS and JOINS every loser, so no loser fiber outlives the call (structured concurrency). - `RaceResult($T) -> Type` projects each `*Task(R)` element to `R` via `field_type(pointee(field_type(T, i)), 0)` and mints the union with `make_enum` (the 0649 composition shape). - `race` Phase 1 registers the caller as waiter on all pending tasks and parks; on wake it DEREGISTERS from every task (a later loser completion must never wake it again) and re-scans, lowest-index-first. Phase 2 builds the winner variant with `make_variant`. Phase 3 cancels + joins each loser one at a time — only the joined loser carries a waiter, so no other completion can wake the caller mid-join. - Join correctness rides a new `Task.finished` flag, set at the very end of the `go` worker body (after the work ran OR was skipped on an early cancel) and checked before parking, so a worker that finishes between the cancel and the park can't be lost. Cancellation is cooperative (M:1, no preemption): a loser parked mid-`sleep` runs to its natural end, its value discarded — `race` returns only once every loser has `finished`. The tuple must be NAMED; a positional `._0`/`._1` form is future work. Locked by examples/concurrency/1821 — three tasks (i64/bool/f64) sleeping 10/20/30ms, shortest wins, losers cancelled + joined; byte-identical on aarch64-macOS and aarch64-linux (deterministic virtual time).
This commit is contained in:
62
examples/concurrency/1821-concurrency-fiber-race.sx
Normal file
62
examples/concurrency/1821-concurrency-fiber-race.sx
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
// Stream B2/A1 — structured first-wins `race` over the M:1 fiber scheduler.
|
||||||
|
//
|
||||||
|
// `s.race((a: ta, b: tb, c: tc))` takes a named tuple of already-spawned
|
||||||
|
// `*Task(..)` handles, SUSPENDS the calling fiber until the FIRST task is ready,
|
||||||
|
// and returns a comptime-SYNTHESIZED tagged-union (`RaceResult`) mirroring the
|
||||||
|
// tuple's labels — variant NAME = the tuple label, payload = that task's result
|
||||||
|
// type. Here the three tasks return DIFFERENT types (i64 / bool / f64), so the
|
||||||
|
// minted union is `enum { a: i64; b: bool; c: f64 }` and the winner is matched by
|
||||||
|
// label. After picking the winner, `race` CANCELS and JOINS every loser, so no
|
||||||
|
// loser fiber outlives the call (structured concurrency).
|
||||||
|
//
|
||||||
|
// Deterministic by virtual time (like 1817 — no real clock): the tasks sleep
|
||||||
|
// 10/20/30 ms, so `a` (shortest) wins at t=10. Cancellation is COOPERATIVE (M:1,
|
||||||
|
// no preemption): the losers were already parked mid-`sleep` when cancelled, so
|
||||||
|
// they cannot be preempted — `race` joins them, letting each run to its natural
|
||||||
|
// end (its value discarded) before returning. The completion log therefore shows
|
||||||
|
// all three finishing (a@10 winner, b@20, c@30 joined) and the final virtual
|
||||||
|
// clock is 30. Each loser's `canceled` flag is set and its worker `finished`.
|
||||||
|
//
|
||||||
|
// aarch64-pinned (the scheduler's per-arch asm + per-OS mmap/event constants):
|
||||||
|
// runs end-to-end on a matching host (macOS + linux), ir-only on a mismatch.
|
||||||
|
#import "modules/std.sx";
|
||||||
|
sched :: #import "modules/std/sched.sx";
|
||||||
|
|
||||||
|
Log :: struct { id: [8]i64; at: [8]i64; n: i64; }
|
||||||
|
rec :: (l: *Log, id: i64, at: i64) { l.id[l.n] = id; l.at[l.n] = at; l.n = l.n + 1; }
|
||||||
|
|
||||||
|
main :: () -> i64 {
|
||||||
|
lg : Log = ---; lg.n = 0;
|
||||||
|
s := sched.Scheduler.init();
|
||||||
|
ps := @s; pl := @lg;
|
||||||
|
|
||||||
|
// The coordinator runs as a fiber so `race` has a `current` to park.
|
||||||
|
s.spawn(() => {
|
||||||
|
// Three async tasks with DIFFERENT result types and sleep durations.
|
||||||
|
a := ps.go(() -> i64 => { ps.sleep(10); rec(pl, 1, ps.now_ms()); 111 });
|
||||||
|
b := ps.go(() -> bool => { ps.sleep(20); rec(pl, 2, ps.now_ms()); true });
|
||||||
|
c := ps.go(() -> f64 => { ps.sleep(30); rec(pl, 3, ps.now_ms()); 2.5 });
|
||||||
|
|
||||||
|
// Race them. `a` (sleep 10) wins; `b` and `c` are cancelled + joined.
|
||||||
|
winner := ps.race(.(a = a, b = b, c = c));
|
||||||
|
if winner == {
|
||||||
|
case .a: (v) { print("winner: a (i64) = {}\n", v); }
|
||||||
|
case .b: (v) { print("winner: b (bool) = {}\n", v); }
|
||||||
|
case .c: (v) { print("winner: c (f64) = {}\n", v); }
|
||||||
|
}
|
||||||
|
|
||||||
|
// The losers were cancelled (flag set) and joined (worker finished).
|
||||||
|
print("loser b: canceled={} finished={}\n", b.canceled, b.finished);
|
||||||
|
print("loser c: canceled={} finished={}\n", c.canceled, c.finished);
|
||||||
|
});
|
||||||
|
s.run();
|
||||||
|
|
||||||
|
print("completion order (id @ virtual-ms):\n");
|
||||||
|
i := 0;
|
||||||
|
while i < lg.n {
|
||||||
|
print(" task {} @ {}ms\n", lg.id[i], lg.at[i]);
|
||||||
|
i = i + 1;
|
||||||
|
}
|
||||||
|
print("final virtual clock: {}ms\n", s.now_ms());
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
@@ -26,6 +26,12 @@
|
|||||||
// epoll on linux). Runs end-to-end on a matching aarch64 host, ir-only on an
|
// epoll on linux). Runs end-to-end on a matching aarch64 host, ir-only on an
|
||||||
// arch mismatch.
|
// arch mismatch.
|
||||||
#import "modules/std.sx";
|
#import "modules/std.sx";
|
||||||
|
// `race` synthesizes its result type (a tagged-union mirroring the input tuple's
|
||||||
|
// labels) and constructs the winner variant by runtime index — both need the
|
||||||
|
// metatype WRITE side (`make_enum`/`make_variant`/`EnumVariant`/`field_type`).
|
||||||
|
// meta.sx imports only std.sx (no naked asm, no cycle back into sched), so it is
|
||||||
|
// safe to pull in here.
|
||||||
|
#import "modules/std/meta.sx";
|
||||||
kqb :: #import "modules/std/net/kqueue.sx";
|
kqb :: #import "modules/std/net/kqueue.sx";
|
||||||
// The fd-readiness backend is per-OS: kqueue (kqb, above) on darwin, epoll on
|
// The fd-readiness backend is per-OS: kqueue (kqb, above) on darwin, epoll on
|
||||||
// linux. The epoll import is scoped to the linux branch so darwin never pulls
|
// linux. The epoll import is scoped to the linux branch so darwin never pulls
|
||||||
@@ -867,6 +873,12 @@ Task :: struct ($R: Type) {
|
|||||||
waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one
|
waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one
|
||||||
sched: *Scheduler; // owning scheduler (for park/wake in `wait`)
|
sched: *Scheduler; // owning scheduler (for park/wake in `wait`)
|
||||||
canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics)
|
canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics)
|
||||||
|
finished: i64; // set to 1 at the very END of the worker body (after the
|
||||||
|
// work ran OR was skipped on an early cancel). Distinct from
|
||||||
|
// `state == .canceled` (which `cancel` sets IMMEDIATELY, before
|
||||||
|
// the fiber has run): a JOINER (`race`) waits on `finished` so it
|
||||||
|
// knows the worker fiber actually reached its end — no loser
|
||||||
|
// outlives the `race` call.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber
|
// Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber
|
||||||
@@ -882,6 +894,7 @@ go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) {
|
|||||||
t.waiter = null;
|
t.waiter = null;
|
||||||
t.sched = self;
|
t.sched = self;
|
||||||
t.canceled = 0;
|
t.canceled = 0;
|
||||||
|
t.finished = 0;
|
||||||
// Record the heap Task so `deinit` can free it (the scheduler otherwise has
|
// Record the heap Task so `deinit` can free it (the scheduler otherwise has
|
||||||
// no handle on its generic Tasks). Long-lived: a Task outlives this call.
|
// no handle on its generic Tasks). Long-lived: a Task outlives this call.
|
||||||
self.task_allocs.append(xx t, self.own_allocator);
|
self.task_allocs.append(xx t, self.own_allocator);
|
||||||
@@ -894,7 +907,14 @@ go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) {
|
|||||||
t.value = work();
|
t.value = work();
|
||||||
t.state = .ready;
|
t.state = .ready;
|
||||||
}
|
}
|
||||||
// Wake the awaiter only if one already parked (else `wait` will not park).
|
// The worker has reached its end (ran the work, or skipped it on an early
|
||||||
|
// cancel). Mark `finished` BEFORE the wake so a joiner that checks the flag
|
||||||
|
// on resume always observes it set (a `race` JOIN distinguishes a finished
|
||||||
|
// worker from a merely-flagged-cancelled one via this).
|
||||||
|
t.finished = 1;
|
||||||
|
// Wake the awaiter only if one already parked (else `wait`/`race` will not
|
||||||
|
// park). Fires whether or not the work ran — both `wait` and a `race` join
|
||||||
|
// resume here.
|
||||||
if t.waiter != null { self.wake(xx t.waiter); }
|
if t.waiter != null { self.wake(xx t.waiter); }
|
||||||
});
|
});
|
||||||
return t;
|
return t;
|
||||||
@@ -930,3 +950,114 @@ cancel :: ufcs (t: *Task($R)) {
|
|||||||
t.canceled = 1;
|
t.canceled = 1;
|
||||||
t.state = .canceled;
|
t.state = .canceled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- B2/A1: structured first-wins `race` over the M:1 Task layer -------------
|
||||||
|
//
|
||||||
|
// `race((a: ta, b: tb, …))` starts from N already-spawned `*Task(..)` handles,
|
||||||
|
// returns when the FIRST completes, and STRUCTURALLY cancels + joins the losers
|
||||||
|
// before returning — no loser fiber outlives the call. The result is a
|
||||||
|
// comptime-synthesized tagged-union (`RaceResult`) mirroring the input tuple's
|
||||||
|
// labels: each variant's NAME is the tuple label, its payload is that task's
|
||||||
|
// result type. The tuple must be NAMED (`(a: ta, b: tb)`); a positional-tuple
|
||||||
|
// form (`._0`/`._1` variants) is future work — `field_name` yields "" for an
|
||||||
|
// unnamed element, which `make_enum` rejects as a duplicate-name collision.
|
||||||
|
//
|
||||||
|
// fa := s.go(() -> A => read_a(conn)); // *Task(A)
|
||||||
|
// fb := s.go(() -> B => read_b(conn)); // *Task(B)
|
||||||
|
// winner := s.race((a: fa, b: fb)); // RaceResult = enum { a: A; b: B }
|
||||||
|
// if winner == { case .a: (v) {…} case .b: (v) {…} } // loser cancelled+joined
|
||||||
|
|
||||||
|
// Synthesize the race RESULT type for a named tuple `$T` of `*Task(..)` handles.
|
||||||
|
// `*Task(R)` projects to its result `R` via `field_type(pointee(field_type(T, i)), 0)`:
|
||||||
|
// `field_type(T, i)` = `*Task(R)`, `pointee` strips the pointer to `Task(R)`, and
|
||||||
|
// field 0 of `Task` is `value: R`. One nominal type per distinct `T` (type-fn
|
||||||
|
// identity), so the decl, every `make_variant(RaceResult(T), …)` call, and the
|
||||||
|
// `-> RaceResult(T)` return all name the SAME union. (The 0649 composition shape,
|
||||||
|
// with `Task` in place of the stand-in `Box`.)
|
||||||
|
RaceResult :: ($T: Type) -> Type {
|
||||||
|
vs : [field_count(T)]EnumVariant = ---;
|
||||||
|
inline for 0..field_count(T) (i) {
|
||||||
|
vs[i] = EnumVariant.{
|
||||||
|
name = field_name(T, i), // tuple label → variant name
|
||||||
|
payload = field_type(pointee(field_type(T, i)), 0), // *Task(R) -> Task(R) -> R
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return make_enum("RaceResult", vs[0..field_count(T)]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Structured first-wins race. Suspends the calling fiber until the FIRST task is
|
||||||
|
// `.ready`, builds a `RaceResult(T)` carrying that winner's value, then CANCELS
|
||||||
|
// and JOINS every loser before returning.
|
||||||
|
//
|
||||||
|
// MUST be called from inside a fiber (there must be a `current` to park), like
|
||||||
|
// `wait`/`sleep`; a null `current` bails loudly rather than dereferencing null.
|
||||||
|
//
|
||||||
|
// COOPERATIVE-CANCEL SEMANTIC (M:1, no preemption): a loser already past its
|
||||||
|
// work's first line cannot be preempted — `cancel` sets its flag and the JOIN
|
||||||
|
// waits for the worker to reach its natural end (the value is discarded). A loser
|
||||||
|
// that had not yet started skips its work entirely (`go`'s `if t.canceled == 0`
|
||||||
|
// guard). Either way `race` returns only once every loser's worker has `finished`,
|
||||||
|
// so no loser fiber is still live past the call (structured concurrency).
|
||||||
|
race :: ufcs (self: *Scheduler, tasks: $T) -> RaceResult(T) {
|
||||||
|
cur := self.current;
|
||||||
|
if cur == null {
|
||||||
|
print("sched: race() called outside a fiber (no running fiber)\n");
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 1 — first winner. Scan for an already-`.ready` task (lowest index
|
||||||
|
// wins on a same-tick tie → deterministic). If none, register the caller as
|
||||||
|
// the waiter on every still-`.pending` task and park. On wake DEREGISTER from
|
||||||
|
// ALL of them: a later loser completion must never wake the caller again — by
|
||||||
|
// the time it fires the caller may be running or parked on a different join,
|
||||||
|
// and a stale waiter-wake would be a spurious/lost wakeup (the queue-corruption
|
||||||
|
// hazard `wake` guards). A spurious wake with nothing ready re-registers and
|
||||||
|
// re-parks.
|
||||||
|
winner_idx : i64 = -1;
|
||||||
|
while winner_idx < 0 {
|
||||||
|
inline for 0..field_count(T) (i) {
|
||||||
|
if winner_idx < 0 and tasks[i].state == .ready { winner_idx = i; }
|
||||||
|
}
|
||||||
|
if winner_idx >= 0 { break; }
|
||||||
|
inline for 0..field_count(T) (i) {
|
||||||
|
if tasks[i].state == .pending { tasks[i].waiter = xx cur; }
|
||||||
|
}
|
||||||
|
self.suspend_self();
|
||||||
|
inline for 0..field_count(T) (i) {
|
||||||
|
if tasks[i].waiter == (xx cur) { tasks[i].waiter = null; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 2 — build the winner variant. `tasks[i].value` carries the CONCRETE
|
||||||
|
// result type of variant `i` (comptime-cursor tuple indexing), so the matching
|
||||||
|
// unrolled arm constructs `RaceResult(T)`'s i-th variant directly — no nested
|
||||||
|
// `inline if` to recover the payload type. `i` is the comptime cursor; the
|
||||||
|
// runtime `if i == winner_idx` selects the one arm that fires.
|
||||||
|
result : RaceResult(T) = ---;
|
||||||
|
inline for 0..field_count(T) (i) {
|
||||||
|
if i == winner_idx {
|
||||||
|
result = make_variant(RaceResult(T), i, tasks[i].value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 3 — cancel + JOIN every loser, one at a time. `cancel` sets the flag;
|
||||||
|
// the join then ensures the loser's worker fiber has `finished` (not merely
|
||||||
|
// been flagged): if it has not, register as ITS sole waiter and park until the
|
||||||
|
// worker's tail wakes us (the worker sets `finished = 1` and wakes its waiter
|
||||||
|
// whether it ran the work or skipped it on an early cancel). Checking
|
||||||
|
// `finished` BEFORE parking avoids a lost wakeup (mirrors `wait` checking
|
||||||
|
// `.ready`). Only the loser being joined has a registered waiter, so no other
|
||||||
|
// task's completion can wake us mid-join.
|
||||||
|
inline for 0..field_count(T) (i) {
|
||||||
|
if i != winner_idx {
|
||||||
|
tasks[i].cancel();
|
||||||
|
if tasks[i].finished == 0 {
|
||||||
|
tasks[i].waiter = xx cur;
|
||||||
|
self.suspend_self();
|
||||||
|
if tasks[i].waiter == (xx cur) { tasks[i].waiter = null; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user