distd: handlers run on a 4-worker pool; writes serialize (PLAN-HTTPZ A2)
thread_pool_count = 4 turns on std.http's pooled dispatch: reads (store load + render per request, fully independent) now run in parallel across workers. POST/PUT stay under one mutex — every write is a whole-model load-modify-save on dist.db, so concurrent writers would lose updates; the old sequential loop serialized them by accident, this serializes them on purpose.
This commit is contained in:
@@ -1015,6 +1015,11 @@ route :: (store_dir: string, req: *http.Request, resp: *http.Response) {
|
|||||||
// Per-request allocations land in std.http's per-dispatch arena.
|
// Per-request allocations land in std.http's per-dispatch arena.
|
||||||
DistdCtx :: struct {
|
DistdCtx :: struct {
|
||||||
store_dir: string;
|
store_dir: string;
|
||||||
|
// Writes serialize: every POST/PUT is a whole-model load-modify-save
|
||||||
|
// on dist.db, so two concurrent writers would lose updates. Reads
|
||||||
|
// run concurrently across the pool (per-request loads are
|
||||||
|
// independent; SQLite's busy_timeout covers reader/writer overlap).
|
||||||
|
write_mu: thread.Mutex = .{};
|
||||||
}
|
}
|
||||||
|
|
||||||
distd_handle :: (req: *http.Request, resp: *http.Response, ctx: usize) {
|
distd_handle :: (req: *http.Request, resp: *http.Response, ctx: usize) {
|
||||||
@@ -1030,7 +1035,10 @@ distd_handle :: (req: *http.Request, resp: *http.Response, ctx: usize) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !refused {
|
if !refused {
|
||||||
|
is_write := req.method == "POST" or req.method == "PUT";
|
||||||
|
if is_write { dctx.write_mu.lock(); }
|
||||||
route(store_dir, req, resp);
|
route(store_dir, req, resp);
|
||||||
|
if is_write { dctx.write_mu.unlock(); }
|
||||||
}
|
}
|
||||||
line := concat("distd: ", concat(req.method, concat(" ", concat(req.path, concat(" -> ", concat(int_to_string(resp.status), "\n"))))));
|
line := concat("distd: ", concat(req.method, concat(" ", concat(req.path, concat(" -> ", concat(int_to_string(resp.status), "\n"))))));
|
||||||
slog(line);
|
slog(line);
|
||||||
@@ -1042,6 +1050,7 @@ distd_handle :: (req: *http.Request, resp: *http.Response, ctx: usize) {
|
|||||||
// can't be opened.
|
// can't be opened.
|
||||||
run_server :: (store_dir: string, port: i64) -> !ServeErr {
|
run_server :: (store_dir: string, port: i64) -> !ServeErr {
|
||||||
dctx : DistdCtx = .{ store_dir = store_dir };
|
dctx : DistdCtx = .{ store_dir = store_dir };
|
||||||
|
if !dctx.write_mu.setup() { raise error.Bind; }
|
||||||
cfg : http.Config = .{
|
cfg : http.Config = .{
|
||||||
port = port,
|
port = port,
|
||||||
max_conn = 256,
|
max_conn = 256,
|
||||||
@@ -1049,6 +1058,7 @@ run_server :: (store_dir: string, port: i64) -> !ServeErr {
|
|||||||
timeout_request_ms = 120000, // a large upload must complete within this
|
timeout_request_ms = 120000, // a large upload must complete within this
|
||||||
timeout_keepalive_ms = 5000,
|
timeout_keepalive_ms = 5000,
|
||||||
request_count = 200,
|
request_count = 200,
|
||||||
|
thread_pool_count = 4, // reads in parallel; writes serialize above
|
||||||
};
|
};
|
||||||
srv, se := http.Server.init(cfg, distd_handle, xx @dctx);
|
srv, se := http.Server.init(cfg, distd_handle, xx @dctx);
|
||||||
if se { raise error.Bind; }
|
if se { raise error.Bind; }
|
||||||
|
|||||||
Reference in New Issue
Block a user