// Stream B1 (fibers) B1.4c — REAL fd-readiness blocking via kqueue. A fiber can // `block_on_fd(read_fd, true)`; the scheduler's run loop blocks on `kevent` when // nothing else is runnable and wakes that fiber when the kernel reports the fd // readable. // // Scenario: a unix `pipe` (read_fd, write_fd). A READER fiber is spawned FIRST, // so it runs while the pipe is EMPTY — it calls `block_on_fd(read_fd)` and parks // (genuinely blocked: there is no data yet, the writer has not run). A WRITER // fiber, spawned second, then writes 3 bytes to write_fd. Now the ready queue is // drained and the only parked fiber is the reader's io-waiter, so the run loop // BLOCKS on `kevent`, which reports read_fd ready; the reader wakes and reads the // bytes. The ordering ("wrote" recorded before "read") proves the reader blocked // on the empty pipe and was woken by kqueue readiness, not by data already // present. // // Contract: // log: wrote read 3 [97 98 99] // n_suspended: 0 (the reader's park was balanced by the kqueue wake) // // aarch64-macOS-pinned: kqueue/kevent is Apple/BSD, and the scheduler's // per-arch asm + Apple mmap constants. Runs end-to-end on a matching host, // ir-only on a mismatch. Like 1809 (mmap), JIT `sx run` resolves the libc // extern calls fine — no AOT build needed. #import "modules/std.sx"; sched :: #import "modules/std/sched.sx"; // Raw libc fd primitives. read/write/close MUST match the canonical signatures // already bound by std (socket.sx / core.sx), or the extern dedupe rejects a // divergent re-binding of the same C symbol. `pipe` is ours alone. pipe :: (fds: *i32) -> i32 extern libc "pipe"; read :: (fd: i32, buf: [*]u8, count: usize) -> isize extern libc "read"; write :: (fd: i32, buf: [*]u8, count: usize) -> isize extern libc "write"; close :: (fd: i32) -> i32 extern libc "close"; // Shared log: a tiny ledger of what happened, in order. S :: struct { wrote: bool; read_n: i64; bytes: [8]u8; read_done: bool; } main :: () -> i64 { st : S = .{ wrote = false, read_n = 0, read_done = false }; // bytes[] zero-filled fds : [2]i32 = ---; if pipe(@fds[0]) != 0 { print("1816: pipe() failed\n"); return 1; } read_fd := fds[0]; write_fd := fds[1]; s := sched.Scheduler.init(); ps := @s; pst := @st; // Reader: block on the (empty) pipe until it is readable, then read 3 bytes. mk_reader :: (ps: *sched.Scheduler, pst: *S, rfd: i32) { ps.spawn(() => { ps.block_on_fd(rfd, true); // parks until read_fd is readable n := read(rfd, xx @pst.bytes[0], xx 3); pst.read_n = xx n; pst.read_done = true; }); } // Writer: write 3 bytes ('a','b','c') to the write end. mk_writer :: (ps: *sched.Scheduler, pst: *S, wfd: i32) { ps.spawn(() => { buf : [3]u8 = ---; buf[0] = xx 97; buf[1] = xx 98; buf[2] = xx 99; // 'a' 'b' 'c' write(wfd, xx @buf[0], xx 3); pst.wrote = true; }); } mk_reader(ps, pst, read_fd); // spawned first → runs + parks on empty pipe mk_writer(ps, pst, write_fd); // spawned second → writes, then kqueue wakes reader s.run(); print("log: "); if st.wrote { print("wrote "); } if st.read_done { print("read {} [", st.read_n); i := 0; while i < st.read_n { if i > 0 { print(" "); } print("{}", st.bytes[i]); i = i + 1; } print("]"); } print("\n"); print("n_suspended: {}\n", s.n_suspended); close(read_fd); close(write_fd); return 0; }