Merge branch 'flow/distribution/P2.2' into distribution-plan

This commit is contained in:
agra
2026-06-06 00:51:04 +03:00
3 changed files with 299 additions and 0 deletions

3
.gitignore vendored
View File

@@ -2,3 +2,6 @@
# build artifacts from `make build`
build/
# scratch store roots / fixtures created by tests (never /tmp)
.sx-tmp/

138
src/store/store.sx Normal file
View File

@@ -0,0 +1,138 @@
// =====================================================================
// store.sx — content-addressed blob store (subplan 02, Slice 3).
//
// Objects are addressed by the lowercase-hex SHA-256 of their bytes:
// the digest IS the storage key, and the bytes live at
// `<root>/objects/<digest>`. This key is what populates an
// Artifact.sha256 / Artifact.storage_key at the domain boundary.
//
// Publish is a two-phase write: bytes are first written under
// `<root>/staging/`, then atomically renamed into `<root>/objects/<key>`.
// The rename is the only operation that makes an object visible at its
// final path, so an interrupted or failed write never leaves a torn
// object — a half-written staging file is not reachable as
// `objects/<key>`. Staging and objects share `<root>` (one filesystem),
// so the rename is atomic.
//
// `put_bytes` stages the in-memory bytes at `staging/<key>` (the key is
// known up front). `put_file` reads its source exactly once: it copies
// the source into a provisional `staging/incoming-<n>`, then derives the
// key from the SHA-256 of THAT staged file — the exact bytes that get
// published. So `key == digest(published object)` holds even if the
// source is mutated after the copy; the source is never read twice.
//
// Dedup: identical bytes hash to the same key, so a put whose object
// already exists returns immediately without re-staging or rewriting.
// =====================================================================
#import "modules/std.sx";
fs :: #import "modules/fs.sx";
hash :: #import "modules/std/hash.sx";
// Failure classes for a put. `Stage` covers a failed staging write,
// `Publish` a failed atomic rename, `Source` an unreadable input file.
StoreErr :: error {
Stage,
Publish,
Source,
}
// Copy a by-value `[64]u8` digest into a heap `string` key. The hash
// modules return the digest on the stack, so the view over it is only
// valid until the array dies; this materialises an owned, null-terminated
// copy safe to store and use as a path component.
digest_to_key :: (d: [64]u8) -> string {
view := string.{ ptr = @d[0], len = 64 };
return substr(view, 0, 64);
}
// SHA-256 of an in-memory buffer, as the lowercase-hex storage key.
digest_of_bytes :: (bytes: string) -> string {
d := hash.sha256_hex(bytes);
return digest_to_key(d);
}
// SHA-256 of a file's contents (streamed in fixed chunks), as the
// storage key. Raises `Source` if the file can't be opened/read.
digest_of_file :: (path: string) -> (string, !StoreErr) {
maybe := hash.sha256_file(path);
if maybe == null { raise error.Source; }
d := maybe!;
return digest_to_key(d);
}
Store :: struct {
root: string;
// Monotonic per-store counter naming `put_file`'s provisional staging
// files, so concurrent file puts don't clobber each other's temp copy.
seq: s64;
init :: (root: string) -> Store {
return Store.{ root = root, seq = 0 };
}
objects_dir :: (self: *Store) -> string { return path_join(self.root, "objects"); }
staging_dir :: (self: *Store) -> string { return path_join(self.root, "staging"); }
object_path :: (self: *Store, key: string) -> string { return path_join(self.root, "objects", key); }
staging_path :: (self: *Store, key: string) -> string { return path_join(self.root, "staging", key); }
// True once `key`'s bytes are published at their final path.
has :: (self: *Store, key: string) -> bool {
return fs.exists(self.object_path(key));
}
// Phase 1: write `bytes` to `staging/<key>`, returning the staging
// path. The bytes are not yet visible at `objects/<key>`.
stage_write :: (self: *Store, key: string, bytes: string) -> (string, !StoreErr) {
if !fs.create_dir_all(self.staging_dir()) { raise error.Stage; }
sp := self.staging_path(key);
if !fs.write_file(sp, bytes) { raise error.Stage; }
return sp;
}
// Phase 1 (file source): copy `src` once into a provisional staging
// file `staging/incoming-<n>`. The key isn't known until these staged
// bytes are hashed, so the name is a per-put sequence — never
// `objects/<key>`, so an interrupted copy is never a published object.
stage_temp_copy :: (self: *Store, src: string) -> (string, !StoreErr) {
if !fs.create_dir_all(self.staging_dir()) { raise error.Stage; }
self.seq += 1;
sp := self.staging_path(concat("incoming-", int_to_string(self.seq)));
if !fs.copy_file(src, sp) { raise error.Stage; }
return sp;
}
// Phase 2: atomically move a staged file into `objects/<key>`. After
// this returns the object is published; before it, it never is.
publish :: (self: *Store, staged: string, key: string) -> !StoreErr {
if !fs.create_dir_all(self.objects_dir()) { raise error.Publish; }
if !fs.move(staged, self.object_path(key)) { raise error.Publish; }
return;
}
// Store in-memory bytes and return their storage key. Dedup: an
// already-published object is returned without re-staging.
put_bytes :: (self: *Store, bytes: string) -> (string, !StoreErr) {
key := digest_of_bytes(bytes);
if self.has(key) { return key; }
sp := try self.stage_write(key, bytes);
try self.publish(sp, key);
return key;
}
// Store a file's bytes and return their storage key. The source is
// read exactly once — copied into staging, then hashed there — so the
// returned key is the SHA-256 of the bytes actually published, not of a
// separate read that could disagree. Dedup: if the object already
// exists, the staged copy is dropped and the existing key returned.
put_file :: (self: *Store, path: string) -> (string, !StoreErr) {
sp := try self.stage_temp_copy(path);
key := try digest_of_file(sp);
if self.has(key) {
fs.delete_file(sp);
return key;
}
try self.publish(sp, key);
return key;
}
}

