P4.4: bearer auth + write endpoints on distd

distd stops being read-only. http.sx learns the write-side request
surface: header capture with case-insensitive lookup, a Content-Length-
bounded body read loop (8K header cap, 512 MiB body cap -> 413, 411 for
length-less POST/PUT), and the matching status texts.

Auth (server/auth.sx): Authorization: Bearer is re-hashed and resolved
via find_token_by_hash, then gated through check_token — 401 for
missing/malformed/unknown credentials, 403 with a refusal-specific code
(auth.revoked/expired/missing_scope/app_forbidden/channel_forbidden);
successful auth stamps last_used_at. check_token's app gate now treats
an empty REQUEST app like an empty request channel (uploads are
app-agnostic until a release references them).

Write routes (POST, publish scope): /api/upload content-addresses the
body; /api/apps/<slug>/releases publishes over already-uploaded objects
through commit_publish — the back half extracted from run_publish so
CLI and HTTP publishes share one find/create-app -> transaction ->
audit -> persist pipeline; channels/<name>/promote|rollback delegate to
the P3.5 CLI pipelines. Reads stay public.

make test 17/17 (new: server_write.sx pinned acceptance over curl).
This commit is contained in:
agra
2026-06-12 11:18:31 +03:00
parent d8b7a7bfb3
commit e2a5150542
8 changed files with 1135 additions and 126 deletions

View File

