Files
sx/library/modules/std/thread.sx
agra 59f90d2939 refactor(ffi-linkage): Phase 6.3 — migrate std/ #foreign→extern
Pure source rename across 11 std modules (~60 sites): cli/core/fmt/fs/log/
net/kqueue/process/socket/thread/time/trace. All fn-decl markers — bare
'#foreign;', '#foreign libc;'/'#foreign tlib;' (LIB ref), and
'#foreign libc "csym";' (LIB+rename) → the same 'extern …' tail (extern carries
the identical [LIB] ["csym"] axis). Plus 2 stale comment mentions (fmt/fs).
No class forms in std. These modules ARE host-corpus-exercised, so the empty
snapshot diff is direct validation. Suite green (647 corpus / 444 unit, 0
failed).
2026-06-15 04:35:52 +03:00

226 lines
7.6 KiB
Plaintext

// std.thread — OS threads, Mutex/Cond, and a bounded worker Pool over
// pthreads (PLAN-HTTPZ S6).
//
// THE RE-ENTRY CONTRACT (pinned by examples/1636): a thread entry is a
// `callconv(.c)` function — it has NO implicit context — and enters
// the sx world by fabricating one: `push Context.{ allocator = xx gpa }`
// around the default-conv code it runs. Pool workers do exactly that,
// each with its own malloc-backed GPA, so tasks allocate freely and
// never share allocator state across threads.
//
// ALLOCATOR DISCIPLINE: GPA is malloc/free — thread-safe. Arena and
// every other context-flowing allocator is NOT; never share one across
// threads. Pool buffers come from the creating context's allocator and
// are touched only under the pool mutex.
//
// MOVE SEMANTICS: a pthread mutex/cond is address-sensitive once
// initialized — POSIX leaves moving one undefined. Mutex/Cond therefore
// initialize IN PLACE (`m.setup()`) and Pool lives behind a pointer
// (`Pool.create`), never by-value copy.
//
// PER-OS: sizes below are darwin's (pthread_mutex_t 64 bytes,
// pthread_cond_t 48, pthread_t a pointer). glibc differs (40/48);
// PLAN-HTTPZ C3 selects per-OS when the linux target activates.
#import "modules/std.sx";
tlib :: #library "c";
pthread_create :: (thread: *usize, attr: *void, start: (*void) -> *void callconv(.c), arg: *void) -> i32 extern tlib;
pthread_join :: (thread: usize, retval: **void) -> i32 extern tlib;
pthread_detach :: (thread: usize) -> i32 extern tlib;
pthread_mutex_init :: (m: *MutexBuf, attr: *void) -> i32 extern tlib;
pthread_mutex_lock :: (m: *MutexBuf) -> i32 extern tlib;
pthread_mutex_unlock :: (m: *MutexBuf) -> i32 extern tlib;
pthread_mutex_destroy :: (m: *MutexBuf) -> i32 extern tlib;
pthread_cond_init :: (c: *CondBuf, attr: *void) -> i32 extern tlib;
pthread_cond_wait :: (c: *CondBuf, m: *MutexBuf) -> i32 extern tlib;
pthread_cond_signal :: (c: *CondBuf) -> i32 extern tlib;
pthread_cond_broadcast :: (c: *CondBuf) -> i32 extern tlib;
pthread_cond_destroy :: (c: *CondBuf) -> i32 extern tlib;
// darwin pthread_mutex_t: { long __sig; char __opaque[56]; } — 64 bytes.
MutexBuf :: struct {
sig: i64 = 0;
o0: i64 = 0; o1: i64 = 0; o2: i64 = 0; o3: i64 = 0;
o4: i64 = 0; o5: i64 = 0; o6: i64 = 0;
}
// darwin pthread_cond_t: { long __sig; char __opaque[40]; } — 48 bytes.
CondBuf :: struct {
sig: i64 = 0;
o0: i64 = 0; o1: i64 = 0; o2: i64 = 0; o3: i64 = 0; o4: i64 = 0;
}
ThreadErr :: error {
Spawn, // pthread_create refused
Init, // mutex/cond/pool initialization failed
}
// ── Mutex / Cond (in-place; see MOVE SEMANTICS above) ────────────────
Mutex :: struct {
buf: MutexBuf = .{};
setup :: (self: *Mutex) -> bool {
return pthread_mutex_init(@self.buf, null) == 0;
}
lock :: (self: *Mutex) {
pthread_mutex_lock(@self.buf);
}
unlock :: (self: *Mutex) {
pthread_mutex_unlock(@self.buf);
}
destroy :: (self: *Mutex) {
pthread_mutex_destroy(@self.buf);
}
}
Cond :: struct {
buf: CondBuf = .{};
setup :: (self: *Cond) -> bool {
return pthread_cond_init(@self.buf, null) == 0;
}
// Atomically releases `m` and sleeps; reacquires `m` before returning.
wait :: (self: *Cond, m: *Mutex) {
pthread_cond_wait(@self.buf, @m.buf);
}
signal :: (self: *Cond) {
pthread_cond_signal(@self.buf);
}
broadcast :: (self: *Cond) {
pthread_cond_broadcast(@self.buf);
}
destroy :: (self: *Cond) {
pthread_cond_destroy(@self.buf);
}
}
// ── Thread ───────────────────────────────────────────────────────────
Thread :: struct {
handle: usize = 0;
// `entry` is the C->sx boundary: callconv(.c), fabricates its own
// Context before touching default-conv sx code (examples/1636).
spawn :: (entry: (*void) -> *void callconv(.c), arg: *void) -> (Thread, !ThreadErr) {
t : Thread = .{};
if pthread_create(@t.handle, null, entry, arg) != 0 { raise error.Spawn; }
return t;
}
join :: (self: *Thread) {
pthread_join(self.handle, null);
}
detach :: (self: *Thread) {
pthread_detach(self.handle);
}
}
// ── Pool: fixed workers, bounded queue, backpressure ────────────────
//
// httpz's thread_pool shape: `submit` enqueues a task for the next free
// worker and returns false when the backlog is full (the caller sheds —
// reject, retry, or run inline). `shutdown` drains nothing: queued
// tasks still run, then workers exit and join.
PoolTask :: struct {
f: (usize) -> void; // default-conv: runs inside the worker's context
arg: usize = 0;
}
Pool :: struct {
mu: Mutex = .{};
nonempty: Cond = .{};
tasks: [*]PoolTask = null;
cap: i64 = 0;
head: i64 = 0;
len: i64 = 0;
stop: bool = false;
threads: [*]usize = null;
count: i64 = 0;
// Heap-allocate (the pool must never move: workers hold its address,
// and it embeds a live mutex), init in place, spawn the workers.
create :: (workers: i64, backlog: i64) -> (*Pool, !ThreadErr) {
alloc := context.allocator;
p : *Pool = xx alloc.alloc_bytes(size_of(Pool));
p.* = Pool.{};
if !p.mu.setup() { raise error.Init; }
if !p.nonempty.setup() { raise error.Init; }
p.tasks = xx alloc.alloc_bytes(backlog * size_of(PoolTask));
p.cap = backlog;
p.threads = xx alloc.alloc_bytes(workers * size_of(usize));
p.count = workers;
i : i64 = 0;
while i < workers {
if pthread_create(@p.threads[i], null, pool_worker, xx p) != 0 {
// join what started, then fail loudly
p.count = i;
p.shutdown();
raise error.Spawn;
}
i += 1;
}
return p;
}
// False = backlog full (backpressure); the task did not enqueue.
submit :: (self: *Pool, f: (usize) -> void, arg: usize) -> bool {
self.mu.lock();
if self.len == self.cap or self.stop {
self.mu.unlock();
return false;
}
slot := (self.head + self.len) % self.cap;
self.tasks[slot] = PoolTask.{ f = f, arg = arg };
self.len += 1;
self.nonempty.signal();
self.mu.unlock();
return true;
}
// Stop accepting work, let queued tasks finish, join every worker.
shutdown :: (self: *Pool) {
self.mu.lock();
self.stop = true;
self.nonempty.broadcast();
self.mu.unlock();
i : i64 = 0;
while i < self.count {
pthread_join(self.threads[i], null);
i += 1;
}
self.mu.destroy();
self.nonempty.destroy();
}
}
// The worker loop: C entry, own fabricated Context, then
// pop-task/run-task until stop with an empty queue.
pool_worker :: (arg: *void) -> *void callconv(.c) {
p : *Pool = xx arg;
gpa := GPA.init();
push Context.{ allocator = xx gpa } {
while true {
p.mu.lock();
while !p.stop and p.len == 0 {
p.nonempty.wait(@p.mu);
}
if p.len == 0 { // stop, and nothing queued
p.mu.unlock();
break;
}
t := p.tasks[p.head];
p.head = (p.head + 1) % p.cap;
p.len -= 1;
p.mu.unlock();
f := t.f;
f(t.arg);
}
}
return null;
}