P2.2: fix put_file content-addressing — hash the published bytes (single source read)

put_file hashed the source path, then copied the source again — two reads.
A source mutated in between would publish bytes whose digest != returned key,
breaking the content-addressed invariant. Now copy the source once into a
provisional staging file, derive the key from the SHA-256 of that staged file
(the exact bytes published), then dedup/atomic-rename. Guarantees
key == digest(published object) with a single source read.

Extends the acceptance test: re-hashes the stored object and asserts it equals
the returned key (and std.hash / shasum of the fixture), asserts cross-path
dedup (put_file and put_bytes of identical content share one object), and
asserts the staging temp is cleaned up on both the success and dedup paths.
This commit is contained in:
agra
2026-06-06 00:47:45 +03:00
parent 68c002ab06
commit 3bc019c736
2 changed files with 78 additions and 19 deletions

View File

@@ -6,13 +6,20 @@
// `<root>/objects/<digest>`. This key is what populates an // `<root>/objects/<digest>`. This key is what populates an
// Artifact.sha256 / Artifact.storage_key at the domain boundary. // Artifact.sha256 / Artifact.storage_key at the domain boundary.
// //
// Publish is a two-phase write: bytes are first written to // Publish is a two-phase write: bytes are first written under
// `<root>/staging/<key>`, then atomically renamed into // `<root>/staging/`, then atomically renamed into `<root>/objects/<key>`.
// `<root>/objects/<key>`. The rename is the only operation that makes an // The rename is the only operation that makes an object visible at its
// object visible at its final path, so an interrupted or failed write // final path, so an interrupted or failed write never leaves a torn
// never leaves a torn object — a half-written staging file is not // object — a half-written staging file is not reachable as
// reachable as `objects/<key>`. Staging and objects share `<root>` (one // `objects/<key>`. Staging and objects share `<root>` (one filesystem),
// filesystem), so the rename is atomic. // so the rename is atomic.
//
// `put_bytes` stages the in-memory bytes at `staging/<key>` (the key is
// known up front). `put_file` reads its source exactly once: it copies
// the source into a provisional `staging/incoming-<n>`, then derives the
// key from the SHA-256 of THAT staged file — the exact bytes that get
// published. So `key == digest(published object)` holds even if the
// source is mutated after the copy; the source is never read twice.
// //
// Dedup: identical bytes hash to the same key, so a put whose object // Dedup: identical bytes hash to the same key, so a put whose object
// already exists returns immediately without re-staging or rewriting. // already exists returns immediately without re-staging or rewriting.
@@ -56,9 +63,12 @@ digest_of_file :: (path: string) -> (string, !StoreErr) {
Store :: struct { Store :: struct {
root: string; root: string;
// Monotonic per-store counter naming `put_file`'s provisional staging
// files, so concurrent file puts don't clobber each other's temp copy.
seq: s64;
init :: (root: string) -> Store { init :: (root: string) -> Store {
return Store.{ root = root }; return Store.{ root = root, seq = 0 };
} }
objects_dir :: (self: *Store) -> string { return path_join(self.root, "objects"); } objects_dir :: (self: *Store) -> string { return path_join(self.root, "objects"); }
@@ -80,10 +90,14 @@ Store :: struct {
return sp; return sp;
} }
// Phase 1 (file source): copy `src`'s bytes into `staging/<key>`. // Phase 1 (file source): copy `src` once into a provisional staging
stage_copy :: (self: *Store, key: string, src: string) -> (string, !StoreErr) { // file `staging/incoming-<n>`. The key isn't known until these staged
// bytes are hashed, so the name is a per-put sequence — never
// `objects/<key>`, so an interrupted copy is never a published object.
stage_temp_copy :: (self: *Store, src: string) -> (string, !StoreErr) {
if !fs.create_dir_all(self.staging_dir()) { raise error.Stage; } if !fs.create_dir_all(self.staging_dir()) { raise error.Stage; }
sp := self.staging_path(key); self.seq += 1;
sp := self.staging_path(concat("incoming-", int_to_string(self.seq)));
if !fs.copy_file(src, sp) { raise error.Stage; } if !fs.copy_file(src, sp) { raise error.Stage; }
return sp; return sp;
} }
@@ -106,11 +120,18 @@ Store :: struct {
return key; return key;
} }
// Store a file's bytes and return their storage key. Dedup as above. // Store a file's bytes and return their storage key. The source is
// read exactly once — copied into staging, then hashed there — so the
// returned key is the SHA-256 of the bytes actually published, not of a
// separate read that could disagree. Dedup: if the object already
// exists, the staged copy is dropped and the existing key returned.
put_file :: (self: *Store, path: string) -> (string, !StoreErr) { put_file :: (self: *Store, path: string) -> (string, !StoreErr) {
key := try digest_of_file(path); sp := try self.stage_temp_copy(path);
if self.has(key) { return key; } key := try digest_of_file(sp);
sp := try self.stage_copy(key, path); if self.has(key) {
fs.delete_file(sp);
return key;
}
try self.publish(sp, key); try self.publish(sp, key);
return key; return key;
} }

View File

@@ -48,6 +48,17 @@ entry_count :: (dir: string) -> string {
return res.stdout; return res.stdout;
} }
// Number of `put_file` staging temps (`incoming-*`) left under `dir`.
// 0 means every file-source put cleaned up its staging copy.
incoming_count :: (dir: string) -> string {
cmd := concat("ls -1 ", concat(dir, " 2>/dev/null | grep -c '^incoming-' | tr -dc '0-9'"));
r := process.run(cmd);
process.assert(r != null, "ls/grep must run");
res := r!;
if res.stdout.len == 0 { return "0"; }
return res.stdout;
}
main :: () -> s32 { main :: () -> s32 {
root := ".sx-tmp/store-cas"; root := ".sx-tmp/store-cas";
process.run(concat("rm -rf ", root)); // fresh root, even after a crashed prior run process.run(concat("rm -rf ", root)); // fresh root, even after a crashed prior run
@@ -99,17 +110,44 @@ main :: () -> s32 {
process.assert(!st.has(missing), "failed publish must leave no object"); process.assert(!st.has(missing), "failed publish must leave no object");
print(" atomicity: staged write invisible; failed publish leaves no object\n"); print(" atomicity: staged write invisible; failed publish leaves no object\n");
// ── 4. put_file: file source, same key + bytes ────────────────────── // ── 4. put_file: single source read, key == digest of published object
src := ".sx-tmp/store-cas-src.bin"; src := ".sx-tmp/store-cas-src.bin";
file_bytes := "the quick brown fox\n"; file_bytes := "file-source-bytes-123"; // shell-safe: no spaces/newlines
process.assert(fs.write_file(src, file_bytes), "fixture source file must be written"); process.assert(fs.write_file(src, file_bytes), "fixture source file must be written");
fkey, fe := st.put_file(src); fkey, fe := st.put_file(src);
process.assert(!fe, "put_file must succeed"); process.assert(!fe, "put_file must succeed");
process.assert(fkey == stdhash_key(file_bytes), "put_file key must equal std.hash of the file bytes");
process.assert(st.has(fkey), "put_file object must be published"); process.assert(st.has(fkey), "put_file object must be published");
// The returned key must be the SHA-256 of the bytes ACTUALLY published —
// re-hash the stored object and confirm it equals the key (and equals
// std.hash + shasum -a 256 of the original fixture).
fstored := fs.read_file(st.object_path(fkey)); fstored := fs.read_file(st.object_path(fkey));
process.assert(fstored != null, "published object must be readable");
process.assert(fstored! == file_bytes, "put_file stored bytes must equal the file"); process.assert(fstored! == file_bytes, "put_file stored bytes must equal the file");
print(" put_file: key {} published\n", fkey); process.assert(stdhash_key(fstored!) == fkey, "key must equal SHA-256 of the published object");
process.assert(fkey == stdhash_key(file_bytes), "put_file key must equal std.hash of the file bytes");
process.assert(fkey == shasum_key(file_bytes), "put_file key must equal shasum -a 256");
process.assert(incoming_count(st.staging_dir()) == "0", "put_file must clean up its staging temp");
objs_after_file := entry_count(st.objects_dir());
print(" put_file: key {} == digest(published object)\n", fkey);
// Cross-path dedup: put_bytes of identical content yields the SAME key
// and adds no second object; the stored bytes are not rewritten.
bkey, be := st.put_bytes(file_bytes);
process.assert(!be, "cross-path put_bytes must succeed");
process.assert(bkey == fkey, "put_file and put_bytes of identical content share a key");
process.assert(entry_count(st.objects_dir()) == objs_after_file, "cross-path dedup adds no object");
afterb := fs.read_file(st.object_path(fkey));
process.assert(afterb! == file_bytes, "cross-path dedup must not rewrite the object");
// A repeat put_file hits dedup and also drops its staging temp.
fkey2, fe2 := st.put_file(src);
process.assert(!fe2, "repeat put_file must succeed");
process.assert(fkey2 == fkey, "repeat put_file dedup yields the same key");
process.assert(entry_count(st.objects_dir()) == objs_after_file, "repeat put_file adds no object");
process.assert(incoming_count(st.staging_dir()) == "0", "dedup put_file must clean up its staging temp");
print(" put_file: cross-path dedup, one object, staging cleaned\n");
// ── cleanup ───────────────────────────────────────────────────────── // ── cleanup ─────────────────────────────────────────────────────────
process.run(concat("rm -rf ", root)); process.run(concat("rm -rf ", root));