Files
distribution/src/store/cleanup.sx
agra dc6908dee7 retention + cleanup: channel retention policy, store GC, deletion audit (P5.3)
Subplan 02 Slice 4. Channel gains retention_keep (0 = keep everything;
N = keep the newest N published releases of the channel's lineage), set
via the new `dist channel set --retention-keep`. The new `dist store
cleanup` prunes lineage-expired releases — never one any channel points
at, so cross-promoted releases survive — drops their artifact rows,
GCs objects/ files no surviving artifact references, and sweeps stale
staging/ leftovers; every deletion writes an audit event. The pruned
model is saved before any unlink, so a crash leaves orphan blobs (next
run catches them), never dangling references.

repo.publish no longer replaces an existing channel row wholesale: only
the pointer moves, so policy/rollout/retention survive every publish
(previously each publish silently reset them to defaults).

std.fs has no directory listing, so cleanup.sx carries a local
opendir/readdir/closedir shim, like publish.sx's time(2) shim.

dist.db channels gains the retention_keep column (idempotent ALTER for
pre-retention stores); db.json import treats it as optional.

tests/retention_cleanup.sx pins the whole scenario; the repo.publish
assertion fails on the pre-fix code. make test 23/23 green.
2026-06-12 19:35:52 +03:00

315 lines
12 KiB
Plaintext

// =====================================================================
// cleanup.sx — `dist store cleanup`: retention pruning + store garbage
// collection over the persisted store (subplan 02, Slice 4).
//
// One pass, four deletions, all audited:
//
// 1. RELEASES beyond retention. For every channel with
// `retention_keep` = N > 0, the channel's lineage (the app's
// PUBLISHED releases targeting that channel, in publish order)
// keeps its newest N entries; older ones are deleted — UNLESS a
// release is currently pointed at by ANY channel (cross-promotion
// means e.g. stable may point into beta's lineage), in which case
// it survives and is reported in `kept_pointed_releases`.
// 2. ARTIFACT rows of the deleted releases.
// 3. OBJECT files in `<store>/objects/` that no surviving artifact
// references (covers both blobs freed by 1/2 and blobs orphaned by
// an aborted upload that never reached a release).
// 4. EVERYTHING in `<store>/staging/`. A completed put always renames
// or deletes its staging file, so anything still there is a
// leftover from a crashed/aborted publish.
//
// ORDERING: the pruned model (with one audit event per deletion) is
// saved to dist.db BEFORE any file is unlinked, so a crash mid-cleanup
// can leave an unreferenced blob (the next run catches it) but never a
// model row pointing at deleted bytes. An unlink that fails after the
// save is reported in `unlink_failed` (exit stays 0 — the model is
// consistent; the orphan is re-detected next run).
//
// OFFLINE OP: like every load-modify-save CLI command, cleanup must not
// run concurrently with a writing distd — the whole-model save would
// drop a publish that landed in between, and the staging sweep would
// eat an in-flight upload's temp file.
// =====================================================================
#import "modules/std.sx";
#import "modules/std/json.sx";
fs :: #import "modules/std/fs.sx";
#import "../domain/platform.sx";
#import "../domain/app.sx";
#import "../domain/release.sx";
#import "../domain/artifact.sx";
#import "../domain/channel.sx";
#import "../domain/token.sx";
#import "../domain/audit.sx";
#import "../domain/validate.sx";
#import "../repo/repo.sx";
db :: #import "../repo/db.sx";
jout :: #import "../json_out.sx";
pl :: #import "../publish/publish.sx";
CleanupErr :: error {
Load, // store database absent or unreadable
Persist, // the pruned model could not be re-written
}
CleanupOutcome :: struct {
releases_deleted: List(string); // release ids pruned by retention
objects_deleted: List(string); // storage keys no longer referenced
staging_deleted: List(string); // staging file names swept
kept_pointed_releases: List(string); // beyond retention but channel-pointed
unlink_failed: List(string); // paths whose unlink failed post-save
}
// ── directory listing (no std.fs equivalent yet) ─────────────────────
// opendir/readdir/closedir via libc, like publish.sx's time(2) shim.
// dirent layout is darwin-arm64: d_type at byte 20, d_name
// (NUL-terminated, max 1024) at byte 21.
cl_libc :: #library "c";
cl_opendir :: (path: [:0]u8) -> usize #foreign cl_libc "opendir";
cl_readdir :: (dirp: usize) -> usize #foreign cl_libc "readdir";
cl_closedir :: (dirp: usize) -> i32 #foreign cl_libc "closedir";
CL_DT_REG :: 8;
// Regular-file names in `dir` (no recursion), copied into the context
// allocator (the dirent buffer is libc-owned and dies on the next read).
// A missing/unopenable directory lists as empty: a store without
// objects/ or staging/ simply has nothing to delete.
cl_list_files :: (dir: string) -> List(string) {
names : List(string) = .{};
dp := cl_opendir(dir);
if dp == 0 { return names; }
while true {
ent := cl_readdir(dp);
if ent == 0 { break; }
p : [*]u8 = xx ent;
if p[20] == CL_DT_REG {
n := 0;
while p[21 + n] != 0 and n < 1023 { n += 1; }
view := string.{ ptr = @p[21], len = n };
names.append(db.db_dup_str(view, context.allocator), context.allocator);
}
}
cl_closedir(dp);
return names;
}
cl_contains :: (l: *List(string), s: string) -> bool {
i := 0;
while i < l.len {
if l.items[i] == s { return true; }
i += 1;
}
return false;
}
// ── the cleanup transaction ──────────────────────────────────────────
run_cleanup :: (store_dir: string, fail_out: *jout.CliFailure) -> (CleanupOutcome, !CleanupErr) {
if !db.store_exists(store_dir) {
fail_out.code = "store.load";
fail_out.message = concat("no store database (nothing published yet): ", store_dir);
raise error.Load;
}
repo, le := db.load(store_dir);
if le {
fail_out.code = "store.load";
fail_out.message = concat("the store database could not be loaded: ", store_dir);
raise error.Load;
}
alloc := context.allocator;
o : CleanupOutcome = .{};
// Every channel's current pointer is sacrosanct, whichever channel's
// lineage the release came from.
guard : List(string) = .{};
i := 0;
while i < repo.channels.len {
cur := repo.channels.items[i].current_release_id;
if cur.len > 0 { guard.append(cur, alloc); }
i += 1;
}
// Per retention-bearing channel: the lineage's oldest entries beyond
// `retention_keep` are pruned (or spared by the pointer guard). A
// release targets exactly one channel, so lineages never overlap.
i = 0;
while i < repo.channels.len {
chan := repo.channels.items[i];
i += 1;
if chan.retention_keep <= 0 { continue; }
total := 0;
j := 0;
while j < repo.releases.len {
r := repo.releases.items[j];
if r.app_id == chan.app_id and r.channel == chan.name and r.published_at > 0 { total += 1; }
j += 1;
}
if total <= chan.retention_keep { continue; }
drop := total - chan.retention_keep; // oldest `drop` lineage entries
seen := 0;
j = 0;
while j < repo.releases.len and seen < drop {
r := repo.releases.items[j];
j += 1;
if !(r.app_id == chan.app_id and r.channel == chan.name and r.published_at > 0) { continue; }
seen += 1;
if cl_contains(@guard, r.id) {
o.kept_pointed_releases.append(r.id, alloc);
} else {
o.releases_deleted.append(r.id, alloc);
}
}
}
// Storage keys the surviving artifacts still reference; any objects/
// file outside this set is garbage.
referenced : List(string) = .{};
i = 0;
while i < repo.artifacts.len {
a := repo.artifacts.items[i];
if !cl_contains(@o.releases_deleted, a.release_id) {
referenced.append(a.storage_key, alloc);
}
i += 1;
}
objects := cl_list_files(path_join(store_dir, "objects"));
i = 0;
while i < objects.len {
if !cl_contains(@referenced, objects.items[i]) {
o.objects_deleted.append(objects.items[i], alloc);
}
i += 1;
}
o.staging_deleted = cl_list_files(path_join(store_dir, "staging"));
// Compact the model in place (order preserved): pruned releases go,
// and their artifact rows with them.
w := 0;
i = 0;
while i < repo.releases.len {
if !cl_contains(@o.releases_deleted, repo.releases.items[i].id) {
repo.releases.items[w] = repo.releases.items[i];
w += 1;
}
i += 1;
}
repo.releases.len = w;
w = 0;
i = 0;
while i < repo.artifacts.len {
if !cl_contains(@o.releases_deleted, repo.artifacts.items[i].release_id) {
repo.artifacts.items[w] = repo.artifacts.items[i];
w += 1;
}
i += 1;
}
repo.artifacts.len = w;
// One audit event per deletion, written into the same save.
now := pl.now_secs();
i = 0;
while i < o.releases_deleted.len {
id := o.releases_deleted.items[i];
repo.create_audit_event(AuditEvent.{
id = concat("evt-cleanup-release-", id),
actor = "cli", action = "release.delete", target_type = "release",
target_id = id, metadata = "retention", created_at = now,
});
i += 1;
}
i = 0;
while i < o.objects_deleted.len {
key := o.objects_deleted.items[i];
repo.create_audit_event(AuditEvent.{
id = concat("evt-cleanup-object-", key),
actor = "cli", action = "object.delete", target_type = "object",
target_id = key, metadata = "unreferenced", created_at = now,
});
i += 1;
}
i = 0;
while i < o.staging_deleted.len {
name := o.staging_deleted.items[i];
repo.create_audit_event(AuditEvent.{
id = concat("evt-cleanup-staging-", name),
actor = "cli", action = "staging.delete", target_type = "staging",
target_id = name, metadata = "stale", created_at = now,
});
i += 1;
}
werr := false;
db.save(@repo, store_dir) catch { werr = true; };
if werr {
fail_out.code = "persist.save";
fail_out.message = concat("the store database could not be written: ", store_dir);
raise error.Persist;
}
// Files go only after the model committed.
i = 0;
while i < o.objects_deleted.len {
p := path_join(path_join(store_dir, "objects"), o.objects_deleted.items[i]);
if !fs.delete_file(p) { o.unlink_failed.append(p, alloc); }
i += 1;
}
i = 0;
while i < o.staging_deleted.len {
p := path_join(path_join(store_dir, "staging"), o.staging_deleted.items[i]);
if !fs.delete_file(p) { o.unlink_failed.append(p, alloc); }
i += 1;
}
return o;
}
// ── rendering ─────────────────────────────────────────────────────────
cl_str_array :: (l: *List(string), alloc: Allocator) -> Value {
arr : Array = .{};
i := 0;
while i < l.len {
arr.add(.str(l.items[i]), alloc);
i += 1;
}
return .array(arr);
}
// `{"status":"cleaned","releases_deleted":[...],"objects_deleted":[...],
// "staging_deleted":[...],"kept_pointed_releases":[...],
// "unlink_failed":[...]}`.
write_cleanup_json :: (o: *CleanupOutcome, dst: []u8) -> (i64, !JsonError) {
gpa := GPA.init();
root : Object = .{};
root.put("status", .str("cleaned"), xx gpa);
root.put("releases_deleted", cl_str_array(@o.releases_deleted, xx gpa), xx gpa);
root.put("objects_deleted", cl_str_array(@o.objects_deleted, xx gpa), xx gpa);
root.put("staging_deleted", cl_str_array(@o.staging_deleted, xx gpa), xx gpa);
root.put("kept_pointed_releases", cl_str_array(@o.kept_pointed_releases, xx gpa), xx gpa);
root.put("unlink_failed", cl_str_array(@o.unlink_failed, xx gpa), xx gpa);
rootv : Value = .object(root);
n := try write_to_buffer(rootv, dst);
return n;
}
cleanup_human :: (o: *CleanupOutcome) -> string {
s := concat("cleaned: ", int_to_string(o.releases_deleted.len));
s = concat(s, concat(" releases, ", int_to_string(o.objects_deleted.len)));
s = concat(s, concat(" objects, ", int_to_string(o.staging_deleted.len)));
s = concat(s, " staged files deleted");
if o.kept_pointed_releases.len > 0 {
s = concat(s, concat("; kept (channel-pointed): ", int_to_string(o.kept_pointed_releases.len)));
}
if o.unlink_failed.len > 0 {
s = concat(s, concat("; UNLINK FAILED: ", int_to_string(o.unlink_failed.len)));
}
return concat(s, "\n");
}