// std.thread (PLAN-HTTPZ S6): raw threads contend correctly on a Mutex, // the Pool runs every submitted task exactly once across its workers, // a full backlog applies backpressure (submit returns false), and // shutdown joins cleanly with queued work finished. #import "modules/std.sx"; Shared :: struct { mu: thread.Mutex = .{}; counter: i64 = 0; gate: i64 = 0; } // Raw-thread entry: C->sx boundary, own fabricated context. bump_entry :: (arg: *void) -> *void callconv(.c) { sh : *Shared = xx arg; gpa := GPA.init(); push Context.{ allocator = xx gpa } { i := 0; while i < 1000 { sh.mu.lock(); sh.counter += 1; sh.mu.unlock(); i += 1; } } return null; } // Pool tasks are default-conv: they run inside a worker's context. AddArg :: struct { sh: *Shared; v: i64; } add_task :: (arg: usize) { a : *AddArg = xx arg; a.sh.mu.lock(); a.sh.counter += a.v; a.sh.mu.unlock(); } // Holds its worker hostage until the gate opens (backpressure case). gate_task :: (arg: usize) { sh : *Shared = xx arg; while true { sh.mu.lock(); g := sh.gate; sh.mu.unlock(); if g != 0 { return; } } } main :: () -> i32 { sh : Shared = .{}; if !sh.mu.setup() { print("mutex setup failed\n"); return 1; } // ── 4 raw threads, 1000 locked increments each ──────────────────── ths : [4]thread.Thread = ---; i := 0; while i < 4 { t, te := thread.Thread.spawn(bump_entry, xx @sh); if te { print("spawn failed\n"); return 1; } ths[i] = t; i += 1; } i = 0; while i < 4 { ths[i].join(); i += 1; } if sh.counter != 4000 { print("raw threads: expected 4000, got {}\n", sh.counter); return 1; } print("raw threads: 4 x 1000 locked increments = {}\n", sh.counter); // ── pool: 100 tasks, each adds its index+1; sum = 5050 ─────────── sh.counter = 0; pool, pe := thread.Pool.create(4, 128); if pe { print("pool create failed\n"); return 1; } args : [*]AddArg = xx context.allocator.alloc_bytes(100 * size_of(AddArg)); n := 0; while n < 100 { args[n] = AddArg.{ sh = @sh, v = xx (n + 1) }; if !pool.submit(add_task, xx @args[n]) { print("submit unexpectedly refused\n"); return 1; } n += 1; } pool.shutdown(); // queued tasks finish before workers exit if sh.counter != 5050 { print("pool: expected 5050, got {}\n", sh.counter); return 1; } print("pool: 100 tasks across 4 workers summed to {}\n", sh.counter); // ── backpressure: 1 worker held at the gate, backlog 2 fills ───── sh.gate = 0; p2, pe2 := thread.Pool.create(1, 2); if pe2 { print("pool2 create failed\n"); return 1; } if !p2.submit(gate_task, xx @sh) { print("gate submit refused\n"); return 1; } // give the worker a beat to take the gate task off the queue spins := 0; taken := false; while !taken and spins < 100000000 { p2.mu.lock(); taken = p2.len == 0; p2.mu.unlock(); spins += 1; } if !taken { print("worker never took the gate task\n"); return 1; } if !p2.submit(gate_task_noop, 0) { print("backlog slot 1 refused\n"); return 1; } if !p2.submit(gate_task_noop, 0) { print("backlog slot 2 refused\n"); return 1; } if p2.submit(gate_task_noop, 0) { print("full backlog must refuse\n"); return 1; } print("backpressure: full backlog refuses\n"); sh.mu.lock(); sh.gate = 1; sh.mu.unlock(); p2.shutdown(); print("std.thread ok\n"); return 0; } gate_task_noop :: (arg: usize) {}