feat: structured first-wins race over the M:1 fiber scheduler

`s.race((a: ta, b: tb, …))` takes a named tuple of already-spawned
`*Task(..)` handles, suspends the calling fiber until the FIRST task is
ready, and returns a comptime-synthesized tagged-union (`RaceResult`)
mirroring the tuple's labels — variant NAME = the tuple label, payload =
that task's result type. After picking the winner it CANCELS and JOINS
every loser, so no loser fiber outlives the call (structured concurrency).

- `RaceResult($T) -> Type` projects each `*Task(R)` element to `R` via
  `field_type(pointee(field_type(T, i)), 0)` and mints the union with
  `make_enum` (the 0649 composition shape).
- `race` Phase 1 registers the caller as waiter on all pending tasks and
  parks; on wake it DEREGISTERS from every task (a later loser completion
  must never wake it again) and re-scans, lowest-index-first. Phase 2
  builds the winner variant with `make_variant`. Phase 3 cancels + joins
  each loser one at a time — only the joined loser carries a waiter, so no
  other completion can wake the caller mid-join.
- Join correctness rides a new `Task.finished` flag, set at the very end of
  the `go` worker body (after the work ran OR was skipped on an early
  cancel) and checked before parking, so a worker that finishes between the
  cancel and the park can't be lost. Cancellation is cooperative (M:1, no
  preemption): a loser parked mid-`sleep` runs to its natural end, its value
  discarded — `race` returns only once every loser has `finished`.

The tuple must be NAMED; a positional `._0`/`._1` form is future work.

Locked by examples/concurrency/1821 — three tasks (i64/bool/f64) sleeping
10/20/30ms, shortest wins, losers cancelled + joined; byte-identical on
aarch64-macOS and aarch64-linux (deterministic virtual time).
This commit is contained in:
agra
2026-06-26 18:07:14 +03:00
parent 6a97628749
commit 9099735e88
2 changed files with 194 additions and 1 deletions

View File

@@ -0,0 +1,62 @@
// Stream B2/A1 — structured first-wins `race` over the M:1 fiber scheduler.
//
// `s.race((a: ta, b: tb, c: tc))` takes a named tuple of already-spawned
// `*Task(..)` handles, SUSPENDS the calling fiber until the FIRST task is ready,
// and returns a comptime-SYNTHESIZED tagged-union (`RaceResult`) mirroring the
// tuple's labels — variant NAME = the tuple label, payload = that task's result
// type. Here the three tasks return DIFFERENT types (i64 / bool / f64), so the
// minted union is `enum { a: i64; b: bool; c: f64 }` and the winner is matched by
// label. After picking the winner, `race` CANCELS and JOINS every loser, so no
// loser fiber outlives the call (structured concurrency).
//
// Deterministic by virtual time (like 1817 — no real clock): the tasks sleep
// 10/20/30 ms, so `a` (shortest) wins at t=10. Cancellation is COOPERATIVE (M:1,
// no preemption): the losers were already parked mid-`sleep` when cancelled, so
// they cannot be preempted — `race` joins them, letting each run to its natural
// end (its value discarded) before returning. The completion log therefore shows
// all three finishing (a@10 winner, b@20, c@30 joined) and the final virtual
// clock is 30. Each loser's `canceled` flag is set and its worker `finished`.
//
// aarch64-pinned (the scheduler's per-arch asm + per-OS mmap/event constants):
// runs end-to-end on a matching host (macOS + linux), ir-only on a mismatch.
#import "modules/std.sx";
sched :: #import "modules/std/sched.sx";
Log :: struct { id: [8]i64; at: [8]i64; n: i64; }
rec :: (l: *Log, id: i64, at: i64) { l.id[l.n] = id; l.at[l.n] = at; l.n = l.n + 1; }
main :: () -> i64 {
lg : Log = ---; lg.n = 0;
s := sched.Scheduler.init();
ps := @s; pl := @lg;
// The coordinator runs as a fiber so `race` has a `current` to park.
s.spawn(() => {
// Three async tasks with DIFFERENT result types and sleep durations.
a := ps.go(() -> i64 => { ps.sleep(10); rec(pl, 1, ps.now_ms()); 111 });
b := ps.go(() -> bool => { ps.sleep(20); rec(pl, 2, ps.now_ms()); true });
c := ps.go(() -> f64 => { ps.sleep(30); rec(pl, 3, ps.now_ms()); 2.5 });
// Race them. `a` (sleep 10) wins; `b` and `c` are cancelled + joined.
winner := ps.race(.(a = a, b = b, c = c));
if winner == {
case .a: (v) { print("winner: a (i64) = {}\n", v); }
case .b: (v) { print("winner: b (bool) = {}\n", v); }
case .c: (v) { print("winner: c (f64) = {}\n", v); }
}
// The losers were cancelled (flag set) and joined (worker finished).
print("loser b: canceled={} finished={}\n", b.canceled, b.finished);
print("loser c: canceled={} finished={}\n", c.canceled, c.finished);
});
s.run();
print("completion order (id @ virtual-ms):\n");
i := 0;
while i < lg.n {
print(" task {} @ {}ms\n", lg.id[i], lg.at[i]);
i = i + 1;
}
print("final virtual clock: {}ms\n", s.now_ms());
return 0;
}

