From 60df3e7cdcec73d72fc721244b279e1ac341bbf3 Mon Sep 17 00:00:00 2001 From: Stephen Gutekanst Date: Sat, 30 Nov 2024 12:22:10 -0700 Subject: [PATCH] object: graph: replace switching-consumer pattern with dedicated graph processing thread Signed-off-by: Stephen Gutekanst --- src/graph.zig | 42 ++++++++++++++++++++---------------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/src/graph.zig b/src/graph.zig index 204941b2..28ae35a2 100644 --- a/src/graph.zig +++ b/src/graph.zig @@ -104,8 +104,8 @@ const Op = union(enum) { /// graph as /operations/ enqueued to a lock-free Multi Producer, Single Consumer (MPSC) FIFO queue. /// /// When an operation is desired (adding a parent to a child, querying the children or parent of a -/// node, etc.) it is enqueued. Then, if the queue contains entries, that thread becomes the -/// consumer of the MPSC queue temporarily and processes all pending operations in the queue. +/// node, etc.) it is enqueued. Then, a background thread processes all pending operations. Atomics +/// are used to wait for reads to complete, and parallel writes are lock-free. /// /// The graph uses lock-free pools to manage all nodes internally, eliminating runtime allocations /// during operation processing. @@ -130,7 +130,15 @@ pub const Graph = struct { preallocate_result_list_size: u32, + /// Thread that processes operations from the queue + thread: ?std.Thread = null, + + /// Flag to signal the processing thread to stop + should_stop: std.atomic.Value(bool) = .init(false), + /// Initialize the graph with the given pre-allocated space for nodes and operations. + /// + /// Spawns a backgroound thread for processing operations to the graph. pub fn init( graph: *Graph, allocator: std.mem.Allocator, @@ -173,9 +181,13 @@ pub const Graph = struct { try list.ensureTotalCapacity(allocator, preallocate.result_list_size); try graph.result_lists.available.append(allocator, list); } + + graph.thread = try std.Thread.spawn(.{ .allocator = allocator }, processThread, .{ graph, allocator }); } pub fn deinit(graph: *Graph, allocator: std.mem.Allocator) void { + graph.should_stop.store(true, .release); + graph.thread.?.join(); for (graph.result_lists.available.items) |list| { list.deinit(allocator); allocator.destroy(list); @@ -193,20 +205,12 @@ pub const Graph = struct { return graph.id_to_node.map.get(id); } - /// Tries to take all queued operations to the graph and, if successful, processes them. - /// - /// A different thread which calls processQueue() may beat us to acquiring all of the queued - /// operations, in which case this function may return before they are processed. - fn processQueue(graph: *Graph, allocator: std.mem.Allocator) void { - if (graph.queue.takeAll()) |nodes| { - defer graph.queue.releaseAll(nodes); - - // Process the entire chain of nodes - var current: ?*Queue(Op).Node = nodes; - while (current) |node| { - graph.processOp(allocator, node.value); - current = node.next; - } + /// The thread that runs continuously in the background to process queue submissions. + fn processThread(graph: *Graph, allocator: std.mem.Allocator) void { + while (!graph.should_stop.load(.acquire)) { + // Process the entire queue + while (graph.queue.pop()) |op| graph.processOp(allocator, op); + std.Thread.yield() catch {}; } } @@ -316,7 +320,6 @@ pub const Graph = struct { .parent_id = parent_id, .child_id = child_id, } }); - graph.processQueue(allocator); } pub fn removeChild(graph: *Graph, allocator: std.mem.Allocator, parent_id: u64, child_id: u64) Error!void { @@ -325,7 +328,6 @@ pub const Graph = struct { .parent_id = parent_id, .child_id = child_id, } }); - graph.processQueue(allocator); } pub fn setParent(graph: *Graph, allocator: std.mem.Allocator, child_id: u64, parent_id: u64) Error!void { @@ -335,14 +337,12 @@ pub const Graph = struct { .child_id = child_id, .parent_id = parent_id, } }); - graph.processQueue(allocator); } pub fn removeParent(graph: *Graph, allocator: std.mem.Allocator, child_id: u64) Error!void { try graph.queue.push(allocator, .{ .remove_parent = .{ .child_id = child_id, } }); - graph.processQueue(allocator); } const Results = struct { @@ -374,7 +374,6 @@ pub const Graph = struct { } }); while (!done.load(.acquire)) { - graph.processQueue(allocator); std.Thread.yield() catch {}; } @@ -429,7 +428,6 @@ pub const Graph = struct { } }); while (!done.load(.acquire)) { - graph.processQueue(allocator); std.Thread.yield() catch {}; }