diff --git a/examples/concurrency/1821-concurrency-fiber-race.sx b/examples/concurrency/1821-concurrency-fiber-race.sx new file mode 100644 index 00000000..834e5825 --- /dev/null +++ b/examples/concurrency/1821-concurrency-fiber-race.sx @@ -0,0 +1,62 @@ +// Stream B2/A1 — structured first-wins `race` over the M:1 fiber scheduler. +// +// `s.race((a: ta, b: tb, c: tc))` takes a named tuple of already-spawned +// `*Task(..)` handles, SUSPENDS the calling fiber until the FIRST task is ready, +// and returns a comptime-SYNTHESIZED tagged-union (`RaceResult`) mirroring the +// tuple's labels — variant NAME = the tuple label, payload = that task's result +// type. Here the three tasks return DIFFERENT types (i64 / bool / f64), so the +// minted union is `enum { a: i64; b: bool; c: f64 }` and the winner is matched by +// label. After picking the winner, `race` CANCELS and JOINS every loser, so no +// loser fiber outlives the call (structured concurrency). +// +// Deterministic by virtual time (like 1817 — no real clock): the tasks sleep +// 10/20/30 ms, so `a` (shortest) wins at t=10. Cancellation is COOPERATIVE (M:1, +// no preemption): the losers were already parked mid-`sleep` when cancelled, so +// they cannot be preempted — `race` joins them, letting each run to its natural +// end (its value discarded) before returning. The completion log therefore shows +// all three finishing (a@10 winner, b@20, c@30 joined) and the final virtual +// clock is 30. Each loser's `canceled` flag is set and its worker `finished`. +// +// aarch64-pinned (the scheduler's per-arch asm + per-OS mmap/event constants): +// runs end-to-end on a matching host (macOS + linux), ir-only on a mismatch. +#import "modules/std.sx"; +sched :: #import "modules/std/sched.sx"; + +Log :: struct { id: [8]i64; at: [8]i64; n: i64; } +rec :: (l: *Log, id: i64, at: i64) { l.id[l.n] = id; l.at[l.n] = at; l.n = l.n + 1; } + +main :: () -> i64 { + lg : Log = ---; lg.n = 0; + s := sched.Scheduler.init(); + ps := @s; pl := @lg; + + // The coordinator runs as a fiber so `race` has a `current` to park. + s.spawn(() => { + // Three async tasks with DIFFERENT result types and sleep durations. + a := ps.go(() -> i64 => { ps.sleep(10); rec(pl, 1, ps.now_ms()); 111 }); + b := ps.go(() -> bool => { ps.sleep(20); rec(pl, 2, ps.now_ms()); true }); + c := ps.go(() -> f64 => { ps.sleep(30); rec(pl, 3, ps.now_ms()); 2.5 }); + + // Race them. `a` (sleep 10) wins; `b` and `c` are cancelled + joined. + winner := ps.race(.(a = a, b = b, c = c)); + if winner == { + case .a: (v) { print("winner: a (i64) = {}\n", v); } + case .b: (v) { print("winner: b (bool) = {}\n", v); } + case .c: (v) { print("winner: c (f64) = {}\n", v); } + } + + // The losers were cancelled (flag set) and joined (worker finished). + print("loser b: canceled={} finished={}\n", b.canceled, b.finished); + print("loser c: canceled={} finished={}\n", c.canceled, c.finished); + }); + s.run(); + + print("completion order (id @ virtual-ms):\n"); + i := 0; + while i < lg.n { + print(" task {} @ {}ms\n", lg.id[i], lg.at[i]); + i = i + 1; + } + print("final virtual clock: {}ms\n", s.now_ms()); + return 0; +} diff --git a/library/modules/std/sched.sx b/library/modules/std/sched.sx index 4426b139..886a5c02 100644 --- a/library/modules/std/sched.sx +++ b/library/modules/std/sched.sx @@ -26,6 +26,12 @@ // epoll on linux). Runs end-to-end on a matching aarch64 host, ir-only on an // arch mismatch. #import "modules/std.sx"; +// `race` synthesizes its result type (a tagged-union mirroring the input tuple's +// labels) and constructs the winner variant by runtime index — both need the +// metatype WRITE side (`make_enum`/`make_variant`/`EnumVariant`/`field_type`). +// meta.sx imports only std.sx (no naked asm, no cycle back into sched), so it is +// safe to pull in here. +#import "modules/std/meta.sx"; kqb :: #import "modules/std/net/kqueue.sx"; // The fd-readiness backend is per-OS: kqueue (kqb, above) on darwin, epoll on // linux. The epoll import is scoped to the linux branch so darwin never pulls @@ -867,6 +873,12 @@ Task :: struct ($R: Type) { waiter: *void = null; // the single parked awaiter (opaque *Fiber); M:1 → at most one sched: *Scheduler; // owning scheduler (for park/wake in `wait`) canceled: i64; // cooperative cancel flag (M:1: no preemption → no atomics) + finished: i64; // set to 1 at the very END of the worker body (after the + // work ran OR was skipped on an early cancel). Distinct from + // `state == .canceled` (which `cancel` sets IMMEDIATELY, before + // the fiber has run): a JOINER (`race`) waits on `finished` so it + // knows the worker fiber actually reached its end — no loser + // outlives the `race` call. } // Spawn `work` as a fiber; return a heap `*Task` that completes when the fiber @@ -882,6 +894,7 @@ go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) { t.waiter = null; t.sched = self; t.canceled = 0; + t.finished = 0; // Record the heap Task so `deinit` can free it (the scheduler otherwise has // no handle on its generic Tasks). Long-lived: a Task outlives this call. self.task_allocs.append(xx t, self.own_allocator); @@ -894,7 +907,14 @@ go :: ufcs (self: *Scheduler, work: Closure() -> $R) -> *Task($R) { t.value = work(); t.state = .ready; } - // Wake the awaiter only if one already parked (else `wait` will not park). + // The worker has reached its end (ran the work, or skipped it on an early + // cancel). Mark `finished` BEFORE the wake so a joiner that checks the flag + // on resume always observes it set (a `race` JOIN distinguishes a finished + // worker from a merely-flagged-cancelled one via this). + t.finished = 1; + // Wake the awaiter only if one already parked (else `wait`/`race` will not + // park). Fires whether or not the work ran — both `wait` and a `race` join + // resume here. if t.waiter != null { self.wake(xx t.waiter); } }); return t; @@ -930,3 +950,114 @@ cancel :: ufcs (t: *Task($R)) { t.canceled = 1; t.state = .canceled; } + +// --- B2/A1: structured first-wins `race` over the M:1 Task layer ------------- +// +// `race((a: ta, b: tb, …))` starts from N already-spawned `*Task(..)` handles, +// returns when the FIRST completes, and STRUCTURALLY cancels + joins the losers +// before returning — no loser fiber outlives the call. The result is a +// comptime-synthesized tagged-union (`RaceResult`) mirroring the input tuple's +// labels: each variant's NAME is the tuple label, its payload is that task's +// result type. The tuple must be NAMED (`(a: ta, b: tb)`); a positional-tuple +// form (`._0`/`._1` variants) is future work — `field_name` yields "" for an +// unnamed element, which `make_enum` rejects as a duplicate-name collision. +// +// fa := s.go(() -> A => read_a(conn)); // *Task(A) +// fb := s.go(() -> B => read_b(conn)); // *Task(B) +// winner := s.race((a: fa, b: fb)); // RaceResult = enum { a: A; b: B } +// if winner == { case .a: (v) {…} case .b: (v) {…} } // loser cancelled+joined + +// Synthesize the race RESULT type for a named tuple `$T` of `*Task(..)` handles. +// `*Task(R)` projects to its result `R` via `field_type(pointee(field_type(T, i)), 0)`: +// `field_type(T, i)` = `*Task(R)`, `pointee` strips the pointer to `Task(R)`, and +// field 0 of `Task` is `value: R`. One nominal type per distinct `T` (type-fn +// identity), so the decl, every `make_variant(RaceResult(T), …)` call, and the +// `-> RaceResult(T)` return all name the SAME union. (The 0649 composition shape, +// with `Task` in place of the stand-in `Box`.) +RaceResult :: ($T: Type) -> Type { + vs : [field_count(T)]EnumVariant = ---; + inline for 0..field_count(T) (i) { + vs[i] = EnumVariant.{ + name = field_name(T, i), // tuple label → variant name + payload = field_type(pointee(field_type(T, i)), 0), // *Task(R) -> Task(R) -> R + }; + } + return make_enum("RaceResult", vs[0..field_count(T)]); +} + +// Structured first-wins race. Suspends the calling fiber until the FIRST task is +// `.ready`, builds a `RaceResult(T)` carrying that winner's value, then CANCELS +// and JOINS every loser before returning. +// +// MUST be called from inside a fiber (there must be a `current` to park), like +// `wait`/`sleep`; a null `current` bails loudly rather than dereferencing null. +// +// COOPERATIVE-CANCEL SEMANTIC (M:1, no preemption): a loser already past its +// work's first line cannot be preempted — `cancel` sets its flag and the JOIN +// waits for the worker to reach its natural end (the value is discarded). A loser +// that had not yet started skips its work entirely (`go`'s `if t.canceled == 0` +// guard). Either way `race` returns only once every loser's worker has `finished`, +// so no loser fiber is still live past the call (structured concurrency). +race :: ufcs (self: *Scheduler, tasks: $T) -> RaceResult(T) { + cur := self.current; + if cur == null { + print("sched: race() called outside a fiber (no running fiber)\n"); + abort(); + } + + // Phase 1 — first winner. Scan for an already-`.ready` task (lowest index + // wins on a same-tick tie → deterministic). If none, register the caller as + // the waiter on every still-`.pending` task and park. On wake DEREGISTER from + // ALL of them: a later loser completion must never wake the caller again — by + // the time it fires the caller may be running or parked on a different join, + // and a stale waiter-wake would be a spurious/lost wakeup (the queue-corruption + // hazard `wake` guards). A spurious wake with nothing ready re-registers and + // re-parks. + winner_idx : i64 = -1; + while winner_idx < 0 { + inline for 0..field_count(T) (i) { + if winner_idx < 0 and tasks[i].state == .ready { winner_idx = i; } + } + if winner_idx >= 0 { break; } + inline for 0..field_count(T) (i) { + if tasks[i].state == .pending { tasks[i].waiter = xx cur; } + } + self.suspend_self(); + inline for 0..field_count(T) (i) { + if tasks[i].waiter == (xx cur) { tasks[i].waiter = null; } + } + } + + // Phase 2 — build the winner variant. `tasks[i].value` carries the CONCRETE + // result type of variant `i` (comptime-cursor tuple indexing), so the matching + // unrolled arm constructs `RaceResult(T)`'s i-th variant directly — no nested + // `inline if` to recover the payload type. `i` is the comptime cursor; the + // runtime `if i == winner_idx` selects the one arm that fires. + result : RaceResult(T) = ---; + inline for 0..field_count(T) (i) { + if i == winner_idx { + result = make_variant(RaceResult(T), i, tasks[i].value); + } + } + + // Phase 3 — cancel + JOIN every loser, one at a time. `cancel` sets the flag; + // the join then ensures the loser's worker fiber has `finished` (not merely + // been flagged): if it has not, register as ITS sole waiter and park until the + // worker's tail wakes us (the worker sets `finished = 1` and wakes its waiter + // whether it ran the work or skipped it on an early cancel). Checking + // `finished` BEFORE parking avoids a lost wakeup (mirrors `wait` checking + // `.ready`). Only the loser being joined has a registered waiter, so no other + // task's completion can wake us mid-join. + inline for 0..field_count(T) (i) { + if i != winner_idx { + tasks[i].cancel(); + if tasks[i].finished == 0 { + tasks[i].waiter = xx cur; + self.suspend_self(); + if tasks[i].waiter == (xx cur) { tasks[i].waiter = null; } + } + } + } + + return result; +}