fibers B1.2: Io capability + context.io + blocking impl + Future/async/await/cancel
Threads an `Io` capability onto `Context` exactly like `Allocator`: a
`protocol #inline` whose process-wide default is a stateless `CBlockingIo`
(the mirror of `CAllocator`), installed in `__sx_default_context`.
Library (library/modules/std):
- core.sx: `Io` protocol (spawn_raw / suspend_raw / ready / poll / now_ms /
arm_timer) + `SpawnOpts` / `PinTarget` / `ParkToken`; `Context` gains an
`io: Io` field LAST (allocator stays index 0, data stays index 1).
- io.sx (new): `CBlockingIo` + `impl Io` (blocking M:1 semantics — now_ms is
a real monotonic clock, the rest are no-ops/0; suspend never called);
`Future($R)` { value; state: FutureState; err: IoErr; park; task; canceled:
Atomic(bool) } with `Value :: R`; the async ergonomic layer
`async` / `async_void` / `await` (value-carrying `(R, !IoErr)`) / `cancel`.
Built with the verified `= ---` + field-assign + `Closure(..$args) -> $R` +
`..$args` idiom (NON-void $R only — Future(void) is deferred per issue 0150).
- std.sx: re-export the Io surface + the io.sx tail.
Compiler (src/ir):
- protocol.zig `emitDefaultContextGlobal` + comptime_vm.zig
`materializeDefaultContext`: both materializers of `__sx_default_context`
now build the inline CBlockingIo->Io vtable (7 words) at the new field.
- stmt.zig `lowerPush`: `push Context.{...}` now INHERITS omitted fields from
the ambient context (seed the slot from current_ctx_ref, overwrite only the
literal's named fields) — correct capability-bag semantics, so the partial
`push Context.{ allocator = X }` sites don't zero a null `io` vtable.
- protocols.zig + lower.zig + error_analysis.zig: record protocol-impl method
names so the "declared `!` but never errors" lint skips a conforming impl
whose `!` is dictated by the protocol contract (e.g. Io.suspend_raw).
37 `.ir` snapshots regenerated: layout-only (the Context type now carries the
Io field, shifting type-table numbering); no stdout/stderr/exit changes.
The blocking Io + now_ms + Future/async work when `async` is called with the
receiver passed explicitly; the user-facing UFCS form `context.io.async(...)`
is blocked on a separate UFCS generic-inference bug (filed next).
Suite: 726 ran, 0 failed.
This commit is contained in:
@@ -17,6 +17,10 @@ list :: #import "modules/std/list.sx";
|
||||
|
||||
Context :: core.Context;
|
||||
Allocator :: core.Allocator;
|
||||
Io :: core.Io;
|
||||
SpawnOpts :: core.SpawnOpts;
|
||||
PinTarget :: core.PinTarget;
|
||||
ParkToken :: core.ParkToken;
|
||||
Into :: core.Into;
|
||||
Source_Location :: core.Source_Location;
|
||||
|
||||
@@ -85,6 +89,20 @@ decompose_u16x4 :: fmt.decompose_u16x4;
|
||||
|
||||
List :: list.List;
|
||||
|
||||
// --- Async / Io capability (impls in std/io.sx) ---
|
||||
|
||||
io_mod :: #import "modules/std/io.sx";
|
||||
CBlockingIo :: io_mod.CBlockingIo;
|
||||
Future :: io_mod.Future;
|
||||
FutureState :: io_mod.FutureState;
|
||||
IoErr :: io_mod.IoErr;
|
||||
async :: io_mod.async;
|
||||
async_void :: io_mod.async_void;
|
||||
await :: io_mod.await;
|
||||
cancel :: io_mod.cancel;
|
||||
// `timeout` / `Future(void)` are DEFERRED (B1.4) pending issue 0150
|
||||
// (a `void` struct field SIGTRAPs the compiler). Re-export once it lands.
|
||||
|
||||
// --- The stdlib namespace tail: flat-importing std.sx carries these ---
|
||||
|
||||
mem :: #import "modules/std/mem.sx";
|
||||
|
||||
@@ -64,11 +64,61 @@ Allocator :: protocol #inline {
|
||||
dealloc_bytes :: (ptr: *void);
|
||||
}
|
||||
|
||||
// --- Io capability protocol (impls live in std/io.sx) ---
|
||||
//
|
||||
// `Io` is threaded on `Context` exactly like `Allocator`: a `#inline`
|
||||
// protocol whose default impl (the stateless `CBlockingIo` in std/io.sx)
|
||||
// is installed in the process-wide `__sx_default_context`. Async runtime
|
||||
// is sx LIBRARY code — the compiler provides only the primitives (inline
|
||||
// asm, `abi(.naked)`, atomics) + fiber-safe codegen. The protocol is the
|
||||
// minimum the fiber scheduler [B1.3+] needs; everything ergonomic
|
||||
// (`async` / `await` / `cancel` / `timeout`) is a generic free-fn on top
|
||||
// (std/io.sx), the same way `alloc(T,n)` sits over `alloc_bytes`.
|
||||
//
|
||||
// spawn_raw — submit a task; opaque handle (B1.3 fiber bootstrap).
|
||||
// suspend_raw— suspend current fiber; `!` so cancel can raise out.
|
||||
// ready — wake a parked fiber (B1.4/B1.5).
|
||||
// poll — drive one step; blocking impl returns 0.
|
||||
// now_ms — clock hook (a PROTOCOL method so the deterministic-sim
|
||||
// Io [B1.4] can return a fake clock — the B1.4 keystone).
|
||||
// arm_timer — register a timer; backs `timeout` (B1.4).
|
||||
//
|
||||
// `ParkToken` is an opaque per-suspension token (unused by the blocking
|
||||
// impl). `SpawnOpts.pin` is inert in the M:1 model. (`PinTarget.on` —
|
||||
// pin to a specific `Thread` — is deferred with the M:N model; the
|
||||
// `on_thread` variant is a placeholder until a `Thread` type exists.)
|
||||
PinTarget :: enum { any; main; on_thread; }
|
||||
|
||||
SpawnOpts :: struct {
|
||||
pin: PinTarget = .any;
|
||||
}
|
||||
|
||||
ParkToken :: struct {
|
||||
handle: *void = null;
|
||||
}
|
||||
|
||||
Io :: protocol #inline {
|
||||
spawn_raw :: (entry: *void, arg: *void, opts: SpawnOpts) -> *void;
|
||||
suspend_raw :: (park: ParkToken) -> !;
|
||||
ready :: (park: ParkToken);
|
||||
poll :: (deadline_ms: i64) -> i64;
|
||||
now_ms :: () -> i64;
|
||||
arm_timer :: (deadline_ms: i64, park: ParkToken) -> *void;
|
||||
}
|
||||
|
||||
// --- Context ---
|
||||
//
|
||||
// `allocator` MUST stay at field index 0 (the heap-alloc lowering path
|
||||
// hardcodes it — src/ir/lower/call.zig). `io` is appended LAST so `data`
|
||||
// keeps its existing index 1 (minimizes the comptime-VM fallback churn).
|
||||
// Both Zig materializers of `__sx_default_context` (protocol.zig
|
||||
// `emitDefaultContextGlobal` + comptime_vm.zig `materializeDefaultContext`)
|
||||
// install the inline `CBlockingIo → Io` vtable at the new field.
|
||||
|
||||
Context :: struct {
|
||||
allocator: Allocator;
|
||||
data: *void;
|
||||
io: Io;
|
||||
}
|
||||
|
||||
// User-space `xx` extension. `xx val : T` where the built-in conversion
|
||||
|
||||
135
library/modules/std/io.sx
Normal file
135
library/modules/std/io.sx
Normal file
@@ -0,0 +1,135 @@
|
||||
// std.io — the `Io` capability's default impl + the async ergonomic layer.
|
||||
//
|
||||
// `Io` itself (the protocol) lives in std/core.sx next to `Allocator`, so
|
||||
// the compiler-coupled `Context` field + the `__sx_default_context`
|
||||
// materializers can reference it. This file carries the parts that are
|
||||
// pure library sx: the stateless blocking impl (`CBlockingIo`, the mirror
|
||||
// of `CAllocator`) + the generic free-fns layered over the protocol
|
||||
// (`async` / `await` / `cancel` + the `Future($R)` type).
|
||||
//
|
||||
// Consumers reach these through std.sx (`Future` / `async` / `await` /
|
||||
// `cancel` / `CBlockingIo` re-exports), never by importing this file
|
||||
// directly.
|
||||
//
|
||||
// BLOCKING SEMANTICS (B1.2): the M:1 default has no scheduler and no
|
||||
// suspension. `async(worker, ..args)` runs the worker to COMPLETION
|
||||
// inline, so the returned `Future` is born `.ready` and `await` yields
|
||||
// immediately. `spawn_raw`/`suspend_raw`/`ready`/`poll`/`arm_timer` are
|
||||
// trivial no-ops/0 — they exist for the fiber scheduler [B1.3+].
|
||||
// `now_ms` returns a real monotonic clock. Fully deterministic/testable.
|
||||
//
|
||||
// Worker form (B1.2): a `Closure(..$args) -> $R` whose params are
|
||||
// annotated at the call site (a lambda `(a: i64) -> i64 => ...`).
|
||||
// Named-fn workers need a `::` callable-parameter language feature that
|
||||
// does not exist yet and are DEFERRED.
|
||||
#import "modules/std/core.sx";
|
||||
#import "modules/std/atomic.sx";
|
||||
time :: #import "modules/std/time.sx";
|
||||
|
||||
// --- IoErr: the error channel async rides (cancellation = model (a)) ---
|
||||
//
|
||||
// A canceled future raises `.Canceled` out of `await`; a failed task
|
||||
// raises `.Failed`. The `(T, !IoErr)` value-failable shape is the same
|
||||
// one the rest of the stdlib uses (see examples/1011-, 1012-).
|
||||
IoErr :: error { Canceled, Failed }
|
||||
|
||||
// --- CBlockingIo: stateless Io that runs tasks synchronously ---
|
||||
//
|
||||
// Zero-sized struct (mirror of CAllocator). Used as the default
|
||||
// `context.io` at program start (see `__sx_default_context` in codegen).
|
||||
// The thunks never dereference `self`, so the protocol value's ctx field
|
||||
// is `null` — which is what keeps the static-constant default context an
|
||||
// inline vtable with a null receiver.
|
||||
|
||||
CBlockingIo :: struct {}
|
||||
|
||||
impl Io for CBlockingIo {
|
||||
// No fiber bootstrap in the blocking model: the generic `async`
|
||||
// free-fn calls the worker directly and fills the Future. `spawn_raw`
|
||||
// is here for the protocol shape the scheduler [B1.3] will use; the
|
||||
// blocking impl never routes through it, so it is a no-op handle.
|
||||
spawn_raw :: (self: *CBlockingIo, entry: *void, arg: *void, opts: SpawnOpts) -> *void {
|
||||
return null;
|
||||
}
|
||||
// Blocking never suspends — a suspend at the bottom of the M:1 stack
|
||||
// would deadlock. No-op (returns success). The `!` is part of the
|
||||
// protocol contract (a suspending impl raises `.Canceled` out here),
|
||||
// so the conforming blocking impl keeps it even though it never raises.
|
||||
suspend_raw :: (self: *CBlockingIo, park: ParkToken) -> ! {
|
||||
return;
|
||||
}
|
||||
ready :: (self: *CBlockingIo, park: ParkToken) {}
|
||||
poll :: (self: *CBlockingIo, deadline_ms: i64) -> i64 { return 0; }
|
||||
now_ms :: (self: *CBlockingIo) -> i64 { return time.mono_ms(); }
|
||||
arm_timer :: (self: *CBlockingIo, deadline_ms: i64, park: ParkToken) -> *void {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// --- Future($R): the handle to an async task's eventual result ---
|
||||
//
|
||||
// Fixed-shape product (NOT the metatype sum machinery). `Value :: $R`
|
||||
// exposes the projection `Future(X) → X`. B1.2 supports NON-void `$R`
|
||||
// only — `Future(void)` (a `void` struct field) SIGTRAPs the compiler
|
||||
// (issue 0150, deferred to B1.4 along with `timeout`).
|
||||
FutureState :: enum { pending; ready; failed; canceled; }
|
||||
|
||||
Future :: struct ($R: Type) {
|
||||
Value :: R;
|
||||
|
||||
value: R;
|
||||
state: FutureState = .pending;
|
||||
err: IoErr;
|
||||
park: ParkToken;
|
||||
task: *void = null;
|
||||
// Cancellation flag — atomic so a future scheduler thread can flip it.
|
||||
// In the blocking model there is no concurrency, but the type is the
|
||||
// one the M:N model [later] needs.
|
||||
canceled: Atomic(bool);
|
||||
}
|
||||
|
||||
// --- The async ergonomic layer (generic free-fns over the protocol) ---
|
||||
|
||||
// `async(io, worker, ..args)` — submit `worker(..args)`. Blocking: runs
|
||||
// the worker to completion inline, Future born `.ready`. The worker is a
|
||||
// `Closure(..$args) -> $R` (a lambda whose params are annotated at the
|
||||
// call site); `..$args` forwards the call-site arguments to it.
|
||||
//
|
||||
// NOTE on construction shape: the Future is built with `= ---` + per-field
|
||||
// assignment, NOT a `return Future.{...}` struct-literal. A struct-literal
|
||||
// in `return` position trips a generic-instantiation gap for the `Atomic`
|
||||
// field; the `= ---` (uninit) + field-assign form is the verified idiom.
|
||||
async :: ufcs (io: Io, worker: Closure(..$args) -> $R, ..$args) -> Future($R) {
|
||||
f : Future($R) = ---;
|
||||
f.value = worker(..args);
|
||||
f.state = .ready;
|
||||
f.canceled = Atomic(bool).init(false);
|
||||
return f;
|
||||
}
|
||||
|
||||
// Nullary form — no args. A worker that takes no arguments.
|
||||
async_void :: ufcs (io: Io, worker: Closure() -> $R) -> Future($R) {
|
||||
f : Future($R) = ---;
|
||||
f.value = worker();
|
||||
f.state = .ready;
|
||||
f.canceled = Atomic(bool).init(false);
|
||||
return f;
|
||||
}
|
||||
|
||||
// `await(f)` — value-carrying failable. `.ready` → the result; `.failed`
|
||||
// / `.canceled` → raise the stored / cancellation error.
|
||||
await :: ufcs (f: *Future($R)) -> ($R, !IoErr) {
|
||||
if f.canceled.load(.acquire) { raise error.Canceled; }
|
||||
if f.state == .canceled { raise error.Canceled; }
|
||||
if f.state == .failed { raise error.Failed; }
|
||||
return f.value;
|
||||
}
|
||||
|
||||
// `cancel(f)` — request cancellation. Sets the per-future cancel flag +
|
||||
// marks the state so a subsequent `await` raises `.Canceled`. (In the
|
||||
// blocking model the task already ran; cancel still rides the `!`
|
||||
// channel — model (a).)
|
||||
cancel :: ufcs (f: *Future($R)) {
|
||||
f.canceled.store(true, .release);
|
||||
f.state = .canceled;
|
||||
}
|
||||
Reference in New Issue
Block a user