Files
distribution/tests/retention_cleanup.sx
agra dc6908dee7 retention + cleanup: channel retention policy, store GC, deletion audit (P5.3)
Subplan 02 Slice 4. Channel gains retention_keep (0 = keep everything;
N = keep the newest N published releases of the channel's lineage), set
via the new `dist channel set --retention-keep`. The new `dist store
cleanup` prunes lineage-expired releases — never one any channel points
at, so cross-promoted releases survive — drops their artifact rows,
GCs objects/ files no surviving artifact references, and sweeps stale
staging/ leftovers; every deletion writes an audit event. The pruned
model is saved before any unlink, so a crash leaves orphan blobs (next
run catches them), never dangling references.

repo.publish no longer replaces an existing channel row wholesale: only
the pointer moves, so policy/rollout/retention survive every publish
(previously each publish silently reset them to defaults).

std.fs has no directory listing, so cleanup.sx carries a local
opendir/readdir/closedir shim, like publish.sx's time(2) shim.

dist.db channels gains the retention_keep column (idempotent ALTER for
pre-retention stores); db.json import treats it as optional.

tests/retention_cleanup.sx pins the whole scenario; the repo.publish
assertion fails on the pre-fix code. make test 23/23 green.
2026-06-12 19:35:52 +03:00

268 lines
15 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Pinned acceptance for P5.3 (subplan 02 Slice 4) — retention policy +
// `dist channel set` + `dist store cleanup`.
//
// Drives the BUILT `build/dist` binary (via `process.run`, like
// release_ops.sx) through the slice's acceptance scenario:
//
// 1. Publish A(1.0.0), B(1.1.0), B2(1.1.1), C(1.2.0) into beta — A and
// B carry unique payload bytes; B2 and C share one payload (one
// content-addressed object between them) → beta points at C, the
// store holds 3 objects.
// 2. Cross-promote A onto stable → stable points at A (a release deep
// in beta's lineage).
// 3. `channel set --retention-keep 1` on beta → exit 0; dist.db row
// records it; a `channel.update` audit event is written. Stable gets
// `--retention-keep 5` — a longer retention than beta, per the
// slice's "stable retained longer than beta" acceptance. A bad value
// exits 64; an unknown channel exits 1 with `channel.unknown`.
// 4. Seed two stale staging files, then `store cleanup`: beta's lineage
// [A B B2 C] keeps only C (newest 1); A is beyond retention but
// POINTED AT by stable, so it survives (`kept_pointed_releases`).
// B and B2 are deleted with their artifact rows; B's unique object
// is GC'd while B2's object SURVIVES (C still references those
// bytes); staging is swept. One audit event per deletion
// (release.delete ×2, object.delete ×1, staging.delete ×2).
// 5. Publish D(1.3.0) into beta → beta's retention_keep is STILL 1
// (regression: repo.publish must not reset channel fields when
// moving the pointer). A second cleanup then prunes C (no longer
// pointed, beyond retention) but keeps the shared object (D
// references it). A third cleanup deletes nothing (idempotent).
// 6. Cleanup on a store that was never published → exit 1 +
// `store.load` JSON error.
//
// Store-side state is asserted by QUERYING `<store>/dist.db` through the
// SQLite bindings and checking `<store>/objects` / `<store>/staging` on
// the filesystem.
#import "modules/std.sx";
#import "modules/std/json.sx";
process :: #import "modules/std/process.sx";
fs :: #import "modules/std/fs.sx";
sq :: #import "vendors/sqlite/sqlite.sx";
STORE :: ".sx-tmp/retention_cleanup";
MDIR :: ".sx-tmp/retention_cleanup_m";
REL_A :: "rel-acme-app-1.0.0";
REL_B :: "rel-acme-app-1.1.0";
REL_B2 :: "rel-acme-app-1.1.1";
REL_C :: "rel-acme-app-1.2.0";
REL_D :: "rel-acme-app-1.3.0";
get :: (o: Object, key: string) -> Value {
i := 0;
while i < o.len {
if o.items[i].key == key { return o.items[i].val; }
i += 1;
}
process.assert(false, concat("missing json key: ", key));
dummy : Value = .null_;
return dummy;
}
get_str :: (o: Object, key: string) -> string { return get(o, key).str; }
get_obj :: (o: Object, key: string) -> Object { return get(o, key).object; }
get_arr :: (o: Object, key: string) -> Array { return get(o, key).array; }
arr_contains :: (a: Array, s: string) -> bool {
i := 0;
while i < a.len {
if a.items[i].str == s { return true; }
i += 1;
}
return false;
}
write_file :: (path: string, body: string) {
cmd := concat(concat(concat("printf '%s' '", body), "' > "), path);
process.run(cmd);
}
db_open_ro :: () -> sq.Sqlite {
c, oe := sq.Sqlite.open_v2(path_join(STORE, "dist.db"), sq.SQLITE_OPEN_READONLY);
process.assert(!oe, "dist.db must open as a SQLite database");
c.busy_timeout(2000);
return c;
}
// One-row TEXT scalar with up to two text bindings ("" = unbound).
q_text :: (sql: string, p1: string, p2: string) -> string {
c := db_open_ro();
st, pe := c.prepare(sql);
process.assert(!pe, concat("prepare must succeed: ", sql));
if p1.len > 0 { st.bind_text(1, p1) catch { process.assert(false, "bind 1 failed"); }; }
if p2.len > 0 { st.bind_text(2, p2) catch { process.assert(false, "bind 2 failed"); }; }
rc, se := st.step();
process.assert(!se, concat("step must succeed: ", sql));
process.assert(rc == sq.SQLITE_ROW, concat("query must return a row: ", sql));
out := st.column_text(0);
st.finalize();
c.close();
return out;
}
// One-row INTEGER scalar with up to two text bindings ("" = unbound).
q_int :: (sql: string, p1: string, p2: string) -> i64 {
c := db_open_ro();
st, pe := c.prepare(sql);
process.assert(!pe, concat("prepare must succeed: ", sql));
if p1.len > 0 { st.bind_text(1, p1) catch { process.assert(false, "bind 1 failed"); }; }
if p2.len > 0 { st.bind_text(2, p2) catch { process.assert(false, "bind 2 failed"); }; }
rc, se := st.step();
process.assert(!se, concat("step must succeed: ", sql));
process.assert(rc == sq.SQLITE_ROW, concat("query must return a row: ", sql));
out := st.column_int64(0);
st.finalize();
c.close();
return out;
}
// Write a manifest + payload pair into MDIR and publish it (artifact
// paths resolve relative to the manifest file).
publish :: (version: string, payload_name: string, payload_bytes: string) {
ppath := path_join(MDIR, payload_name);
if !fs.exists(ppath) { write_file(ppath, payload_bytes); }
m := concat("{\"app\":\"acme-app\",\"version\":\"", version);
m = concat(m, "\",\"channel\":\"beta\",\"artifacts\":[{\"platform\":\"android_apk\",\"path\":\"");
m = concat(m, concat(payload_name, "\"}]}"));
mpath := path_join(MDIR, concat(concat("m-", version), ".json"));
write_file(mpath, m);
cmd := concat(concat("build/dist ci publish --manifest ", mpath), concat(concat(" --local-store ", STORE), " --json 2>/dev/null"));
r := process.run(cmd);
process.assert(r != null and r!.exit_code == 0, concat("publish must exit 0: ", version));
}
cleanup_cmd :: () -> string {
return concat(concat("build/dist store cleanup --local-store ", STORE), " --json 2>/dev/null");
}
beta_retention :: () -> i64 {
return q_int("SELECT retention_keep FROM channels WHERE name = ?1", "beta", "");
}
main :: () -> i32 {
gpa := GPA.init();
arena := Arena.init(xx gpa, 1 << 20);
defer arena.deinit();
process.run(concat("rm -rf ", STORE));
process.run(concat("rm -rf ", MDIR));
process.run(concat("mkdir -p ", MDIR));
// ── 1+2. Lineage [A B B2 C] on beta (B2 and C share bytes), A
// cross-promoted onto stable ────────────────────────────────
publish("1.0.0", "payload-a.apk", "unique-bytes-of-A");
publish("1.1.0", "payload-b.apk", "unique-bytes-of-B");
publish("1.1.1", "payload-shared.apk", "shared-bytes-B2-C-D");
publish("1.2.0", "payload-shared.apk", "shared-bytes-B2-C-D");
process.assert(q_int("SELECT COUNT(*) FROM releases", "", "") == 4, "four releases published");
rp := process.run(concat(concat("build/dist release promote --app acme-app --channel stable --release ", REL_A),
concat(concat(" --local-store ", STORE), " --json 2>/dev/null")));
process.assert(rp != null and rp!.exit_code == 0, "promote A onto stable must exit 0");
key_b := q_text("SELECT sha256 FROM artifacts WHERE release_id = ?1", REL_B, "");
key_shared := q_text("SELECT sha256 FROM artifacts WHERE release_id = ?1", REL_C, "");
print(" lineage [A B B2 C] on beta; stable -> A; 3 objects stored\n");
// ── 3. channel set: beta keeps 1, stable keeps 5; failure paths ──
cs := process.run(concat(concat("build/dist channel set --app acme-app --channel beta --retention-keep 1 --local-store ", STORE), " --json 2>/dev/null"));
process.assert(cs != null and cs!.exit_code == 0, "channel set beta must exit 0");
cv, ce := parse(cs!.stdout, xx arena);
if ce { process.assert(false, "channel set stdout must be one JSON object"); return 1; }
co := cv.object;
process.assert(get_str(co, "status") == "updated", "channel set json status");
process.assert(get(get_obj(co, "channel"), "retention_keep").int_ == 1, "channel set json retention_keep");
process.assert(beta_retention() == 1, "dist.db: beta retention_keep = 1");
cs5 := process.run(concat(concat("build/dist channel set --app acme-app --channel stable --retention-keep 5 --local-store ", STORE), " --json 2>/dev/null"));
process.assert(cs5 != null and cs5!.exit_code == 0, "channel set stable must exit 0");
process.assert(q_int("SELECT retention_keep FROM channels WHERE name = ?1", "stable", "") == 5,
"dist.db: stable retains longer than beta (5 > 1)");
process.assert(q_int("SELECT COUNT(*) FROM audit_events WHERE actor = ?1 AND action = ?2", "cli", "channel.update") == 2,
"each channel set recorded a cli channel.update audit event");
bad := process.run(concat(concat("build/dist channel set --app acme-app --channel beta --retention-keep abc --local-store ", STORE), " --json 2>/dev/null"));
process.assert(bad != null and bad!.exit_code == 64, "non-numeric --retention-keep is a usage error (64)");
unk := process.run(concat(concat("build/dist channel set --app acme-app --channel nope --retention-keep 1 --local-store ", STORE), " --json 2>/dev/null"));
process.assert(unk != null and unk!.exit_code == 1, "unknown channel must exit 1");
uv, ue := parse(unk!.stdout, xx arena);
if ue { process.assert(false, "unknown-channel stdout must be one JSON object"); return 1; }
process.assert(get_str(get_obj(uv.object, "error"), "code") == "channel.unknown", "unknown-channel json names the code");
print(" channel set: beta keep 1, stable keep 5, audited; bad value 64, unknown channel 1\n");
// ── 4. cleanup: B+B2 pruned, A spared (pointed), B's object GC'd,
// shared object survives, staging swept, all audited ─────────
process.run(concat("mkdir -p ", path_join(STORE, "staging")));
write_file(path_join(STORE, "staging/incoming-7"), "half-written upload");
write_file(path_join(STORE, "staging/deadbeef"), "stale staged bytes");
r1 := process.run(cleanup_cmd());
process.assert(r1 != null and r1!.exit_code == 0, "cleanup must exit 0");
v1, e1 := parse(r1!.stdout, xx arena);
if e1 { process.assert(false, "cleanup stdout must be one JSON object"); return 1; }
o1 := v1.object;
process.assert(get_str(o1, "status") == "cleaned", "cleanup json status");
rd := get_arr(o1, "releases_deleted");
process.assert(rd.len == 2 and arr_contains(rd, REL_B) and arr_contains(rd, REL_B2),
"cleanup deleted exactly B and B2");
kp := get_arr(o1, "kept_pointed_releases");
process.assert(kp.len == 1 and arr_contains(kp, REL_A),
"A is beyond retention but spared: stable points at it");
od := get_arr(o1, "objects_deleted");
process.assert(od.len == 1 and arr_contains(od, key_b), "only B's unreferenced object was GC'd");
sd := get_arr(o1, "staging_deleted");
process.assert(sd.len == 2 and arr_contains(sd, "incoming-7") and arr_contains(sd, "deadbeef"),
"both stale staging files swept");
process.assert(get_arr(o1, "unlink_failed").len == 0, "no unlink failures");
process.assert(q_int("SELECT COUNT(*) FROM releases", "", "") == 2, "releases left: A and C");
process.assert(q_int("SELECT COUNT(*) FROM releases WHERE id = ?1", REL_A, "") == 1, "A survived");
process.assert(q_int("SELECT COUNT(*) FROM releases WHERE id = ?1", REL_C, "") == 1, "C survived");
process.assert(q_int("SELECT COUNT(*) FROM artifacts", "", "") == 2, "pruned releases lost their artifact rows");
process.assert(q_text("SELECT current_release_id FROM channels WHERE name = ?1", "stable", "") == REL_A, "stable still -> A");
process.assert(q_text("SELECT current_release_id FROM channels WHERE name = ?1", "beta", "") == REL_C, "beta still -> C");
process.assert(!fs.exists(path_join(STORE, concat("objects/", key_b))), "B's object bytes are gone");
process.assert(fs.exists(path_join(STORE, concat("objects/", key_shared))), "the shared object survived (C references it)");
process.assert(!fs.exists(path_join(STORE, "staging/incoming-7")), "staging file 1 gone");
process.assert(!fs.exists(path_join(STORE, "staging/deadbeef")), "staging file 2 gone");
process.assert(q_int("SELECT COUNT(*) FROM audit_events WHERE action = ?1", "release.delete", "") == 2, "2 release.delete audit events");
process.assert(q_int("SELECT COUNT(*) FROM audit_events WHERE action = ?1 AND target_id = ?2", "object.delete", key_b) == 1, "object.delete audit names B's key");
process.assert(q_int("SELECT COUNT(*) FROM audit_events WHERE action = ?1", "staging.delete", "") == 2, "2 staging.delete audit events");
print(" cleanup: B+B2 pruned, A spared via stable, shared object kept, staging swept, audited\n");
// ── 5. retention survives publish (repo.publish regression), then
// a second cleanup prunes C and an idle third does nothing ───
publish("1.3.0", "payload-shared.apk", "shared-bytes-B2-C-D");
process.assert(beta_retention() == 1, "publishing into beta must NOT reset retention_keep");
r2 := process.run(cleanup_cmd());
process.assert(r2 != null and r2!.exit_code == 0, "second cleanup must exit 0");
v2, e2 := parse(r2!.stdout, xx arena);
if e2 { process.assert(false, "second cleanup stdout must be one JSON object"); return 1; }
rd2 := get_arr(v2.object, "releases_deleted");
process.assert(rd2.len == 1 and arr_contains(rd2, REL_C), "second cleanup prunes C (beta -> D now)");
process.assert(get_arr(v2.object, "objects_deleted").len == 0, "shared object survives: D references it");
process.assert(fs.exists(path_join(STORE, concat("objects/", key_shared))), "shared object bytes still present");
process.assert(q_text("SELECT current_release_id FROM channels WHERE name = ?1", "beta", "") == REL_D, "beta -> D");
r3 := process.run(cleanup_cmd());
process.assert(r3 != null and r3!.exit_code == 0, "third cleanup must exit 0");
v3, e3 := parse(r3!.stdout, xx arena);
if e3 { process.assert(false, "third cleanup stdout must be one JSON object"); return 1; }
process.assert(get_arr(v3.object, "releases_deleted").len == 0
and get_arr(v3.object, "objects_deleted").len == 0
and get_arr(v3.object, "staging_deleted").len == 0,
"an idle cleanup deletes nothing (idempotent)");
print(" retention survives publish; second cleanup prunes C, third is a no-op\n");
// ── 6. cleanup on a never-published store fails loudly ───────────
rn := process.run("build/dist store cleanup --local-store .sx-tmp/retention_cleanup_none --json 2>/dev/null");
process.assert(rn != null and rn!.exit_code == 1, "cleanup on a missing store must exit 1");
nv, ne := parse(rn!.stdout, xx arena);
if ne { process.assert(false, "missing-store stdout must be one JSON object"); return 1; }
process.assert(get_str(get_obj(nv.object, "error"), "code") == "store.load", "missing-store json names the code");
print(" cleanup on a missing store: exit 1 + store.load\n");
process.run(concat("rm -rf ", STORE));
process.run(concat("rm -rf ", MDIR));
print("retention_cleanup: ALL CASES PASS\n");
return 0;
}