Files
sx/examples/concurrency/1816-concurrency-fiber-io-pipe.sx
agra 55ed9a248e fibers: Scheduler.deinit + struct-literal init cleanup
Scheduler.deinit closes the bounded leaks B1 documented: it reaps any leftover
ready fibers, frees every heap Task from go (now tracked via a task_allocs
field), frees the timers/io_waiters/task_allocs List backings, and closes the
lazily-opened kqueue fd. Terminal + idempotent; the per-spawn/go closure env
remains unfreeable (language limitation). Locked by
examples/concurrency/1820-concurrency-fiber-scheduler-deinit.sx, which exercises
every freed resource under a tracking GPA (freed by deinit: 5, kq reset to -1).

Also converts plain-struct '= ---'+field-assign init to '.{ ... }' literal init
where '---' carries no meaning: Scheduler.init, Dock.make, and the fiber
examples 1811/1813/1814/1816 (partial literals zero-fill the index-filled array
fields). Unions, '---'-feature tests, the 0154 regression, documented
generic-pack gaps, and loop/conditional inits are intentionally left on '---'.
2026-06-22 09:45:33 +03:00

99 lines
3.6 KiB
Plaintext

// Stream B1 (fibers) B1.4c — REAL fd-readiness blocking via kqueue. A fiber can
// `block_on_fd(read_fd, true)`; the scheduler's run loop blocks on `kevent` when
// nothing else is runnable and wakes that fiber when the kernel reports the fd
// readable.
//
// Scenario: a unix `pipe` (read_fd, write_fd). A READER fiber is spawned FIRST,
// so it runs while the pipe is EMPTY — it calls `block_on_fd(read_fd)` and parks
// (genuinely blocked: there is no data yet, the writer has not run). A WRITER
// fiber, spawned second, then writes 3 bytes to write_fd. Now the ready queue is
// drained and the only parked fiber is the reader's io-waiter, so the run loop
// BLOCKS on `kevent`, which reports read_fd ready; the reader wakes and reads the
// bytes. The ordering ("wrote" recorded before "read") proves the reader blocked
// on the empty pipe and was woken by kqueue readiness, not by data already
// present.
//
// Contract:
// log: wrote read 3 [97 98 99]
// n_suspended: 0 (the reader's park was balanced by the kqueue wake)
//
// aarch64-macOS-pinned: kqueue/kevent is Apple/BSD, and the scheduler's
// per-arch asm + Apple mmap constants. Runs end-to-end on a matching host,
// ir-only on a mismatch. Like 1809 (mmap), JIT `sx run` resolves the libc
// extern calls fine — no AOT build needed.
#import "modules/std.sx";
sched :: #import "modules/std/sched.sx";
// Raw libc fd primitives. read/write/close MUST match the canonical signatures
// already bound by std (socket.sx / core.sx), or the extern dedupe rejects a
// divergent re-binding of the same C symbol. `pipe` is ours alone.
pipe :: (fds: *i32) -> i32 extern libc "pipe";
read :: (fd: i32, buf: [*]u8, count: usize) -> isize extern libc "read";
write :: (fd: i32, buf: [*]u8, count: usize) -> isize extern libc "write";
close :: (fd: i32) -> i32 extern libc "close";
// Shared log: a tiny ledger of what happened, in order.
S :: struct {
wrote: bool;
read_n: i64;
bytes: [8]u8;
read_done: bool;
}
main :: () -> i64 {
st : S = .{ wrote = false, read_n = 0, read_done = false }; // bytes[] zero-filled
fds : [2]i32 = ---;
if pipe(@fds[0]) != 0 {
print("1816: pipe() failed\n");
return 1;
}
read_fd := fds[0];
write_fd := fds[1];
s := sched.Scheduler.init();
ps := @s; pst := @st;
// Reader: block on the (empty) pipe until it is readable, then read 3 bytes.
mk_reader :: (ps: *sched.Scheduler, pst: *S, rfd: i32) {
ps.spawn(() => {
ps.block_on_fd(rfd, true); // parks until read_fd is readable
n := read(rfd, xx @pst.bytes[0], xx 3);
pst.read_n = xx n;
pst.read_done = true;
});
}
// Writer: write 3 bytes ('a','b','c') to the write end.
mk_writer :: (ps: *sched.Scheduler, pst: *S, wfd: i32) {
ps.spawn(() => {
buf : [3]u8 = ---;
buf[0] = xx 97; buf[1] = xx 98; buf[2] = xx 99; // 'a' 'b' 'c'
write(wfd, xx @buf[0], xx 3);
pst.wrote = true;
});
}
mk_reader(ps, pst, read_fd); // spawned first → runs + parks on empty pipe
mk_writer(ps, pst, write_fd); // spawned second → writes, then kqueue wakes reader
s.run();
print("log: ");
if st.wrote { print("wrote "); }
if st.read_done {
print("read {} [", st.read_n);
i := 0;
while i < st.read_n {
if i > 0 { print(" "); }
print("{}", st.bytes[i]);
i = i + 1;
}
print("]");
}
print("\n");
print("n_suspended: {}\n", s.n_suspended);
close(read_fd);
close(write_fd);
return 0;
}