View File

@@ -26,6 +26,12 @@
// epoll on linux). Runs end-to-end on a matching aarch64 host, ir-only on an // epoll on linux). Runs end-to-end on a matching aarch64 host, ir-only on an
// arch mismatch. // arch mismatch.
#import "modules/std.sx"; #import "modules/std.sx";
// `race` synthesizes its result type (a tagged-union mirroring the input tuple's
// labels) and constructs the winner variant by runtime index — both need the
// metatype WRITE side (`make_enum`/`make_variant`/`EnumVariant`/`field_type`).
// meta.sx imports only std.sx (no naked asm, no cycle back into sched), so it is
// safe to pull in here.
#import "modules/std/meta.sx";
kqb :: #import "modules/std/net/kqueue.sx"; kqb :: #import "modules/std/net/kqueue.sx";
// The fd-readiness backend is per-OS: kqueue (kqb, above) on darwin, epoll on // The fd-readiness backend is per-OS: kqueue (kqb, above) on darwin, epoll on
// linux. The epoll import is scoped to the linux branch so darwin never pulls // linux. The epoll import is scoped to the linux branch so darwin never pulls
@@ -867,6 +873,12 @@ Task :: struct ($R: Type) {
waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one
sched: *Scheduler; // owning scheduler (for park/wake in `wait`) sched: *Scheduler; // owning scheduler (for park/wake in `wait`)
canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics) canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics)
finished: i64; // set to 1 at the very END of the worker body (after the
// work ran OR was skipped on an early cancel). Distinct from
// `state == .canceled` (which `cancel` sets IMMEDIATELY, before
// the fiber has run): a JOINER (`race`) waits on `finished` so it
// knows the worker fiber actually reached its end — no loser
// outlives the `race` call.
} }
// Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber // Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber
@@ -882,6 +894,7 @@ go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) {
t.waiter = null; t.waiter = null;
t.sched = self; t.sched = self;
t.canceled = 0; t.canceled = 0;
t.finished = 0;
// Record the heap Task so `deinit` can free it (the scheduler otherwise has // Record the heap Task so `deinit` can free it (the scheduler otherwise has
// no handle on its generic Tasks). Long-lived: a Task outlives this call. // no handle on its generic Tasks). Long-lived: a Task outlives this call.
self.task_allocs.append(xx t, self.own_allocator); self.task_allocs.append(xx t, self.own_allocator);
@@ -894,7 +907,14 @@ go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) {
t.value = work(); t.value = work();
t.state = .ready; t.state = .ready;
} }
// Wake the awaiter only if one already parked (else `wait` will not park). // The worker has reached its end (ran the work, or skipped it on an early
// cancel). Mark `finished` BEFORE the wake so a joiner that checks the flag
// on resume always observes it set (a `race` JOIN distinguishes a finished
// worker from a merely-flagged-cancelled one via this).
t.finished = 1;
// Wake the awaiter only if one already parked (else `wait`/`race` will not
// park). Fires whether or not the work ran — both `wait` and a `race` join
// resume here.
if t.waiter != null { self.wake(xx t.waiter); } if t.waiter != null { self.wake(xx t.waiter); }
}); });
return t; return t;
@@ -930,3 +950,114 @@ cancel :: ufcs (t: *Task($R)) {
t.canceled = 1; t.canceled = 1;
t.state = .canceled; t.state = .canceled;
} }
// --- B2/A1: structured first-wins `race` over the M:1 Task layer -------------
//
// `race((a: ta, b: tb, …))` starts from N already-spawned `*Task(..)` handles,
// returns when the FIRST completes, and STRUCTURALLY cancels + joins the losers
// before returning — no loser fiber outlives the call. The result is a
// comptime-synthesized tagged-union (`RaceResult`) mirroring the input tuple's
// labels: each variant's NAME is the tuple label, its payload is that task's
// result type. The tuple must be NAMED (`(a: ta, b: tb)`); a positional-tuple
// form (`._0`/`._1` variants) is future work — `field_name` yields "" for an
// unnamed element, which `make_enum` rejects as a duplicate-name collision.
//
// fa := s.go(() -> A => read_a(conn)); // *Task(A)
// fb := s.go(() -> B => read_b(conn)); // *Task(B)
// winner := s.race((a: fa, b: fb)); // RaceResult = enum { a: A; b: B }
// if winner == { case .a: (v) {…} case .b: (v) {…} } // loser cancelled+joined
// Synthesize the race RESULT type for a named tuple `$T` of `*Task(..)` handles.
// `*Task(R)` projects to its result `R` via `field_type(pointee(field_type(T, i)), 0)`:
// `field_type(T, i)` = `*Task(R)`, `pointee` strips the pointer to `Task(R)`, and
// field 0 of `Task` is `value: R`. One nominal type per distinct `T` (type-fn
// identity), so the decl, every `make_variant(RaceResult(T), …)` call, and the
// `-> RaceResult(T)` return all name the SAME union. (The 0649 composition shape,
// with `Task` in place of the stand-in `Box`.)
RaceResult :: ($T: Type) -> Type {
vs : [field_count(T)]EnumVariant = ---;
inline for 0..field_count(T) (i) {
vs[i] = EnumVariant.{
name = field_name(T, i), // tuple label → variant name
payload = field_type(pointee(field_type(T, i)), 0), // *Task(R) -> Task(R) -> R
};
}
return make_enum("RaceResult", vs[0..field_count(T)]);
}
// Structured first-wins race. Suspends the calling fiber until the FIRST task is
// `.ready`, builds a `RaceResult(T)` carrying that winner's value, then CANCELS
// and JOINS every loser before returning.
//
// MUST be called from inside a fiber (there must be a `current` to park), like
// `wait`/`sleep`; a null `current` bails loudly rather than dereferencing null.
//
// COOPERATIVE-CANCEL SEMANTIC (M:1, no preemption): a loser already past its
// work's first line cannot be preempted — `cancel` sets its flag and the JOIN
// waits for the worker to reach its natural end (the value is discarded). A loser
// that had not yet started skips its work entirely (`go`'s `if t.canceled == 0`
// guard). Either way `race` returns only once every loser's worker has `finished`,
// so no loser fiber is still live past the call (structured concurrency).
race :: ufcs (self: *Scheduler, tasks: $T) -> RaceResult(T) {
cur := self.current;
if cur == null {
print("sched: race() called outside a fiber (no running fiber)\n");
abort();
}
// Phase 1 — first winner. Scan for an already-`.ready` task (lowest index
// wins on a same-tick tie → deterministic). If none, register the caller as
// the waiter on every still-`.pending` task and park. On wake DEREGISTER from
// ALL of them: a later loser completion must never wake the caller again — by
// the time it fires the caller may be running or parked on a different join,
// and a stale waiter-wake would be a spurious/lost wakeup (the queue-corruption
// hazard `wake` guards). A spurious wake with nothing ready re-registers and
// re-parks.
winner_idx : i64 = -1;
while winner_idx < 0 {
inline for 0..field_count(T) (i) {
if winner_idx < 0 and tasks[i].state == .ready { winner_idx = i; }
}
if winner_idx >= 0 { break; }
inline for 0..field_count(T) (i) {
if tasks[i].state == .pending { tasks[i].waiter = xx cur; }
}
self.suspend_self();
inline for 0..field_count(T) (i) {
if tasks[i].waiter == (xx cur) { tasks[i].waiter = null; }
}
}
// Phase 2 — build the winner variant. `tasks[i].value` carries the CONCRETE
// result type of variant `i` (comptime-cursor tuple indexing), so the matching
// unrolled arm constructs `RaceResult(T)`'s i-th variant directly — no nested
// `inline if` to recover the payload type. `i` is the comptime cursor; the
// runtime `if i == winner_idx` selects the one arm that fires.
result : RaceResult(T) = ---;
inline for 0..field_count(T) (i) {
if i == winner_idx {
result = make_variant(RaceResult(T), i, tasks[i].value);
}
}
// Phase 3 — cancel + JOIN every loser, one at a time. `cancel` sets the flag;
// the join then ensures the loser's worker fiber has `finished` (not merely
// been flagged): if it has not, register as ITS sole waiter and park until the
// worker's tail wakes us (the worker sets `finished = 1` and wakes its waiter
// whether it ran the work or skipped it on an early cancel). Checking
// `finished` BEFORE parking avoids a lost wakeup (mirrors `wait` checking
// `.ready`). Only the loser being joined has a registered waiter, so no other
// task's completion can wake us mid-join.
inline for 0..field_count(T) (i) {
if i != winner_idx {
tasks[i].cancel();
if tasks[i].finished == 0 {
tasks[i].waiter = xx cur;
self.suspend_self();
if tasks[i].waiter == (xx cur) { tasks[i].waiter = null; }
}
}
}
return result;
}