// std.io — the `Io` capability's default impl + the async ergonomic layer. // // `Io` itself (the protocol) lives in std/core.sx next to `Allocator`, so // the compiler-coupled `Context` field + the `__sx_default_context` // materializers can reference it. This file carries the parts that are // pure library sx: the stateless blocking impl (`CBlockingIo`, the mirror // of `CAllocator`) + the generic free-fns layered over the protocol // (`async` / `await` / `cancel` + the `Future($R)` type). // // Consumers reach these through std.sx (`Future` / `async` / `await` / // `cancel` / `CBlockingIo` re-exports), never by importing this file // directly. // // BLOCKING SEMANTICS (B1.2): the M:1 default has no scheduler and no // suspension. `async(worker, ..args)` runs the worker to COMPLETION // inline, so the returned `Future` is born `.ready` and `await` yields // immediately. `spawn_raw`/`suspend_raw`/`ready`/`poll`/`arm_timer` are // trivial no-ops/0 — they exist for the fiber scheduler [B1.3+]. // `now_ms` returns a real monotonic clock. Fully deterministic/testable. // // Worker form (B1.2): a `Closure(..$args) -> $R` whose params are // annotated at the call site (a lambda `(a: i64) -> i64 => ...`). // Named-fn workers need a `::` callable-parameter language feature that // does not exist yet and are DEFERRED. #import "modules/std/core.sx"; #import "modules/std/atomic.sx"; time :: #import "modules/std/time.sx"; // Loud-bail for the one-awaiter-per-future invariant (mirrors sched.sx). io_abort :: () -> noreturn extern libc "abort"; // --- IoErr: the error channel async rides (cancellation = model (a)) --- // // A canceled future raises `.Canceled` out of `await`; a failed task // raises `.Failed`. The `(T, !IoErr)` value-failable shape is the same // one the rest of the stdlib uses (see examples/1011-, 1012-). IoErr :: error { Canceled, Failed } // --- CBlockingIo: stateless Io that runs tasks synchronously --- // // Zero-sized struct (mirror of CAllocator). Used as the default // `context.io` at program start (see `__sx_default_context` in codegen). // The thunks never dereference `self`, so the protocol value's ctx field // is `null` — which is what keeps the static-constant default context an // inline vtable with a null receiver. CBlockingIo :: struct {} impl Io for CBlockingIo { // No fiber bootstrap in the blocking model: the generic `async` // free-fn calls the worker directly and fills the Future. `spawn_raw` // is here for the protocol shape the scheduler [B1.3] will use; the // blocking impl never routes through it, so it is a no-op handle. spawn_raw :: (self: *CBlockingIo, entry: *void, arg: *void, opts: SpawnOpts) -> *void { // The blocking model has no scheduler: run the worker thunk INLINE to // completion right here, so the `async` free-fn's Future is born `.ready`. // (A suspending impl — the fiber scheduler — instead defers `entry` onto a // fiber.) Same `(*void)->void` erased-thunk contract `spawn_raw` mandates. entry_fn : (*void) -> void = xx entry; entry_fn(arg); return null; } // Blocking never suspends — a suspend at the bottom of the M:1 stack // would deadlock. No-op (returns success). The `!` is part of the // protocol contract (a suspending impl raises `.Canceled` out here), // so the conforming blocking impl keeps it even though it never raises. suspend_raw :: (self: *CBlockingIo, park: *ParkToken) -> ! { return; } ready :: (self: *CBlockingIo, park: ParkToken) {} poll :: (self: *CBlockingIo, deadline_ms: i64) -> i64 { return 0; } now_ms :: (self: *CBlockingIo) -> i64 { return time.mono_ms(); } arm_timer :: (self: *CBlockingIo, deadline_ms: i64, park: ParkToken) -> *void { return null; } } // --- Future($R): the handle to an async task's eventual result --- // // Fixed-shape product (NOT the metatype sum machinery). `Value :: $R` // exposes the projection `Future(X) → X`. B1.2 supports NON-void `$R` // only — `Future(void)` (a `void` struct field) SIGTRAPs the compiler // (issue 0150, deferred to B1.4 along with `timeout`). FutureState :: enum { pending; ready; failed; canceled; } Future :: struct ($R: Type) { Value :: R; value: R; state: FutureState = .pending; err: IoErr; park: ParkToken; task: *void = null; // Cancellation flag — atomic so a future scheduler thread can flip it. // In the blocking model there is no concurrency, but the type is the // one the M:N model [later] needs. canceled: Atomic(bool); } // --- The async ergonomic layer (generic free-fns over the protocol) --- // // COLORBLIND over the `Io` impl: `async` always submits the worker through // `io.spawn_raw`, so the SAME code runs the worker inline under `CBlockingIo` // (Future born `.ready`) or as a real fiber under the scheduler (Future born // `.pending`, completed later — `await` suspends until then). The only protocol- // level value `spawn_raw` accepts is a raw `(*void)->void` entry + a `*void` // arg, so the generic worker is bridged via a MONOMORPHIC boxed-closure thunk // (`sx_run_boxed_closure`): all the generic-ness lives in the closure's env, and // the thunk is one fixed `Closure()->void` invoker — no per-instantiation entry. // The one fixed entry `spawn_raw` ever calls: cast the arg back to the heap-boxed // completion closure and run it. Monomorphic (over `Closure()->void`), so a // single top-level symbol serves every `async($R)` instantiation. // The heap box the bridge carries: a struct holding the nullary completion // closure. A struct field is the one position a `Closure() -> void` type parses // in (a bare alias / `size_of(Closure()->void)` trips the parser), and it gives // the bridge a concrete `*ThunkBox` to `size_of`/cast/call through. ThunkBox :: struct { run: Closure() -> void; } sx_run_boxed_closure :: (arg: *void) { b : *ThunkBox = xx arg; b.run(); } // `async(io, worker)` — submit a NULLARY `worker: Closure() -> $R` and get a // `*Future($R)` handle. The worker must be nullary because under the fiber impl // the body crosses a fiber boundary, and a captured variadic pack segfaults there // (issue 0156 Part 2) — so any inputs are captured at the CALL SITE in the lambda // (`context.io.async(() -> i64 => compute(a, b))`), exactly like `sched.go`. // // The Future (and the completion-closure `ThunkBox`) are HEAP-allocated (not // returned by value): under the fiber impl the worker fills the Future AFTER // `async` returns, so the awaiter and the worker must share one stable object. // Like `sched.go`'s Task, they currently leak (bounded by the async count; // invisible under the default GPA). Freeing them needs join-point ownership — // deferred. // // ALLOCATOR-LIFETIME CONTRACT: both are allocated from the `context.allocator` // in force at the `async` CALL, and that allocator MUST outlive the future — // i.e. survive until the worker has run and the result is consumed. This is the // long-lived-container rule (CLAUDE.md): calling `async` inside a transient // `push Context { allocator = arena }` that is torn down before `run()`/`await` // drives the worker frees the Future while it is still live (use-after-free). // The common case (the program-stable default GPA, or a scheduler set up under a // long-lived allocator) is safe. A deeper fix — `async` capturing the scheduler's // own long-lived allocator the way `sched.go` does — needs a protocol affordance // to reach it and is deferred to the convergence phase. async :: ufcs (io: Io, worker: Closure() -> ($R, !)) -> *Future($R) { raw := context.allocator.alloc_bytes(size_of(Future($R))); f : *Future($R) = xx raw; f.state = .pending; f.park = .{ handle = null }; f.canceled = Atomic(bool).init(false); // The completion closure: run the worker, publish the result, wake any parked // awaiter. Heap-boxed so it survives until the worker actually runs (deferred // under the fiber impl). It captures `f` + `worker`; nothing variadic crosses. // // Phase 3 (true cancellation): the worker is FAILABLE (`Closure() -> ($R, !)`). // A suspend that delivers cancellation (`suspend_raw` raising `Canceled` on a // cancelled worker), or any genuine `raise`, unwinds the worker's body right // here — so its post-suspend side effects never run. On success publish the // value and mark `.ready`; on error mark `.canceled` when `cancel` set the // flag, else `.failed`. Either way wake any parked awaiter. Under `CBlockingIo` // `suspend_raw` is a no-op, so the worker never raises Canceled inline — it // runs to completion (a post-hoc `cancel` still makes `await` raise via the // sticky `f.canceled`, the 1806 contract). braw := context.allocator.alloc_bytes(size_of(ThunkBox)); b : *ThunkBox = xx braw; b.run = () => { f.value = worker() catch { if f.canceled.load(.acquire) { f.state = .canceled; } else { f.state = .failed; } context.io.ready(f.park); return; }; f.state = .ready; context.io.ready(f.park); // no-op if no awaiter parked yet }; // Pass the cancel-flag back-ref so the worker fiber's `suspend_raw` can consult // it (Phase 3). `xx @f.canceled` erases the `*Atomic(bool)` to `*void`. f.task = io.spawn_raw(xx sx_run_boxed_closure, xx b, .{ cancel_flag = xx @f.canceled }); return f; } // `await(f)` — value-carrying failable. Suspends the caller until `f` completes // (no-op under the blocking impl, where it is already `.ready`), then `.ready` → // the result; `.failed`/`.canceled` → raise. Under the fiber impl the caller is a // fiber; `suspend_raw` records it into `f.park` so the worker's `ready(f.park)` // resumes it. Re-checks state after the wake (the worker set `.ready` before // waking). A worker that finished BEFORE `await` leaves `.ready`, so no park, no // lost wakeup. await :: ufcs (f: *Future($R)) -> ($R, !IoErr) { if f.canceled.load(.acquire) { raise error.Canceled; } if f.state == .pending { // ONE awaiter per future (M:1): the single `park` slot records one parked // fiber, so a second concurrent `await` on the same pending future would // OVERWRITE the first awaiter's handle and orphan it forever (the worker's // single `ready(f.park)` wakes only the last). Enforce loudly here, exactly // as `sched.Task.wait` does — a non-null handle on a still-pending future // means another fiber is already parked on it. (Fan-in over many futures — // `race` — registers ONE awaiter across SEPARATE futures, so it is fine.) if f.park.handle != null { out("io: await — future already has an awaiter (one awaiter per future in the M:1 model)\n"); io_abort(); } context.io.suspend_raw(@f.park) catch {}; // Phase 3 propagates Canceled } if f.canceled.load(.acquire) { raise error.Canceled; } if f.state == .canceled { raise error.Canceled; } if f.state == .failed { raise error.Failed; } return f.value; } // `cancel(f)` — request cancellation (model (a) — cancel rides the `!` channel). // Sets the sticky per-future cancel flag + marks `.canceled` (so a subsequent // `await` raises `.Canceled`), then WAKES the worker fiber so it delivers the // cancellation at its current/next suspend. // // Phase 3 (TRUE cancellation): `ready(.{ handle = f.task })` re-readies the worker // fiber parked under the fiber impl. On resume its `suspend_raw` sees the flag and // raises `Canceled`, so the worker ABANDONS its body — post-suspend side effects // never run. The sticky `canceled` atomic is the source of truth (`await` keeps // raising regardless of the state field). `wake` is guarded on `.suspended`, so a // `ready` of a not-yet-parked worker is a safe no-op (its first `suspend_raw`'s // pre-park check then delivers the cancel without parking). Under `CBlockingIo` // `f.task` is null and `ready` is a no-op — the worker already ran inline, and the // sticky flag still makes `await` raise (the 1806 contract, unchanged). cancel :: ufcs (f: *Future($R)) { // Wake the worker fiber ONLY while the task is still in flight (`.pending`). // Once it has completed (`.ready`/`.failed`) or was already cancelled, its // fiber may have been REAPED (the run loop `munmap`s + frees a `.done` // fiber), so `f.task` would dangle — `ready` on it is a use-after-free. The // sticky `canceled` flag still makes a subsequent `await` raise in those // cases (the 1806 model-(a) contract), so no wake is needed there. A // not-yet-run worker is `.pending` with a live (queued) fiber; `ready` is a // safe no-op on it (its first `suspend_raw` pre-park check then delivers). was_pending := f.state == .pending; f.canceled.store(true, .release); f.state = .canceled; if was_pending { context.io.ready(.{ handle = f.task }); } } // `sleep(io, ms)` — a FAILABLE suspend for `ms` virtual milliseconds. Arms a // timer at `now_ms() + ms` and parks via `suspend_raw`; the fired timer // re-readies the fiber, and on resume `suspend_raw` raises `Canceled` if the task // was cancelled while sleeping (Phase 3). So `try io.sleep(..)` inside an `async` // worker is a cancellation point: a `cancel` lands the worker's body unwinding // here instead of running past the sleep. No-op under `CBlockingIo` (its // `arm_timer`/`suspend_raw` are stubs — the blocking model has no scheduler to // advance a virtual clock). sleep :: ufcs (io: Io, ms: i64) -> ! { pk : ParkToken = .{ handle = null }; io.arm_timer(io.now_ms() + ms, pk); try io.suspend_raw(@pk); }