Files
sx/examples/1633-http-server.sx
agra e57a27205e feat: std.http pooled handler dispatch (PLAN-HTTPZ S7b)
thread_pool_count = 0 (default) keeps handlers inline on the loop
thread — the measured fast path (BENCH-HTTPZ.md). N > 0 dispatches
each parsed request to a std.thread Pool of N workers, completing the
httpz two-pool shape: the connection freezes as CONN_HANDLING (no
reads, growth, eviction, or recycling — the worker borrows views into
its read buffer), the worker runs the handler under a per-job arena
and serializes into job-owned bytes, the completion queues under the
PoolState mutex, and the loop wakes through the new std.event wake
channel (kqueue EVFILT_USER + EV_CLEAR; the epoll twin maps to
eventfd), attaches the response, compacts the buffer, and resumes
keep-alive/pipeline handling. A full backlog sheds with 503. Stale
completions (generation mismatch after close) are dropped. Pool mode
requires the server's constructing allocator to be thread-safe
(GPA/malloc), documented on the knob.

PoolState lives behind a heap pointer (it embeds a Mutex and is shared
with workers; the Server struct itself is returned by value).
serialize_response/run_handler_job share one serialize_bytes.

examples/1633 gains the pooled section (GET, body echo, 404 across
worker threads) plus the loop-wake path exercised end to end; AOT run
five times. examples/1632 unchanged but the Event struct gains `user`.
2026-06-12 22:31:27 +03:00

250 lines
10 KiB
Plaintext

