P4.1: distd — read-only HTTP API + downloads over the local store

dist server run binds 0.0.0.0:<port> (default 8787) and serves /healthz,
/api/apps, /api/apps/<slug>, and /download/<sha256> (X-Checksum-SHA256,
bytes verified content-identical). db.json reloads per request so CLI
publishes/promotes are visible immediately. Errors reuse the CLI's JSON
error shape with matching HTTP statuses. HTTP/1.1 is an in-repo shim over
std.socket (src/server/http.sx), liftable to the sx stdlib later.

Response buffers are heap slices: a 64K+ stack array in one frame
segfaults the sx LLVM backend (DAGCombiner); 4-16K stack buffers are
fine. Pinned in tests/server_http.sx including a freshness case.
This commit is contained in:
agra
2026-06-12 01:32:46 +03:00
parent e56a921eec
commit 415ddeaac6
4 changed files with 608 additions and 1 deletions

303
src/server/distd.sx Normal file
View File

@@ -0,0 +1,303 @@
// =====================================================================
// distd.sx — the read-only distribution server over the local store
// (subplan 04, Slices 1 + the read half of 3/4), run as `dist server run`.
//
// Serves the state the CLI publishes — db.json metadata and the
// content-addressed objects — over HTTP (src/server/http.sx):
//
// GET /healthz {"status":"ok"} — no store access
// GET /api/apps {"apps":[...]} every app in the store
// GET /api/apps/<slug> {"app":..,"releases":[..],"channels":[..]}
// GET /download/<sha256> the object's bytes (application/octet-stream,
// X-Checksum-SHA256 header)
//
// Anything else is a JSON error in the CLI's error shape
// (`{"status":"error","error":{code,message}}`) with the matching HTTP
// status — the API and the CLI report failures identically.
//
// FRESHNESS: db.json is RELOADED on every /api request, so a `dist ci
// publish` / `release promote` between requests is visible immediately —
// the store on disk stays the single source of truth (no cache to
// invalidate, LAN-scale traffic).
//
// MUTATION is the CLI's job for now: every route is GET; writes arrive
// with token auth (subplan 04 Slice 2, deferred with the rest of the
// upload/auth surface).
//
// RESPONSE BUFFERS are heap slices from the per-request arena, never big
// stack arrays: a stack array of 64K+ in one frame crashes the sx LLVM
// backend (DAGCombiner segfault). Small fixed buffers (4K) are fine.
// =====================================================================
#import "modules/std.sx";
#import "modules/std/json.sx";
#import "modules/std/fs.sx";
#import "../domain/platform.sx";
#import "../domain/app.sx";
#import "../domain/release.sx";
#import "../domain/artifact.sx";
#import "../domain/channel.sx";
#import "../domain/audit.sx";
#import "../repo/repo.sx";
sock :: #import "modules/std/socket.sx";
http :: #import "http.sx";
db :: #import "../repo/db.sx";
jout :: #import "../json_out.sx";
// Response-body capacity for the /api JSON renders (heap, per-request).
RENDER_CAP :: 262144;
ServeErr :: error {
Bind,
}
// Server-side stderr log line (fd 2 via the socket module's write; stdout
// stays clean for whoever launched the process).
slog :: (s: string) {
if s.len > 0 { sock.write(2, s.ptr, xx s.len); }
}
// True iff `s` is exactly 64 lowercase-hex chars (a storage key).
is_hex64 :: (s: string) -> bool {
if s.len != 64 { return false; }
i := 0;
while i < s.len {
c := s[i];
digit := c >= 48 and c <= 57; // '0'..'9'
lower := c >= 97 and c <= 102; // 'a'..'f'
if !digit and !lower { return false; }
i += 1;
}
return true;
}
starts_with :: (s: string, prefix: string) -> bool {
if prefix.len > s.len { return false; }
i := 0;
while i < prefix.len {
if s[i] != prefix[i] { return false; }
i += 1;
}
return true;
}
// The path remainder after `prefix` (caller has checked starts_with).
tail_after :: (s: string, prefix: string) -> string {
return string.{ ptr = @s[prefix.len], len = s.len - prefix.len };
}
// A `RENDER_CAP` heap slice from the per-request context allocator.
render_buf :: () -> string {
raw : [*]u8 = xx context.allocator.alloc_bytes(RENDER_CAP);
return string.{ ptr = raw, len = RENDER_CAP };
}
// ── responses ─────────────────────────────────────────────────────────
// JSON error body in the CLI's error shape, sent with the HTTP status.
respond_error :: (client: s32, code: s64, fail_code: string, fail_message: string) {
f : jout.CliFailure = .{ code = fail_code, message = fail_message };
raw : [4096]u8 = ---;
werr := false;
n := jout.write_error(f, string.{ ptr = @raw[0], len = 4096 }) catch { werr = true; 0 };
body := "{\"status\":\"error\"}";
if !werr { body = string.{ ptr = @raw[0], len = n }; }
http.respond(client, code, "application/json", "", body);
}
// ── /api renders (builders own the `try`, callers catch) ─────────────
// Reload the persisted model. Null means the store has no readable
// db.json — the 503 error response has already been sent.
load_or_503 :: (client: s32, store_dir: string) -> ?Repo {
if !exists(path_join(store_dir, "db.json")) {
respond_error(client, 503, "store.load",
concat("no db.json under the store (nothing published yet): ", store_dir));
return null;
}
loaded, le := db.load(store_dir);
if le {
respond_error(client, 503, "store.load",
concat("db.json under the store could not be loaded: ", store_dir));
return null;
}
return loaded;
}
// `{"apps":[...]}` into `dst`, returning the bytes written.
render_apps_json :: (repo: *Repo, dst: []u8) -> (s64, !JsonError) {
alloc := context.allocator;
root : Object = .{};
arr : Array = .{};
i := 0;
while i < repo.apps.len {
arr.add(db.app_to_json(repo.apps.items[i], alloc), alloc);
i += 1;
}
root.put("apps", .array(arr), alloc);
rootv : Value = .object(root);
n := try write_to_buffer(rootv, dst);
return n;
}
// `{"app":..,"releases":[..],"channels":[..]}` for `app` into `dst`.
render_app_detail_json :: (repo: *Repo, app: App, dst: []u8) -> (s64, !JsonError) {
alloc := context.allocator;
root : Object = .{};
root.put("app", db.app_to_json(app, alloc), alloc);
rels : Array = .{};
i := 0;
while i < repo.releases.len {
r := repo.releases.items[i];
if r.app_id == app.id { rels.add(db.release_to_json(r, alloc), alloc); }
i += 1;
}
root.put("releases", .array(rels), alloc);
chans : Array = .{};
i = 0;
while i < repo.channels.len {
c := repo.channels.items[i];
if c.app_id == app.id { chans.add(db.channel_to_json(c, alloc), alloc); }
i += 1;
}
root.put("channels", .array(chans), alloc);
rootv : Value = .object(root);
n := try write_to_buffer(rootv, dst);
return n;
}
// Send `n` rendered bytes of `buf` as 200 JSON — or the overflow error
// when the render didn't fit RENDER_CAP.
respond_render :: (client: s32, buf: string, n: s64, overflowed: bool) {
if overflowed {
respond_error(client, 500, "http.response_overflow",
"response exceeded the server's render buffer");
return;
}
http.respond(client, 200, "application/json", "", string.{ ptr = buf.ptr, len = n });
}
// ── routes ────────────────────────────────────────────────────────────
handle_apps_index :: (client: s32, store_dir: string) {
rq := load_or_503(client, store_dir);
if rq == null { return; }
repo := rq!;
buf := render_buf();
werr := false;
n := render_apps_json(@repo, buf) catch { werr = true; 0 };
respond_render(client, buf, n, werr);
}
handle_app_detail :: (client: s32, store_dir: string, slug: string) {
rq := load_or_503(client, store_dir);
if rq == null { return; }
repo := rq!;
aq := repo.find_app_by_slug(slug);
if aq == null {
respond_error(client, 404, "api.unknown_app",
concat("no app with that slug in the store: ", slug));
return;
}
buf := render_buf();
werr := false;
n := render_app_detail_json(@repo, aq!, buf) catch { werr = true; 0 };
respond_render(client, buf, n, werr);
}
handle_download :: (client: s32, store_dir: string, sha: string) {
if !is_hex64(sha) {
respond_error(client, 404, "download.bad_key",
"download key must be a 64-char lowercase-hex sha256");
return;
}
opath := path_join(store_dir, concat("objects/", sha));
bq := read_file(opath);
if bq == null {
respond_error(client, 404, "download.unknown_object",
concat("no object with that digest in the store: ", sha));
return;
}
extra := concat("X-Checksum-SHA256: ", concat(sha, "\r\n"));
http.respond(client, 200, "application/octet-stream", extra, bq!);
}
// Route one parsed request. GET only; the path decides the handler.
route :: (client: s32, store_dir: string, method: string, path: string) -> s64 {
if method != "GET" {
respond_error(client, 405, "http.method_not_allowed",
"every distd route is GET for now (writes go through the dist CLI)");
return 405;
}
if path == "/healthz" {
http.respond(client, 200, "application/json", "", "{\"status\":\"ok\"}");
return 200;
}
if path == "/api/apps" {
handle_apps_index(client, store_dir);
return 200;
}
if starts_with(path, "/api/apps/") {
handle_app_detail(client, store_dir, tail_after(path, "/api/apps/"));
return 200;
}
if starts_with(path, "/download/") {
handle_download(client, store_dir, tail_after(path, "/download/"));
return 200;
}
respond_error(client, 404, "http.not_found",
concat("no route for ", path));
return 404;
}
// ── server loop ───────────────────────────────────────────────────────
// Read one request off `client`, route it, log the result line. All
// allocations land in the pushed per-request context allocator.
serve_one :: (client: s32, store_dir: string) {
buf : [8192]u8 = ---;
n := sock.read(client, @buf[0], 8192);
if n <= 0 { return; }
raw := string.{ ptr = @buf[0], len = xx n };
req : http.Request = .{};
if !http.parse_request(raw, @req) {
respond_error(client, 400, "http.bad_request",
"request line did not parse as HTTP");
return;
}
code := route(client, store_dir, req.method, req.path);
line := concat("distd: ", concat(req.method, concat(" ", concat(req.path, concat(" -> ", concat(int_to_string(code), "\n"))))));
slog(line);
}
// Bind 0.0.0.0:<port> and serve forever (sequential, one connection at a
// time — the deployment story is LAN/NAS-scale). Returns only when the
// socket can't be opened. Per-request allocations live in an arena that
// dies with the request.
run_server :: (store_dir: string, port: s64) -> !ServeErr {
fd, fe := http.listen_on(port);
if fe { raise error.Bind; }
slog(concat("distd: serving store ", concat(store_dir, concat(" on http://0.0.0.0:", concat(int_to_string(port), "\n")))));
while true {
client := sock.accept(fd, null, null);
if client < 0 { continue; }
gpa := GPA.init();
arena := Arena.init(xx gpa, 65536);
push Context.{ allocator = xx arena } {
serve_one(client, store_dir);
}
arena.deinit();
sock.close(client);
}
return;
}