// std.http — single-worker HTTP/1.1 server core (PLAN-HTTPZ S7a). // // The httpz shape, one worker, handlers inline: a readiness Loop // (std.event) multiplexes the listener and every connection, so an // idle socket costs nothing and nothing ever blocks the loop. Timeouts // are EVICTION, not blocking — each connection carries a monotonic // deadline (request delivery / keepalive idle), checked between waits. // Keep-alive is the default for HTTP/1.1; a Connection header, the // per-connection request cap, or HTTP/1.0 turns it off. // // API: `Server.init(cfg, handler)` then either `run()` (the forever // loop) or `tick(max_wait_ms)` — one bounded loop iteration, which is // also how tests drive a live server and its client sockets in ONE // thread. S7b adds worker counts + a handler thread pool behind this // same surface; the epoll backend arrives with the linux target (S4). // // MEMORY: init captures the constructing allocator (the Repo pattern): // connection slots and their read buffers live across ticks and are // reused connection-to-connection (httpz's buffer-pool spirit); // response bytes are allocated per response and freed when fully sent. // Request views handed to the handler point into the connection's read // buffer and are valid only during the handler call. #import "modules/std.sx"; HttpErr :: error { Bind, // socket/bind/listen failed for the configured port Loop, // the readiness loop could not be created or waited on } // httpz-mirroring knobs (single-worker subset). Config :: struct { port: i64 = 8080; backlog: i32 = 128; max_conn: i64 = 256; // workers.max_conn read_buf_cap: i64 = 65536; // workers.large_buffer_size timeout_request_ms: i64 = 5000; // deliver a full request, or evicted timeout_keepalive_ms: i64 = 5000;// idle between requests, or evicted request_count: i64 = 100; // requests per connection, then close } // One parsed request, viewed in place over the connection's read // buffer — valid for the duration of the handler call only. Request :: struct { method: string = ""; path: string = ""; version: string = ""; headers_raw: string = ""; // the raw header block (no request line) body: string = ""; keep_alive: bool = true; } // What the handler fills in; the server serializes it. Response :: struct { status: i64 = 200; content_type: string = "text/plain; charset=utf-8"; extra_headers: string = ""; // preformatted "Name: value\r\n" lines body: string = ""; } // Case-insensitive header lookup in `headers_raw`; "" when absent. // `name` must be lowercase. find_header :: (req: *Request, name: string) -> string { h := req.headers_raw; i := 0; while i < h.len { // line start: try to match `name` case-insensitively, then ':' j := 0; while j < name.len and i + j < h.len { c := h[i + j]; if c >= 65 and c <= 90 { c = c + 32; } // ASCII lower if c != name[j] { break; } j += 1; } if j == name.len and i + j < h.len and h[i + j] == 58 { // ':' v := i + j + 1; while v < h.len and h[v] == 32 { v += 1; } e := v; while e < h.len and h[e] != 13 { e += 1; } return string.{ ptr = @h[v], len = e - v }; } // skip to the next line while i < h.len and h[i] != 10 { i += 1; } // '\n' i += 1; } return ""; } ascii_ieq :: (a: string, b_lower: string) -> bool { if a.len != b_lower.len { return false; } i := 0; while i < a.len { c := a[i]; if c >= 65 and c <= 90 { c = c + 32; } if c != b_lower[i] { return false; } i += 1; } return true; } reason_for :: (status: i64) -> string { if status == 200 { return "OK"; } if status == 201 { return "Created"; } if status == 204 { return "No Content"; } if status == 301 { return "Moved Permanently"; } if status == 302 { return "Found"; } if status == 400 { return "Bad Request"; } if status == 401 { return "Unauthorized"; } if status == 403 { return "Forbidden"; } if status == 404 { return "Not Found"; } if status == 405 { return "Method Not Allowed"; } if status == 413 { return "Content Too Large"; } if status == 431 { return "Request Header Fields Too Large"; } if status == 500 { return "Internal Server Error"; } if status == 503 { return "Service Unavailable"; } return "Status"; } // Connection slot states. CONN_FREE :u8: 0; CONN_READING :u8: 1; // awaiting a complete request (deadline: request) CONN_WRITING :u8: 2; // response partially sent (deadline: request) CONN_KEEPALIVE :u8: 3; // between requests (deadline: keepalive) Conn :: struct { fd: i32 = -1; state: u8 = 0; read_buf: [*]u8 = null; // cap = config.read_buf_cap, reused across connections read_len: i64 = 0; out_buf: [*]u8 = null; // per-response allocation, freed when sent out_len: i64 = 0; out_sent: i64 = 0; deadline: i64 = 0; served: i64 = 0; close_after: bool = false; write_armed: bool = false; } // The listener's udata; connection udata is the slot index. LISTENER_UDATA :usize: 0xFFFFFFFF; Server :: struct { cfg: Config; loop: event.Loop; lfd: i32 = -1; conns: [*]Conn = null; own_alloc: Allocator; handler: (*Request, *Response) -> void; init :: (cfg: Config, handler: (*Request, *Response) -> void) -> (Server, !HttpErr) { lfd := socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0); if lfd < 0 { raise error.Bind; } one : i32 = 1; socket.setsockopt(lfd, socket.SOL_SOCKET, socket.SO_REUSEADDR, @one, 4); addr : socket.SockAddr = .{ sin_len = 16, sin_family = xx socket.AF_INET, sin_port = socket.htons(cfg.port), sin_addr = 0, // INADDR_ANY }; if socket.bind(lfd, @addr, 16) != 0 { socket.close(lfd); raise error.Bind; } if socket.listen(lfd, cfg.backlog) != 0 { socket.close(lfd); raise error.Bind; } if !socket.set_nonblocking(lfd) { socket.close(lfd); raise error.Bind; } lp, le := event.Loop.init(); if le { socket.close(lfd); raise error.Loop; } are := false; lp.add_read(lfd, LISTENER_UDATA) catch { are = true; }; if are { socket.close(lfd); raise error.Loop; } oa := context.allocator; slots : [*]Conn = xx oa.alloc_bytes(cfg.max_conn * size_of(Conn)); i : i64 = 0; while i < cfg.max_conn { slots[i] = Conn.{}; i += 1; } return Server.{ cfg = cfg, loop = lp, lfd = lfd, conns = slots, own_alloc = oa, handler = handler, }; } close :: (self: *Server) { i : i64 = 0; while i < self.cfg.max_conn { if self.conns[i].state != CONN_FREE { self.conn_close(i); } i += 1; } if self.lfd >= 0 { socket.close(self.lfd); } self.lfd = -1; self.loop.close(); } // ── slot management ────────────────────────────────────────────── free_slot :: (self: *Server) -> i64 { i : i64 = 0; while i < self.cfg.max_conn { if self.conns[i].state == CONN_FREE { return i; } i += 1; } return -1; } conn_close :: (self: *Server, slot: i64) { c := @self.conns[slot]; if c.fd >= 0 { self.loop.del_read(c.fd); if c.write_armed { self.loop.del_write(c.fd); } socket.close(c.fd); } if c.out_buf != null { self.own_alloc.dealloc_bytes(xx c.out_buf, c.out_len); c.out_buf = null; } // read_buf stays allocated — reused by the next connection here. c.fd = -1; c.state = CONN_FREE; c.read_len = 0; c.out_len = 0; c.out_sent = 0; c.served = 0; c.close_after = false; c.write_armed = false; } // ── the tick: one bounded loop iteration ───────────────────────── // // Waits at most `max_wait_ms` (sooner when a connection deadline is // nearer), services every ready fd, then evicts expired connections. tick :: (self: *Server, max_wait_ms: i64) -> !HttpErr { wait_ms := max_wait_ms; i : i64 = 0; while i < self.cfg.max_conn { c := self.conns[i]; if c.state != CONN_FREE { left := event.remaining_ms(c.deadline); if left < wait_ms { wait_ms = left; } } i += 1; } evs : [64]event.Event = ---; n, werr := self.loop.wait(.{ ptr = @evs[0], len = 64 }, wait_ms); if werr { raise error.Loop; } k : i64 = 0; while k < n { ev := evs[k]; k += 1; if ev.udata == LISTENER_UDATA { self.accept_ready(); continue; } slot : i64 = xx ev.udata; c := @self.conns[slot]; if c.state == CONN_FREE or c.fd != ev.fd { continue; } // stale event for a recycled slot if ev.writable and c.state == CONN_WRITING { self.write_more(slot); continue; } if ev.readable or ev.eof { self.read_more(slot); } } // Deadline eviction — after I/O, so a request that just arrived // under the wire is served, not evicted. i = 0; while i < self.cfg.max_conn { if self.conns[i].state != CONN_FREE and event.expired(self.conns[i].deadline) { self.conn_close(i); } i += 1; } return; } run :: (self: *Server) { while true { self.tick(1000) catch {}; } } // ── accept ─────────────────────────────────────────────────────── accept_ready :: (self: *Server) { while true { fd, ae := socket.accept_nb(self.lfd); if ae { return; } // WouldBlock = drained; Fault = nothing to do here slot := self.free_slot(); if slot < 0 { socket.close(fd); return; } // at max_conn: shed if !socket.set_nonblocking(fd) { socket.close(fd); return; } c := @self.conns[slot]; if c.read_buf == null { c.read_buf = xx self.own_alloc.alloc_bytes(self.cfg.read_buf_cap); } c.fd = fd; c.state = CONN_READING; c.read_len = 0; c.served = 0; c.close_after = false; c.deadline = event.deadline_in(self.cfg.timeout_request_ms); are := false; self.loop.add_read(fd, xx slot) catch { are = true; }; if are { self.conn_close(slot); } } } // ── read → parse → dispatch ────────────────────────────────────── read_more :: (self: *Server, slot: i64) { c := @self.conns[slot]; if c.state == CONN_KEEPALIVE { c.state = CONN_READING; c.deadline = event.deadline_in(self.cfg.timeout_request_ms); } while true { cap := self.cfg.read_buf_cap - c.read_len; if cap <= 0 { self.respond_error_close(slot, 431); return; } nq, re := socket.read_nb(c.fd, @c.read_buf[c.read_len], xx cap); if re == error.WouldBlock { break; } if re { // Closed or Fault self.conn_close(slot); return; } c.read_len += nq; } self.serve_buffered(slot); } // Serve every complete request sitting in the buffer (a keep-alive // client may pipeline; each response must finish sending before the // next parse — a pending partial write pauses the drain and // write_more resumes it). serve_buffered :: (self: *Server, slot: i64) { while self.conns[slot].state == CONN_READING { if !self.try_serve_one(slot) { return; } } } // Parse one request off the front of the buffer; false = incomplete // (need more bytes) or the connection left the READING state. try_serve_one :: (self: *Server, slot: i64) -> bool { c := @self.conns[slot]; buf := string.{ ptr = c.read_buf, len = xx c.read_len }; // headers complete? he := -1; i := 0; while i + 3 < buf.len { if buf[i] == 13 and buf[i+1] == 10 and buf[i+2] == 13 and buf[i+3] == 10 { he = i; break; } i += 1; } if he < 0 { return false; } // request line: METHOD SP PATH SP VERSION CRLF req : Request = .{}; p := 0; while p < he and buf[p] != 32 { p += 1; } req.method = string.{ ptr = c.read_buf, len = p }; p += 1; ps := p; while p < he and buf[p] != 32 { p += 1; } req.path = string.{ ptr = @c.read_buf[ps], len = p - ps }; p += 1; vs := p; while p < he and buf[p] != 13 { p += 1; } req.version = string.{ ptr = @c.read_buf[vs], len = p - vs }; hdr_start := p + 2; if req.method.len == 0 or req.path.len == 0 or hdr_start > he { self.respond_error_close(slot, 400); return false; } req.headers_raw = string.{ ptr = @c.read_buf[hdr_start], len = he - hdr_start + 2 }; // body per Content-Length clen : i64 = 0; clv := find_header(@req, "content-length"); j := 0; while j < clv.len { d := clv[j]; if d < 48 or d > 57 { self.respond_error_close(slot, 400); return false; } clen = clen * 10 + (d - 48); j += 1; } total := xx he + 4 + clen; if total > self.cfg.read_buf_cap { self.respond_error_close(slot, 413); return false; } if c.read_len < total { return false; } req.body = string.{ ptr = @c.read_buf[he + 4], len = xx clen }; // keep-alive: 1.1 default on, 1.0 default off, header overrides req.keep_alive = !ascii_ieq(req.version, "http/1.0"); cnv := find_header(@req, "connection"); if ascii_ieq(cnv, "close") { req.keep_alive = false; } if ascii_ieq(cnv, "keep-alive") { req.keep_alive = true; } // dispatch (the field must be loaded — `self.handler(...)` would // be parsed as a dot-call on a function named `handler`) h := self.handler; resp : Response = .{}; h(@req, @resp); c.served += 1; keep := req.keep_alive and c.served < self.cfg.request_count; // Serialize while the request views are still valid (the body // may reference the read buffer), THEN drop the served bytes — // write_more's pipelining check must see only the remainder — // and only then start sending. Overlapping copy: dst < src, so // forward byte-wise is safe. self.serialize_response(slot, @resp, keep); rest := c.read_len - total; m : i64 = 0; while m < rest { c.read_buf[m] = c.read_buf[total + m]; m += 1; } c.read_len = rest; self.write_more(slot); return true; } // ── response serialization + write continuation ────────────────── // Build the response bytes into the slot's out buffer. Does NOT // start sending — try_serve_one compacts the read buffer between // serialization and the first write (see the ordering note there). serialize_response :: (self: *Server, slot: i64, resp: *Response, keep: bool) { head := concat("HTTP/1.1 ", concat(int_to_string(resp.status), concat(" ", reason_for(resp.status)))); head = concat(head, concat("\r\nContent-Length: ", int_to_string(resp.body.len))); head = concat(head, concat("\r\nContent-Type: ", resp.content_type)); head = concat(head, if keep then "\r\nConnection: keep-alive\r\n" else "\r\nConnection: close\r\n"); if resp.extra_headers.len > 0 { head = concat(head, resp.extra_headers); } head = concat(head, "\r\n"); c := @self.conns[slot]; c.out_len = xx (head.len + resp.body.len); c.out_buf = xx self.own_alloc.alloc_bytes(xx c.out_len); memcpy(c.out_buf, head.ptr, head.len); if resp.body.len > 0 { memcpy(@c.out_buf[head.len], resp.body.ptr, resp.body.len); } c.out_sent = 0; c.close_after = !keep; } write_more :: (self: *Server, slot: i64) { c := @self.conns[slot]; while c.out_sent < c.out_len { nq, we := socket.write_nb(c.fd, @c.out_buf[c.out_sent], xx (c.out_len - c.out_sent)); if we == error.WouldBlock { if !c.write_armed { awe := false; self.loop.add_write(c.fd, xx slot) catch { awe = true; }; if awe { self.conn_close(slot); return; } c.write_armed = true; } c.state = CONN_WRITING; return; } if we { self.conn_close(slot); return; } c.out_sent += nq; } // fully sent if c.write_armed { self.loop.del_write(c.fd); c.write_armed = false; } self.own_alloc.dealloc_bytes(xx c.out_buf, c.out_len); c.out_buf = null; c.out_len = 0; c.out_sent = 0; if c.close_after { self.conn_close(slot); return; } if c.read_len > 0 { // pipelined bytes already buffered: keep serving c.state = CONN_READING; c.deadline = event.deadline_in(self.cfg.timeout_request_ms); self.serve_buffered(slot); return; } c.state = CONN_KEEPALIVE; c.deadline = event.deadline_in(self.cfg.timeout_keepalive_ms); } // A terminal error response: serialize, send, close when done. respond_error_close :: (self: *Server, slot: i64, status: i64) { resp : Response = .{ status = status, body = reason_for(status) }; self.conns[slot].read_len = 0; self.serialize_response(slot, @resp, false); self.write_more(slot); } }