From d72facda9a1680a1ba5df6b3e9998e8ecad51b8d Mon Sep 17 00:00:00 2001 From: Stephen Gutekanst Date: Sat, 30 Nov 2024 12:18:21 -0700 Subject: [PATCH] object: remove MPSC queue takeAll() in favor of just pop() Signed-off-by: Stephen Gutekanst --- src/mpsc.zig | 173 +-------------------------------------------------- 1 file changed, 2 insertions(+), 171 deletions(-) diff --git a/src/mpsc.zig b/src/mpsc.zig index c6eb7c57..1c0b4567 100644 --- a/src/mpsc.zig +++ b/src/mpsc.zig @@ -1,8 +1,4 @@ -//! MPSC (Multi Producer, Single Consumer) lock-free FIFO queue with atomic batch-take support -//! -//! The queue offers two ways to consume items: -//! 1. As a traditional MPSC queue using pop() - only one consumer thread may call pop() -//! 2. As a multi-consumer queue using takeAll() - multiple threads may compete to take all items +//! MPSC (Multi Producer, Single Consumer) lock-free FIFO queue //! //! Internally, the queue maintains a lock-free atomic pool of batch-allocated nodes for reuse. //! Nodes are acquired and owned exclusively by individual threads, then given to the queue and @@ -15,10 +11,8 @@ //! 1. FIFO ordering is maintained //! 2. Multiple threads can always push() items in parallel //! 3. No locks/mutexes are needed -//! 4. Multiple consumer threads can safely compete to takeAll() (only one succeeds), or a single -//! consumer thread may pop(). +//! 4. A single consumer thread may pop(). //! -//! Note: takeAll() is parallel-safe, but pop() is never parallel-safe with itself or takeAll(). const std = @import("std"); /// Lock-free atomic pool of nodes for memory allocation @@ -293,92 +287,6 @@ pub fn Queue(comptime Value: type) type { } } - /// Attempts to atomically take all nodes from the queue, returning the chain of all nodes - /// currently in the queue, or null if (a) the queue is empty or (b) another takeAll() - /// consumer beat us to taking it all. - /// - /// This operation is safe to call from multiple threads in parallel - only one will succeed - /// in taking the nodes. Although takeAll() is safe to call in parallel, pop() may not be - /// called in parallel with itself or takeAll(). - /// - /// Caller takes ownership of all nodes up to the head at time of operation, and must free - /// the entire chain by calling releaseAll(returned_node.?) later. - /// - /// The returned node is the first node in FIFO order, i.e. node is 1st, node.next is 2nd, - /// node.next.next is 3rd, and so on in FIFO order. - pub fn takeAll(q: *@This()) ?*Node { - outer: while (true) { - var tail = q.tail; - const next = @atomicLoad(?*Node, &tail.next, .acquire); - - // First reset head to empty node atomically to ensure new pushes will link to [empty] - // rather than our taken chain. This also acts as our point of taking ownership and - // competing against other parallel takeAll() invocations. - // - // Before: head -> [A] - // After: head -> [empty] - while (true) { - const current_head = @atomicLoad(*Node, &q.head, .acquire); - if (current_head == &q.empty) { - // Another takeAll won - return null; - } - if (@cmpxchgStrong(*Node, &q.head, current_head, &q.empty, .acq_rel, .acquire)) |_| { - continue; - } - break; - } - - // Handle empty node advancement if needed - if (tail == &q.empty) { - if (next) |tail_next| { - // Before: tail -> [empty] -> [A] <- head - // After: tail -> [A] <- head - if (@cmpxchgStrong(*Node, &q.tail, tail, tail_next, .acq_rel, .acquire)) |_| { - continue :outer; - } - tail = tail_next; - } else return null; // State: tail -> [empty] <- head - } - - // Try to take ownership of the chain - // - // Before: tail -> [B] -> [A] <- head=[empty] - // After: tail=[empty] - // Return: [B] -> [A] - if (@cmpxchgStrong(*Node, &q.tail, tail, &q.empty, .acq_rel, .acquire)) |_| { - // Lost race (with another takeAll() or pop()), retry from start - continue :outer; - } - - // Ensure all previous atomic operations (including linking) are complete - // Specifically this part of pushRaw(): - // - // // Link previous node to new node - // @atomicStore(?*Node, &prev.next, node, .release); - // - _ = @atomicLoad(*Node, &q.head, .acquire); - - return tail; - } - } - - /// Release a chain of nodes back to the pool starting from the given node. - /// Used to return nodes acquired via takeAll() back to the pool. - /// - /// State: start -> [B] -> [A] -> null - /// After: (all nodes returned to pool) - pub fn releaseAll(q: *@This(), start: *Node) void { - var current = start; - while (true) { - const next = current.next; - current.next = null; - q.pool.release(current); - if (next == null) break; - current = next.?; - } - } - pub fn deinit(q: *@This(), allocator: std.mem.Allocator) void { q.pool.deinit(allocator); } @@ -403,80 +311,3 @@ test "basic" { try std.testing.expectEqual(queue.pop(), 3); try std.testing.expectEqual(queue.pop(), null); } - -test "takeAll" { - const allocator = std.testing.allocator; - - var queue: Queue(u32) = undefined; - try queue.init(allocator, 32); - defer queue.deinit(allocator); - - // Take empty queue - try std.testing.expectEqual(queue.takeAll(), null); - try std.testing.expect(queue.head == &queue.empty); - try std.testing.expect(queue.tail == &queue.empty); - - // Take single-element queue - try queue.push(allocator, 1); - if (queue.takeAll()) |nodes| { - defer queue.releaseAll(nodes); - try std.testing.expectEqual(nodes.value, 1); - try std.testing.expectEqual(nodes.next, null); - try std.testing.expect(queue.head == &queue.empty); - try std.testing.expect(queue.tail == &queue.empty); - } else { - return error.TestUnexpectedNull; - } - - // Take empty queue again - try std.testing.expectEqual(queue.takeAll(), null); - try std.testing.expect(queue.head == &queue.empty); - try std.testing.expect(queue.tail == &queue.empty); - - // Multiple elements with push after takeAll - try queue.push(allocator, 1); - try queue.push(allocator, 2); - try queue.push(allocator, 3); - if (queue.takeAll()) |nodes| { - try std.testing.expectEqual(nodes.value, 1); - try std.testing.expectEqual(nodes.next.?.value, 2); - try std.testing.expectEqual(nodes.next.?.next.?.value, 3); - try std.testing.expectEqual(nodes.next.?.next.?.next, null); - try std.testing.expect(queue.head == &queue.empty); - try std.testing.expect(queue.tail == &queue.empty); - - // Push while holding taken nodes - try queue.push(allocator, 42); - try std.testing.expect(queue.head != &queue.empty); - try std.testing.expect(queue.tail == &queue.empty); - - // Then release held nodes - queue.releaseAll(nodes); - } else return error.TestUnexpectedNull; - - // Verify queue state after all operations - try std.testing.expectEqual(queue.pop(), 42); - try std.testing.expectEqual(queue.pop(), null); - try std.testing.expect(queue.head == &queue.empty); - try std.testing.expect(queue.tail == &queue.empty); -} - -test "single takeAll" { - const allocator = std.testing.allocator; - - var queue: Queue(u32) = undefined; - try queue.init(allocator, 32); - defer queue.deinit(allocator); - - try queue.push(allocator, 1); - - if (queue.takeAll()) |nodes| { - try std.testing.expectEqual(nodes.value, 1); - try std.testing.expectEqual(nodes.next, null); - try std.testing.expect(queue.head == &queue.empty); - try std.testing.expect(queue.tail == &queue.empty); - - // Then release held nodes - queue.releaseAll(nodes); - } else return error.TestUnexpectedNull; -}