feat: race over Futures via context.io.race (PLAN-IO-UNIFY Phase 4)

Re-home the proven first-wins race from sched.race(*Task) onto *Future handles
+ the Io protocol; the old Task-based race is REPLACED (ufcs overload-by-receiver
is rejected, and only 1821 used it).

- Protocol: add Io.current_park() -> ParkToken — the running fiber as a token,
  captured WITHOUT parking — so race can register the SAME coordinator across N
  futures' park slots, then park once via suspend_raw; any completion readies it.
  Scheduler returns {self.current} (bails outside a fiber); CBlockingIo returns
  {null} (race never parks there — futures are born .ready).
- race :: ufcs (io: Io, futures: $T) -> RaceResult(T), kept in sched.sx (it needs
  meta.sx's make_enum/make_variant; pulling that into the io.sx prelude part-file
  would cycle). Winner scan -> register/park/deregister -> make_variant the winner
  -> Phase-3 cancel each still-.pending loser (no join). RaceResult reused
  unchanged (*Future(R) projects field 0 'value' -> R).
- TRUE-cancel: parked losers stop at their next suspend (timers evicted by cancel's
  wake), so race returns at WINNER-time, not slowest-loser-time.
- Adversarial review fixes: (1) an all-failing/all-cancelling racer set no longer
  deadlock-aborts the scheduler — race bails loudly ('all futures settled without
  a winner') when nothing is .ready and nothing is still .pending; (2) only
  .pending losers are cancelled, so a loser that already .failed keeps its real
  outcome label instead of being stomped to .canceled.

Re-point 1821 to context.io.async + context.io.race (winner a=111, losers
.canceled, completion log only 'task 1 @ 10ms', final clock 10ms — was 30 under
the old cooperative join). New 1826 locks the failing-loser case. Byte-identical
on aarch64-macOS + aarch64-linux. Suite 853/0; .ir churn is the current_park
vtable method.
This commit is contained in:
agra
2026-06-28 09:50:10 +03:00
parent 8bacb2b01c
commit 97b0abef66
51 changed files with 58153 additions and 57125 deletions

View File

@@ -765,6 +765,20 @@ impl Io for Scheduler {
self.timers.append(t, self.own_allocator);
return null;
}
// The running fiber as a `ParkToken`, for fan-in registration (`race`): the
// caller stamps this handle into several futures' `park` slots so ANY of
// their completions `ready`s it, then parks once via `suspend_raw`. MUST be
// called from inside a fiber (race parks `current`); a null current would
// register a `null` waiter no completion can wake — bail loudly, mirroring
// `suspend_self` / `sleep` / `arm_timer`.
current_park :: (self: *Scheduler) -> ParkToken {
if self.current == null {
print("sched: current_park() called outside a fiber (no running fiber)\n");
abort();
}
return .{ handle = xx self.current };
}
}
// --- the context switch (naked) + first-entry trampoline -------------------
@@ -1154,112 +1168,120 @@ cancel :: ufcs (t: *Task($R)) {
t.state = .canceled;
}
// --- B2/A1: structured first-wins `race` over the M:1 Task layer -------------
// --- B2/A1: structured first-wins `race` over `context.io` Futures -----------
//
// `race((a: ta, b: tb, …))` starts from N already-spawned `*Task(..)` handles,
// returns when the FIRST completes, and STRUCTURALLY cancels + joins the losers
// before returning — no loser fiber outlives the call. The result is a
// comptime-synthesized tagged-union (`RaceResult`) mirroring the input tuple's
// labels: each variant's NAME is the tuple label, its payload is that task's
// result type. The tuple must be NAMED (`(a: ta, b: tb)`); a positional-tuple
// form (`._0`/`._1` variants) is future work — `field_name` yields "" for an
// unnamed element, which `make_enum` rejects as a duplicate-name collision.
// `context.io.race((a: fa, b: fb, …))` starts from N already-spawned `*Future(..)`
// handles (from `context.io.async`), returns when the FIRST is `.ready`, and
// CANCELS every loser before returning — with Phase-3 TRUE cancellation each loser
// stops at its next suspend, so `race` returns at WINNER-time, not slowest-loser-
// time. The result is a comptime-synthesized tagged-union (`RaceResult`) mirroring
// the input tuple's labels: each variant's NAME is the tuple label, its payload is
// that future's result type. The tuple must be NAMED (`(a: fa, b: fb)`); a
// positional-tuple form (`._0`/`._1`) is future work — `field_name` yields "" for
// an unnamed element, which `make_enum` rejects as a duplicate-name collision.
//
// fa := s.go(() -> A => read_a(conn)); // *Task(A)
// fb := s.go(() -> B => read_b(conn)); // *Task(B)
// winner := s.race((a: fa, b: fb)); // RaceResult = enum { a: A; b: B }
// if winner == { case .a: (v) {…} case .b: (v) {…} } // loser cancelled+joined
// It runs over the `Io` PROTOCOL (`current_park`/`suspend_raw`/`ready`), so it is
// colorblind: under the fiber scheduler it really parks/wakes; under the blocking
// `CBlockingIo` every future is born `.ready`, so the winner scan returns
// immediately and it never parks. Lives here (not io.sx) because the `RaceResult`
// synthesis needs the metatype WRITE side (`make_enum`/`make_variant`), and
// meta.sx imports only std.sx — pulling it into the io.sx prelude part-file would
// cycle.
//
// fa := context.io.async(() -> (A, !) => read_a(conn)); // *Future(A)
// fb := context.io.async(() -> (B, !) => read_b(conn)); // *Future(B)
// winner := context.io.race(.(a = fa, b = fb)); // enum { a: A; b: B }
// if winner == { case .a: (v) {…} case .b: (v) {…} } // losers cancelled
// Synthesize the race RESULT type for a named tuple `$T` of `*Task(..)` handles.
// `*Task(R)` projects to its result `R` via `field_type(pointee(field_type(T, i)), 0)`:
// `field_type(T, i)` = `*Task(R)`, `pointee` strips the pointer to `Task(R)`, and
// field 0 of `Task` is `value: R`. One nominal type per distinct `T` (type-fn
// identity), so the decl, every `make_variant(RaceResult(T), …)` call, and the
// `-> RaceResult(T)` return all name the SAME union. (The 0649 composition shape,
// with `Task` in place of the stand-in `Box`.)
// Synthesize the race RESULT type for a named tuple `$T` of `*Future(..)` handles.
// `*Future(R)` projects to its result `R` via `field_type(pointee(field_type(T, i)), 0)`:
// `field_type(T, i)` = `*Future(R)`, `pointee` strips the pointer to `Future(R)`,
// and field 0 of `Future` is `value: R` (the `Value :: R` type member is not a
// data field). One nominal type per distinct `T` (type-fn identity), so the decl,
// every `make_variant(RaceResult(T), …)` call, and the `-> RaceResult(T)` return
// all name the SAME union.
RaceResult :: ($T: Type) -> Type {
vs : [field_count(T)]EnumVariant = ---;
inline for 0..field_count(T) (i) {
vs[i] = EnumVariant.{
name = field_name(T, i), // tuple label → variant name
payload = field_type(pointee(field_type(T, i)), 0), // *Task(R) -> Task(R) -> R
payload = field_type(pointee(field_type(T, i)), 0), // *Future(R) -> Future(R) -> R
};
}
return make_enum("RaceResult", vs[0..field_count(T)]);
}
// Structured first-wins race. Suspends the calling fiber until the FIRST task is
// `.ready`, builds a `RaceResult(T)` carrying that winner's value, then CANCELS
// and JOINS every loser before returning.
// Structured first-wins race over the `Io` protocol. Suspends the calling fiber
// until the FIRST future is `.ready`, builds a `RaceResult(T)` carrying that
// winner's value, then CANCELS every loser and returns immediately.
//
// MUST be called from inside a fiber (there must be a `current` to park), like
// `wait`/`sleep`; a null `current` bails loudly rather than dereferencing null.
// MUST be called from inside a fiber under a suspending `Io` (there must be a
// `current` to park) — `current_park` bails loudly on a null current. Under
// `CBlockingIo` the futures are already `.ready`, so the winner scan returns
// without ever calling `current_park`/`suspend_raw`.
//
// COOPERATIVE-CANCEL SEMANTIC (M:1, no preemption): a loser already past its
// work's first line cannot be preempted — `cancel` sets its flag and the JOIN
// waits for the worker to reach its natural end (the value is discarded). A loser
// that had not yet started skips its work entirely (`go`'s `if t.canceled == 0`
// guard). Either way `race` returns only once every loser's worker has `finished`,
// so no loser fiber is still live past the call (structured concurrency).
race :: ufcs (self: *Scheduler, tasks: $T) -> RaceResult(T) {
cur := self.current;
if cur == null {
print("sched: race() called outside a fiber (no running fiber)\n");
abort();
}
// Phase 1 — first winner. Scan for an already-`.ready` task (lowest index
// wins on a same-tick tie → deterministic). If none, register the caller as
// the waiter on every still-`.pending` task and park. On wake DEREGISTER from
// ALL of them: a later loser completion must never wake the caller again — by
// the time it fires the caller may be running or parked on a different join,
// and a stale waiter-wake would be a spurious/lost wakeup (the queue-corruption
// hazard `wake` guards). A spurious wake with nothing ready re-registers and
// re-parks.
// TRUE-CANCEL SEMANTIC (Phase 3): each loser was parked mid-suspend when cancelled;
// `cancel(f)` flips its sticky flag and wakes its worker fiber, whose next
// `suspend_raw` raises `Canceled` and unwinds the body — its post-suspend work
// never runs. `race` does NOT join the losers (they unwind on their own next turn),
// so it returns at winner-time. The winner's value is taken from `f.value`.
race :: ufcs (io: Io, futures: $T) -> RaceResult(T) {
// Phase 1 — first winner. Scan for an already-`.ready` future (lowest index
// wins on a same-tick tie → deterministic). If none, register THIS coordinator
// (`current_park`) as the waiter on every still-`.pending` future's `park`
// slot and park once via `suspend_raw`; any completion `ready`s us. On wake
// DEREGISTER from ALL of them (clear our handle): a later loser completion must
// never `ready` a coordinator that has since moved on (the spurious/lost-wakeup
// hazard `wake` guards). A spurious wake with nothing ready re-registers + re-parks.
winner_idx : i64 = -1;
while winner_idx < 0 {
inline for 0..field_count(T) (i) {
if winner_idx < 0 and tasks[i].state == .ready { winner_idx = i; }
if winner_idx < 0 and futures[i].state == .ready { winner_idx = i; }
}
if winner_idx >= 0 { break; }
me := io.current_park();
any_pending := false;
inline for 0..field_count(T) (i) {
if tasks[i].state == .pending { tasks[i].waiter = xx cur; }
if futures[i].state == .pending { futures[i].park.handle = me.handle; any_pending = true; }
}
self.suspend_self();
// No `.ready` winner and nothing still `.pending` → every racer settled
// `.failed`/`.canceled` with no success. `race` is first-SUCCESS-wins and
// its `RaceResult` carries only success values, so there is no winner to
// return. Parking here would deadlock (no future can ever `ready` us);
// bail loudly with a specific message instead. (A recoverable all-fail —
// a failable `race` that raises — is a deliberate future refinement.)
if !any_pending {
print("sched: race — all futures settled without a winner (all failed/canceled)\n");
abort();
}
// The coordinator is the user's fiber (no cancel flag), so `suspend_raw`
// never raises here; the `catch {}` just discards the `!` for the type.
pk : ParkToken = .{ handle = null };
io.suspend_raw(@pk) catch {};
inline for 0..field_count(T) (i) {
if tasks[i].waiter == (xx cur) { tasks[i].waiter = null; }
if futures[i].park.handle == me.handle { futures[i].park.handle = null; }
}
}
// Phase 2 — build the winner variant. `tasks[i].value` carries the CONCRETE
// Phase 2 — build the winner variant. `futures[i].value` carries the CONCRETE
// result type of variant `i` (comptime-cursor tuple indexing), so the matching
// unrolled arm constructs `RaceResult(T)`'s i-th variant directly — no nested
// `inline if` to recover the payload type. `i` is the comptime cursor; the
// runtime `if i == winner_idx` selects the one arm that fires.
// unrolled arm constructs `RaceResult(T)`'s i-th variant directly. `i` is the
// comptime cursor; the runtime `if i == winner_idx` selects the one arm.
result : RaceResult(T) = ---;
inline for 0..field_count(T) (i) {
if i == winner_idx {
result = make_variant(RaceResult(T), i, tasks[i].value);
result = make_variant(RaceResult(T), i, futures[i].value);
}
}
// Phase 3 — cancel + JOIN every loser, one at a time. `cancel` sets the flag;
// the join then ensures the loser's worker fiber has `finished` (not merely
// been flagged): if it has not, register as ITS sole waiter and park until the
// worker's tail wakes us (the worker sets `finished = 1` and wakes its waiter
// whether it ran the work or skipped it on an early cancel). Checking
// `finished` BEFORE parking avoids a lost wakeup (mirrors `wait` checking
// `.ready`). Only the loser being joined has a registered waiter, so no other
// task's completion can wake us mid-join.
// Phase 3 — cancel every still-IN-FLIGHT loser. With true cancellation a
// parked loser's next `suspend_raw` raises `Canceled` and unwinds its body;
// its `park` was cleared above, so its completion `ready`s nobody. No join —
// `race` returns now. Only `.pending` losers are cancelled: a loser that
// already settled (`.ready`/`.failed`) is done — cancelling it would do
// nothing useful and would stomp its real outcome label to `.canceled`.
inline for 0..field_count(T) (i) {
if i != winner_idx {
tasks[i].cancel();
if tasks[i].finished == 0 {
tasks[i].waiter = xx cur;
self.suspend_self();
if tasks[i].waiter == (xx cur) { tasks[i].waiter = null; }
}
}
if i != winner_idx and futures[i].state == .pending { futures[i].cancel(); }
}
return result;