thread_pool_count = 0 (default) keeps handlers inline on the loop thread — the measured fast path (BENCH-HTTPZ.md). N > 0 dispatches each parsed request to a std.thread Pool of N workers, completing the httpz two-pool shape: the connection freezes as CONN_HANDLING (no reads, growth, eviction, or recycling — the worker borrows views into its read buffer), the worker runs the handler under a per-job arena and serializes into job-owned bytes, the completion queues under the PoolState mutex, and the loop wakes through the new std.event wake channel (kqueue EVFILT_USER + EV_CLEAR; the epoll twin maps to eventfd), attaches the response, compacts the buffer, and resumes keep-alive/pipeline handling. A full backlog sheds with 503. Stale completions (generation mismatch after close) are dropped. Pool mode requires the server's constructing allocator to be thread-safe (GPA/malloc), documented on the knob. PoolState lives behind a heap pointer (it embeds a Mutex and is shared with workers; the Server struct itself is returned by value). serialize_response/run_handler_job share one serialize_bytes. examples/1633 gains the pooled section (GET, body echo, 404 across worker threads) plus the loop-wake path exercised end to end; AOT run five times. examples/1632 unchanged but the Event struct gains `user`.
746 lines
28 KiB
Plaintext
746 lines
28 KiB
Plaintext
// std.http — single-worker HTTP/1.1 server core (PLAN-HTTPZ S7a).
|
|
//
|
|
// The httpz shape, one worker, handlers inline: a readiness Loop
|
|
// (std.event) multiplexes the listener and every connection, so an
|
|
// idle socket costs nothing and nothing ever blocks the loop. Timeouts
|
|
// are EVICTION, not blocking — each connection carries a monotonic
|
|
// deadline (request delivery / keepalive idle), checked between waits.
|
|
// Keep-alive is the default for HTTP/1.1; a Connection header, the
|
|
// per-connection request cap, or HTTP/1.0 turns it off.
|
|
//
|
|
// API: `Server.init(cfg, handler)` then either `run()` (the forever
|
|
// loop) or `tick(max_wait_ms)` — one bounded loop iteration, which is
|
|
// also how tests drive a live server and its client sockets in ONE
|
|
// thread. S7b adds worker counts + a handler thread pool behind this
|
|
// same surface; the epoll backend arrives with the linux target (S4).
|
|
//
|
|
// MEMORY: init captures the constructing allocator (the Repo pattern):
|
|
// connection slots and their read buffers live across ticks and are
|
|
// reused connection-to-connection (httpz's buffer-pool spirit);
|
|
// response bytes are allocated per response and freed when fully sent.
|
|
// Request views handed to the handler point into the connection's read
|
|
// buffer and are valid only during the handler call.
|
|
|
|
#import "modules/std.sx";
|
|
|
|
HttpErr :: error {
|
|
Bind, // socket/bind/listen failed for the configured port
|
|
Loop, // the readiness loop could not be created or waited on
|
|
}
|
|
|
|
// httpz-mirroring knobs (single-worker subset).
|
|
Config :: struct {
|
|
port: i64 = 8080;
|
|
backlog: i32 = 128;
|
|
max_conn: i64 = 256; // workers.max_conn
|
|
// MAXIMUM bytes one request (headers + body) may occupy. The
|
|
// per-connection buffer starts at READ_BUF_INITIAL and grows on
|
|
// demand toward this limit (a declared Content-Length sizes it in
|
|
// one step); a grown buffer is retained for slot reuse.
|
|
read_buf_cap: i64 = 1048576;
|
|
timeout_request_ms: i64 = 5000; // deliver a full request, or evicted
|
|
timeout_keepalive_ms: i64 = 5000;// idle between requests, or evicted
|
|
request_count: i64 = 100; // requests per connection, then close
|
|
// Handler dispatch. 0 (default) runs handlers INLINE on the loop
|
|
// thread — measured ~6x faster for fast handlers (BENCH-HTTPZ.md).
|
|
// N > 0 dispatches each parsed request to a thread pool of N
|
|
// workers (slow handlers stop stalling the loop, httpz's shape);
|
|
// requires the Server's constructing allocator to be THREAD-SAFE
|
|
// (GPA/malloc — never an Arena).
|
|
thread_pool_count: i64 = 0;
|
|
thread_pool_backlog: i64 = 500; // queued jobs beyond running ones; full = 503 shed
|
|
}
|
|
|
|
READ_BUF_INITIAL :: 16384;
|
|
|
|
// One parsed request, viewed in place over the connection's read
|
|
// buffer — valid for the duration of the handler call only.
|
|
Request :: struct {
|
|
method: string = "";
|
|
path: string = "";
|
|
version: string = "";
|
|
headers_raw: string = ""; // the raw header block (no request line)
|
|
body: string = "";
|
|
keep_alive: bool = true;
|
|
}
|
|
|
|
// What the handler fills in; the server serializes it.
|
|
Response :: struct {
|
|
status: i64 = 200;
|
|
content_type: string = "text/plain; charset=utf-8";
|
|
extra_headers: string = ""; // preformatted "Name: value\r\n" lines
|
|
body: string = "";
|
|
}
|
|
|
|
// Case-insensitive header lookup in `headers_raw`; "" when absent.
|
|
// `name` must be lowercase.
|
|
find_header :: (req: *Request, name: string) -> string {
|
|
h := req.headers_raw;
|
|
i := 0;
|
|
while i < h.len {
|
|
// line start: try to match `name` case-insensitively, then ':'
|
|
j := 0;
|
|
while j < name.len and i + j < h.len {
|
|
c := h[i + j];
|
|
if c >= 65 and c <= 90 { c = c + 32; } // ASCII lower
|
|
if c != name[j] { break; }
|
|
j += 1;
|
|
}
|
|
if j == name.len and i + j < h.len and h[i + j] == 58 { // ':'
|
|
v := i + j + 1;
|
|
while v < h.len and h[v] == 32 { v += 1; }
|
|
e := v;
|
|
while e < h.len and h[e] != 13 { e += 1; }
|
|
return string.{ ptr = @h[v], len = e - v };
|
|
}
|
|
// skip to the next line
|
|
while i < h.len and h[i] != 10 { i += 1; } // '\n'
|
|
i += 1;
|
|
}
|
|
return "";
|
|
}
|
|
|
|
ascii_ieq :: (a: string, b_lower: string) -> bool {
|
|
if a.len != b_lower.len { return false; }
|
|
i := 0;
|
|
while i < a.len {
|
|
c := a[i];
|
|
if c >= 65 and c <= 90 { c = c + 32; }
|
|
if c != b_lower[i] { return false; }
|
|
i += 1;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
reason_for :: (status: i64) -> string {
|
|
if status == 200 { return "OK"; }
|
|
if status == 201 { return "Created"; }
|
|
if status == 204 { return "No Content"; }
|
|
if status == 301 { return "Moved Permanently"; }
|
|
if status == 302 { return "Found"; }
|
|
if status == 400 { return "Bad Request"; }
|
|
if status == 401 { return "Unauthorized"; }
|
|
if status == 403 { return "Forbidden"; }
|
|
if status == 404 { return "Not Found"; }
|
|
if status == 405 { return "Method Not Allowed"; }
|
|
if status == 413 { return "Content Too Large"; }
|
|
if status == 431 { return "Request Header Fields Too Large"; }
|
|
if status == 500 { return "Internal Server Error"; }
|
|
if status == 503 { return "Service Unavailable"; }
|
|
return "Status";
|
|
}
|
|
|
|
// Connection slot states.
|
|
CONN_FREE :u8: 0;
|
|
CONN_READING :u8: 1; // awaiting a complete request (deadline: request)
|
|
CONN_WRITING :u8: 2; // response partially sent (deadline: request)
|
|
CONN_KEEPALIVE :u8: 3; // between requests (deadline: keepalive)
|
|
CONN_HANDLING :u8: 4; // request dispatched to the pool: the loop must
|
|
// not read, evict, or recycle the slot (the
|
|
// worker holds views into its read buffer)
|
|
|
|
Conn :: struct {
|
|
fd: i32 = -1;
|
|
state: u8 = 0;
|
|
read_buf: [*]u8 = null; // grows toward config.read_buf_cap, reused across connections
|
|
read_cap: i64 = 0;
|
|
read_len: i64 = 0;
|
|
out_buf: [*]u8 = null; // per-response allocation, freed when sent
|
|
out_len: i64 = 0;
|
|
out_sent: i64 = 0;
|
|
deadline: i64 = 0;
|
|
served: i64 = 0;
|
|
close_after: bool = false;
|
|
write_armed: bool = false;
|
|
gen: i64 = 0; // bumped on close; a stale pool completion is dropped
|
|
}
|
|
|
|
// The listener's udata; connection udata is the slot index.
|
|
LISTENER_UDATA :usize: 0xFFFFFFFF;
|
|
// The pool wake channel's udata.
|
|
WAKE_UDATA :usize: 0xFFFFFFFE;
|
|
|
|
// Serialized response bytes (shared by the inline and pooled paths).
|
|
OutBytes :: struct {
|
|
buf: [*]u8 = null;
|
|
len: i64 = 0;
|
|
}
|
|
|
|
// One finished pool job, queued for the loop thread to attach.
|
|
Completion :: struct {
|
|
slot: i64 = 0;
|
|
gen: i64 = 0;
|
|
out: OutBytes = .{};
|
|
close_after: bool = false;
|
|
consumed: i64 = 0; // request bytes to drop from the read buffer
|
|
}
|
|
|
|
// Heap-resident pool machinery: embeds a Mutex and is shared with
|
|
// worker threads, so it must never move (the Server struct itself is
|
|
// returned by value from init and may be copied).
|
|
PoolState :: struct {
|
|
pool: *thread.Pool = null;
|
|
mu: thread.Mutex = .{};
|
|
done: [*]Completion = null;
|
|
done_cap: i64 = 0;
|
|
done_len: i64 = 0;
|
|
loop_copy: event.Loop = .{}; // Loop is an fd wrapper; copy-safe
|
|
}
|
|
|
|
// What a worker needs to run one handler. Allocated (and freed) per
|
|
// dispatch from the server's thread-safe allocator; `req`'s views point
|
|
// into the connection's read buffer, untouched while CONN_HANDLING.
|
|
HandlerJob :: struct {
|
|
handler: (*Request, *Response, usize) -> void;
|
|
app_ctx: usize = 0;
|
|
ps: *PoolState = null;
|
|
alloc: Allocator;
|
|
req: Request = .{};
|
|
slot: i64 = 0;
|
|
gen: i64 = 0;
|
|
keep: bool = false;
|
|
consumed: i64 = 0;
|
|
}
|
|
|
|
// The pool task: fabricated per-job arena over the (thread-safe) server
|
|
// allocator; serialize into job-owned bytes; queue the completion; wake
|
|
// the loop.
|
|
run_handler_job :: (arg: usize) {
|
|
job : *HandlerJob = xx arg;
|
|
resp : Response = .{};
|
|
arena := Arena.init(job.alloc, 65536);
|
|
ob : OutBytes = .{};
|
|
push Context.{ allocator = xx arena } {
|
|
h := job.handler;
|
|
h(@job.req, @resp, job.app_ctx);
|
|
ob = serialize_bytes(@resp, job.keep, job.alloc);
|
|
}
|
|
arena.deinit();
|
|
|
|
ps := job.ps;
|
|
done : Completion = .{
|
|
slot = job.slot, gen = job.gen, out = ob,
|
|
close_after = !job.keep, consumed = job.consumed,
|
|
};
|
|
ps.mu.lock();
|
|
if ps.done_len < ps.done_cap {
|
|
ps.done[ps.done_len] = done;
|
|
ps.done_len += 1;
|
|
}
|
|
ps.mu.unlock();
|
|
ps.loop_copy.wake();
|
|
a := job.alloc;
|
|
a.dealloc_bytes(xx job);
|
|
}
|
|
|
|
// Build the response bytes: status line, lengths, connection mode,
|
|
// extra headers, body — one allocation from `alloc`.
|
|
serialize_bytes :: (resp: *Response, keep: bool, alloc: Allocator) -> OutBytes {
|
|
head := concat("HTTP/1.1 ", concat(int_to_string(resp.status), concat(" ", reason_for(resp.status))));
|
|
head = concat(head, concat("\r\nContent-Length: ", int_to_string(resp.body.len)));
|
|
head = concat(head, concat("\r\nContent-Type: ", resp.content_type));
|
|
head = concat(head, if keep then "\r\nConnection: keep-alive\r\n" else "\r\nConnection: close\r\n");
|
|
if resp.extra_headers.len > 0 { head = concat(head, resp.extra_headers); }
|
|
head = concat(head, "\r\n");
|
|
|
|
total : i64 = xx (head.len + resp.body.len);
|
|
buf : [*]u8 = xx alloc.alloc_bytes(total);
|
|
memcpy(buf, head.ptr, head.len);
|
|
if resp.body.len > 0 { memcpy(@buf[head.len], resp.body.ptr, resp.body.len); }
|
|
return OutBytes.{ buf = buf, len = total };
|
|
}
|
|
|
|
Server :: struct {
|
|
cfg: Config;
|
|
loop: event.Loop;
|
|
lfd: i32 = -1;
|
|
conns: [*]Conn = null;
|
|
own_alloc: Allocator;
|
|
// The handler's third argument is `ctx`, an opaque word the app
|
|
// gave init — typically a pointer to its own state (store handle,
|
|
// config), since the server owns the call site.
|
|
handler: (*Request, *Response, usize) -> void;
|
|
ctx: usize = 0;
|
|
ps: *PoolState = null; // non-null iff cfg.thread_pool_count > 0
|
|
|
|
init :: (cfg: Config, handler: (*Request, *Response, usize) -> void, ctx: usize) -> (Server, !HttpErr) {
|
|
lfd := socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
|
|
if lfd < 0 { raise error.Bind; }
|
|
one : i32 = 1;
|
|
socket.setsockopt(lfd, socket.SOL_SOCKET, socket.SO_REUSEADDR, @one, 4);
|
|
addr : socket.SockAddr = .{
|
|
sin_len = 16, sin_family = xx socket.AF_INET,
|
|
sin_port = socket.htons(cfg.port), sin_addr = 0, // INADDR_ANY
|
|
};
|
|
if socket.bind(lfd, @addr, 16) != 0 { socket.close(lfd); raise error.Bind; }
|
|
if socket.listen(lfd, cfg.backlog) != 0 { socket.close(lfd); raise error.Bind; }
|
|
if !socket.set_nonblocking(lfd) { socket.close(lfd); raise error.Bind; }
|
|
|
|
lp, le := event.Loop.init();
|
|
if le { socket.close(lfd); raise error.Loop; }
|
|
are := false;
|
|
lp.add_read(lfd, LISTENER_UDATA) catch { are = true; };
|
|
if are { socket.close(lfd); raise error.Loop; }
|
|
|
|
oa := context.allocator;
|
|
slots : [*]Conn = xx oa.alloc_bytes(cfg.max_conn * size_of(Conn));
|
|
i : i64 = 0;
|
|
while i < cfg.max_conn {
|
|
slots[i] = Conn.{};
|
|
i += 1;
|
|
}
|
|
|
|
ps : *PoolState = null;
|
|
if cfg.thread_pool_count > 0 {
|
|
ps = xx oa.alloc_bytes(size_of(PoolState));
|
|
ps.* = PoolState.{};
|
|
if !ps.mu.setup() { socket.close(lfd); raise error.Loop; }
|
|
// max completions in flight = queued + running
|
|
ps.done_cap = cfg.thread_pool_backlog + cfg.thread_pool_count;
|
|
ps.done = xx oa.alloc_bytes(ps.done_cap * size_of(Completion));
|
|
ps.done_len = 0;
|
|
ps.loop_copy = lp;
|
|
wre := false;
|
|
lp.add_wake(WAKE_UDATA) catch { wre = true; };
|
|
if wre { socket.close(lfd); raise error.Loop; }
|
|
pool, pce := thread.Pool.create(cfg.thread_pool_count, cfg.thread_pool_backlog);
|
|
if pce { socket.close(lfd); raise error.Loop; }
|
|
ps.pool = pool;
|
|
}
|
|
|
|
return Server.{
|
|
cfg = cfg, loop = lp, lfd = lfd, conns = slots,
|
|
own_alloc = oa, handler = handler, ctx = ctx, ps = ps,
|
|
};
|
|
}
|
|
|
|
close :: (self: *Server) {
|
|
if self.ps != null {
|
|
self.ps.pool.shutdown();
|
|
}
|
|
i : i64 = 0;
|
|
while i < self.cfg.max_conn {
|
|
if self.conns[i].state != CONN_FREE { self.conn_close(i); }
|
|
i += 1;
|
|
}
|
|
if self.lfd >= 0 { socket.close(self.lfd); }
|
|
self.lfd = -1;
|
|
self.loop.close();
|
|
}
|
|
|
|
// ── slot management ──────────────────────────────────────────────
|
|
|
|
free_slot :: (self: *Server) -> i64 {
|
|
i : i64 = 0;
|
|
while i < self.cfg.max_conn {
|
|
if self.conns[i].state == CONN_FREE { return i; }
|
|
i += 1;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
conn_close :: (self: *Server, slot: i64) {
|
|
c := @self.conns[slot];
|
|
if c.fd >= 0 {
|
|
self.loop.del_read(c.fd);
|
|
if c.write_armed { self.loop.del_write(c.fd); }
|
|
socket.close(c.fd);
|
|
}
|
|
if c.out_buf != null {
|
|
self.own_alloc.dealloc_bytes(xx c.out_buf);
|
|
c.out_buf = null;
|
|
}
|
|
// read_buf stays allocated — reused by the next connection here.
|
|
c.fd = -1;
|
|
c.state = CONN_FREE;
|
|
c.gen += 1;
|
|
c.read_len = 0;
|
|
c.out_len = 0;
|
|
c.out_sent = 0;
|
|
c.served = 0;
|
|
c.close_after = false;
|
|
c.write_armed = false;
|
|
}
|
|
|
|
// ── the tick: one bounded loop iteration ─────────────────────────
|
|
//
|
|
// Waits at most `max_wait_ms` (sooner when a connection deadline is
|
|
// nearer), services every ready fd, then evicts expired connections.
|
|
tick :: (self: *Server, max_wait_ms: i64) -> !HttpErr {
|
|
wait_ms := max_wait_ms;
|
|
i : i64 = 0;
|
|
while i < self.cfg.max_conn {
|
|
c := self.conns[i];
|
|
if c.state != CONN_FREE and c.state != CONN_HANDLING {
|
|
left := event.remaining_ms(c.deadline);
|
|
if left < wait_ms { wait_ms = left; }
|
|
}
|
|
i += 1;
|
|
}
|
|
|
|
evs : [64]event.Event = ---;
|
|
n, werr := self.loop.wait(.{ ptr = @evs[0], len = 64 }, wait_ms);
|
|
if werr { raise error.Loop; }
|
|
|
|
k : i64 = 0;
|
|
while k < n {
|
|
ev := evs[k];
|
|
k += 1;
|
|
if ev.udata == LISTENER_UDATA {
|
|
self.accept_ready();
|
|
continue;
|
|
}
|
|
if ev.user or ev.udata == WAKE_UDATA {
|
|
self.drain_completions();
|
|
continue;
|
|
}
|
|
slot : i64 = xx ev.udata;
|
|
c := @self.conns[slot];
|
|
if c.state == CONN_FREE or c.fd != ev.fd { continue; } // stale event for a recycled slot
|
|
if c.state == CONN_HANDLING { continue; } // buffer frozen until the worker finishes
|
|
if ev.writable and c.state == CONN_WRITING {
|
|
self.write_more(slot);
|
|
continue;
|
|
}
|
|
if ev.readable or ev.eof {
|
|
self.read_more(slot);
|
|
}
|
|
}
|
|
|
|
// Deadline eviction — after I/O, so a request that just arrived
|
|
// under the wire is served, not evicted. A HANDLING conn is the
|
|
// worker's: never evicted (its read buffer is borrowed).
|
|
i = 0;
|
|
while i < self.cfg.max_conn {
|
|
st := self.conns[i].state;
|
|
if st != CONN_FREE and st != CONN_HANDLING and event.expired(self.conns[i].deadline) {
|
|
self.conn_close(i);
|
|
}
|
|
i += 1;
|
|
}
|
|
return;
|
|
}
|
|
|
|
run :: (self: *Server) {
|
|
while true {
|
|
self.tick(1000) catch {};
|
|
}
|
|
}
|
|
|
|
// ── accept ───────────────────────────────────────────────────────
|
|
|
|
accept_ready :: (self: *Server) {
|
|
while true {
|
|
fd, ae := socket.accept_nb(self.lfd);
|
|
if ae { return; } // WouldBlock = drained; Fault = nothing to do here
|
|
slot := self.free_slot();
|
|
if slot < 0 { socket.close(fd); return; } // at max_conn: shed
|
|
if !socket.set_nonblocking(fd) { socket.close(fd); return; }
|
|
c := @self.conns[slot];
|
|
if c.read_buf == null {
|
|
init_cap : i64 = READ_BUF_INITIAL;
|
|
if init_cap > self.cfg.read_buf_cap { init_cap = self.cfg.read_buf_cap; }
|
|
c.read_buf = xx self.own_alloc.alloc_bytes(init_cap);
|
|
c.read_cap = init_cap;
|
|
}
|
|
c.fd = fd;
|
|
c.state = CONN_READING;
|
|
c.read_len = 0;
|
|
c.served = 0;
|
|
c.close_after = false;
|
|
c.deadline = event.deadline_in(self.cfg.timeout_request_ms);
|
|
are := false;
|
|
self.loop.add_read(fd, xx slot) catch { are = true; };
|
|
if are { self.conn_close(slot); }
|
|
}
|
|
}
|
|
|
|
// ── read → parse → dispatch ──────────────────────────────────────
|
|
|
|
// Grow the slot's read buffer to at least `target` (0 = double),
|
|
// never past cfg.read_buf_cap. False when already at the limit.
|
|
grow_read_buf :: (self: *Server, slot: i64, target: i64) -> bool {
|
|
c := @self.conns[slot];
|
|
want := if target > 0 then target else c.read_cap * 2;
|
|
if want <= c.read_cap { return true; }
|
|
if want > self.cfg.read_buf_cap { want = self.cfg.read_buf_cap; }
|
|
if want <= c.read_cap { return false; } // already at the limit
|
|
nb : [*]u8 = xx self.own_alloc.alloc_bytes(want);
|
|
if c.read_len > 0 { memcpy(nb, c.read_buf, xx c.read_len); }
|
|
self.own_alloc.dealloc_bytes(xx c.read_buf);
|
|
c.read_buf = nb;
|
|
c.read_cap = want;
|
|
return true;
|
|
}
|
|
|
|
read_more :: (self: *Server, slot: i64) {
|
|
c := @self.conns[slot];
|
|
if c.state == CONN_KEEPALIVE {
|
|
c.state = CONN_READING;
|
|
c.deadline = event.deadline_in(self.cfg.timeout_request_ms);
|
|
}
|
|
while true {
|
|
if c.read_len == c.read_cap {
|
|
if !self.grow_read_buf(slot, 0) {
|
|
// over the limit: oversized headers (431) or body (413)
|
|
hdr_done := false;
|
|
i : i64 = 0;
|
|
while i + 3 < c.read_len {
|
|
if c.read_buf[i] == 13 and c.read_buf[i+1] == 10 and c.read_buf[i+2] == 13 and c.read_buf[i+3] == 10 { hdr_done = true; break; }
|
|
i += 1;
|
|
}
|
|
self.respond_error_close(slot, if hdr_done then 413 else 431);
|
|
return;
|
|
}
|
|
}
|
|
nq, re := socket.read_nb(c.fd, @c.read_buf[c.read_len], xx (c.read_cap - c.read_len));
|
|
if re == error.WouldBlock { break; }
|
|
if re { // Closed or Fault
|
|
self.conn_close(slot);
|
|
return;
|
|
}
|
|
c.read_len += nq;
|
|
}
|
|
self.serve_buffered(slot);
|
|
}
|
|
|
|
// Serve every complete request sitting in the buffer (a keep-alive
|
|
// client may pipeline; each response must finish sending before the
|
|
// next parse — a pending partial write pauses the drain and
|
|
// write_more resumes it).
|
|
serve_buffered :: (self: *Server, slot: i64) {
|
|
while self.conns[slot].state == CONN_READING {
|
|
if !self.try_serve_one(slot) { return; }
|
|
}
|
|
}
|
|
|
|
// Parse one request off the front of the buffer; false = incomplete
|
|
// (need more bytes) or the connection left the READING state.
|
|
try_serve_one :: (self: *Server, slot: i64) -> bool {
|
|
c := @self.conns[slot];
|
|
buf := string.{ ptr = c.read_buf, len = xx c.read_len };
|
|
|
|
// headers complete?
|
|
he := -1;
|
|
i := 0;
|
|
while i + 3 < buf.len {
|
|
if buf[i] == 13 and buf[i+1] == 10 and buf[i+2] == 13 and buf[i+3] == 10 { he = i; break; }
|
|
i += 1;
|
|
}
|
|
if he < 0 { return false; }
|
|
|
|
// request line: METHOD SP PATH SP VERSION CRLF
|
|
req : Request = .{};
|
|
p := 0;
|
|
while p < he and buf[p] != 32 { p += 1; }
|
|
req.method = string.{ ptr = c.read_buf, len = p };
|
|
p += 1;
|
|
ps := p;
|
|
while p < he and buf[p] != 32 { p += 1; }
|
|
req.path = string.{ ptr = @c.read_buf[ps], len = p - ps };
|
|
p += 1;
|
|
vs := p;
|
|
while p < he and buf[p] != 13 { p += 1; }
|
|
req.version = string.{ ptr = @c.read_buf[vs], len = p - vs };
|
|
hdr_start := p + 2;
|
|
if req.method.len == 0 or req.path.len == 0 or hdr_start > he {
|
|
self.respond_error_close(slot, 400);
|
|
return false;
|
|
}
|
|
req.headers_raw = string.{ ptr = @c.read_buf[hdr_start], len = he - hdr_start + 2 };
|
|
|
|
// body per Content-Length
|
|
clen : i64 = 0;
|
|
clv := find_header(@req, "content-length");
|
|
j := 0;
|
|
while j < clv.len {
|
|
d := clv[j];
|
|
if d < 48 or d > 57 { self.respond_error_close(slot, 400); return false; }
|
|
clen = clen * 10 + (d - 48);
|
|
j += 1;
|
|
}
|
|
total := xx he + 4 + clen;
|
|
if total > self.cfg.read_buf_cap { self.respond_error_close(slot, 413); return false; }
|
|
if c.read_len < total {
|
|
// size the buffer for the declared body in one step
|
|
self.grow_read_buf(slot, total);
|
|
return false;
|
|
}
|
|
req.body = string.{ ptr = @c.read_buf[he + 4], len = xx clen };
|
|
|
|
// keep-alive: 1.1 default on, 1.0 default off, header overrides
|
|
req.keep_alive = !ascii_ieq(req.version, "http/1.0");
|
|
cnv := find_header(@req, "connection");
|
|
if ascii_ieq(cnv, "close") { req.keep_alive = false; }
|
|
if ascii_ieq(cnv, "keep-alive") { req.keep_alive = true; }
|
|
|
|
c.served += 1;
|
|
keep := req.keep_alive and c.served < self.cfg.request_count;
|
|
|
|
// Pooled dispatch: freeze this connection (CONN_HANDLING — no
|
|
// reads, no eviction, no recycling, so the job's request views
|
|
// into the read buffer stay valid), hand the job to a worker,
|
|
// and return; the completion re-enters via drain_completions.
|
|
// A full backlog sheds with 503 (httpz backpressure).
|
|
if self.ps != null {
|
|
job : *HandlerJob = xx self.own_alloc.alloc_bytes(size_of(HandlerJob));
|
|
job.* = HandlerJob.{
|
|
handler = self.handler, app_ctx = self.ctx, ps = self.ps,
|
|
alloc = self.own_alloc, req = req,
|
|
slot = slot, gen = c.gen, keep = keep, consumed = total,
|
|
};
|
|
c.state = CONN_HANDLING;
|
|
if !self.ps.pool.submit(run_handler_job, xx job) {
|
|
self.own_alloc.dealloc_bytes(xx job);
|
|
c.state = CONN_READING;
|
|
self.respond_error_close(slot, 503);
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// Dispatch under a per-request arena: everything the handler
|
|
// (and serialization) allocates through the implicit context
|
|
// dies with the request — response bytes survive because
|
|
// serialize copies them into own_alloc inside the push scope.
|
|
// Serialize while the request views are still valid (the body
|
|
// may reference the read buffer), THEN drop the served bytes —
|
|
// write_more's pipelining check must see only the remainder —
|
|
// and only then start sending. Overlapping copy: dst < src, so
|
|
// forward byte-wise is safe.
|
|
// (The handler field must be loaded — `self.handler(...)` would
|
|
// be parsed as a dot-call on a function named `handler`.)
|
|
h := self.handler;
|
|
resp : Response = .{};
|
|
req_arena := Arena.init(self.own_alloc, 65536);
|
|
push Context.{ allocator = xx req_arena } {
|
|
h(@req, @resp, self.ctx);
|
|
self.serialize_response(slot, @resp, keep);
|
|
}
|
|
req_arena.deinit();
|
|
rest := c.read_len - total;
|
|
m : i64 = 0;
|
|
while m < rest {
|
|
c.read_buf[m] = c.read_buf[total + m];
|
|
m += 1;
|
|
}
|
|
c.read_len = rest;
|
|
self.write_more(slot);
|
|
return true;
|
|
}
|
|
|
|
// ── response serialization + write continuation ──────────────────
|
|
|
|
// Build the response bytes into the slot's out buffer. Does NOT
|
|
// start sending — try_serve_one compacts the read buffer between
|
|
// serialization and the first write (see the ordering note there).
|
|
serialize_response :: (self: *Server, slot: i64, resp: *Response, keep: bool) {
|
|
ob := serialize_bytes(resp, keep, self.own_alloc);
|
|
c := @self.conns[slot];
|
|
c.out_buf = ob.buf;
|
|
c.out_len = ob.len;
|
|
c.out_sent = 0;
|
|
c.close_after = !keep;
|
|
}
|
|
|
|
// Attach every queued pool completion to its connection: drop the
|
|
// served request bytes, take the response, start writing. A
|
|
// completion whose generation no longer matches (the conn was
|
|
// closed by Server.close) just frees its bytes.
|
|
drain_completions :: (self: *Server) {
|
|
if self.ps == null { return; }
|
|
ps := self.ps;
|
|
ps.mu.lock();
|
|
n := ps.done_len;
|
|
ps.done_len = 0;
|
|
// copy out under the lock; the list is small (<= backlog + workers)
|
|
batch : [*]Completion = xx context.allocator.alloc_bytes(if n > 0 then n * size_of(Completion) else 8);
|
|
i : i64 = 0;
|
|
while i < n {
|
|
batch[i] = ps.done[i];
|
|
i += 1;
|
|
}
|
|
ps.mu.unlock();
|
|
|
|
i = 0;
|
|
while i < n {
|
|
done := batch[i];
|
|
i += 1;
|
|
c := @self.conns[done.slot];
|
|
if c.state != CONN_HANDLING or c.gen != done.gen {
|
|
self.own_alloc.dealloc_bytes(xx done.out.buf);
|
|
continue;
|
|
}
|
|
c.out_buf = done.out.buf;
|
|
c.out_len = done.out.len;
|
|
c.out_sent = 0;
|
|
c.close_after = done.close_after;
|
|
// drop the served bytes (dst < src: forward copy is safe)
|
|
rest := c.read_len - done.consumed;
|
|
m : i64 = 0;
|
|
while m < rest {
|
|
c.read_buf[m] = c.read_buf[done.consumed + m];
|
|
m += 1;
|
|
}
|
|
c.read_len = rest;
|
|
c.state = CONN_READING;
|
|
c.deadline = event.deadline_in(self.cfg.timeout_request_ms);
|
|
self.write_more(done.slot);
|
|
}
|
|
}
|
|
|
|
write_more :: (self: *Server, slot: i64) {
|
|
c := @self.conns[slot];
|
|
while c.out_sent < c.out_len {
|
|
nq, we := socket.write_nb(c.fd, @c.out_buf[c.out_sent], xx (c.out_len - c.out_sent));
|
|
if we == error.WouldBlock {
|
|
if !c.write_armed {
|
|
awe := false;
|
|
self.loop.add_write(c.fd, xx slot) catch { awe = true; };
|
|
if awe { self.conn_close(slot); return; }
|
|
c.write_armed = true;
|
|
}
|
|
c.state = CONN_WRITING;
|
|
return;
|
|
}
|
|
if we { self.conn_close(slot); return; }
|
|
c.out_sent += nq;
|
|
}
|
|
// fully sent
|
|
if c.write_armed {
|
|
self.loop.del_write(c.fd);
|
|
c.write_armed = false;
|
|
}
|
|
self.own_alloc.dealloc_bytes(xx c.out_buf);
|
|
c.out_buf = null;
|
|
c.out_len = 0;
|
|
c.out_sent = 0;
|
|
if c.close_after {
|
|
self.conn_close(slot);
|
|
return;
|
|
}
|
|
if c.read_len > 0 {
|
|
// pipelined bytes already buffered: keep serving
|
|
c.state = CONN_READING;
|
|
c.deadline = event.deadline_in(self.cfg.timeout_request_ms);
|
|
self.serve_buffered(slot);
|
|
return;
|
|
}
|
|
c.state = CONN_KEEPALIVE;
|
|
c.deadline = event.deadline_in(self.cfg.timeout_keepalive_ms);
|
|
}
|
|
|
|
// A terminal error response: serialize, send, close when done.
|
|
// Same arena discipline as dispatch — serialization concats must
|
|
// not accumulate in the long-lived loop context.
|
|
respond_error_close :: (self: *Server, slot: i64, status: i64) {
|
|
resp : Response = .{ status = status, body = reason_for(status) };
|
|
self.conns[slot].read_len = 0;
|
|
err_arena := Arena.init(self.own_alloc, 4096);
|
|
push Context.{ allocator = xx err_arena } {
|
|
self.serialize_response(slot, @resp, false);
|
|
}
|
|
err_arena.deinit();
|
|
self.write_more(slot);
|
|
}
|
|
}
|