@@ -1,9 +1,10 @@
// =====================================================================
// distd.sx — the read-only distribution server over the local store
// (subplan 04, Slices 1 + the read half of 3/4), run as `dist server run`.
// distd.sx — the distribution server over the local store (subplan 04,
// Slices 1-4), run as `dist server run`.
//
// Serves the state the CLI publishes — db.json metadata and the
// content-addressed objects — over HTTP (src/server/http.sx):
// content-addressed objects — over HTTP (src/server/http.sx). Reads are
// public:
//
// GET / HTML index: apps, channels, releases, links
// GET /healthz {"status":"ok"} — no store access
@@ -12,6 +13,21 @@
// GET /download/<sha256> the object's bytes (application/octet-stream,
// X-Checksum-SHA256 header)
//
// Writes require `Authorization: Bearer <token>` with the `publish` scope
// (src/server/auth.sx; tokens are minted with `dist token create`):
//
// POST /api/upload raw bytes -> content-
// addressed object; responds {"status":"stored","sha256":..}
// POST /api/apps/<slug>/releases JSON body {version,
// channel, artifacts:[{platform, sha256, ...}]} over ALREADY-
// uploaded objects -> the same commit pipeline as `dist ci publish`
// POST /api/apps/<slug>/channels/<name>/promote {"release_id":..}
// POST /api/apps/<slug>/channels/<name>/rollback (empty body)
//
// The channel operations delegate to the P3.5 CLI pipelines and the
// release POST commits through publish.sx's `commit_publish`, so HTTP and
// CLI semantics cannot drift.
//
// Anything else is a JSON error in the CLI's error shape
// (`{"status":"error","error":{code,message}}`) with the matching HTTP
// status — the API and the CLI report failures identically.
@@ -21,10 +37,6 @@
// the store on disk stays the single source of truth (no cache to
// invalidate, LAN-scale traffic).
//
// MUTATION is the CLI's job for now: every route is GET; writes arrive
// with token auth (subplan 04 Slice 2, deferred with the rest of the
// upload/auth surface).
//
// RESPONSE BUFFERS are heap slices from the per-request arena, never big
// stack arrays: a stack array of 64K+ in one frame crashes the sx LLVM
// backend (DAGCombiner segfault). Small fixed buffers (4K) are fine.
@@ -38,12 +50,25 @@
#import "../domain/release.sx";
#import "../domain/artifact.sx";
#import "../domain/channel.sx";
#import "../domain/token.sx";
#import "../domain/audit.sx";
#import "../domain/validate.sx";
#import "../repo/repo.sx";
#import "../store/store.sx";
#import "../validation/artifact_file.sx";
#import "../publish/publish.sx";
sock :: #import "modules/std/socket.sx";
http :: #import "http.sx";
au :: #import "auth.sx";
db :: #import "../repo/db.sx";
jout :: #import "../json_out.sx";
// Also reached through an alias so publish helpers read as `pl.…` at the
// call sites that mirror dist.sx's.
pl :: #import "../publish/publish.sx";
ops :: #import "../release/ops.sx";
// Aliased so the json reader is called as `jsrv.parse` — a bare `parse`
// would bind to `std.cli`'s once both modules share the `dist` program.
jsrv :: #import "modules/std/json.sx";
// Response-body capacity for the /api JSON renders (heap, per-request).
RENDER_CAP :: 262144;
@@ -326,36 +351,371 @@ handle_download :: (client: i32, store_dir: string, sha: string) {
http.respond(client, 200, "application/octet-stream", extra, bq!);
}
// Route one parsed request. GET only; the path decides the handler.
route :: (client: i32, store_dir: string, method: string, path: string) -> i64 {
if method != "GET" {
respond_error(client, 405, "http.method_not_allowed",
"every distd route is GET for now (writes go through the dist CLI)");
return 405;
// ── write surface (POST, token-gated) ─────────────────────────────────
// `s` split at its first '/': head before it, rest after it ("" when no
// slash exists).
Seg :: struct {
head: string;
rest: string;
}
seg_split :: (s: string) -> Seg {
i := 0;
while i < s.len {
if s[i] == 47 { // '/'
return Seg.{
head = string.{ ptr = s.ptr, len = i },
rest = string.{ ptr = @s[i + 1], len = s.len - i - 1 },
};
}
i += 1;
}
if path == "/" {
handle_index(client, store_dir);
return 200;
return Seg.{ head = s, rest = "" };
}
// Authenticate a write request or answer it. Null means the refusal
// response has already been sent.
auth_or_respond :: (client: i32, store_dir: string, req: *http.Request, app_slug: string, channel: string) -> ?Token {
fail : jout.CliFailure = .{};
t, ae := au.authenticate(store_dir, req.headers, "publish", app_slug, channel, @fail);
if ae {
status : i64 = 401;
if ae == error.Forbidden { status = 403; }
if ae == error.Unavailable { status = 503; }
respond_error(client, status, fail.code, fail.message);
return null;
}
if path == "/healthz" {
http.respond(client, 200, "application/json", "", "{\"status\":\"ok\"}");
return 200;
return t;
}
// JSON-object body, or null after answering 400.
body_object :: (client: i32, body: string) -> ?Object {
v, pe := jsrv.parse(body, context.allocator);
if pe {
respond_error(client, 400, "api.bad_json", "request body is not valid JSON");
return null;
}
if path == "/api/apps" {
handle_apps_index(client, store_dir);
return 200;
if v != .object {
respond_error(client, 400, "api.bad_json", "request body must be a JSON object");
return null;
}
return v.object;
}
wb_find :: (o: Object, key: string) -> ?Value {
i := 0;
while i < o.len {
if o.items[i].key == key { return o.items[i].val; }
i += 1;
}
return null;
}
// Required string member, or null after answering 400.
wb_req_str :: (client: i32, o: Object, key: string) -> ?string {
vq := wb_find(o, key);
if vq != null {
v := vq!;
if v == .str {
if v.str.len > 0 { return v.str; }
}
}
respond_error(client, 400, "api.missing_field",
concat("body requires a non-empty string member: ", key));
return null;
}
wb_opt_str :: (o: Object, key: string) -> string {
vq := wb_find(o, key);
if vq == null { return ""; }
v := vq!;
if v != .str { return ""; }
return v.str;
}
wb_opt_int :: (o: Object, key: string, default_: i64) -> i64 {
vq := wb_find(o, key);
if vq == null { return default_; }
v := vq!;
if v != .int_ { return default_; }
return v.int_;
}
// POST /api/upload — content-address the raw body into the store. The
// upload is app-agnostic (the bytes are inert until a release references
// them), so auth demands only the publish scope.
handle_upload :: (client: i32, store_dir: string, req: *http.Request) {
if auth_or_respond(client, store_dir, req, "", "") == null { return; }
if req.body.len == 0 {
respond_error(client, 400, "upload.empty", "upload body is empty");
return;
}
st := Store.init(store_dir);
werr := false;
key := "";
k, se := st.put_bytes(req.body);
if se { werr = true; }
if !se { key = k; }
if werr {
respond_error(client, 500, "store.write",
"upload bytes could not be content-addressed into the store");
return;
}
body := concat("{\"status\":\"stored\",\"sha256\":\"", concat(key, concat("\",\"size_bytes\":", concat(int_to_string(req.body.len), "}"))));
http.respond(client, 200, "application/json", "", body);
}
// POST /api/apps/<slug>/releases — publish a release whose artifacts name
// already-uploaded objects by digest. Mirrors the CLI manifest checks
// (platform, digest, size, content type, filename extension) against the
// STORE rather than local files, then commits through the shared pipeline.
handle_release_create :: (client: i32, store_dir: string, req: *http.Request, slug: string) {
oq := body_object(client, req.body);
if oq == null { return; }
o := oq!;
versionq := wb_req_str(client, o, "version");
if versionq == null { return; }
version := versionq!;
channelq := wb_req_str(client, o, "channel");
if channelq == null { return; }
channel := channelq!;
tokq := auth_or_respond(client, store_dir, req, slug, channel);
if tokq == null { return; }
tok := tokq!;
artsq := wb_find(o, "artifacts");
arts_ok := false;
arts_arr : Array = .{};
if artsq != null {
av := artsq!;
if av == .array {
if av.array.len > 0 { arts_ok = true; arts_arr = av.array; }
}
}
if !arts_ok {
respond_error(client, 400, "api.missing_field",
"body requires a non-empty artifacts array");
return;
}
alloc := context.allocator;
resolved : List(ResolvedArtifact) = .{};
i := 0;
while i < arts_arr.len {
if arts_arr.items[i] != .object {
respond_error(client, 400, "api.bad_json", "each artifact must be a JSON object");
return;
}
ao := arts_arr.items[i].object;
pidq := wb_req_str(client, ao, "platform");
if pidq == null { return; }
platform, perr := parse_platform(pidq!);
if perr {
respond_error(client, 400, "api.unknown_platform",
concat("unknown platform id: ", pidq!));
return;
}
shaq := wb_req_str(client, ao, "sha256");
if shaq == null { return; }
sha := shaq!;
if !is_hex64(sha) {
respond_error(client, 400, "api.bad_digest",
"artifact sha256 must be a 64-char lowercase-hex digest");
return;
}
// The named object must already live in the store (uploaded via
// /api/upload or an earlier publish).
opath := path_join(store_dir, concat("objects/", sha));
ob := read_file(opath);
if ob == null {
respond_error(client, 404, "api.unknown_object",
concat("no object with that digest in the store (upload it first): ", sha));
return;
}
actual_size := ob!.len;
declared := wb_opt_int(ao, "size_bytes", -1);
if declared >= 0 and declared != actual_size {
respond_error(client, 400, "api.size_mismatch",
concat("declared size_bytes does not match the stored object: ", sha));
return;
}
ct := wb_opt_str(ao, "content_type");
if ct.len == 0 { ct = pl.default_content_type(platform); }
if !is_allowed_content_type(ct) {
respond_error(client, 400, "api.content_type_denied",
concat("artifact content type is not on the allow-list: ", ct));
return;
}
fname := wb_opt_str(ao, "filename");
if fname.len == 0 {
fname = concat(slug, concat("-", concat(version, concat(".", expected_ext(platform)))));
}
if !ext_matches_platform(fname, platform) {
respond_error(client, 400, "api.extension_mismatch",
concat("artifact filename extension does not match its platform: ", fname));
return;
}
resolved.append(ResolvedArtifact.{
platform = platform, key = sha, size_bytes = actual_size,
filename = fname, content_type = ct,
metadata = wb_opt_str(ao, "metadata"),
}, alloc);
i += 1;
}
fail : jout.CliFailure = .{};
actor := concat("token:", tok.name);
arts_view : []ResolvedArtifact = .{ ptr = resolved.items, len = resolved.len };
outcome, ce := pl.commit_publish(store_dir, slug, version, channel, actor, "/download/", arts_view, @fail);
if ce {
status : i64 = 500;
if ce == error.Transaction {
status = 400;
if fail.code == "transaction.integrity" { status = 409; }
}
if ce == error.Persist {
if fail.code == "persist.load" { status = 503; }
}
respond_error(client, status, fail.code, fail.message);
return;
}
if !ce {
buf := render_buf();
werr := false;
n := pl.write_json(@outcome, buf) catch { werr = true; 0 };
respond_render(client, buf, n, werr);
}
}
// HTTP status for a failed P3.5 channel operation.
op_http_status :: (e: ops.OpError) -> i64 {
if e == error.Load { return 503; }
if e == error.NotFound { return 404; }
if e == error.Invalid { return 409; }
return 500;
}
// POST /api/apps/<slug>/channels/<name>/promote — body {"release_id":..};
// delegates to the CLI's promote pipeline.
handle_promote :: (client: i32, store_dir: string, req: *http.Request, slug: string, chan_name: string) {
oq := body_object(client, req.body);
if oq == null { return; }
relq := wb_req_str(client, oq!, "release_id");
if relq == null { return; }
if auth_or_respond(client, store_dir, req, slug, chan_name) == null { return; }
fail : jout.CliFailure = .{};
o, e := ops.run_promote(store_dir, slug, chan_name, relq!, @fail);
if e {
respond_error(client, op_http_status(e), fail.code, fail.message);
return;
}
if !e {
buf := render_buf();
werr := false;
n := ops.write_promote_json(@o, buf) catch { werr = true; 0 };
respond_render(client, buf, n, werr);
}
}
// POST /api/apps/<slug>/channels/<name>/rollback — empty body; delegates
// to the CLI's rollback pipeline.
handle_rollback :: (client: i32, store_dir: string, req: *http.Request, slug: string, chan_name: string) {
if auth_or_respond(client, store_dir, req, slug, chan_name) == null { return; }
fail : jout.CliFailure = .{};
o, e := ops.run_rollback(store_dir, slug, chan_name, @fail);
if e {
respond_error(client, op_http_status(e), fail.code, fail.message);
return;
}
if !e {
buf := render_buf();
werr := false;
n := ops.write_rollback_json(@o, buf) catch { werr = true; 0 };
respond_render(client, buf, n, werr);
}
}
// Route a write request under POST /api/. Returns false when no write
// route matches (the caller 404s).
route_post :: (client: i32, store_dir: string, req: *http.Request) -> bool {
path := req.path;
if path == "/api/upload" {
handle_upload(client, store_dir, req);
return true;
}
if starts_with(path, "/api/apps/") {
handle_app_detail(client, store_dir, tail_after(path, "/api/apps/"));
return 200;
s1 := seg_split(tail_after(path, "/api/apps/"));
if s1.head.len == 0 { return false; }
if s1.rest == "releases" {
handle_release_create(client, store_dir, req, s1.head);
return true;
}
if starts_with(s1.rest, "channels/") {
s2 := seg_split(tail_after(s1.rest, "channels/"));
if s2.head.len == 0 { return false; }
if s2.rest == "promote" {
handle_promote(client, store_dir, req, s1.head, s2.head);
return true;
}
if s2.rest == "rollback" {
handle_rollback(client, store_dir, req, s1.head, s2.head);
return true;
}
}
}
if starts_with(path, "/download/") {
handle_download(client, store_dir, tail_after(path, "/download/"));
return 200;
return false;
}
// Route one parsed request: GET reads, POST writes.
route :: (client: i32, store_dir: string, req: *http.Request) -> i64 {
method := req.method;
path := req.path;
if method == "GET" {
if path == "/" {
handle_index(client, store_dir);
return 200;
}
if path == "/healthz" {
http.respond(client, 200, "application/json", "", "{\"status\":\"ok\"}");
return 200;
}
if path == "/api/apps" {
handle_apps_index(client, store_dir);
return 200;
}
if starts_with(path, "/api/apps/") {
handle_app_detail(client, store_dir, tail_after(path, "/api/apps/"));
return 200;
}
if starts_with(path, "/download/") {
handle_download(client, store_dir, tail_after(path, "/download/"));
return 200;
}
respond_error(client, 404, "http.not_found",
concat("no route for ", path));
return 404;
}
respond_error(client, 404, "http.not_found",
concat("no route for ", path));
return 404;
if method == "POST" {
if route_post(client, store_dir, req) { return 200; }
respond_error(client, 404, "http.not_found",
concat("no write route for ", path));
return 404;
}
respond_error(client, 405, "http.method_not_allowed",
"distd routes are GET (reads) or POST (token-gated writes)");
return 405;
}
// ── server loop ───────────────────────────────────────────────────────
@@ -363,18 +723,38 @@ route :: (client: i32, store_dir: string, method: string, path: string) -> i64 {
// Read one request off `client`, route it, log the result line. All
// allocations land in the pushed per-request context allocator.
serve_one :: (client: i32, store_dir: string) {
buf : [8192]u8 = ---;
n := sock.read(client, @buf[0], 8192);
if n <= 0 { return; }
raw := string.{ ptr = @buf[0], len = xx n };
req : http.Request = .{};
if !http.parse_request(raw, @req) {
respond_error(client, 400, "http.bad_request",
"request line did not parse as HTTP");
closed := false;
rstatus : i64 = 0;
rcode := "";
rmsg := "";
http.read_request(client, @req) catch (e) {
if e == error.Closed { closed = true; }
if !closed {
rstatus = 400; rcode = "http.bad_request";
rmsg = "request could not be read as HTTP (bad request line or truncated body)";
if e == error.LengthRequired {
rstatus = 411; rcode = "http.length_required";
rmsg = "POST/PUT requires a Content-Length header";
}
if e == error.BodyTooLarge {
rstatus = 413; rcode = "http.body_too_large";
rmsg = "request body exceeds the server's size cap";
}
if e == error.HeadersTooLarge {
rstatus = 400; rcode = "http.headers_too_large";
rmsg = "request header block exceeds 8K";
}
}
};
if closed { return; }
if rstatus > 0 {
respond_error(client, rstatus, rcode, rmsg);
slog(concat("distd: unreadable request -> ", concat(int_to_string(rstatus), "\n")));
return;
}
code := route(client, store_dir, req.method, req.path);
code := route(client, store_dir, @req);
line := concat("distd: ", concat(req.method, concat(" ", concat(req.path, concat(" -> ", concat(int_to_string(code), "\n"))))));
slog(line);
}