Files
distribution/src/repo/repo.sx
agra a1f13c4356 sqlite persistence: the store moves from db.json to dist.db (P5.2)
src/repo/db.sx persists the whole Repo to <store>/dist.db through the
vendored SQLite bindings, keeping the load-whole/save-whole call shape.
One table per entity; enums as lowercase variant names; list order
round-trips via rowid. Enforced uniqueness: apps.slug,
channels(app_id, name), tokens.token_hash; lookup indexes on
releases(app_id) and artifacts(sha256) (non-unique - identical bytes
may ship in several releases). save is DELETE-all + INSERT-all inside
BEGIN IMMEDIATE...COMMIT with rollback on failure; every connection
sets busy_timeout so the CLI and a running distd interleave safely.

A store holding only a pre-SQLite db.json imports once on first load,
then the file is renamed db.json.imported; a store with neither starts
empty. Consumers gate on db.store_exists instead of probing db.json.
The JSON read-back stays for the import path; the entity->json writers
stay for distd's /api responses.

Tests that parsed db.json directly now assert by querying dist.db
through the SQLite bindings; tests/db_import.sx pins the import path;
tests/repo_roundtrip.sx pins the SQLite round-trip. make test 22/22.
2026-06-12 16:16:13 +03:00

316 lines
13 KiB
Plaintext

