pthread bindings with darwin opaque sizes (mutex 64B, cond 48B; glibc divergence is a C3 per-OS item). Mutex/Cond initialize IN PLACE and Pool lives behind Pool.create's heap pointer — POSIX sync objects are address-sensitive, so nothing here moves after setup. Thread.spawn takes the C2 re-entry contract entry (callconv(.c), fabricates its own Context); Pool workers do exactly that with a per-worker malloc-backed GPA, then run default-conv tasks inside it. submit returns false on a full backlog (httpz thread_pool backpressure); shutdown finishes queued work and joins every worker. examples/1637 pins: 4 raw threads x 1000 locked increments, 100 pool tasks summing exactly once across 4 workers, a held worker + full backlog refusing the next submit, clean shutdown. JIT + AOT (AOT run three times). The std.sx barrel carries thread; .ir snapshot regen is the usual renumbering.
226 lines
7.6 KiB
Plaintext
226 lines
7.6 KiB
Plaintext
// std.thread — OS threads, Mutex/Cond, and a bounded worker Pool over
|
|
// pthreads (PLAN-HTTPZ S6).
|
|
//
|
|
// THE RE-ENTRY CONTRACT (pinned by examples/1636): a thread entry is a
|
|
// `callconv(.c)` function — it has NO implicit context — and enters
|
|
// the sx world by fabricating one: `push Context.{ allocator = xx gpa }`
|
|
// around the default-conv code it runs. Pool workers do exactly that,
|
|
// each with its own malloc-backed GPA, so tasks allocate freely and
|
|
// never share allocator state across threads.
|
|
//
|
|
// ALLOCATOR DISCIPLINE: GPA is malloc/free — thread-safe. Arena and
|
|
// every other context-flowing allocator is NOT; never share one across
|
|
// threads. Pool buffers come from the creating context's allocator and
|
|
// are touched only under the pool mutex.
|
|
//
|
|
// MOVE SEMANTICS: a pthread mutex/cond is address-sensitive once
|
|
// initialized — POSIX leaves moving one undefined. Mutex/Cond therefore
|
|
// initialize IN PLACE (`m.setup()`) and Pool lives behind a pointer
|
|
// (`Pool.create`), never by-value copy.
|
|
//
|
|
// PER-OS: sizes below are darwin's (pthread_mutex_t 64 bytes,
|
|
// pthread_cond_t 48, pthread_t a pointer). glibc differs (40/48);
|
|
// PLAN-HTTPZ C3 selects per-OS when the linux target activates.
|
|
|
|
#import "modules/std.sx";
|
|
|
|
tlib :: #library "c";
|
|
|
|
pthread_create :: (thread: *usize, attr: *void, start: (*void) -> *void callconv(.c), arg: *void) -> i32 #foreign tlib;
|
|
pthread_join :: (thread: usize, retval: **void) -> i32 #foreign tlib;
|
|
pthread_detach :: (thread: usize) -> i32 #foreign tlib;
|
|
|
|
pthread_mutex_init :: (m: *MutexBuf, attr: *void) -> i32 #foreign tlib;
|
|
pthread_mutex_lock :: (m: *MutexBuf) -> i32 #foreign tlib;
|
|
pthread_mutex_unlock :: (m: *MutexBuf) -> i32 #foreign tlib;
|
|
pthread_mutex_destroy :: (m: *MutexBuf) -> i32 #foreign tlib;
|
|
|
|
pthread_cond_init :: (c: *CondBuf, attr: *void) -> i32 #foreign tlib;
|
|
pthread_cond_wait :: (c: *CondBuf, m: *MutexBuf) -> i32 #foreign tlib;
|
|
pthread_cond_signal :: (c: *CondBuf) -> i32 #foreign tlib;
|
|
pthread_cond_broadcast :: (c: *CondBuf) -> i32 #foreign tlib;
|
|
pthread_cond_destroy :: (c: *CondBuf) -> i32 #foreign tlib;
|
|
|
|
// darwin pthread_mutex_t: { long __sig; char __opaque[56]; } — 64 bytes.
|
|
MutexBuf :: struct {
|
|
sig: i64 = 0;
|
|
o0: i64 = 0; o1: i64 = 0; o2: i64 = 0; o3: i64 = 0;
|
|
o4: i64 = 0; o5: i64 = 0; o6: i64 = 0;
|
|
}
|
|
|
|
// darwin pthread_cond_t: { long __sig; char __opaque[40]; } — 48 bytes.
|
|
CondBuf :: struct {
|
|
sig: i64 = 0;
|
|
o0: i64 = 0; o1: i64 = 0; o2: i64 = 0; o3: i64 = 0; o4: i64 = 0;
|
|
}
|
|
|
|
ThreadErr :: error {
|
|
Spawn, // pthread_create refused
|
|
Init, // mutex/cond/pool initialization failed
|
|
}
|
|
|
|
// ── Mutex / Cond (in-place; see MOVE SEMANTICS above) ────────────────
|
|
|
|
Mutex :: struct {
|
|
buf: MutexBuf = .{};
|
|
|
|
setup :: (self: *Mutex) -> bool {
|
|
return pthread_mutex_init(@self.buf, null) == 0;
|
|
}
|
|
lock :: (self: *Mutex) {
|
|
pthread_mutex_lock(@self.buf);
|
|
}
|
|
unlock :: (self: *Mutex) {
|
|
pthread_mutex_unlock(@self.buf);
|
|
}
|
|
destroy :: (self: *Mutex) {
|
|
pthread_mutex_destroy(@self.buf);
|
|
}
|
|
}
|
|
|
|
Cond :: struct {
|
|
buf: CondBuf = .{};
|
|
|
|
setup :: (self: *Cond) -> bool {
|
|
return pthread_cond_init(@self.buf, null) == 0;
|
|
}
|
|
// Atomically releases `m` and sleeps; reacquires `m` before returning.
|
|
wait :: (self: *Cond, m: *Mutex) {
|
|
pthread_cond_wait(@self.buf, @m.buf);
|
|
}
|
|
signal :: (self: *Cond) {
|
|
pthread_cond_signal(@self.buf);
|
|
}
|
|
broadcast :: (self: *Cond) {
|
|
pthread_cond_broadcast(@self.buf);
|
|
}
|
|
destroy :: (self: *Cond) {
|
|
pthread_cond_destroy(@self.buf);
|
|
}
|
|
}
|
|
|
|
// ── Thread ───────────────────────────────────────────────────────────
|
|
|
|
Thread :: struct {
|
|
handle: usize = 0;
|
|
|
|
// `entry` is the C->sx boundary: callconv(.c), fabricates its own
|
|
// Context before touching default-conv sx code (examples/1636).
|
|
spawn :: (entry: (*void) -> *void callconv(.c), arg: *void) -> (Thread, !ThreadErr) {
|
|
t : Thread = .{};
|
|
if pthread_create(@t.handle, null, entry, arg) != 0 { raise error.Spawn; }
|
|
return t;
|
|
}
|
|
join :: (self: *Thread) {
|
|
pthread_join(self.handle, null);
|
|
}
|
|
detach :: (self: *Thread) {
|
|
pthread_detach(self.handle);
|
|
}
|
|
}
|
|
|
|
// ── Pool: fixed workers, bounded queue, backpressure ────────────────
|
|
//
|
|
// httpz's thread_pool shape: `submit` enqueues a task for the next free
|
|
// worker and returns false when the backlog is full (the caller sheds —
|
|
// reject, retry, or run inline). `shutdown` drains nothing: queued
|
|
// tasks still run, then workers exit and join.
|
|
|
|
PoolTask :: struct {
|
|
f: (usize) -> void; // default-conv: runs inside the worker's context
|
|
arg: usize = 0;
|
|
}
|
|
|
|
Pool :: struct {
|
|
mu: Mutex = .{};
|
|
nonempty: Cond = .{};
|
|
tasks: [*]PoolTask = null;
|
|
cap: i64 = 0;
|
|
head: i64 = 0;
|
|
len: i64 = 0;
|
|
stop: bool = false;
|
|
threads: [*]usize = null;
|
|
count: i64 = 0;
|
|
|
|
// Heap-allocate (the pool must never move: workers hold its address,
|
|
// and it embeds a live mutex), init in place, spawn the workers.
|
|
create :: (workers: i64, backlog: i64) -> (*Pool, !ThreadErr) {
|
|
alloc := context.allocator;
|
|
p : *Pool = xx alloc.alloc_bytes(size_of(Pool));
|
|
p.* = Pool.{};
|
|
if !p.mu.setup() { raise error.Init; }
|
|
if !p.nonempty.setup() { raise error.Init; }
|
|
p.tasks = xx alloc.alloc_bytes(backlog * size_of(PoolTask));
|
|
p.cap = backlog;
|
|
p.threads = xx alloc.alloc_bytes(workers * size_of(usize));
|
|
p.count = workers;
|
|
i : i64 = 0;
|
|
while i < workers {
|
|
if pthread_create(@p.threads[i], null, pool_worker, xx p) != 0 {
|
|
// join what started, then fail loudly
|
|
p.count = i;
|
|
p.shutdown();
|
|
raise error.Spawn;
|
|
}
|
|
i += 1;
|
|
}
|
|
return p;
|
|
}
|
|
|
|
// False = backlog full (backpressure); the task did not enqueue.
|
|
submit :: (self: *Pool, f: (usize) -> void, arg: usize) -> bool {
|
|
self.mu.lock();
|
|
if self.len == self.cap or self.stop {
|
|
self.mu.unlock();
|
|
return false;
|
|
}
|
|
slot := (self.head + self.len) % self.cap;
|
|
self.tasks[slot] = PoolTask.{ f = f, arg = arg };
|
|
self.len += 1;
|
|
self.nonempty.signal();
|
|
self.mu.unlock();
|
|
return true;
|
|
}
|
|
|
|
// Stop accepting work, let queued tasks finish, join every worker.
|
|
shutdown :: (self: *Pool) {
|
|
self.mu.lock();
|
|
self.stop = true;
|
|
self.nonempty.broadcast();
|
|
self.mu.unlock();
|
|
i : i64 = 0;
|
|
while i < self.count {
|
|
pthread_join(self.threads[i], null);
|
|
i += 1;
|
|
}
|
|
self.mu.destroy();
|
|
self.nonempty.destroy();
|
|
}
|
|
}
|
|
|
|
// The worker loop: C entry, own fabricated Context, then
|
|
// pop-task/run-task until stop with an empty queue.
|
|
pool_worker :: (arg: *void) -> *void callconv(.c) {
|
|
p : *Pool = xx arg;
|
|
gpa := GPA.init();
|
|
push Context.{ allocator = xx gpa } {
|
|
while true {
|
|
p.mu.lock();
|
|
while !p.stop and p.len == 0 {
|
|
p.nonempty.wait(@p.mu);
|
|
}
|
|
if p.len == 0 { // stop, and nothing queued
|
|
p.mu.unlock();
|
|
break;
|
|
}
|
|
t := p.tasks[p.head];
|
|
p.head = (p.head + 1) % p.cap;
|
|
p.len -= 1;
|
|
p.mu.unlock();
|
|
f := t.f;
|
|
f(t.arg);
|
|
}
|
|
}
|
|
return null;
|
|
}
|