thread_pool_count = 0 (default) keeps handlers inline on the loop thread — the measured fast path (BENCH-HTTPZ.md). N > 0 dispatches each parsed request to a std.thread Pool of N workers, completing the httpz two-pool shape: the connection freezes as CONN_HANDLING (no reads, growth, eviction, or recycling — the worker borrows views into its read buffer), the worker runs the handler under a per-job arena and serializes into job-owned bytes, the completion queues under the PoolState mutex, and the loop wakes through the new std.event wake channel (kqueue EVFILT_USER + EV_CLEAR; the epoll twin maps to eventfd), attaches the response, compacts the buffer, and resumes keep-alive/pipeline handling. A full backlog sheds with 503. Stale completions (generation mismatch after close) are dropped. Pool mode requires the server's constructing allocator to be thread-safe (GPA/malloc), documented on the knob. PoolState lives behind a heap pointer (it embeds a Mutex and is shared with workers; the Server struct itself is returned by value). serialize_response/run_handler_job share one serialize_bytes. examples/1633 gains the pooled section (GET, body echo, 404 across worker threads) plus the loop-wake path exercised end to end; AOT run five times. examples/1632 unchanged but the Event struct gains `user`.
250 lines
10 KiB
Plaintext
250 lines
10 KiB
Plaintext
// std.http S7a (PLAN-HTTPZ): a live single-worker server and its
|
|
// clients driven in ONE thread via Server.tick — keep-alive reuse,
|
|
// POST body echo, the per-connection request cap closing politely,
|
|
// 404 routing, and half-a-header eviction at the request deadline
|
|
// while the server keeps serving others.
|
|
#import "modules/std.sx";
|
|
|
|
PORT :: 18933;
|
|
|
|
handler :: (req: *http.Request, resp: *http.Response, ctx: usize) {
|
|
if req.path == "/hello" {
|
|
resp.body = concat("hello ", req.method);
|
|
// the ctx word arrives verbatim (init passed 77)
|
|
if ctx != 77 { resp.status = 500; resp.body = "ctx lost"; }
|
|
return;
|
|
}
|
|
if req.path == "/echo" {
|
|
resp.body = req.body;
|
|
return;
|
|
}
|
|
resp.status = 404;
|
|
resp.body = "nope";
|
|
}
|
|
|
|
contains :: (hay: string, needle: string) -> bool {
|
|
if needle.len > hay.len { return false; }
|
|
i := 0;
|
|
while i + needle.len <= hay.len {
|
|
j := 0;
|
|
ok := true;
|
|
while j < needle.len {
|
|
if hay[i + j] != needle[j] { ok = false; break; }
|
|
j += 1;
|
|
}
|
|
if ok { return true; }
|
|
i += 1;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// Connect a nonblocking loopback client.
|
|
dial_port :: (port: i64) -> i32 {
|
|
fd := socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
|
|
if fd < 0 { return -1; }
|
|
addr : socket.SockAddr = .{
|
|
sin_len = 16, sin_family = xx socket.AF_INET,
|
|
sin_port = socket.htons(port), sin_addr = 0x0100007F,
|
|
};
|
|
if socket.connect(fd, @addr, 16) != 0 { socket.close(fd); return -1; }
|
|
if !socket.set_nonblocking(fd) { socket.close(fd); return -1; }
|
|
return fd;
|
|
}
|
|
|
|
dial :: () -> i32 {
|
|
return dial_port(PORT);
|
|
}
|
|
|
|
// True when `buf[0..len]` holds a complete response (headers + body).
|
|
resp_complete :: (buf: [*]u8, len: i64) -> bool {
|
|
s := string.{ ptr = buf, len = xx len };
|
|
he := -1;
|
|
i := 0;
|
|
while i + 3 < s.len {
|
|
if s[i] == 13 and s[i+1] == 10 and s[i+2] == 13 and s[i+3] == 10 { he = i; break; }
|
|
i += 1;
|
|
}
|
|
if he < 0 { return false; }
|
|
// Content-Length digits
|
|
cl : i64 = 0;
|
|
seen := false;
|
|
j := 0;
|
|
needle := "Content-Length: ";
|
|
while j + needle.len < s.len {
|
|
k := 0;
|
|
ok := true;
|
|
while k < needle.len { if s[j + k] != needle[k] { ok = false; break; } k += 1; }
|
|
if ok {
|
|
d := j + needle.len;
|
|
while d < s.len and s[d] >= 48 and s[d] <= 57 { cl = cl * 10 + (s[d] - 48); d += 1; }
|
|
seen = true;
|
|
break;
|
|
}
|
|
j += 1;
|
|
}
|
|
if !seen { return false; }
|
|
return len >= he + 4 + cl;
|
|
}
|
|
|
|
// Send a request and tick the server until its full response arrives.
|
|
// Returns the response text ("" = the connection closed instead).
|
|
roundtrip :: (srv: *http.Server, fd: i32, reqtext: string, scratch: [*]u8) -> string {
|
|
socket.write(fd, reqtext.ptr, xx reqtext.len);
|
|
total : i64 = 0;
|
|
tries := 0;
|
|
while tries < 400 {
|
|
srv.tick(5) catch {};
|
|
n, re := socket.read_nb(fd, @scratch[total], xx (4096 - total));
|
|
if !re { total += n; }
|
|
else if re == error.Closed { return string.{ ptr = scratch, len = xx total }; }
|
|
if resp_complete(scratch, total) { return string.{ ptr = scratch, len = xx total }; }
|
|
tries += 1;
|
|
}
|
|
return "";
|
|
}
|
|
|
|
main :: () -> i32 {
|
|
cfg : http.Config = .{
|
|
port = PORT,
|
|
timeout_request_ms = 150,
|
|
timeout_keepalive_ms = 400,
|
|
request_count = 3,
|
|
max_conn = 8,
|
|
};
|
|
srv, se := http.Server.init(cfg, handler, 77);
|
|
if se { print("server init failed\n"); return 1; }
|
|
|
|
buf : [4096]u8 = ---;
|
|
|
|
// ── 1. GET, keep-alive default ────────────────────────────────────
|
|
c1 := dial();
|
|
if c1 < 0 { print("dial failed\n"); return 1; }
|
|
r1 := roundtrip(@srv, c1, "GET /hello HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
|
|
if !contains(r1, "HTTP/1.1 200 OK") { print("case1: bad status\n"); return 1; }
|
|
if !contains(r1, "hello GET") { print("case1: bad body\n"); return 1; }
|
|
if !contains(r1, "Connection: keep-alive") { print("case1: expected keep-alive\n"); return 1; }
|
|
print("GET 200, keep-alive\n");
|
|
|
|
// ── 2. same socket again: the connection was actually reused ─────
|
|
r2 := roundtrip(@srv, c1, "GET /hello HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
|
|
if !contains(r2, "hello GET") { print("case2: keep-alive reuse failed\n"); return 1; }
|
|
print("keep-alive reuse ok\n");
|
|
|
|
// ── 3. third request hits request_count: Connection: close + EOF ─
|
|
r3 := roundtrip(@srv, c1, "GET /hello HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
|
|
if !contains(r3, "Connection: close") { print("case3: expected close at cap\n"); return 1; }
|
|
drained := false;
|
|
tries := 0;
|
|
while !drained and tries < 200 {
|
|
srv.tick(5) catch {};
|
|
zq, ze := socket.read_nb(c1, @buf[0], 64);
|
|
if ze == error.Closed { drained = true; }
|
|
if !ze and zq == 0 { drained = true; }
|
|
tries += 1;
|
|
}
|
|
if !drained { print("case3: server did not close at the cap\n"); return 1; }
|
|
socket.close(c1);
|
|
print("request cap: close + EOF\n");
|
|
|
|
// ── 4. POST body echo ─────────────────────────────────────────────
|
|
c2 := dial();
|
|
if c2 < 0 { print("dial2 failed\n"); return 1; }
|
|
r4 := roundtrip(@srv, c2, "POST /echo HTTP/1.1\r\nHost: t\r\nContent-Length: 9\r\n\r\nping-pong", @buf[0]);
|
|
if !contains(r4, "ping-pong") { print("case4: body not echoed\n"); return 1; }
|
|
print("POST echo ok\n");
|
|
|
|
// ── 5. unknown path routes 404 ────────────────────────────────────
|
|
r5 := roundtrip(@srv, c2, "GET /missing HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
|
|
if !contains(r5, "HTTP/1.1 404 Not Found") { print("case5: expected 404\n"); return 1; }
|
|
socket.close(c2);
|
|
print("404 routing ok\n");
|
|
|
|
// ── 5b. a body past READ_BUF_INITIAL forces the buffer to grow ────
|
|
big_n := 50000;
|
|
payload : [*]u8 = xx context.allocator.alloc_bytes(xx big_n);
|
|
f := 0;
|
|
while f < big_n { payload[f] = 97 + cast(u8)(f % 26); f += 1; }
|
|
breq := concat("POST /echo HTTP/1.1\r\nHost: t\r\nContent-Length: ", concat(int_to_string(big_n), "\r\n\r\n"));
|
|
breq = concat(breq, string.{ ptr = payload, len = xx big_n });
|
|
c2b := dial();
|
|
if c2b < 0 { print("dial2b failed\n"); return 1; }
|
|
bigbuf : [*]u8 = xx context.allocator.alloc_bytes(xx (big_n + 4096));
|
|
socket.write(c2b, breq.ptr, xx breq.len);
|
|
btotal : i64 = 0;
|
|
btries := 0;
|
|
while btries < 2000 {
|
|
srv.tick(5) catch {};
|
|
bn, bre := socket.read_nb(c2b, @bigbuf[btotal], xx (big_n + 4096 - btotal));
|
|
if !bre { btotal += bn; }
|
|
else if bre == error.Closed { break; }
|
|
if resp_complete(bigbuf, btotal) { break; }
|
|
btries += 1;
|
|
}
|
|
bresp := string.{ ptr = bigbuf, len = xx btotal };
|
|
if !contains(bresp, "HTTP/1.1 200 OK") { print("case5b: big echo not 200\n"); return 1; }
|
|
if !contains(bresp, concat("Content-Length: ", int_to_string(big_n))) { print("case5b: wrong echo length\n"); return 1; }
|
|
if bigbuf[btotal - 1] != 97 + cast(u8)((big_n - 1) % 26) { print("case5b: tail byte wrong\n"); return 1; }
|
|
socket.close(c2b);
|
|
print("big body grows the buffer and echoes intact\n");
|
|
|
|
// ── 6. half a header is evicted at the request deadline, while a
|
|
// healthy client keeps being served ──────────────────────────
|
|
c3 := dial();
|
|
if c3 < 0 { print("dial3 failed\n"); return 1; }
|
|
half := "GET /hel";
|
|
socket.write(c3, half.ptr, xx half.len);
|
|
gone := event.deadline_in(300); // > timeout_request_ms
|
|
while !event.expired(gone) { srv.tick(5) catch {}; }
|
|
c4 := dial();
|
|
if c4 < 0 { print("dial4 failed\n"); return 1; }
|
|
r6 := roundtrip(@srv, c4, "GET /hello HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
|
|
if !contains(r6, "hello GET") { print("case6: healthy client starved\n"); return 1; }
|
|
evicted := false;
|
|
tries = 0;
|
|
while !evicted and tries < 200 {
|
|
srv.tick(5) catch {};
|
|
zq2, ze2 := socket.read_nb(c3, @buf[0], 64);
|
|
if ze2 == error.Closed { evicted = true; }
|
|
if !ze2 and zq2 == 0 { evicted = true; }
|
|
tries += 1;
|
|
}
|
|
if !evicted { print("case6: half-header connection never evicted\n"); return 1; }
|
|
socket.close(c3);
|
|
socket.close(c4);
|
|
print("slow client evicted, healthy client served\n");
|
|
|
|
srv.close();
|
|
|
|
// ── pooled dispatch (S7b): same contract through worker threads ──
|
|
// thread_pool_count > 0 runs handlers on a pool; completions come
|
|
// back through the loop's wake channel. Same assertions: routing,
|
|
// body echo, keep-alive reuse — now crossing threads per request.
|
|
pcfg : http.Config = .{
|
|
port = PORT + 1,
|
|
timeout_request_ms = 1000,
|
|
timeout_keepalive_ms = 1000,
|
|
request_count = 50,
|
|
max_conn = 8,
|
|
thread_pool_count = 2,
|
|
thread_pool_backlog = 16,
|
|
};
|
|
psrv, pse := http.Server.init(pcfg, handler, 77);
|
|
if pse { print("pooled server init failed\n"); return 1; }
|
|
|
|
c5 := dial_port(PORT + 1);
|
|
if c5 < 0 { print("dial5 failed\n"); return 1; }
|
|
r7 := roundtrip(@psrv, c5, "GET /hello HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
|
|
if !contains(r7, "HTTP/1.1 200 OK") { print("pooled: bad status\n"); return 1; }
|
|
if !contains(r7, "hello GET") { print("pooled: bad body\n"); return 1; }
|
|
r8 := roundtrip(@psrv, c5, "POST /echo HTTP/1.1\r\nHost: t\r\nContent-Length: 9\r\n\r\nping-pong", @buf[0]);
|
|
if !contains(r8, "ping-pong") { print("pooled: echo failed\n"); return 1; }
|
|
r9 := roundtrip(@psrv, c5, "GET /missing HTTP/1.1\r\nHost: t\r\n\r\n", @buf[0]);
|
|
if !contains(r9, "HTTP/1.1 404 Not Found") { print("pooled: expected 404\n"); return 1; }
|
|
socket.close(c5);
|
|
psrv.close();
|
|
print("pooled dispatch: GET, echo, 404 across worker threads ok\n");
|
|
|
|
print("http server ok\n");
|
|
return 0;
|
|
}
|