View File

@@ -0,0 +1,158 @@
// Acceptance for P2.2 — the content-addressed artifact store.
//
// Drives a fresh store rooted under `.sx-tmp/` (never /tmp) and asserts
// the four Slice-3 invariants:
// 1. put → object lands at `objects/<sha256>` and its bytes round-trip;
// the storage key equals std.hash, an independent `shasum -a 256`,
// and the pinned SHA-256("abc") vector.
// 2. dedup — identical bytes are not stored twice and an existing
// object is never rewritten.
// 3. atomicity — a staged-but-unpublished write is invisible at the
// final path, and a publish that fails before/at the rename leaves
// no object.
// 4. put_file — a file source produces the same key and bytes.
// Exits 0 only if every assertion holds (process.assert aborts otherwise).
#import "modules/std.sx";
fs :: #import "modules/fs.sx";
hash :: #import "modules/std/hash.sx";
process :: #import "modules/process.sx";
#import "../src/store/store.sx";
// SHA-256("abc"), the FIPS 180-4 one-block known-answer vector.
ABC_SHA256 :: "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad";
// std.hash digest of `s` as a heap string key (independent of the store).
stdhash_key :: (s: string) -> string {
d := hash.sha256_hex(s);
view := string.{ ptr = @d[0], len = 64 };
return substr(view, 0, 64);
}
// First 64 hex chars of `shasum -a 256` over `bytes`, via the shell.
// `bytes` must be shell-safe (the fixtures here are plain ASCII).
shasum_key :: (bytes: string) -> string {
cmd := concat("printf '%s' ", concat(bytes, " | shasum -a 256"));
r := process.run(cmd);
process.assert(r != null, "shasum -a 256 must run");
res := r!;
process.assert(res.exit_code == 0, "shasum -a 256 must exit 0");
return substr(res.stdout, 0, 64);
}
// Number of directory entries under `dir`, parsed from `ls -1 | wc -l`.
entry_count :: (dir: string) -> string {
cmd := concat("ls -1 ", concat(dir, " | wc -l | tr -dc '0-9'"));
r := process.run(cmd);
process.assert(r != null, "ls/wc must run");
res := r!;
return res.stdout;
}
// Number of `put_file` staging temps (`incoming-*`) left under `dir`.
// 0 means every file-source put cleaned up its staging copy.
incoming_count :: (dir: string) -> string {
cmd := concat("ls -1 ", concat(dir, " 2>/dev/null | grep -c '^incoming-' | tr -dc '0-9'"));
r := process.run(cmd);
process.assert(r != null, "ls/grep must run");
res := r!;
if res.stdout.len == 0 { return "0"; }
return res.stdout;
}
main :: () -> s32 {
root := ".sx-tmp/store-cas";
process.run(concat("rm -rf ", root)); // fresh root, even after a crashed prior run
st := Store.init(root);
// ── 1. put + content addressing ─────────────────────────────────────
fixture := "abc";
key, e := st.put_bytes(fixture);
process.assert(!e, "put_bytes(abc) must succeed");
process.assert(key == ABC_SHA256, "key must equal pinned SHA-256(abc) vector");
process.assert(key == stdhash_key(fixture), "store key must equal std.hash digest");
process.assert(key == shasum_key(fixture), "store key must equal shasum -a 256");
print(" store == std.hash == shasum == vector: {}\n", key);
process.assert(st.has(key), "object must exist at objects/<sha256>");
stored := fs.read_file(st.object_path(key));
process.assert(stored != null, "stored object must be readable");
process.assert(stored! == fixture, "stored bytes must equal the input");
// ── 2. dedup: same bytes, one object, never rewritten ───────────────
// Overwrite the object on disk; a deduped re-put must NOT touch it.
process.assert(fs.write_file(st.object_path(key), "TAMPERED"), "tamper write must succeed");
key2, e2 := st.put_bytes(fixture);
process.assert(!e2, "second put_bytes must succeed");
process.assert(key2 == key, "dedup: identical bytes yield the same key");
after := fs.read_file(st.object_path(key));
process.assert(after! == "TAMPERED", "dedup: existing object must not be rewritten");
process.assert(entry_count(st.objects_dir()) == "1", "dedup: exactly one object stored");
// Restore the real bytes so the store is left consistent.
process.assert(fs.write_file(st.object_path(key), fixture), "restore write must succeed");
print(" dedup: one object, copy skipped on re-put\n");
// ── 3. atomicity: staged write is invisible until publish ───────────
pending := "interrupted-upload-bytes";
pkey := stdhash_key(pending);
process.assert(!st.has(pkey), "fresh store: pending object must be absent");
sp, se := st.stage_write(pkey, pending);
process.assert(!se, "stage_write must succeed");
process.assert(fs.exists(sp), "staged file must exist after stage_write");
process.assert(!st.has(pkey), "atomicity: object must NOT exist before the rename");
// A publish whose staging source is missing fails and creates nothing.
missing := "1111111111111111111111111111111111111111111111111111111111111111";
process.assert(!st.has(missing), "precondition: no object for the missing key");
failed := false;
st.publish(st.staging_path(missing), missing) catch { failed = true; };
process.assert(failed, "publish of a missing staging file must fail");
process.assert(!st.has(missing), "failed publish must leave no object");
print(" atomicity: staged write invisible; failed publish leaves no object\n");
// ── 4. put_file: single source read, key == digest of published object
src := ".sx-tmp/store-cas-src.bin";
file_bytes := "file-source-bytes-123"; // shell-safe: no spaces/newlines
process.assert(fs.write_file(src, file_bytes), "fixture source file must be written");
fkey, fe := st.put_file(src);
process.assert(!fe, "put_file must succeed");
process.assert(st.has(fkey), "put_file object must be published");
// The returned key must be the SHA-256 of the bytes ACTUALLY published —
// re-hash the stored object and confirm it equals the key (and equals
// std.hash + shasum -a 256 of the original fixture).
fstored := fs.read_file(st.object_path(fkey));
process.assert(fstored != null, "published object must be readable");
process.assert(fstored! == file_bytes, "put_file stored bytes must equal the file");
process.assert(stdhash_key(fstored!) == fkey, "key must equal SHA-256 of the published object");
process.assert(fkey == stdhash_key(file_bytes), "put_file key must equal std.hash of the file bytes");
process.assert(fkey == shasum_key(file_bytes), "put_file key must equal shasum -a 256");
process.assert(incoming_count(st.staging_dir()) == "0", "put_file must clean up its staging temp");
objs_after_file := entry_count(st.objects_dir());
print(" put_file: key {} == digest(published object)\n", fkey);
// Cross-path dedup: put_bytes of identical content yields the SAME key
// and adds no second object; the stored bytes are not rewritten.
bkey, be := st.put_bytes(file_bytes);
process.assert(!be, "cross-path put_bytes must succeed");
process.assert(bkey == fkey, "put_file and put_bytes of identical content share a key");
process.assert(entry_count(st.objects_dir()) == objs_after_file, "cross-path dedup adds no object");
afterb := fs.read_file(st.object_path(fkey));
process.assert(afterb! == file_bytes, "cross-path dedup must not rewrite the object");
// A repeat put_file hits dedup and also drops its staging temp.
fkey2, fe2 := st.put_file(src);
process.assert(!fe2, "repeat put_file must succeed");
process.assert(fkey2 == fkey, "repeat put_file dedup yields the same key");
process.assert(entry_count(st.objects_dir()) == objs_after_file, "repeat put_file adds no object");
process.assert(incoming_count(st.staging_dir()) == "0", "dedup put_file must clean up its staging temp");
print(" put_file: cross-path dedup, one object, staging cleaned\n");
// ── cleanup ─────────────────────────────────────────────────────────
process.run(concat("rm -rf ", root));
fs.delete_file(src);
print("store_content_addressed: ALL CASES PASS\n");
return 0;
}