distd serves through std.http — the readiness loop lands (PLAN-HTTPZ A1)

The hand-rolled sequential accept loop, its SO_RCVTIMEO band-aid, and
the whole src/server/http.sx module are retired: distd is now a
std.http handler. Server.init gets the store directory through the ctx
word; route() fills a Response instead of writing to a socket; every
handler ports mechanically (respond_error/load_or_503/respond_render
take *Response; bodies allocate from the per-request arena, never the
stack, since serialization happens after the handler returns).
Downloads keep X-Checksum-SHA256 via extra_headers; auth takes the
extracted Authorization value; the 411 contract (POST/PUT must declare
Content-Length) moves into the handler, pinned as before.

Config: 512 MiB read cap (whole-body artifact uploads), 120s request
deadline, 5s keepalive, 200 requests per connection. Idle connections
now cost nothing — timeouts evict, never block.

http_client gains its own 10s read timeout (the old shared helper's
secs->ms change had silently shrunk it to 10ms).

tests/server_http.sx pins the architecture: a request answers within
1s while SIX idle preconnects are held open (the retired loop paid
250ms-2s per idle socket serially), and two requests ride one
keep-alive connection. make test 24/24 green.
This commit is contained in:
agra
2026-06-12 22:00:31 +03:00
parent a6052bbb4c
commit 48a13c43ee
6 changed files with 243 additions and 496 deletions

View File

