Files
sx/library/modules/std/thread.sx
agra cd5b958d19 comptime compiler-API: Phase 1 foundation + Phase 2.1 weld plan
Introduce the welded comptime `compiler` library (`#library "compiler"` +
`abi(.zig) extern compiler`), per design/comptime-compiler-api.md, and unify
`callconv(...)` into the new `abi(...)` annotation.

abi(...) replaces callconv(...):
- New ABI enum { default, c, zig, pure }; `abi(.c|.zig|.pure)` parses in the
  postfix slot before extern/export (and standalone). `kw_callconv` -> `kw_abi`.
- Migrated 52 sx files, the call-convention-mismatch diagnostic, and docs
  (readme/specs) from `callconv(.c)` to `abi(.c)`.

Phase 1 — welded compiler library (parse -> registry -> validation -> bridge):
- `abi(.zig) extern compiler` parses on fn decls (carries abi/extern_lib) and
  struct decls (StructDecl.abi/extern_lib).
- `#library "compiler"` is the comptime-only internal surface — never dlopen'd.
- src/ir/compiler_lib.zig: the binding registry (the safety boundary). `Field`
  welded to StructInfo.Field with layout baked from the real Zig type
  (@offsetOf/@sizeOf); `findType`/`findFn`. Welded structs are layout-validated
  at registration (field set + total size) as a header checked against the impl.
- Host-call bridge: a `fn abi(.zig) extern compiler` dispatches under the
  comptime interp to its registered Zig handler (intern/text_of round-trip),
  never dlsym. IR Function.compiler_welded; validated in declareFunction.
- Comptime-only enforcement: a runtime call to a welded fn is a clean
  build-gating error (emitCall), not an undefined-symbol link failure.

Phase 2.1 — byte-layout weld foundation:
- Decision: full byte-layout weld (sx struct laid out byte-identically to the
  bound Zig type). Registered StructInfo (first non-natural / Zig-reordered
  layout). `computeWeldPlan` — pure offset-ordered element plan + padding +
  sx-field->LLVM-element remap; unit-tested. Emit/interp wiring is the next
  sub-step (2.2+, see current/CHECKPOINT-COMPILER-API.md).

Examples: 0625/0626 (welded struct + fn round-trip), 1183/1184/1185
(layout-mismatch, unexported-fn, runtime-call diagnostics).
2026-06-17 13:31:11 +03:00

226 lines
7.5 KiB
Plaintext

