// Unit coverage for P4.3's token domain + persistence: // // * check_token — the auth gate the P4.4 server will sit behind: ok path, // refusal order (revoked outranks expired), expiry boundary (now >= // expires_at), scope membership (whole words, multi-scope), app/channel // scope match, and the unscoped-matches-anything rules. // * validate_token — each invalid form rejected with its SPECIFIC tag. // * mark_token_used — last-used stamping through the repo. // * store round trip — tokens persist field-for-field, and an absent // `tokens` member (an import-path db.json from before tokens existed) loads as zero // tokens instead of BadShape. #import "modules/std.sx"; #import "../src/domain/platform.sx"; #import "../src/domain/app.sx"; #import "../src/domain/release.sx"; #import "../src/domain/artifact.sx"; #import "../src/domain/channel.sx"; #import "../src/domain/token.sx"; #import "../src/domain/audit.sx"; #import "../src/domain/validate.sx"; #import "../src/repo/repo.sx"; db :: #import "../src/repo/db.sx"; // A fully-scoped, live token: publish-only, bound to acme-app/beta, // expiring at t=1000. scoped_token :: () -> Token { return Token.{ id = "tok-aaaaaaaaaaaa", name = "ci-main", token_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", scopes = "publish", app_slug = "acme-app", channel = "beta", created_at = 100, expires_at = 1000, last_used_at = 0, revoked_at = 0, }; } // The error tag check_token raises for these arguments, as a label ("" = // accepted). refusal :: (t: Token, scope: string, app: string, chan: string, now: i64) -> string { tag := ""; check_token(t, scope, app, chan, now) catch (e) { if e == error.Revoked { tag = "revoked"; } if e == error.Expired { tag = "expired"; } if e == error.ScopeMissing { tag = "scope"; } if e == error.AppMismatch { tag = "app"; } if e == error.ChannelMismatch { tag = "channel"; } }; return tag; } check_ok_path :: () -> bool { return refusal(scoped_token(), "publish", "acme-app", "beta", 500) == ""; } check_revoked :: () -> bool { t := scoped_token(); t.revoked_at = 200; return refusal(t, "publish", "acme-app", "beta", 500) == "revoked"; } // A token both revoked and expired refuses as Revoked — refusal-severity // order. check_revoked_outranks_expired :: () -> bool { t := scoped_token(); t.revoked_at = 200; return refusal(t, "publish", "acme-app", "beta", 5000) == "revoked"; } check_expired_boundary :: () -> bool { t := scoped_token(); if refusal(t, "publish", "acme-app", "beta", 999) != "" { return false; } // still live if refusal(t, "publish", "acme-app", "beta", 1000) != "expired" { return false; } // now == expires_at return true; } check_never_expires :: () -> bool { t := scoped_token(); t.expires_at = 0; return refusal(t, "publish", "acme-app", "beta", 9999999999) == ""; } check_scope_missing :: () -> bool { return refusal(scoped_token(), "read", "acme-app", "beta", 500) == "scope"; } check_multi_scope :: () -> bool { t := scoped_token(); t.scopes = "publish read"; if refusal(t, "read", "acme-app", "beta", 500) != "" { return false; } if refusal(t, "publish", "acme-app", "beta", 500) != "" { return false; } return true; } // Scope words match whole words only: "publish" does not grant "pub", and // a "pub" word grants nothing in the vocabulary. check_scope_whole_words :: () -> bool { if has_scope("publish", "pub") { return false; } if has_scope("pub lish", "publish") { return false; } if !has_scope("publish read", "read") { return false; } return true; } check_app_mismatch :: () -> bool { return refusal(scoped_token(), "publish", "other-app", "beta", 500) == "app"; } check_channel_mismatch :: () -> bool { return refusal(scoped_token(), "publish", "acme-app", "stable", 500) == "channel"; } // An empty REQUEST channel passes the channel gate (the operation has no // channel to constrain). check_empty_request_channel :: () -> bool { return refusal(scoped_token(), "publish", "acme-app", "", 500) == ""; } // An empty REQUEST app passes the app gate likewise (an upload is // app-agnostic until a release references it). check_empty_request_app :: () -> bool { return refusal(scoped_token(), "publish", "", "", 500) == ""; } // An unscoped token (empty app_slug/channel) authorizes any app/channel. check_unscoped_matches_all :: () -> bool { t := scoped_token(); t.app_slug = ""; t.channel = ""; return refusal(t, "publish", "any-app", "any-channel", 500) == ""; } // ── validate_token ─────────────────────────────────────────────────── // The tag validate_token raises for `t`, as a label ("" = accepted). vrefusal :: (t: Token) -> string { tag := ""; validate_token(t) catch (e) { if e == error.MissingField { tag = "missing"; } if e == error.BadTokenName { tag = "name"; } if e == error.BadScope { tag = "scope"; } if e == error.BadSlug { tag = "slug"; } if e == error.BadChannelName { tag = "channel"; } if e == error.BadDigest { tag = "digest"; } }; return tag; } check_validate_accepts :: () -> bool { if vrefusal(scoped_token()) != "" { return false; } u := scoped_token(); // unscoped + multi-scope is also valid u.app_slug = ""; u.channel = ""; u.scopes = "publish read"; return vrefusal(u) == ""; } check_validate_rejects :: () -> bool { t := scoped_token(); t.name = "Bad Name"; if vrefusal(t) != "name" { return false; } t = scoped_token(); t.scopes = "publish admin"; if vrefusal(t) != "scope" { return false; } t = scoped_token(); t.scopes = " "; if vrefusal(t) != "scope" { return false; } t = scoped_token(); t.app_slug = "-bad-"; if vrefusal(t) != "slug" { return false; } t = scoped_token(); t.channel = "Beta"; if vrefusal(t) != "channel" { return false; } t = scoped_token(); t.token_hash = "deadbeef"; if vrefusal(t) != "digest" { return false; } t = scoped_token(); t.id = ""; if vrefusal(t) != "missing" { return false; } return true; } // ── repo last-used stamping ────────────────────────────────────────── check_mark_used :: () -> bool { repo := Repo.init(); repo.create_token(scoped_token()); if !tok_mark_used_shim(@repo, "tok-aaaaaaaaaaaa", 777) { return false; } tq := repo.get_token("tok-aaaaaaaaaaaa"); if tq == null { return false; } if tq!.last_used_at != 777 { return false; } return !tok_mark_used_shim(@repo, "tok-nope", 778); } // Inline twin of token/ops.sx's mark_token_used: importing the ops module // here would drag the publish pipeline (and its libc deps) into a unit // test, so the three-line repo interaction is restated instead. tok_mark_used_shim :: (repo: *Repo, id: string, now: i64) -> bool { tq := repo.get_token(id); if tq == null { return false; } t := tq!; t.last_used_at = now; return repo.update_token(t); } // ── persistence ────────────────────────────────────────────────────── // An import-path db.json with no `tokens` member loads as zero tokens (compat with // pre-token layouts), not BadShape. NO_TOKENS_DB :: "{\"apps\":[],\"releases\":[],\"artifacts\":[],\"channels\":[],\"audit_events\":[]}"; check_load_without_tokens :: () -> bool { gpa := GPA.init(); arena := Arena.init(xx gpa, 65536); defer arena.deinit(); repo := Repo.init(); ok := true; db.load_into(@repo, NO_TOKENS_DB, xx arena) catch { ok = false; }; return ok and repo.tokens.len == 0; } // A present `tokens` member with the wrong JSON type is still BadShape. BAD_TOKENS_DB :: "{\"apps\":[],\"releases\":[],\"artifacts\":[],\"channels\":[],\"tokens\":7,\"audit_events\":[]}"; check_load_bad_tokens_shape :: () -> bool { gpa := GPA.init(); arena := Arena.init(xx gpa, 65536); defer arena.deinit(); repo := Repo.init(); bad := false; db.load_into(@repo, BAD_TOKENS_DB, xx arena) catch (e) { bad = (e == error.BadShape); }; return bad; } // Token fields survive a save -> load round trip byte-for-byte. check_token_roundtrip :: () -> bool { dir := ".sx-tmp/token_check"; repo := Repo.init(); src := scoped_token(); src.last_used_at = 555; src.revoked_at = 666; repo.create_token(src); werr := false; db.save(@repo, dir) catch { werr = true; }; if werr { return false; } loaded, le := db.load(dir); if le { return false; } if loaded.tokens.len != 1 { return false; } t := loaded.tokens.items[0]; return t.id == src.id and t.name == src.name and t.token_hash == src.token_hash and t.scopes == src.scopes and t.app_slug == src.app_slug and t.channel == src.channel and t.created_at == src.created_at and t.expires_at == src.expires_at and t.last_used_at == 555 and t.revoked_at == 666; } run_case :: (label: string, ok: bool) -> i32 { if ok { print(" PASS {}\n", label); return 0; } print(" FAIL {}\n", label); return 1; } main :: () -> i32 { failures : i32 = 0; failures += run_case("check: live scoped token accepted", check_ok_path()); failures += run_case("check: revoked refused", check_revoked()); failures += run_case("check: revoked outranks expired", check_revoked_outranks_expired()); failures += run_case("check: expiry boundary now >= expires_at", check_expired_boundary()); failures += run_case("check: expires_at 0 never expires", check_never_expires()); failures += run_case("check: missing scope refused", check_scope_missing()); failures += run_case("check: multi-scope grants each word", check_multi_scope()); failures += run_case("check: scope words match whole words", check_scope_whole_words()); failures += run_case("check: app scope mismatch refused", check_app_mismatch()); failures += run_case("check: channel scope mismatch refused", check_channel_mismatch()); failures += run_case("check: empty request channel passes", check_empty_request_channel()); failures += run_case("check: empty request app passes", check_empty_request_app()); failures += run_case("check: unscoped token matches all", check_unscoped_matches_all()); failures += run_case("validate: accepts good tokens", check_validate_accepts()); failures += run_case("validate: rejects each bad form", check_validate_rejects()); failures += run_case("repo: mark-used stamps last_used_at", check_mark_used()); failures += run_case("db: absent tokens member loads as empty", check_load_without_tokens()); failures += run_case("db: wrong-typed tokens member is BadShape", check_load_bad_tokens_shape()); failures += run_case("db: token fields survive a round trip", check_token_roundtrip()); print("------------------------------------------------\n"); if failures == 0 { print("token_check: ALL CASES PASS\n"); return 0; } print("token_check: {} CASE(S) FAILED\n", failures); return 1; }