feat: reclaim fiber + async heap (close the closure-env / Future leaks)
Closes the documented per-spawn closure-env leak and most of the async leak,
using only the existing closure.env / closure.fn_ptr field accessors — no compiler
change. Also names the fat-pointer ABI in core.sx (ClosureRaw / SliceRaw) so the
underlying {fn_ptr, env} / {ptr, len} layout is discoverable in one place.
- Fiber body env: Scheduler.reap_fiber frees f.body.env via f.dctx.allocator (the
spawn-time allocator snapshotted in dctx) at all three reap sites (run/poll/
deinit). 1820's 'live after deinit' 3 -> 0.
- Async box + closure envs: sx_run_boxed_closure frees the ThunkBox, the
completion-closure env, and the worker's env (new ThunkBox.worker_env) the
instant the worker completes.
- Async Future: two-flag ownership — Future.worker_done (set at the end of the
completion closure) + consumed (set at the end of await); fut_release frees the
heap Future (via the captured Future.alloc) when BOTH are set, so the LAST of
{worker, await} reclaims it. await now CONSUMES the future (single-use; touching
it afterward is a use-after-free — documented). Residual for an AWAITED future
is 0 (lock: examples/concurrency/1827); a never-awaited future (fire-and-forget /
race loser) keeps only its Future struct — the structured-concurrency remainder.
Self-reviewed across orderings (await-after/before-complete, cancel-then-await,
cancel-while-parked, double-free via await+deinit, race residual, blocking impl,
cross-allocator reap) — all deterministic, no UAF/double-free. Suite 855/0;
byte-identical on aarch64-macOS + aarch64-linux; .ir churn is the core.sx +
Future/ThunkBox field additions.
This commit is contained in:
@@ -160,3 +160,35 @@ Context :: struct {
|
||||
Into :: protocol(Target: Type) {
|
||||
convert :: (self: *Self) -> Target;
|
||||
}
|
||||
|
||||
// --- Raw ABI views of the language's fat-pointer types -----------------------
|
||||
//
|
||||
// sx's closures and slices/strings are two-word "fat" values. These structs name
|
||||
// that underlying layout in ONE place so it is discoverable and documented, and so
|
||||
// owning code can reinterpret a fat value (`raw : ClosureRaw = xx c`) to reach a
|
||||
// field the ergonomic accessors do not expose for a use case — e.g. freeing a
|
||||
// stored closure's heap `env`. Field order/types mirror the compiler ABI
|
||||
// (`types.zig`: closure / slice size = 2 words); if that ABI ever changes these
|
||||
// move with it.
|
||||
//
|
||||
// The ergonomic accessors are the normal way in: a closure value answers
|
||||
// `.fn_ptr` (the code pointer) and `.env` (the captured environment — heap,
|
||||
// allocated at the literal via the then-current `context.allocator`; `null` for a
|
||||
// capture-free closure), and a slice/string answers `.ptr` / `.len`. The `*Raw`
|
||||
// structs are the explicit type-erased layout behind those accessors.
|
||||
|
||||
// A closure value: `{ fn_ptr, env }`. Reinterpret with `xx` to reach `env` for
|
||||
// ownership/lifetime work (the owner of a stored closure frees `env` when the
|
||||
// closure is dead). Equivalent to the `c.fn_ptr` / `c.env` field accessors.
|
||||
ClosureRaw :: struct {
|
||||
fn_ptr: *void;
|
||||
env: *void;
|
||||
}
|
||||
|
||||
// A slice or string value: `{ ptr, len }` (the element type is erased to bytes
|
||||
// here). Equivalent to the `s.ptr` / `s.len` accessors. `len` is the element
|
||||
// count (an `i64`, matching the ABI), not a byte count.
|
||||
SliceRaw :: struct {
|
||||
ptr: [*]u8;
|
||||
len: i64;
|
||||
}
|
||||
|
||||
@@ -101,6 +101,19 @@ Future :: struct ($R: Type) {
|
||||
// In the blocking model there is no concurrency, but the type is the
|
||||
// one the M:N model [later] needs.
|
||||
canceled: Atomic(bool);
|
||||
// --- ownership (heap Future lifetime) ---
|
||||
// The Future is referenced by TWO owners: the worker (writes the result, then
|
||||
// ends) and the awaiter (reads it via `await`). It is freed by whichever
|
||||
// FINISHES LAST — `worker_done` is set at the end of the completion closure,
|
||||
// `consumed` at the end of `await`; `fut_release` frees once BOTH are set,
|
||||
// through `alloc` (the `context.allocator` captured at the `async` call — the
|
||||
// awaiter may run under a different one). A future that is never awaited
|
||||
// (fire-and-forget, or a `race` loser) keeps `consumed == false` and is NOT
|
||||
// freed — that residual needs a structured-concurrency scope and is the
|
||||
// documented remainder.
|
||||
alloc: Allocator;
|
||||
worker_done: bool = false;
|
||||
consumed: bool = false;
|
||||
}
|
||||
|
||||
// --- The async ergonomic layer (generic free-fns over the protocol) ---
|
||||
@@ -121,11 +134,26 @@ Future :: struct ($R: Type) {
|
||||
// closure. A struct field is the one position a `Closure() -> void` type parses
|
||||
// in (a bare alias / `size_of(Closure()->void)` trips the parser), and it gives
|
||||
// the bridge a concrete `*ThunkBox` to `size_of`/cast/call through.
|
||||
ThunkBox :: struct { run: Closure() -> void; }
|
||||
// `run` is the completion closure (captures the Future + the worker); `worker_env`
|
||||
// records the WORKER closure's own heap env (captured by-value into `run`'s env, so
|
||||
// otherwise unreachable to free). Both the box and these two envs are dead the
|
||||
// instant `run()` returns — `sx_run_boxed_closure` reclaims them there.
|
||||
ThunkBox :: struct { run: Closure() -> void; worker_env: *void = null; }
|
||||
|
||||
sx_run_boxed_closure :: (arg: *void) {
|
||||
b : *ThunkBox = xx arg;
|
||||
b.run();
|
||||
// `b.run` has returned, so its env, the worker's env, and the box itself are
|
||||
// all dead — free them (the per-`async` heap, minus the Future). This runs
|
||||
// under the spawn-time context (fib_dispatch re-pushes `dctx`), so
|
||||
// `context.allocator` is the same allocator `async` used. Read every field
|
||||
// BEFORE freeing the box. A capture-free completion/worker has a null env →
|
||||
// the dealloc is a no-op.
|
||||
run_env := b.run.env;
|
||||
worker_env := b.worker_env;
|
||||
if run_env != null { context.allocator.dealloc_bytes(run_env); }
|
||||
if worker_env != null { context.allocator.dealloc_bytes(worker_env); }
|
||||
context.allocator.dealloc_bytes(xx b);
|
||||
}
|
||||
|
||||
// `async(io, worker)` — submit a NULLARY `worker: Closure() -> $R` and get a
|
||||
@@ -137,10 +165,15 @@ sx_run_boxed_closure :: (arg: *void) {
|
||||
// The Future (and the completion-closure `ThunkBox`) are HEAP-allocated (not
|
||||
// returned by value): under the fiber impl the worker fills the Future AFTER
|
||||
// `async` returns, so the awaiter and the worker must share one stable object.
|
||||
// They currently leak (bounded by the async count; invisible under the default
|
||||
// GPA). Freeing them needs join-point ownership — deferred.
|
||||
// OWNERSHIP: the `ThunkBox` + the completion-closure env + the worker's env are
|
||||
// freed by `sx_run_boxed_closure` the instant the worker completes; the `Future`
|
||||
// is freed by the last of {worker completion, `await`} via the two-flag
|
||||
// `fut_release` (see the `Future` fields). The remaining leak is a future that is
|
||||
// never awaited (fire-and-forget, or a `race` loser) — `consumed` stays false so
|
||||
// its `Future` struct is kept; reclaiming that needs a structured-concurrency
|
||||
// scope (deferred).
|
||||
//
|
||||
// ALLOCATOR-LIFETIME CONTRACT: both are allocated from the `context.allocator`
|
||||
// ALLOCATOR-LIFETIME CONTRACT: all are allocated from the `context.allocator`
|
||||
// in force at the `async` CALL, and that allocator MUST outlive the future —
|
||||
// i.e. survive until the worker has run and the result is consumed. This is the
|
||||
// long-lived-container rule (CLAUDE.md): calling `async` inside a transient
|
||||
@@ -149,12 +182,27 @@ sx_run_boxed_closure :: (arg: *void) {
|
||||
// The common case (the program-stable default GPA, or a scheduler set up under a
|
||||
// long-lived allocator) is safe. A deeper fix — `async` capturing the scheduler's
|
||||
// own long-lived allocator — needs a protocol affordance to reach it; deferred.
|
||||
// Release one owner's hold on a Future and free it once BOTH the worker and the
|
||||
// awaiter are done (the two-flag handshake). Idempotent in effect: the caller sets
|
||||
// its own flag first, so only the LAST releaser sees both set and frees — exactly
|
||||
// once. The freed struct must not be touched after.
|
||||
fut_release :: ufcs (f: *Future($R)) {
|
||||
if f.worker_done and f.consumed {
|
||||
f.alloc.dealloc_bytes(xx f);
|
||||
}
|
||||
}
|
||||
|
||||
async :: ufcs (io: Io, worker: Closure() -> ($R, !)) -> *Future($R) {
|
||||
raw := context.allocator.alloc_bytes(size_of(Future($R)));
|
||||
f : *Future($R) = xx raw;
|
||||
f.state = .pending;
|
||||
f.park = .{ handle = null };
|
||||
f.canceled = Atomic(bool).init(false);
|
||||
// Ownership bookkeeping: capture the allocating allocator + clear the two
|
||||
// release flags so `fut_release` can free the Future when both owners finish.
|
||||
f.alloc = context.allocator;
|
||||
f.worker_done = false;
|
||||
f.consumed = false;
|
||||
// The completion closure: run the worker, publish the result, wake any parked
|
||||
// awaiter. Heap-boxed so it survives until the worker actually runs (deferred
|
||||
// under the fiber impl). It captures `f` + `worker`; nothing variadic crosses.
|
||||
@@ -170,15 +218,26 @@ async :: ufcs (io: Io, worker: Closure() -> ($R, !)) -> *Future($R) {
|
||||
// sticky `f.canceled`, the 1806 contract).
|
||||
braw := context.allocator.alloc_bytes(size_of(ThunkBox));
|
||||
b : *ThunkBox = xx braw;
|
||||
// Record the worker's own heap env so `sx_run_boxed_closure` can free it (it is
|
||||
// captured by-value into `run`'s env below, otherwise unreachable). `null` for
|
||||
// a capture-free worker.
|
||||
b.worker_env = worker.env;
|
||||
b.run = () => {
|
||||
f.value = worker() catch {
|
||||
if f.canceled.load(.acquire) { f.state = .canceled; }
|
||||
else { f.state = .failed; }
|
||||
context.io.ready(f.park);
|
||||
// Worker finished (via the error/cancel path); release our owner-ref —
|
||||
// frees the Future iff `await` already consumed it. MUST be the last
|
||||
// touch of `f`.
|
||||
f.worker_done = true;
|
||||
fut_release(f);
|
||||
return;
|
||||
};
|
||||
f.state = .ready;
|
||||
context.io.ready(f.park); // no-op if no awaiter parked yet
|
||||
f.worker_done = true;
|
||||
fut_release(f); // last touch of `f`
|
||||
};
|
||||
// Pass the cancel-flag back-ref so the worker fiber's `suspend_raw` can consult
|
||||
// it (Phase 3). `xx @f.canceled` erases the `*Atomic(bool)` to `*void`.
|
||||
@@ -193,26 +252,38 @@ async :: ufcs (io: Io, worker: Closure() -> ($R, !)) -> *Future($R) {
|
||||
// resumes it. Re-checks state after the wake (the worker set `.ready` before
|
||||
// waking). A worker that finished BEFORE `await` leaves `.ready`, so no park, no
|
||||
// lost wakeup.
|
||||
//
|
||||
// CONSUMES `f`: `await` is the awaiter's owner-handoff — once it (and the worker)
|
||||
// finish, the heap `Future` is freed (`fut_release`). So `await` is SINGLE-USE per
|
||||
// future: do NOT touch `f` afterward (a second `await`, `cancel(f)`, `f.state`, …)
|
||||
// — that is a use-after-free. The one-awaiter guard already rejects a CONCURRENT
|
||||
// second awaiter; this is the SEQUENTIAL-reuse contract.
|
||||
await :: ufcs (f: *Future($R)) -> ($R, !IoErr) {
|
||||
if f.canceled.load(.acquire) { raise error.Canceled; }
|
||||
if f.state == .pending {
|
||||
// ONE awaiter per future (M:1): the single `park` slot records one parked
|
||||
// fiber, so a second concurrent `await` on the same pending future would
|
||||
// OVERWRITE the first awaiter's handle and orphan it forever (the worker's
|
||||
// single `ready(f.park)` wakes only the last). Enforce loudly here — a
|
||||
// non-null handle on a still-pending future means another fiber is already
|
||||
// parked on it. (Fan-in over many futures —
|
||||
// `race` — registers ONE awaiter across SEPARATE futures, so it is fine.)
|
||||
// Park until the worker completes — UNLESS the future is already cancelled
|
||||
// (then deliver immediately without parking, as before). A still-`.pending`
|
||||
// non-cancelled future suspends the caller; the worker's `ready(f.park)` wakes
|
||||
// it. ONE awaiter per future (M:1): a non-null `park.handle` on a pending
|
||||
// future means another fiber is already parked — abort loudly (a fan-in
|
||||
// `race` registers one awaiter across SEPARATE futures, so it is fine).
|
||||
already_canceled := f.canceled.load(.acquire);
|
||||
if f.state == .pending and !already_canceled {
|
||||
if f.park.handle != null {
|
||||
out("io: await — future already has an awaiter (one awaiter per future in the M:1 model)\n");
|
||||
io_abort();
|
||||
}
|
||||
context.io.suspend_raw(@f.park) catch {}; // Phase 3 propagates Canceled
|
||||
}
|
||||
if f.canceled.load(.acquire) { raise error.Canceled; }
|
||||
if f.state == .canceled { raise error.Canceled; }
|
||||
if f.state == .failed { raise error.Failed; }
|
||||
return f.value;
|
||||
// Settle the outcome and COPY the value out BEFORE releasing — `fut_release`
|
||||
// may free `f`, after which only the locals below are safe to touch.
|
||||
canceled := f.canceled.load(.acquire);
|
||||
if f.state == .canceled { canceled = true; }
|
||||
failed := f.state == .failed;
|
||||
v := f.value;
|
||||
f.consumed = true;
|
||||
fut_release(f); // frees the Future iff the worker has also finished
|
||||
if canceled { raise error.Canceled; }
|
||||
if failed { raise error.Failed; }
|
||||
return v;
|
||||
}
|
||||
|
||||
// `cancel(f)` — request cancellation (model (a) — cancel rides the `!` channel).
|
||||
|
||||
@@ -433,10 +433,8 @@ Scheduler :: struct {
|
||||
self.current = null;
|
||||
if f.state == .done {
|
||||
// We've switched OFF f's stack already (the final swap landed
|
||||
// here), so the stack is free to unmap. Free the Fiber struct
|
||||
// AFTER munmap.
|
||||
munmap(f.stack_region, f.stack_len);
|
||||
self.own_allocator.dealloc_bytes(xx f);
|
||||
// here), so the stack is free to unmap and the body is dead.
|
||||
reap_fiber(self, f);
|
||||
} else if f.state == .ready {
|
||||
enqueue(self, f);
|
||||
}
|
||||
@@ -561,12 +559,11 @@ Scheduler :: struct {
|
||||
// `ThunkBox`es likewise leak (they are not scheduler-tracked) — freeing both
|
||||
// needs join-point / closure-env ownership affordances.
|
||||
deinit :: (self: *Scheduler) {
|
||||
// (1) Reap leftover ready fibers: unmap the stack, free the Fiber.
|
||||
// (1) Reap leftover ready fibers: free the body env, unmap, free the Fiber.
|
||||
f := self.ready_head;
|
||||
while f != null {
|
||||
nxt := f.next;
|
||||
munmap(f.stack_region, f.stack_len);
|
||||
self.own_allocator.dealloc_bytes(xx f);
|
||||
reap_fiber(self, f);
|
||||
f = nxt;
|
||||
}
|
||||
self.ready_head = null;
|
||||
@@ -708,8 +705,7 @@ impl Io for Scheduler {
|
||||
swap_context(@self.sched_ctx, @f.ctx);
|
||||
self.current = null;
|
||||
if f.state == .done {
|
||||
munmap(f.stack_region, f.stack_len);
|
||||
self.own_allocator.dealloc_bytes(xx f);
|
||||
reap_fiber(self, f);
|
||||
} else if f.state == .ready {
|
||||
enqueue(self, f);
|
||||
}
|
||||
@@ -885,6 +881,25 @@ boot_stack :: (f: *Fiber, size: i64) -> u64 {
|
||||
return top - (top % 16); // 16-byte aligned stack top (AAPCS)
|
||||
}
|
||||
|
||||
// --- fiber reap -------------------------------------------------------------
|
||||
|
||||
// Reclaim a finished (`.done`) or leftover fiber. Frees, in order: the body
|
||||
// closure's heap ENV (`body.env` — the captured environment, allocated at the
|
||||
// closure literal via the SPAWN-time `context.allocator`, which `dctx` snapshots;
|
||||
// `null` for a capture-free body, so the free is an unconditional no-op then),
|
||||
// then the guarded stack (munmap), then the `Fiber` struct itself. This closes
|
||||
// the per-spawn env leak. MUST be the LAST use of `f` — `f` is dangling after.
|
||||
// (The body's env outlives the body's execution but dies WITH the fiber: the
|
||||
// body has returned by the time a `.done` fiber is reaped, so nothing reads the
|
||||
// captures again.)
|
||||
reap_fiber :: (self: *Scheduler, f: *Fiber) {
|
||||
if f.body.env != null {
|
||||
f.dctx.allocator.dealloc_bytes(f.body.env);
|
||||
}
|
||||
munmap(f.stack_region, f.stack_len);
|
||||
self.own_allocator.dealloc_bytes(xx f);
|
||||
}
|
||||
|
||||
// --- intrusive FIFO ready-queue -------------------------------------------
|
||||
|
||||
enqueue :: (self: *Scheduler, f: *Fiber) {
|
||||
|
||||
Reference in New Issue
Block a user