// std.thread — OS threads, Mutex/Cond, and a bounded worker Pool over
// pthreads (PLAN-HTTPZ S6).
//
// THE RE-ENTRY CONTRACT (pinned by examples/1636): a thread entry is a
// `abi(.c)` function — it has NO implicit context — and enters
// the sx world by fabricating one: `push Context.{ allocator = xx gpa }`
// around the default-conv code it runs. Pool workers do exactly that,
// each with its own malloc-backed GPA, so tasks allocate freely and
// never share allocator state across threads.
//
// ALLOCATOR DISCIPLINE: GPA is malloc/free — thread-safe. Arena and
// every other context-flowing allocator is NOT; never share one across
// threads. Pool buffers come from the creating context's allocator and
// are touched only under the pool mutex.
//
// MOVE SEMANTICS: a pthread mutex/cond is address-sensitive once
// initialized — POSIX leaves moving one undefined. Mutex/Cond therefore
// initialize IN PLACE (`m.setup()`) and Pool lives behind a pointer
// (`Pool.create`), never by-value copy.
//
// PER-OS: sizes below are darwin's (pthread_mutex_t 64 bytes,
// pthread_cond_t 48, pthread_t a pointer). glibc differs (40/48);
// PLAN-HTTPZ C3 selects per-OS when the linux target activates.
#import "modules/std.sx";
tlib :: #library "c";
pthread_create :: (thread: *usize, attr: *void, start: (*void) -> *void abi(.c), arg: *void) -> i32 extern tlib;
pthread_join :: (thread: usize, retval: **void) -> i32 extern tlib;
pthread_detach :: (thread: usize) -> i32 extern tlib;
pthread_mutex_init :: (m: *MutexBuf, attr: *void) -> i32 extern tlib;
pthread_mutex_lock :: (m: *MutexBuf) -> i32 extern tlib;
pthread_mutex_unlock :: (m: *MutexBuf) -> i32 extern tlib;
pthread_mutex_destroy :: (m: *MutexBuf) -> i32 extern tlib;
pthread_cond_init :: (c: *CondBuf, attr: *void) -> i32 extern tlib;
pthread_cond_wait :: (c: *CondBuf, m: *MutexBuf) -> i32 extern tlib;
pthread_cond_signal :: (c: *CondBuf) -> i32 extern tlib;
pthread_cond_broadcast :: (c: *CondBuf) -> i32 extern tlib;
pthread_cond_destroy :: (c: *CondBuf) -> i32 extern tlib;
// darwin pthread_mutex_t: { long __sig; char __opaque[56]; } — 64 bytes.
MutexBuf :: struct {
sig: i64 = 0;
o0: i64 = 0; o1: i64 = 0; o2: i64 = 0; o3: i64 = 0;
o4: i64 = 0; o5: i64 = 0; o6: i64 = 0;
}
// darwin pthread_cond_t: { long __sig; char __opaque[40]; } — 48 bytes.
CondBuf :: struct {
sig: i64 = 0;
o0: i64 = 0; o1: i64 = 0; o2: i64 = 0; o3: i64 = 0; o4: i64 = 0;
}
ThreadErr :: error {
Spawn, // pthread_create refused
Init, // mutex/cond/pool initialization failed
}
// ── Mutex / Cond (in-place; see MOVE SEMANTICS above) ────────────────
Mutex :: struct {
buf: MutexBuf = .{};
setup :: (self: *Mutex) -> bool {
return pthread_mutex_init(@self.buf, null) == 0;
}
lock :: (self: *Mutex) {
pthread_mutex_lock(@self.buf);
}
unlock :: (self: *Mutex) {
pthread_mutex_unlock(@self.buf);
}
destroy :: (self: *Mutex) {
pthread_mutex_destroy(@self.buf);
}
}
Cond :: struct {
buf: CondBuf = .{};
setup :: (self: *Cond) -> bool {
return pthread_cond_init(@self.buf, null) == 0;
}
// Atomically releases `m` and sleeps; reacquires `m` before returning.
wait :: (self: *Cond, m: *Mutex) {
pthread_cond_wait(@self.buf, @m.buf);
}
signal :: (self: *Cond) {
pthread_cond_signal(@self.buf);
}
broadcast :: (self: *Cond) {
pthread_cond_broadcast(@self.buf);
}
destroy :: (self: *Cond) {
pthread_cond_destroy(@self.buf);
}
}
// ── Thread ───────────────────────────────────────────────────────────
Thread :: struct {
handle: usize = 0;
// `entry` is the C->sx boundary: abi(.c), fabricates its own
// Context before touching default-conv sx code (examples/1636).
spawn :: (entry: (*void) -> *void abi(.c), arg: *void) -> (Thread, !ThreadErr) {
t : Thread = .{};
if pthread_create(@t.handle, null, entry, arg) != 0 { raise error.Spawn; }
return t;
}
join :: (self: *Thread) {
pthread_join(self.handle, null);
}
detach :: (self: *Thread) {
pthread_detach(self.handle);
}
}
// ── Pool: fixed workers, bounded queue, backpressure ────────────────
//
// httpz's thread_pool shape: `submit` enqueues a task for the next free
// worker and returns false when the backlog is full (the caller sheds —
// reject, retry, or run inline). `shutdown` drains nothing: queued
// tasks still run, then workers exit and join.
PoolTask :: struct {
f: (usize) -> void; // default-conv: runs inside the worker's context
arg: usize = 0;
}
Pool :: struct {
mu: Mutex = .{};
nonempty: Cond = .{};
tasks: [*]PoolTask = null;
cap: i64 = 0;
head: i64 = 0;
len: i64 = 0;
stop: bool = false;
threads: [*]usize = null;
count: i64 = 0;
// Heap-allocate (the pool must never move: workers hold its address,
// and it embeds a live mutex), init in place, spawn the workers.
create :: (workers: i64, backlog: i64) -> (*Pool, !ThreadErr) {
alloc := context.allocator;
p : *Pool = xx alloc.alloc_bytes(size_of(Pool));
p.* = Pool.{};
if !p.mu.setup() { raise error.Init; }
if !p.nonempty.setup() { raise error.Init; }
p.tasks = xx alloc.alloc_bytes(backlog * size_of(PoolTask));
p.cap = backlog;
p.threads = xx alloc.alloc_bytes(workers * size_of(usize));
p.count = workers;
i : i64 = 0;
while i < workers {
if pthread_create(@p.threads[i], null, pool_worker, xx p) != 0 {
// join what started, then fail loudly
p.count = i;
p.shutdown();
raise error.Spawn;
}
i += 1;
}
return p;
}
// False = backlog full (backpressure); the task did not enqueue.
submit :: (self: *Pool, f: (usize) -> void, arg: usize) -> bool {
self.mu.lock();
if self.len == self.cap or self.stop {
self.mu.unlock();
return false;
}
slot := (self.head + self.len) % self.cap;
self.tasks[slot] = PoolTask.{ f = f, arg = arg };
self.len += 1;
self.nonempty.signal();
self.mu.unlock();
return true;
}
// Stop accepting work, let queued tasks finish, join every worker.
shutdown :: (self: *Pool) {
self.mu.lock();
self.stop = true;
self.nonempty.broadcast();
self.mu.unlock();
i : i64 = 0;
while i < self.count {
pthread_join(self.threads[i], null);
i += 1;
}
self.mu.destroy();
self.nonempty.destroy();
}
}
// The worker loop: C entry, own fabricated Context, then
// pop-task/run-task until stop with an empty queue.
pool_worker :: (arg: *void) -> *void abi(.c) {
p : *Pool = xx arg;
gpa := GPA.init();
push Context.{ allocator = xx gpa } {
while true {
p.mu.lock();
while !p.stop and p.len == 0 {
p.nonempty.wait(@p.mu);
}
if p.len == 0 { // stop, and nothing queued
p.mu.unlock();
break;
}
t := p.tasks[p.head];
p.head = (p.head + 1) % p.cap;
p.len -= 1;
p.mu.unlock();
f := t.f;
f(t.arg);
}
}
return null;
}