pthread bindings with darwin opaque sizes (mutex 64B, cond 48B; glibc divergence is a C3 per-OS item). Mutex/Cond initialize IN PLACE and Pool lives behind Pool.create's heap pointer — POSIX sync objects are address-sensitive, so nothing here moves after setup. Thread.spawn takes the C2 re-entry contract entry (callconv(.c), fabricates its own Context); Pool workers do exactly that with a per-worker malloc-backed GPA, then run default-conv tasks inside it. submit returns false on a full backlog (httpz thread_pool backpressure); shutdown finishes queued work and joins every worker. examples/1637 pins: 4 raw threads x 1000 locked increments, 100 pool tasks summing exactly once across 4 workers, a held worker + full backlog refusing the next submit, clean shutdown. JIT + AOT (AOT run three times). The std.sx barrel carries thread; .ir snapshot regen is the usual renumbering.
117 lines
3.7 KiB
Plaintext
117 lines
3.7 KiB
Plaintext
// std.thread (PLAN-HTTPZ S6): raw threads contend correctly on a Mutex,
|
|
// the Pool runs every submitted task exactly once across its workers,
|
|
// a full backlog applies backpressure (submit returns false), and
|
|
// shutdown joins cleanly with queued work finished.
|
|
#import "modules/std.sx";
|
|
|
|
Shared :: struct {
|
|
mu: thread.Mutex = .{};
|
|
counter: i64 = 0;
|
|
gate: i64 = 0;
|
|
}
|
|
|
|
// Raw-thread entry: C->sx boundary, own fabricated context.
|
|
bump_entry :: (arg: *void) -> *void callconv(.c) {
|
|
sh : *Shared = xx arg;
|
|
gpa := GPA.init();
|
|
push Context.{ allocator = xx gpa } {
|
|
i := 0;
|
|
while i < 1000 {
|
|
sh.mu.lock();
|
|
sh.counter += 1;
|
|
sh.mu.unlock();
|
|
i += 1;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
// Pool tasks are default-conv: they run inside a worker's context.
|
|
AddArg :: struct {
|
|
sh: *Shared;
|
|
v: i64;
|
|
}
|
|
|
|
add_task :: (arg: usize) {
|
|
a : *AddArg = xx arg;
|
|
a.sh.mu.lock();
|
|
a.sh.counter += a.v;
|
|
a.sh.mu.unlock();
|
|
}
|
|
|
|
// Holds its worker hostage until the gate opens (backpressure case).
|
|
gate_task :: (arg: usize) {
|
|
sh : *Shared = xx arg;
|
|
while true {
|
|
sh.mu.lock();
|
|
g := sh.gate;
|
|
sh.mu.unlock();
|
|
if g != 0 { return; }
|
|
}
|
|
}
|
|
|
|
main :: () -> i32 {
|
|
sh : Shared = .{};
|
|
if !sh.mu.setup() { print("mutex setup failed\n"); return 1; }
|
|
|
|
// ── 4 raw threads, 1000 locked increments each ────────────────────
|
|
ths : [4]thread.Thread = ---;
|
|
i := 0;
|
|
while i < 4 {
|
|
t, te := thread.Thread.spawn(bump_entry, xx @sh);
|
|
if te { print("spawn failed\n"); return 1; }
|
|
ths[i] = t;
|
|
i += 1;
|
|
}
|
|
i = 0;
|
|
while i < 4 {
|
|
ths[i].join();
|
|
i += 1;
|
|
}
|
|
if sh.counter != 4000 { print("raw threads: expected 4000, got {}\n", sh.counter); return 1; }
|
|
print("raw threads: 4 x 1000 locked increments = {}\n", sh.counter);
|
|
|
|
// ── pool: 100 tasks, each adds its index+1; sum = 5050 ───────────
|
|
sh.counter = 0;
|
|
pool, pe := thread.Pool.create(4, 128);
|
|
if pe { print("pool create failed\n"); return 1; }
|
|
args : [*]AddArg = xx context.allocator.alloc_bytes(100 * size_of(AddArg));
|
|
n := 0;
|
|
while n < 100 {
|
|
args[n] = AddArg.{ sh = @sh, v = xx (n + 1) };
|
|
if !pool.submit(add_task, xx @args[n]) { print("submit unexpectedly refused\n"); return 1; }
|
|
n += 1;
|
|
}
|
|
pool.shutdown(); // queued tasks finish before workers exit
|
|
if sh.counter != 5050 { print("pool: expected 5050, got {}\n", sh.counter); return 1; }
|
|
print("pool: 100 tasks across 4 workers summed to {}\n", sh.counter);
|
|
|
|
// ── backpressure: 1 worker held at the gate, backlog 2 fills ─────
|
|
sh.gate = 0;
|
|
p2, pe2 := thread.Pool.create(1, 2);
|
|
if pe2 { print("pool2 create failed\n"); return 1; }
|
|
if !p2.submit(gate_task, xx @sh) { print("gate submit refused\n"); return 1; }
|
|
// give the worker a beat to take the gate task off the queue
|
|
spins := 0;
|
|
taken := false;
|
|
while !taken and spins < 100000000 {
|
|
p2.mu.lock();
|
|
taken = p2.len == 0;
|
|
p2.mu.unlock();
|
|
spins += 1;
|
|
}
|
|
if !taken { print("worker never took the gate task\n"); return 1; }
|
|
if !p2.submit(gate_task_noop, 0) { print("backlog slot 1 refused\n"); return 1; }
|
|
if !p2.submit(gate_task_noop, 0) { print("backlog slot 2 refused\n"); return 1; }
|
|
if p2.submit(gate_task_noop, 0) { print("full backlog must refuse\n"); return 1; }
|
|
print("backpressure: full backlog refuses\n");
|
|
sh.mu.lock();
|
|
sh.gate = 1;
|
|
sh.mu.unlock();
|
|
p2.shutdown();
|
|
print("std.thread ok\n");
|
|
return 0;
|
|
}
|
|
|
|
gate_task_noop :: (arg: usize) {}
|