// std.http S7a (PLAN-HTTPZ): a live single-worker server and its
// clients driven in ONE thread via Server.tick — keep-alive reuse,
// POST body echo, the per-connection request cap closing politely,
// 404 routing, and half-a-header eviction at the request deadline
// while the server keeps serving others.
#import "modules/std.sx";
PORT :: 18933;
handler :: (req: *http.Request, resp: *http.Response, ctx: usize) {
if req.path == "/hello" {
resp.body = concat("hello ", req.method);
// the ctx word arrives verbatim (init passed 77)
if ctx != 77 { resp.status = 500; resp.body = "ctx lost"; }
return;
}
if req.path == "/echo" {
resp.body = req.body;
return;
}
resp.status = 404;
resp.body = "nope";
}
contains :: (hay: string, needle: string) -> bool {
if needle.len > hay.len { return false; }
i := 0;
while i + needle.len <= hay.len {
j := 0;
ok := true;
while j < needle.len {
if hay[i + j] != needle[j] { ok = false; break; }
j += 1;
}
if ok { return true; }
i += 1;
}
return false;
}
// Connect a nonblocking loopback client.
dial_port :: (port: i64) -> i32 {
fd := socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
if fd < 0 { return -1; }
addr : socket.SockAddr = .{
sin_len = 16, sin_family = xx socket.AF_INET,
sin_port = socket.htons(port), sin_addr = 0x0100007F,
};
if socket.connect(fd, @addr, 16) != 0 { socket.close(fd); return -1; }
if !socket.set_nonblocking(fd) { socket.close(fd); return -1; }
return fd;
}
dial :: () -> i32 {
return dial_port(PORT);
}
// True when `buf[0..len]` holds a complete response (headers + body).
resp_complete :: (buf: [*]u8, len: i64) -> bool {
s := string.{ ptr = buf, len = xx len };
he := -1;
i := 0;
while i + 3 < s.len {
if s[i] == 13 and s[i+1] == 10 and s[i+2] == 13 and s[i+3] == 10 { he = i; break; }
i += 1;
}
if he < 0 { return false; }
// Content-Length digits
cl : i64 = 0;
seen := false;
j := 0;
needle := "Content-Length: ";
while j + needle.len < s.len {
k := 0;
ok := true;
while k < needle.len { if s[j + k] != needle[k] { ok = false; break; } k += 1; }
if ok {
d := j + needle.len;
while d < s.len and s[d] >= 48 and s[d] <= 57 { cl = cl * 10 + (s[d] - 48); d += 1; }
seen = true;
break;
}
j += 1;
}
if !seen { return false; }
return len >= he + 4 + cl;
}
// Send a request and tick the server until its full response arrives.
// Returns the response text ("" = the connection closed instead).
roundtrip :: (srv: *http.Server, fd: i32, reqtext: string, scratch: [*]u8) -> string {
socket.write(fd, reqtext.ptr, xx reqtext.len);
total : i64 = 0;
tries := 0;
while tries < 400 {
srv.tick(5) catch {};
n, re := socket.read_nb(fd, @scratch[total], xx (4096 - total));
if !re { total += n; }
else if re == error.Closed { return string.{ ptr = scratch, len = xx total }; }
if resp_complete(scratch, total) { return string.{ ptr = scratch, len = xx total }; }
tries += 1;
}
return "";
}
main :: () -> i32 {
cfg : http.Config = .{
port = PORT,
timeout_request_ms = 150,
timeout_keepalive_ms = 400,
request_count = 3,
max_conn = 8,
};
srv, se := http.Server.init(cfg, handler, 77);
if se { print("server init failed\n"); return 1; }
buf : [4096]u8 = ---;
// ── 1. GET, keep-alive default ────────────────────────────────────
c1 := dial();
if c1 < 0 { print("dial failed\n"); return 1; }
r1 := roundtrip(@srv, c1, "GET /hello HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
if !contains(r1, "HTTP/1.1 200 OK") { print("case1: bad status\n"); return 1; }
if !contains(r1, "hello GET") { print("case1: bad body\n"); return 1; }
if !contains(r1, "Connection: keep-alive") { print("case1: expected keep-alive\n"); return 1; }
print("GET 200, keep-alive\n");
// ── 2. same socket again: the connection was actually reused ─────
r2 := roundtrip(@srv, c1, "GET /hello HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
if !contains(r2, "hello GET") { print("case2: keep-alive reuse failed\n"); return 1; }
print("keep-alive reuse ok\n");
// ── 3. third request hits request_count: Connection: close + EOF ─
r3 := roundtrip(@srv, c1, "GET /hello HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
if !contains(r3, "Connection: close") { print("case3: expected close at cap\n"); return 1; }
drained := false;
tries := 0;
while !drained and tries < 200 {
srv.tick(5) catch {};
zq, ze := socket.read_nb(c1, @buf[0], 64);
if ze == error.Closed { drained = true; }
if !ze and zq == 0 { drained = true; }
tries += 1;
}
if !drained { print("case3: server did not close at the cap\n"); return 1; }
socket.close(c1);
print("request cap: close + EOF\n");
// ── 4. POST body echo ─────────────────────────────────────────────
c2 := dial();
if c2 < 0 { print("dial2 failed\n"); return 1; }
r4 := roundtrip(@srv, c2, "POST /echo HTTP/1.1\r\nHost: t\r\nContent-Length: 9\r\n\r\nping-pong", @buf[0]);
if !contains(r4, "ping-pong") { print("case4: body not echoed\n"); return 1; }
print("POST echo ok\n");
// ── 5. unknown path routes 404 ────────────────────────────────────
r5 := roundtrip(@srv, c2, "GET /missing HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
if !contains(r5, "HTTP/1.1 404 Not Found") { print("case5: expected 404\n"); return 1; }
socket.close(c2);
print("404 routing ok\n");
// ── 5b. a body past READ_BUF_INITIAL forces the buffer to grow ────
big_n := 50000;
payload : [*]u8 = xx context.allocator.alloc_bytes(xx big_n);
f := 0;
while f < big_n { payload[f] = 97 + cast(u8)(f % 26); f += 1; }
breq := concat("POST /echo HTTP/1.1\r\nHost: t\r\nContent-Length: ", concat(int_to_string(big_n), "\r\n\r\n"));
breq = concat(breq, string.{ ptr = payload, len = xx big_n });
c2b := dial();
if c2b < 0 { print("dial2b failed\n"); return 1; }
bigbuf : [*]u8 = xx context.allocator.alloc_bytes(xx (big_n + 4096));
socket.write(c2b, breq.ptr, xx breq.len);
btotal : i64 = 0;
btries := 0;
while btries < 2000 {
srv.tick(5) catch {};
bn, bre := socket.read_nb(c2b, @bigbuf[btotal], xx (big_n + 4096 - btotal));
if !bre { btotal += bn; }
else if bre == error.Closed { break; }
if resp_complete(bigbuf, btotal) { break; }
btries += 1;
}
bresp := string.{ ptr = bigbuf, len = xx btotal };
if !contains(bresp, "HTTP/1.1 200 OK") { print("case5b: big echo not 200\n"); return 1; }
if !contains(bresp, concat("Content-Length: ", int_to_string(big_n))) { print("case5b: wrong echo length\n"); return 1; }
if bigbuf[btotal - 1] != 97 + cast(u8)((big_n - 1) % 26) { print("case5b: tail byte wrong\n"); return 1; }
socket.close(c2b);
print("big body grows the buffer and echoes intact\n");
// ── 6. half a header is evicted at the request deadline, while a
// healthy client keeps being served ──────────────────────────
c3 := dial();
if c3 < 0 { print("dial3 failed\n"); return 1; }
half := "GET /hel";
socket.write(c3, half.ptr, xx half.len);
gone := event.deadline_in(300); // > timeout_request_ms
while !event.expired(gone) { srv.tick(5) catch {}; }
c4 := dial();
if c4 < 0 { print("dial4 failed\n"); return 1; }
r6 := roundtrip(@srv, c4, "GET /hello HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
if !contains(r6, "hello GET") { print("case6: healthy client starved\n"); return 1; }
evicted := false;
tries = 0;
while !evicted and tries < 200 {
srv.tick(5) catch {};
zq2, ze2 := socket.read_nb(c3, @buf[0], 64);
if ze2 == error.Closed { evicted = true; }
if !ze2 and zq2 == 0 { evicted = true; }
tries += 1;
}
if !evicted { print("case6: half-header connection never evicted\n"); return 1; }
socket.close(c3);
socket.close(c4);
print("slow client evicted, healthy client served\n");
srv.close();
// ── pooled dispatch (S7b): same contract through worker threads ──
// thread_pool_count > 0 runs handlers on a pool; completions come
// back through the loop's wake channel. Same assertions: routing,
// body echo, keep-alive reuse — now crossing threads per request.
pcfg : http.Config = .{
port = PORT + 1,
timeout_request_ms = 1000,
timeout_keepalive_ms = 1000,
request_count = 50,
max_conn = 8,
thread_pool_count = 2,
thread_pool_backlog = 16,
};
psrv, pse := http.Server.init(pcfg, handler, 77);
if pse { print("pooled server init failed\n"); return 1; }
c5 := dial_port(PORT + 1);
if c5 < 0 { print("dial5 failed\n"); return 1; }
r7 := roundtrip(@psrv, c5, "GET /hello HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
if !contains(r7, "HTTP/1.1 200 OK") { print("pooled: bad status\n"); return 1; }
if !contains(r7, "hello GET") { print("pooled: bad body\n"); return 1; }
r8 := roundtrip(@psrv, c5, "POST /echo HTTP/1.1\r\nHost: t\r\nContent-Length: 9\r\n\r\nping-pong", @buf[0]);
if !contains(r8, "ping-pong") { print("pooled: echo failed\n"); return 1; }
r9 := roundtrip(@psrv, c5, "GET /missing HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
if !contains(r9, "HTTP/1.1 404 Not Found") { print("pooled: expected 404\n"); return 1; }
socket.close(c5);
psrv.close();
print("pooled dispatch: GET, echo, 404 across worker threads ok\n");
print("http server ok\n");
return 0;
}