// =====================================================================
// repo.sx — in-memory repository over the P2.1 domain (subplan 02,
// Slice 1). Entities live in growable `List`s scanned LINEARLY — no
// HashMap; the persistence layer's SQLite indexes (db.sx) don't change
// that, since a Repo is always loaded whole.
//
// LONG-LIVED ALLOCATOR (binding, project CLAUDE.md): a Repo OUTLIVES the
// transient `context.allocator` of any single CLI call. Its `List`
// backing stores must NOT grow through the implicit context allocator —
// that memory would die when a caller's `push Context { allocator = .. }`
// scope ends, even though the Repo is still alive. So `init` CAPTURES the
// owning allocator (`own_allocator := context.allocator`) and every
// growth point forwards it explicitly: `self.<list>.append(x,
// self.own_allocator)`. There is no implicit-allocator growth anywhere in
// this file.
// =====================================================================
#import "modules/std.sx";
#import "../domain/platform.sx";
#import "../domain/app.sx";
#import "../domain/release.sx";
#import "../domain/artifact.sx";
#import "../domain/channel.sx";
#import "../domain/token.sx";
#import "../domain/audit.sx";
#import "../domain/validate.sx";
// Failure classes for the publish transaction. `Validation` = a release /
// artifact / channel failed domain validation; `Integrity` = the published
// aggregate is cross-identity inconsistent — the release names an app that
// doesn't exist, a release id that already exists, an artifact whose app_id /
// release_id doesn't match the release, or a promoted channel that belongs to
// a different app or is not the channel the release targets (name mismatch).
PublishErr :: error {
Validation,
Integrity,
}
Repo :: struct {
// The allocator captured at construction. EVERY List growth in this
// repo forwards this explicitly, so the backing stores outlive any
// single call's transient `context.allocator` (see file header).
own_allocator: Allocator;
apps: List(App);
releases: List(Release);
artifacts: List(Artifact);
channels: List(Channel);
tokens: List(Token);
audit_events: List(AuditEvent);
// Capture the owning allocator. The List fields default to empty
// (items=null, len=0, cap=0) and first allocate on their first append.
init :: () -> Repo {
return Repo.{ own_allocator = context.allocator };
}
// ── Apps ─────────────────────────────────────────────────────────
create_app :: (self: *Repo, a: App) {
self.apps.append(a, self.own_allocator);
}
get_app :: (self: *Repo, id: string) -> ?App {
i := 0;
while i < self.apps.len {
if self.apps.items[i].id == id { return self.apps.items[i]; }
i += 1;
}
return null;
}
find_app_by_slug :: (self: *Repo, slug: string) -> ?App {
i := 0;
while i < self.apps.len {
if self.apps.items[i].slug == slug { return self.apps.items[i]; }
i += 1;
}
return null;
}
list_apps :: (self: *Repo) -> []App {
return .{ ptr = self.apps.items, len = self.apps.len };
}
update_app :: (self: *Repo, a: App) -> bool {
i := 0;
while i < self.apps.len {
if self.apps.items[i].id == a.id { self.apps.items[i] = a; return true; }
i += 1;
}
return false;
}
// ── Releases ─────────────────────────────────────────────────────
create_release :: (self: *Repo, r: Release) {
self.releases.append(r, self.own_allocator);
}
get_release :: (self: *Repo, id: string) -> ?Release {
i := 0;
while i < self.releases.len {
if self.releases.items[i].id == id { return self.releases.items[i]; }
i += 1;
}
return null;
}
list_releases :: (self: *Repo) -> []Release {
return .{ ptr = self.releases.items, len = self.releases.len };
}
update_release :: (self: *Repo, r: Release) -> bool {
i := 0;
while i < self.releases.len {
if self.releases.items[i].id == r.id { self.releases.items[i] = r; return true; }
i += 1;
}
return false;
}
// ── Artifacts ────────────────────────────────────────────────────
create_artifact :: (self: *Repo, a: Artifact) {
self.artifacts.append(a, self.own_allocator);
}
get_artifact :: (self: *Repo, id: string) -> ?Artifact {
i := 0;
while i < self.artifacts.len {
if self.artifacts.items[i].id == id { return self.artifacts.items[i]; }
i += 1;
}
return null;
}
// The content-address lookup from P2.2: scan by the sha256 digest.
find_artifact_by_digest :: (self: *Repo, digest: string) -> ?Artifact {
i := 0;
while i < self.artifacts.len {
if self.artifacts.items[i].sha256 == digest { return self.artifacts.items[i]; }
i += 1;
}
return null;
}
list_artifacts :: (self: *Repo) -> []Artifact {
return .{ ptr = self.artifacts.items, len = self.artifacts.len };
}
update_artifact :: (self: *Repo, a: Artifact) -> bool {
i := 0;
while i < self.artifacts.len {
if self.artifacts.items[i].id == a.id { self.artifacts.items[i] = a; return true; }
i += 1;
}
return false;
}
// ── Channels ─────────────────────────────────────────────────────
create_channel :: (self: *Repo, c: Channel) {
self.channels.append(c, self.own_allocator);
}
// Channels are keyed by (app_id, name) — there is no separate id.
channel_index :: (self: *Repo, app_id: string, name: string) -> i64 {
i := 0;
while i < self.channels.len {
c := self.channels.items[i];
if c.app_id == app_id and c.name == name { return i; }
i += 1;
}
return -1;
}
get_channel :: (self: *Repo, app_id: string, name: string) -> ?Channel {
idx := self.channel_index(app_id, name);
if idx < 0 { return null; }
return self.channels.items[idx];
}
list_channels :: (self: *Repo) -> []Channel {
return .{ ptr = self.channels.items, len = self.channels.len };
}
update_channel :: (self: *Repo, c: Channel) -> bool {
idx := self.channel_index(c.app_id, c.name);
if idx < 0 { return false; }
self.channels.items[idx] = c;
return true;
}
// ── Tokens ───────────────────────────────────────────────────────
create_token :: (self: *Repo, t: Token) {
self.tokens.append(t, self.own_allocator);
}
get_token :: (self: *Repo, id: string) -> ?Token {
i := 0;
while i < self.tokens.len {
if self.tokens.items[i].id == id { return self.tokens.items[i]; }
i += 1;
}
return null;
}
// The auth lookup: a presented secret is hashed and matched here.
find_token_by_hash :: (self: *Repo, token_hash: string) -> ?Token {
i := 0;
while i < self.tokens.len {
if self.tokens.items[i].token_hash == token_hash { return self.tokens.items[i]; }
i += 1;
}
return null;
}
list_tokens :: (self: *Repo) -> []Token {
return .{ ptr = self.tokens.items, len = self.tokens.len };
}
update_token :: (self: *Repo, t: Token) -> bool {
i := 0;
while i < self.tokens.len {
if self.tokens.items[i].id == t.id { self.tokens.items[i] = t; return true; }
i += 1;
}
return false;
}
// ── Audit events ─────────────────────────────────────────────────
create_audit_event :: (self: *Repo, e: AuditEvent) {
self.audit_events.append(e, self.own_allocator);
}
get_audit_event :: (self: *Repo, id: string) -> ?AuditEvent {
i := 0;
while i < self.audit_events.len {
if self.audit_events.items[i].id == id { return self.audit_events.items[i]; }
i += 1;
}
return null;
}
list_audit_events :: (self: *Repo) -> []AuditEvent {
return .{ ptr = self.audit_events.items, len = self.audit_events.len };
}
// ── Publish transaction (atomic-ish) ─────────────────────────────
//
// Insert `release` + its `arts`, then point the (app_id, name) channel
// `chan` at that release. ATOMIC: a failure midway rolls the model back
// to exactly its pre-call state — no half-inserted release/artifacts
// and, in particular, no channel left pointing at a release that isn't
// in the repo (the "no dangling release" invariant).
//
// The published aggregate must also form ONE consistent identity graph,
// else committing it would create a dangling or ambiguous edge. So, as
// Integrity preconditions: the release's app must exist; the release id
// must be new (a colliding id would shadow the existing release, leaving
// the channel edge pointing at a different release than the one
// published); the promoted channel must belong to that app
// (chan.app_id == release.app_id) AND be the channel this release targets
// (chan.name == release.channel); and every artifact must belong to that
// app AND name this release (a.app_id == release.app_id and
// a.release_id == release.id).
//
// Rollback is by snapshot: List appends only bump `len`, so undoing them
// is a `len` reset; an updated existing channel is restored from a saved
// copy. The channel pointer is forced to `release.id` here, so a
// committed publish always points at the freshly inserted release.
publish :: (self: *Repo, release: Release, arts: *List(Artifact), chan: Channel) -> !PublishErr {
rel0 := self.releases.len;
art0 := self.artifacts.len;
chan0 := self.channels.len;
cidx := self.channel_index(chan.app_id, chan.name);
cprev : Channel = .{};
if cidx >= 0 { cprev = self.channels.items[cidx]; }
failed := false;
integrity := false;
validate_release(release) catch { failed = true; };
// Cross-entity identity preconditions for the whole aggregate: the
// release's app must exist, the release id must be new (else the
// channel edge would resolve to a pre-existing release, not this one),
// and the promoted channel must belong to that same app AND be the
// channel this release targets (chan.name == release.channel).
// (Per-artifact identity is checked in the loop.)
if !failed {
if self.get_app(release.app_id) == null { integrity = true; failed = true; }
}
if !failed {
if self.get_release(release.id) != null { integrity = true; failed = true; }
}
if !failed {
if chan.app_id != release.app_id { integrity = true; failed = true; }
}
if !failed {
if chan.name != release.channel { integrity = true; failed = true; }
}
if !failed {
self.releases.append(release, self.own_allocator);
i := 0;
while i < arts.len {
a := arts.items[i];
if a.app_id != release.app_id or a.release_id != release.id {
integrity = true; failed = true; break;
}
validate_artifact(a) catch { failed = true; };
if failed { break; }
self.artifacts.append(a, self.own_allocator);
i += 1;
}
}
if !failed {
c := chan;
c.current_release_id = release.id;
validate_channel(c) catch { failed = true; };
if !failed {
if cidx >= 0 { self.channels.items[cidx] = c; }
else { self.channels.append(c, self.own_allocator); }
}
}
if failed {
// Undo every append (len reset) and restore the prior channel.
self.releases.len = rel0;
self.artifacts.len = art0;
self.channels.len = chan0;
if cidx >= 0 { self.channels.items[cidx] = cprev; }
if integrity { raise error.Integrity; }
raise error.Validation;
}
return;
}
}