// std.thread — OS threads, Mutex/Cond, and a bounded worker Pool over // pthreads (PLAN-HTTPZ S6). // // THE RE-ENTRY CONTRACT (pinned by examples/1636): a thread entry is a // `callconv(.c)` function — it has NO implicit context — and enters // the sx world by fabricating one: `push Context.{ allocator = xx gpa }` // around the default-conv code it runs. Pool workers do exactly that, // each with its own malloc-backed GPA, so tasks allocate freely and // never share allocator state across threads. // // ALLOCATOR DISCIPLINE: GPA is malloc/free — thread-safe. Arena and // every other context-flowing allocator is NOT; never share one across // threads. Pool buffers come from the creating context's allocator and // are touched only under the pool mutex. // // MOVE SEMANTICS: a pthread mutex/cond is address-sensitive once // initialized — POSIX leaves moving one undefined. Mutex/Cond therefore // initialize IN PLACE (`m.setup()`) and Pool lives behind a pointer // (`Pool.create`), never by-value copy. // // PER-OS: sizes below are darwin's (pthread_mutex_t 64 bytes, // pthread_cond_t 48, pthread_t a pointer). glibc differs (40/48); // PLAN-HTTPZ C3 selects per-OS when the linux target activates. #import "modules/std.sx"; tlib :: #library "c"; pthread_create :: (thread: *usize, attr: *void, start: (*void) -> *void callconv(.c), arg: *void) -> i32 extern tlib; pthread_join :: (thread: usize, retval: **void) -> i32 extern tlib; pthread_detach :: (thread: usize) -> i32 extern tlib; pthread_mutex_init :: (m: *MutexBuf, attr: *void) -> i32 extern tlib; pthread_mutex_lock :: (m: *MutexBuf) -> i32 extern tlib; pthread_mutex_unlock :: (m: *MutexBuf) -> i32 extern tlib; pthread_mutex_destroy :: (m: *MutexBuf) -> i32 extern tlib; pthread_cond_init :: (c: *CondBuf, attr: *void) -> i32 extern tlib; pthread_cond_wait :: (c: *CondBuf, m: *MutexBuf) -> i32 extern tlib; pthread_cond_signal :: (c: *CondBuf) -> i32 extern tlib; pthread_cond_broadcast :: (c: *CondBuf) -> i32 extern tlib; pthread_cond_destroy :: (c: *CondBuf) -> i32 extern tlib; // darwin pthread_mutex_t: { long __sig; char __opaque[56]; } — 64 bytes. MutexBuf :: struct { sig: i64 = 0; o0: i64 = 0; o1: i64 = 0; o2: i64 = 0; o3: i64 = 0; o4: i64 = 0; o5: i64 = 0; o6: i64 = 0; } // darwin pthread_cond_t: { long __sig; char __opaque[40]; } — 48 bytes. CondBuf :: struct { sig: i64 = 0; o0: i64 = 0; o1: i64 = 0; o2: i64 = 0; o3: i64 = 0; o4: i64 = 0; } ThreadErr :: error { Spawn, // pthread_create refused Init, // mutex/cond/pool initialization failed } // ── Mutex / Cond (in-place; see MOVE SEMANTICS above) ──────────────── Mutex :: struct { buf: MutexBuf = .{}; setup :: (self: *Mutex) -> bool { return pthread_mutex_init(@self.buf, null) == 0; } lock :: (self: *Mutex) { pthread_mutex_lock(@self.buf); } unlock :: (self: *Mutex) { pthread_mutex_unlock(@self.buf); } destroy :: (self: *Mutex) { pthread_mutex_destroy(@self.buf); } } Cond :: struct { buf: CondBuf = .{}; setup :: (self: *Cond) -> bool { return pthread_cond_init(@self.buf, null) == 0; } // Atomically releases `m` and sleeps; reacquires `m` before returning. wait :: (self: *Cond, m: *Mutex) { pthread_cond_wait(@self.buf, @m.buf); } signal :: (self: *Cond) { pthread_cond_signal(@self.buf); } broadcast :: (self: *Cond) { pthread_cond_broadcast(@self.buf); } destroy :: (self: *Cond) { pthread_cond_destroy(@self.buf); } } // ── Thread ─────────────────────────────────────────────────────────── Thread :: struct { handle: usize = 0; // `entry` is the C->sx boundary: callconv(.c), fabricates its own // Context before touching default-conv sx code (examples/1636). spawn :: (entry: (*void) -> *void callconv(.c), arg: *void) -> (Thread, !ThreadErr) { t : Thread = .{}; if pthread_create(@t.handle, null, entry, arg) != 0 { raise error.Spawn; } return t; } join :: (self: *Thread) { pthread_join(self.handle, null); } detach :: (self: *Thread) { pthread_detach(self.handle); } } // ── Pool: fixed workers, bounded queue, backpressure ──────────────── // // httpz's thread_pool shape: `submit` enqueues a task for the next free // worker and returns false when the backlog is full (the caller sheds — // reject, retry, or run inline). `shutdown` drains nothing: queued // tasks still run, then workers exit and join. PoolTask :: struct { f: (usize) -> void; // default-conv: runs inside the worker's context arg: usize = 0; } Pool :: struct { mu: Mutex = .{}; nonempty: Cond = .{}; tasks: [*]PoolTask = null; cap: i64 = 0; head: i64 = 0; len: i64 = 0; stop: bool = false; threads: [*]usize = null; count: i64 = 0; // Heap-allocate (the pool must never move: workers hold its address, // and it embeds a live mutex), init in place, spawn the workers. create :: (workers: i64, backlog: i64) -> (*Pool, !ThreadErr) { alloc := context.allocator; p : *Pool = xx alloc.alloc_bytes(size_of(Pool)); p.* = Pool.{}; if !p.mu.setup() { raise error.Init; } if !p.nonempty.setup() { raise error.Init; } p.tasks = xx alloc.alloc_bytes(backlog * size_of(PoolTask)); p.cap = backlog; p.threads = xx alloc.alloc_bytes(workers * size_of(usize)); p.count = workers; i : i64 = 0; while i < workers { if pthread_create(@p.threads[i], null, pool_worker, xx p) != 0 { // join what started, then fail loudly p.count = i; p.shutdown(); raise error.Spawn; } i += 1; } return p; } // False = backlog full (backpressure); the task did not enqueue. submit :: (self: *Pool, f: (usize) -> void, arg: usize) -> bool { self.mu.lock(); if self.len == self.cap or self.stop { self.mu.unlock(); return false; } slot := (self.head + self.len) % self.cap; self.tasks[slot] = PoolTask.{ f = f, arg = arg }; self.len += 1; self.nonempty.signal(); self.mu.unlock(); return true; } // Stop accepting work, let queued tasks finish, join every worker. shutdown :: (self: *Pool) { self.mu.lock(); self.stop = true; self.nonempty.broadcast(); self.mu.unlock(); i : i64 = 0; while i < self.count { pthread_join(self.threads[i], null); i += 1; } self.mu.destroy(); self.nonempty.destroy(); } } // The worker loop: C entry, own fabricated Context, then // pop-task/run-task until stop with an empty queue. pool_worker :: (arg: *void) -> *void callconv(.c) { p : *Pool = xx arg; gpa := GPA.init(); push Context.{ allocator = xx gpa } { while true { p.mu.lock(); while !p.stop and p.len == 0 { p.nonempty.wait(@p.mu); } if p.len == 0 { // stop, and nothing queued p.mu.unlock(); break; } t := p.tasks[p.head]; p.head = (p.head + 1) % p.cap; p.len -= 1; p.mu.unlock(); f := t.f; f(t.arg); } } return null; }