@@ -62,7 +62,6 @@
#import "../validation/artifact_file.sx";
#import "../publish/publish.sx";
sock :: #import "modules/std/socket.sx";
http :: #import "http.sx";
au :: #import "auth.sx";
db :: #import "../repo/db.sx";
jout :: #import "../json_out.sx";
@@ -126,29 +125,34 @@ render_buf :: () -> string {
// ── responses ─────────────────────────────────────────────────────────
// JSON error body in the CLI's error shape, sent with the HTTP status.
respond_error :: (client: i32, code: i64, fail_code: string, fail_message: string) {
respond_error :: (resp: *http.Response, code: i64, fail_code: string, fail_message: string) {
f : jout.CliFailure = .{ code = fail_code, message = fail_message };
raw : [4096]u8 = ---;
// The body must outlive this frame (the server serializes after the
// handler returns), so the render buffer comes from the per-request
// arena, never the stack.
raw : [*]u8 = xx context.allocator.alloc_bytes(4096);
werr := false;
n := jout.write_error(f, string.{ ptr = @raw[0], len = 4096 }) catch { werr = true; 0 };
n := jout.write_error(f, string.{ ptr = raw, len = 4096 }) catch { werr = true; 0 };
body := "{\"status\":\"error\"}";
if !werr { body = string.{ ptr = @raw[0], len = n }; }
http.respond(client, code, "application/json", "", body);
if !werr { body = string.{ ptr = raw, len = n }; }
resp.status = code;
resp.content_type = "application/json";
resp.body = body;
}
// ── /api renders (builders own the `try`, callers catch) ─────────────
// Reload the persisted model. Null means the store has no readable
// database — the 503 error response has already been sent.
load_or_503 :: (client: i32, store_dir: string) -> ?Repo {
load_or_503 :: (resp: *http.Response, store_dir: string) -> ?Repo {
if !db.store_exists(store_dir) {
respond_error(client, 503, "store.load",
respond_error(resp, 503, "store.load",
concat("no store database (nothing published yet): ", store_dir));
return null;
}
loaded, le := db.load(store_dir);
if le {
respond_error(client, 503, "store.load",
respond_error(resp, 503, "store.load",
concat("the store database could not be loaded: ", store_dir));
return null;
}
@@ -202,13 +206,15 @@ render_app_detail_json :: (repo: *Repo, app: App, dst: []u8) -> (i64, !JsonError
// Send `n` rendered bytes of `buf` as 200 JSON — or the overflow error
// when the render didn't fit RENDER_CAP.
respond_render :: (client: i32, buf: string, n: i64, overflowed: bool) {
respond_render :: (resp: *http.Response, buf: string, n: i64, overflowed: bool) {
if overflowed {
respond_error(client, 500, "http.response_overflow",
respond_error(resp, 500, "http.response_overflow",
"response exceeded the server's render buffer");
return;
}
http.respond(client, 200, "application/json", "", string.{ ptr = buf.ptr, len = n });
resp.status = 200;
resp.content_type = "application/json";
resp.body = string.{ ptr = buf.ptr, len = n };
}
// ── HTML index (the install-page seed, subplan 04 Slice 5) ───────────
@@ -302,34 +308,35 @@ render_index :: (repo: *Repo) -> string {
return concat(page, INDEX_FOOT);
}
handle_index :: (client: i32, store_dir: string) {
rq := load_or_503(client, store_dir);
handle_index :: (resp: *http.Response, store_dir: string) {
rq := load_or_503(resp, store_dir);
if rq == null { return; }
repo := rq!;
http.respond(client, 200, "text/html; charset=utf-8", "", render_index(@repo));
resp.content_type = "text/html; charset=utf-8";
resp.body = render_index(@repo);
}
// ── routes ────────────────────────────────────────────────────────────
handle_apps_index :: (client: i32, store_dir: string) {
rq := load_or_503(client, store_dir);
handle_apps_index :: (resp: *http.Response, store_dir: string) {
rq := load_or_503(resp, store_dir);
if rq == null { return; }
repo := rq!;
buf := render_buf();
werr := false;
n := render_apps_json(@repo, buf) catch { werr = true; 0 };
respond_render(client, buf, n, werr);
respond_render(resp, buf, n, werr);
}
handle_app_detail :: (client: i32, store_dir: string, slug: string) {
rq := load_or_503(client, store_dir);
handle_app_detail :: (resp: *http.Response, store_dir: string, slug: string) {
rq := load_or_503(resp, store_dir);
if rq == null { return; }
repo := rq!;
aq := repo.find_app_by_slug(slug);
if aq == null {
respond_error(client, 404, "api.unknown_app",
respond_error(resp, 404, "api.unknown_app",
concat("no app with that slug in the store: ", slug));
return;
}
@@ -337,24 +344,25 @@ handle_app_detail :: (client: i32, store_dir: string, slug: string) {
buf := render_buf();
werr := false;
n := render_app_detail_json(@repo, aq!, buf) catch { werr = true; 0 };
respond_render(client, buf, n, werr);
respond_render(resp, buf, n, werr);
}
handle_download :: (client: i32, store_dir: string, sha: string) {
handle_download :: (resp: *http.Response, store_dir: string, sha: string) {
if !is_hex64(sha) {
respond_error(client, 404, "download.bad_key",
respond_error(resp, 404, "download.bad_key",
"download key must be a 64-char lowercase-hex sha256");
return;
}
opath := path_join(store_dir, concat("objects/", sha));
bq := read_file(opath);
if bq == null {
respond_error(client, 404, "download.unknown_object",
respond_error(resp, 404, "download.unknown_object",
concat("no object with that digest in the store: ", sha));
return;
}
extra := concat("X-Checksum-SHA256: ", concat(sha, "\r\n"));
http.respond(client, 200, "application/octet-stream", extra, bq!);
resp.content_type = "application/octet-stream";
resp.extra_headers = concat("X-Checksum-SHA256: ", concat(sha, "\r\n"));
resp.body = bq!;
}
// ── install pages (subplan 04 Slice 5) ────────────────────────────────
@@ -408,23 +416,23 @@ InstallCtx :: struct {
host: string = "localhost";
}
resolve_install :: (client: i32, repo: *Repo, slug: string, chan_name: string) -> ?InstallCtx {
resolve_install :: (resp: *http.Response, repo: *Repo, slug: string, chan_name: string) -> ?InstallCtx {
aq := repo.find_app_by_slug(slug);
if aq == null {
respond_error(client, 404, "install.unknown_app",
respond_error(resp, 404, "install.unknown_app",
concat("no app with that slug in the store: ", slug));
return null;
}
app := aq!;
cq := repo.get_channel(app.id, chan_name);
if cq == null {
respond_error(client, 404, "install.unknown_channel",
respond_error(resp, 404, "install.unknown_channel",
concat("the app has no channel with that name: ", chan_name));
return null;
}
rq := repo.get_release(cq!.current_release_id);
if rq == null {
respond_error(client, 404, "install.no_release",
respond_error(resp, 404, "install.no_release",
concat("the channel does not point at a published release: ", chan_name));
return null;
}
@@ -528,43 +536,41 @@ render_install :: (ctx: *InstallCtx, repo: *Repo, detected: ?Platform) -> string
return concat(page, "</body></html>");
}
handle_install_page :: (client: i32, store_dir: string, req: *http.Request, slug: string, chan_name: string) {
rq := load_or_503(client, store_dir);
handle_install_page :: (resp: *http.Response, store_dir: string, req: *http.Request, slug: string, chan_name: string) {
rq := load_or_503(resp, store_dir);
if rq == null { return; }
repo := rq!;
ctxq := resolve_install(client, @repo, slug, chan_name);
ctxq := resolve_install(resp, @repo, slug, chan_name);
if ctxq == null { return; }
ctx := ctxq!;
ua := "";
uq := http.header_value(req.headers, "user-agent");
if uq != null { ua = uq!; }
hostq := http.header_value(req.headers, "host");
if hostq != null { ctx.host = hostq!; }
ua := http.find_header(req, "user-agent");
hostv := http.find_header(req, "host");
if hostv.len > 0 { ctx.host = hostv; }
http.respond(client, 200, "text/html; charset=utf-8", "",
render_install(@ctx, @repo, ua_platform(ua)));
resp.content_type = "text/html; charset=utf-8";
resp.body = render_install(@ctx, @repo, ua_platform(ua));
}
// The enterprise OTA manifest: Apple's plist shape, with the software
// package URL pointing back at this server over HTTPS (itms-services
// refuses plain http; TLS terminates at the reverse proxy).
handle_manifest_plist :: (client: i32, store_dir: string, req: *http.Request, slug: string, chan_name: string) {
rq := load_or_503(client, store_dir);
handle_manifest_plist :: (resp: *http.Response, store_dir: string, req: *http.Request, slug: string, chan_name: string) {
rq := load_or_503(resp, store_dir);
if rq == null { return; }
repo := rq!;
ctxq := resolve_install(client, @repo, slug, chan_name);
ctxq := resolve_install(resp, @repo, slug, chan_name);
if ctxq == null { return; }
ctx := ctxq!;
if ctx.app.ios_mode != .enterprise {
respond_error(client, 404, "install.not_enterprise",
respond_error(resp, 404, "install.not_enterprise",
"the app's iOS install mode is not enterprise; no OTA manifest exists");
return;
}
bid := bundle_id_for(ctx.app, .ios);
if bid.len == 0 {
respond_error(client, 404, "install.no_bundle_id",
respond_error(resp, 404, "install.no_bundle_id",
"the app has no iOS bundle id; set one with: dist app set --ios-bundle-id");
return;
}
@@ -578,14 +584,14 @@ handle_manifest_plist :: (client: i32, store_dir: string, req: *http.Request, sl
ai += 1;
}
if !found {
respond_error(client, 404, "install.no_ios_artifact",
respond_error(resp, 404, "install.no_ios_artifact",
"the channel's current release carries no iOS artifact");
return;
}
host := "localhost";
hostq := http.header_value(req.headers, "host");
if hostq != null { host = hostq!; }
hostv2 := http.find_header(req, "host");
if hostv2.len > 0 { host = hostv2; }
url := concat("https://", concat(host, concat("/download/", art.sha256)));
x := "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<!DOCTYPE plist PUBLIC \"-//Apple//DTD PLIST 1.0//EN\" \"http://www.apple.com/DTDs/PropertyList-1.0.dtd\">\n<plist version=\"1.0\"><dict><key>items</key><array><dict>";
@@ -599,27 +605,28 @@ handle_manifest_plist :: (client: i32, store_dir: string, req: *http.Request, sl
x = concat(x, html_escape(ctx.app.display_name));
x = concat(x, "</string></dict></dict></array></dict></plist>\n");
http.respond(client, 200, "application/xml", "", x);
resp.content_type = "application/xml";
resp.body = x;
}
// GET /install/<slug>/<channel>[/manifest.plist]
handle_install_route :: (client: i32, store_dir: string, req: *http.Request, tail: string) {
handle_install_route :: (resp: *http.Response, store_dir: string, req: *http.Request, tail: string) {
s1 := seg_split(tail);
if s1.head.len == 0 or s1.rest.len == 0 {
respond_error(client, 404, "http.not_found",
respond_error(resp, 404, "http.not_found",
"install pages live at /install/<slug>/<channel>");
return;
}
s2 := seg_split(s1.rest);
if s2.rest.len == 0 {
handle_install_page(client, store_dir, req, s1.head, s2.head);
handle_install_page(resp, store_dir, req, s1.head, s2.head);
return;
}
if s2.rest == "manifest.plist" {
handle_manifest_plist(client, store_dir, req, s1.head, s2.head);
handle_manifest_plist(resp, store_dir, req, s1.head, s2.head);
return;
}
respond_error(client, 404, "http.not_found",
respond_error(resp, 404, "http.not_found",
concat("no install route for ", tail));
}
@@ -648,28 +655,28 @@ seg_split :: (s: string) -> Seg {
// Authenticate a write request or answer it. Null means the refusal
// response has already been sent.
auth_or_respond :: (client: i32, store_dir: string, req: *http.Request, app_slug: string, channel: string) -> ?Token {
auth_or_respond :: (resp: *http.Response, store_dir: string, req: *http.Request, app_slug: string, channel: string) -> ?Token {
fail : jout.CliFailure = .{};
t, ae := au.authenticate(store_dir, req.headers, "publish", app_slug, channel, @fail);
t, ae := au.authenticate(store_dir, http.find_header(req, "authorization"), "publish", app_slug, channel, @fail);
if ae {
status : i64 = 401;
if ae == error.Forbidden { status = 403; }
if ae == error.Unavailable { status = 503; }
respond_error(client, status, fail.code, fail.message);
respond_error(resp, status, fail.code, fail.message);
return null;
}
return t;
}
// JSON-object body, or null after answering 400.
body_object :: (client: i32, body: string) -> ?Object {
body_object :: (resp: *http.Response, body: string) -> ?Object {
v, pe := jsrv.parse(body, context.allocator);
if pe {
respond_error(client, 400, "api.bad_json", "request body is not valid JSON");
respond_error(resp, 400, "api.bad_json", "request body is not valid JSON");
return null;
}
if v != .object {
respond_error(client, 400, "api.bad_json", "request body must be a JSON object");
respond_error(resp, 400, "api.bad_json", "request body must be a JSON object");
return null;
}
return v.object;
@@ -685,7 +692,7 @@ wb_find :: (o: Object, key: string) -> ?Value {
}
// Required string member, or null after answering 400.
wb_req_str :: (client: i32, o: Object, key: string) -> ?string {
wb_req_str :: (resp: *http.Response, o: Object, key: string) -> ?string {
vq := wb_find(o, key);
if vq != null {
v := vq!;
@@ -693,7 +700,7 @@ wb_req_str :: (client: i32, o: Object, key: string) -> ?string {
if v.str.len > 0 { return v.str; }
}
}
respond_error(client, 400, "api.missing_field",
respond_error(resp, 400, "api.missing_field",
concat("body requires a non-empty string member: ", key));
return null;
}
@@ -717,10 +724,10 @@ wb_opt_int :: (o: Object, key: string, default_: i64) -> i64 {
// POST /api/upload — content-address the raw body into the store. The
// upload is app-agnostic (the bytes are inert until a release references
// them), so auth demands only the publish scope.
handle_upload :: (client: i32, store_dir: string, req: *http.Request) {
if auth_or_respond(client, store_dir, req, "", "") == null { return; }
handle_upload :: (resp: *http.Response, store_dir: string, req: *http.Request) {
if auth_or_respond(resp, store_dir, req, "", "") == null { return; }
if req.body.len == 0 {
respond_error(client, 400, "upload.empty", "upload body is empty");
respond_error(resp, 400, "upload.empty", "upload body is empty");
return;
}
st := Store.init(store_dir);
@@ -730,31 +737,31 @@ handle_upload :: (client: i32, store_dir: string, req: *http.Request) {
if se { werr = true; }
if !se { key = k; }
if werr {
respond_error(client, 500, "store.write",
respond_error(resp, 500, "store.write",
"upload bytes could not be content-addressed into the store");
return;
}
body := concat("{\"status\":\"stored\",\"sha256\":\"", concat(key, concat("\",\"size_bytes\":", concat(int_to_string(req.body.len), "}"))));
http.respond(client, 200, "application/json", "", body);
resp.content_type = "application/json";
resp.body = concat("{\"status\":\"stored\",\"sha256\":\"", concat(key, concat("\",\"size_bytes\":", concat(int_to_string(req.body.len), "}"))));
}
// POST /api/apps/<slug>/releases — publish a release whose artifacts name
// already-uploaded objects by digest. Mirrors the CLI manifest checks
// (platform, digest, size, content type, filename extension) against the
// STORE rather than local files, then commits through the shared pipeline.
handle_release_create :: (client: i32, store_dir: string, req: *http.Request, slug: string) {
oq := body_object(client, req.body);
handle_release_create :: (resp: *http.Response, store_dir: string, req: *http.Request, slug: string) {
oq := body_object(resp, req.body);
if oq == null { return; }
o := oq!;
versionq := wb_req_str(client, o, "version");
versionq := wb_req_str(resp, o, "version");
if versionq == null { return; }
version := versionq!;
channelq := wb_req_str(client, o, "channel");
channelq := wb_req_str(resp, o, "channel");
if channelq == null { return; }
channel := channelq!;
tokq := auth_or_respond(client, store_dir, req, slug, channel);
tokq := auth_or_respond(resp, store_dir, req, slug, channel);
if tokq == null { return; }
tok := tokq!;
@@ -768,7 +775,7 @@ handle_release_create :: (client: i32, store_dir: string, req: *http.Request, sl
}
}
if !arts_ok {
respond_error(client, 400, "api.missing_field",
respond_error(resp, 400, "api.missing_field",
"body requires a non-empty artifacts array");
return;
}
@@ -778,25 +785,25 @@ handle_release_create :: (client: i32, store_dir: string, req: *http.Request, sl
i := 0;
while i < arts_arr.len {
if arts_arr.items[i] != .object {
respond_error(client, 400, "api.bad_json", "each artifact must be a JSON object");
respond_error(resp, 400, "api.bad_json", "each artifact must be a JSON object");
return;
}
ao := arts_arr.items[i].object;
pidq := wb_req_str(client, ao, "platform");
pidq := wb_req_str(resp, ao, "platform");
if pidq == null { return; }
platform, perr := parse_platform(pidq!);
if perr {
respond_error(client, 400, "api.unknown_platform",
respond_error(resp, 400, "api.unknown_platform",
concat("unknown platform id: ", pidq!));
return;
}
shaq := wb_req_str(client, ao, "sha256");
shaq := wb_req_str(resp, ao, "sha256");
if shaq == null { return; }
sha := shaq!;
if !is_hex64(sha) {
respond_error(client, 400, "api.bad_digest",
respond_error(resp, 400, "api.bad_digest",
"artifact sha256 must be a 64-char lowercase-hex digest");
return;
}
@@ -806,14 +813,14 @@ handle_release_create :: (client: i32, store_dir: string, req: *http.Request, sl
opath := path_join(store_dir, concat("objects/", sha));
ob := read_file(opath);
if ob == null {
respond_error(client, 404, "api.unknown_object",
respond_error(resp, 404, "api.unknown_object",
concat("no object with that digest in the store (upload it first): ", sha));
return;
}
actual_size := ob!.len;
declared := wb_opt_int(ao, "size_bytes", -1);
if declared >= 0 and declared != actual_size {
respond_error(client, 400, "api.size_mismatch",
respond_error(resp, 400, "api.size_mismatch",
concat("declared size_bytes does not match the stored object: ", sha));
return;
}
@@ -821,7 +828,7 @@ handle_release_create :: (client: i32, store_dir: string, req: *http.Request, sl
ct := wb_opt_str(ao, "content_type");
if ct.len == 0 { ct = pl.default_content_type(platform); }
if !is_allowed_content_type(ct) {
respond_error(client, 400, "api.content_type_denied",
respond_error(resp, 400, "api.content_type_denied",
concat("artifact content type is not on the allow-list: ", ct));
return;
}
@@ -831,7 +838,7 @@ handle_release_create :: (client: i32, store_dir: string, req: *http.Request, sl
fname = concat(slug, concat("-", concat(version, concat(".", expected_ext(platform)))));
}
if !ext_matches_platform(fname, platform) {
respond_error(client, 400, "api.extension_mismatch",
respond_error(resp, 400, "api.extension_mismatch",
concat("artifact filename extension does not match its platform: ", fname));
return;
}
@@ -857,14 +864,14 @@ handle_release_create :: (client: i32, store_dir: string, req: *http.Request, sl
if ce == error.Persist {
if fail.code == "persist.load" { status = 503; }
}
respond_error(client, status, fail.code, fail.message);
respond_error(resp, status, fail.code, fail.message);
return;
}
if !ce {
buf := render_buf();
werr := false;
n := pl.write_json(@outcome, buf) catch { werr = true; 0 };
respond_render(client, buf, n, werr);
respond_render(resp, buf, n, werr);
}
}
@@ -878,71 +885,71 @@ op_http_status :: (e: ops.OpError) -> i64 {
// POST /api/apps/<slug>/channels/<name>/promote — body {"release_id":..};
// delegates to the CLI's promote pipeline.
handle_promote :: (client: i32, store_dir: string, req: *http.Request, slug: string, chan_name: string) {
oq := body_object(client, req.body);
handle_promote :: (resp: *http.Response, store_dir: string, req: *http.Request, slug: string, chan_name: string) {
oq := body_object(resp, req.body);
if oq == null { return; }
relq := wb_req_str(client, oq!, "release_id");
relq := wb_req_str(resp, oq!, "release_id");
if relq == null { return; }
if auth_or_respond(client, store_dir, req, slug, chan_name) == null { return; }
if auth_or_respond(resp, store_dir, req, slug, chan_name) == null { return; }
fail : jout.CliFailure = .{};
o, e := ops.run_promote(store_dir, slug, chan_name, relq!, @fail);
if e {
respond_error(client, op_http_status(e), fail.code, fail.message);
respond_error(resp, op_http_status(e), fail.code, fail.message);
return;
}
if !e {
buf := render_buf();
werr := false;
n := ops.write_promote_json(@o, buf) catch { werr = true; 0 };
respond_render(client, buf, n, werr);
respond_render(resp, buf, n, werr);
}
}
// POST /api/apps/<slug>/channels/<name>/rollback — empty body; delegates
// to the CLI's rollback pipeline.
handle_rollback :: (client: i32, store_dir: string, req: *http.Request, slug: string, chan_name: string) {
if auth_or_respond(client, store_dir, req, slug, chan_name) == null { return; }
handle_rollback :: (resp: *http.Response, store_dir: string, req: *http.Request, slug: string, chan_name: string) {
if auth_or_respond(resp, store_dir, req, slug, chan_name) == null { return; }
fail : jout.CliFailure = .{};
o, e := ops.run_rollback(store_dir, slug, chan_name, @fail);
if e {
respond_error(client, op_http_status(e), fail.code, fail.message);
respond_error(resp, op_http_status(e), fail.code, fail.message);
return;
}
if !e {
buf := render_buf();
werr := false;
n := ops.write_rollback_json(@o, buf) catch { werr = true; 0 };
respond_render(client, buf, n, werr);
respond_render(resp, buf, n, werr);
}
}
// Route a write request under POST /api/. Returns false when no write
// route matches (the caller 404s).
route_post :: (client: i32, store_dir: string, req: *http.Request) -> bool {
route_post :: (resp: *http.Response, store_dir: string, req: *http.Request) -> bool {
path := req.path;
if path == "/api/upload" {
handle_upload(client, store_dir, req);
handle_upload(resp, store_dir, req);
return true;
}
if starts_with(path, "/api/apps/") {
s1 := seg_split(tail_after(path, "/api/apps/"));
if s1.head.len == 0 { return false; }
if s1.rest == "releases" {
handle_release_create(client, store_dir, req, s1.head);
handle_release_create(resp, store_dir, req, s1.head);
return true;
}
if starts_with(s1.rest, "channels/") {
s2 := seg_split(tail_after(s1.rest, "channels/"));
if s2.head.len == 0 { return false; }
if s2.rest == "promote" {
handle_promote(client, store_dir, req, s1.head, s2.head);
handle_promote(resp, store_dir, req, s1.head, s2.head);
return true;
}
if s2.rest == "rollback" {
handle_rollback(client, store_dir, req, s1.head, s2.head);
handle_rollback(resp, store_dir, req, s1.head, s2.head);
return true;
}
}
@@ -950,117 +957,103 @@ route_post :: (client: i32, store_dir: string, req: *http.Request) -> bool {
return false;
}
// Route one parsed request: GET reads, POST writes.
route :: (client: i32, store_dir: string, req: *http.Request) -> i64 {
// Route one parsed request into `resp`: GET reads, POST writes. The
// outcome status lives in resp.status (respond_error sets it on every
// refusal path).
route :: (store_dir: string, req: *http.Request, resp: *http.Response) {
method := req.method;
path := req.path;
if method == "GET" {
if path == "/" {
handle_index(client, store_dir);
return 200;
handle_index(resp, store_dir);
return;
}
if path == "/healthz" {
http.respond(client, 200, "application/json", "", "{\"status\":\"ok\"}");
return 200;
resp.content_type = "application/json";
resp.body = "{\"status\":\"ok\"}";
return;
}
if path == "/api/apps" {
handle_apps_index(client, store_dir);
return 200;
handle_apps_index(resp, store_dir);
return;
}
if starts_with(path, "/api/apps/") {
handle_app_detail(client, store_dir, tail_after(path, "/api/apps/"));
return 200;
handle_app_detail(resp, store_dir, tail_after(path, "/api/apps/"));
return;
}
if starts_with(path, "/download/") {
handle_download(client, store_dir, tail_after(path, "/download/"));
return 200;
handle_download(resp, store_dir, tail_after(path, "/download/"));
return;
}
if starts_with(path, "/install/") {
handle_install_route(client, store_dir, req, tail_after(path, "/install/"));
return 200;
handle_install_route(resp, store_dir, req, tail_after(path, "/install/"));
return;
}
if path == "/admin" or starts_with(path, "/admin/") {
adm.handle_admin(client, store_dir, path);
return 200;
adm.handle_admin(store_dir, path, resp);
return;
}
respond_error(client, 404, "http.not_found",
respond_error(resp, 404, "http.not_found",
concat("no route for ", path));
return 404;
}
if method == "POST" {
if route_post(client, store_dir, req) { return 200; }
respond_error(client, 404, "http.not_found",
concat("no write route for ", path));
return 404;
}
respond_error(client, 405, "http.method_not_allowed",
"distd routes are GET (reads) or POST (token-gated writes)");
return 405;
}
// ── server loop ───────────────────────────────────────────────────────
// Read one request off `client`, route it, log the result line. All
// allocations land in the pushed per-request context allocator.
serve_one :: (client: i32, store_dir: string) {
req : http.Request = .{};
closed := false;
rstatus : i64 = 0;
rcode := "";
rmsg := "";
http.read_request(client, @req) catch (e) {
if e == error.Closed { closed = true; }
if !closed {
rstatus = 400; rcode = "http.bad_request";
rmsg = "request could not be read as HTTP (bad request line or truncated body)";
if e == error.LengthRequired {
rstatus = 411; rcode = "http.length_required";
rmsg = "POST/PUT requires a Content-Length header";
}
if e == error.BodyTooLarge {
rstatus = 413; rcode = "http.body_too_large";
rmsg = "request body exceeds the server's size cap";
}
if e == error.HeadersTooLarge {
rstatus = 400; rcode = "http.headers_too_large";
rmsg = "request header block exceeds 8K";
}
}
};
if closed { return; }
if rstatus > 0 {
respond_error(client, rstatus, rcode, rmsg);
slog(concat("distd: unreadable request -> ", concat(int_to_string(rstatus), "\n")));
return;
}
if method == "POST" {
if route_post(resp, store_dir, req) { return; }
respond_error(resp, 404, "http.not_found",
concat("no write route for ", path));
return;
}
respond_error(resp, 405, "http.method_not_allowed",
"distd routes are GET (reads) or POST (token-gated writes)");
}
code := route(client, store_dir, @req);
line := concat("distd: ", concat(req.method, concat(" ", concat(req.path, concat(" -> ", concat(int_to_string(code), "\n"))))));
// ── server loop (std.http, PLAN-HTTPZ A1) ─────────────────────────────
// The std.http handler: thread the store directory through the ctx
// word, keep the 411 contract (POST/PUT must declare Content-Length —
// std.http treats an absent header as a zero-length body), route, log.
// Per-request allocations land in std.http's per-dispatch arena.
DistdCtx :: struct {
store_dir: string;
}
distd_handle :: (req: *http.Request, resp: *http.Response, ctx: usize) {
dctx : *DistdCtx = xx ctx;
store_dir := dctx.store_dir;
refused := false;
if req.method == "POST" or req.method == "PUT" {
if http.find_header(req, "content-length").len == 0 {
respond_error(resp, 411, "http.length_required",
"POST/PUT requires a Content-Length header");
refused = true;
}
}
if !refused {
route(store_dir, req, resp);
}
line := concat("distd: ", concat(req.method, concat(" ", concat(req.path, concat(" -> ", concat(int_to_string(resp.status), "\n"))))));
slog(line);
}
// Bind 0.0.0.0:<port> and serve forever (sequential, one connection at a
// time — the deployment story is LAN/NAS-scale). Returns only when the
// socket can't be opened. Per-request allocations live in an arena that
// dies with the request.
// Serve the store over std.http's readiness loop (PLAN-HTTPZ A1): idle
// connections cost nothing, timeouts evict instead of blocking,
// keep-alive holds between requests. Returns only when the socket
// can't be opened.
run_server :: (store_dir: string, port: i64) -> !ServeErr {
fd, fe := http.listen_on(port);
if fe { raise error.Bind; }
dctx : DistdCtx = .{ store_dir = store_dir };
cfg : http.Config = .{
port = port,
max_conn = 256,
read_buf_cap = 536870912, // 512 MiB: artifact uploads arrive whole-body
timeout_request_ms = 120000, // a large upload must complete within this
timeout_keepalive_ms = 5000,
request_count = 200,
};
srv, se := http.Server.init(cfg, distd_handle, xx @dctx);
if se { raise error.Bind; }
slog(concat("distd: serving store ", concat(store_dir, concat(" on http://0.0.0.0:", concat(int_to_string(port), "\n")))));
while true {
client := sock.accept(fd, null, null);
if client < 0 { continue; }
http.set_read_timeout(client, 250);
gpa := GPA.init();
arena := Arena.init(xx gpa, 65536);
push Context.{ allocator = xx arena } {
serve_one(client, store_dir);
}
arena.deinit();
sock.close(client);
}
srv.run();
return;
}