Files
sx/examples/1637-std-thread.sx
agra 7f23bb7530 feat: std.thread — Thread, Mutex/Cond, bounded worker Pool (PLAN-HTTPZ S6)
pthread bindings with darwin opaque sizes (mutex 64B, cond 48B; glibc
divergence is a C3 per-OS item). Mutex/Cond initialize IN PLACE and
Pool lives behind Pool.create's heap pointer — POSIX sync objects are
address-sensitive, so nothing here moves after setup. Thread.spawn
takes the C2 re-entry contract entry (callconv(.c), fabricates its own
Context); Pool workers do exactly that with a per-worker malloc-backed
GPA, then run default-conv tasks inside it. submit returns false on a
full backlog (httpz thread_pool backpressure); shutdown finishes
queued work and joins every worker.

examples/1637 pins: 4 raw threads x 1000 locked increments, 100 pool
tasks summing exactly once across 4 workers, a held worker + full
backlog refusing the next submit, clean shutdown. JIT + AOT (AOT run
three times). The std.sx barrel carries thread; .ir snapshot regen is
the usual renumbering.
2026-06-12 22:21:40 +03:00

117 lines
3.7 KiB
Plaintext

// std.thread (PLAN-HTTPZ S6): raw threads contend correctly on a Mutex,
// the Pool runs every submitted task exactly once across its workers,
// a full backlog applies backpressure (submit returns false), and
// shutdown joins cleanly with queued work finished.
#import "modules/std.sx";
Shared :: struct {
mu: thread.Mutex = .{};
counter: i64 = 0;
gate: i64 = 0;
}
// Raw-thread entry: C->sx boundary, own fabricated context.
bump_entry :: (arg: *void) -> *void callconv(.c) {
sh : *Shared = xx arg;
gpa := GPA.init();
push Context.{ allocator = xx gpa } {
i := 0;
while i < 1000 {
sh.mu.lock();
sh.counter += 1;
sh.mu.unlock();
i += 1;
}
}
return null;
}
// Pool tasks are default-conv: they run inside a worker's context.
AddArg :: struct {
sh: *Shared;
v: i64;
}
add_task :: (arg: usize) {
a : *AddArg = xx arg;
a.sh.mu.lock();
a.sh.counter += a.v;
a.sh.mu.unlock();
}
// Holds its worker hostage until the gate opens (backpressure case).
gate_task :: (arg: usize) {
sh : *Shared = xx arg;
while true {
sh.mu.lock();
g := sh.gate;
sh.mu.unlock();
if g != 0 { return; }
}
}
main :: () -> i32 {
sh : Shared = .{};
if !sh.mu.setup() { print("mutex setup failed\n"); return 1; }
// ── 4 raw threads, 1000 locked increments each ────────────────────
ths : [4]thread.Thread = ---;
i := 0;
while i < 4 {
t, te := thread.Thread.spawn(bump_entry, xx @sh);
if te { print("spawn failed\n"); return 1; }
ths[i] = t;
i += 1;
}
i = 0;
while i < 4 {
ths[i].join();
i += 1;
}
if sh.counter != 4000 { print("raw threads: expected 4000, got {}\n", sh.counter); return 1; }
print("raw threads: 4 x 1000 locked increments = {}\n", sh.counter);
// ── pool: 100 tasks, each adds its index+1; sum = 5050 ───────────
sh.counter = 0;
pool, pe := thread.Pool.create(4, 128);
if pe { print("pool create failed\n"); return 1; }
args : [*]AddArg = xx context.allocator.alloc_bytes(100 * size_of(AddArg));
n := 0;
while n < 100 {
args[n] = AddArg.{ sh = @sh, v = xx (n + 1) };
if !pool.submit(add_task, xx @args[n]) { print("submit unexpectedly refused\n"); return 1; }
n += 1;
}
pool.shutdown(); // queued tasks finish before workers exit
if sh.counter != 5050 { print("pool: expected 5050, got {}\n", sh.counter); return 1; }
print("pool: 100 tasks across 4 workers summed to {}\n", sh.counter);
// ── backpressure: 1 worker held at the gate, backlog 2 fills ─────
sh.gate = 0;
p2, pe2 := thread.Pool.create(1, 2);
if pe2 { print("pool2 create failed\n"); return 1; }
if !p2.submit(gate_task, xx @sh) { print("gate submit refused\n"); return 1; }
// give the worker a beat to take the gate task off the queue
spins := 0;
taken := false;
while !taken and spins < 100000000 {
p2.mu.lock();
taken = p2.len == 0;
p2.mu.unlock();
spins += 1;
}
if !taken { print("worker never took the gate task\n"); return 1; }
if !p2.submit(gate_task_noop, 0) { print("backlog slot 1 refused\n"); return 1; }
if !p2.submit(gate_task_noop, 0) { print("backlog slot 2 refused\n"); return 1; }
if p2.submit(gate_task_noop, 0) { print("full backlog must refuse\n"); return 1; }
print("backpressure: full backlog refuses\n");
sh.mu.lock();
sh.gate = 1;
sh.mu.unlock();
p2.shutdown();
print("std.thread ok\n");
return 0;
}
gate_task_noop :: (arg: usize) {}