about summary refs log tree commit diff stats
path: root/archive/2025/summer/bsc_karidas/src/BufferQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'archive/2025/summer/bsc_karidas/src/BufferQueue.cpp')
-rw-r--r--archive/2025/summer/bsc_karidas/src/BufferQueue.cpp131
1 files changed, 131 insertions, 0 deletions
diff --git a/archive/2025/summer/bsc_karidas/src/BufferQueue.cpp b/archive/2025/summer/bsc_karidas/src/BufferQueue.cpp
new file mode 100644
index 000000000..3d9af1c0c
--- /dev/null
+++ b/archive/2025/summer/bsc_karidas/src/BufferQueue.cpp
@@ -0,0 +1,131 @@
+#include "BufferQueue.hpp"
+#include <algorithm>
+#include <thread>
+#include <iostream>
+#include <chrono>
+#include <cmath>
+
+BufferQueue::BufferQueue(size_t capacity, size_t maxExplicitProducers)
+{
+    m_queue = moodycamel::ConcurrentQueue<QueueItem>(capacity, maxExplicitProducers, 0);
+}
+
+bool BufferQueue::enqueue(QueueItem item, ProducerToken &token)
+{
+    return m_queue.try_enqueue(token, std::move(item));
+}
+
+bool BufferQueue::enqueueBlocking(QueueItem item, ProducerToken &token, std::chrono::milliseconds timeout)
+{
+    auto start = std::chrono::steady_clock::now();
+    int backoffMs = 1;
+    const int maxBackoffMs = 100;
+
+    while (true)
+    {
+        QueueItem itemCopy = item;
+        if (enqueue(std::move(itemCopy), token))
+        {
+            return true;
+        }
+
+        auto elapsed = std::chrono::steady_clock::now() - start;
+        if (elapsed >= timeout)
+        {
+            return false;
+        }
+
+        int sleepTime = backoffMs;
+
+        // Make sure we don't sleep longer than our remaining timeout
+        if (timeout != std::chrono::milliseconds::max())
+        {
+            auto remainingTime = timeout - elapsed;
+            if (remainingTime <= std::chrono::milliseconds(sleepTime))
+            {
+                sleepTime = std::max(1, static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(remainingTime).count()));
+            }
+        }
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime));
+        backoffMs = std::min(backoffMs * 2, maxBackoffMs);
+    }
+}
+
+bool BufferQueue::enqueueBatch(std::vector<QueueItem> items, ProducerToken &token)
+{
+    return m_queue.try_enqueue_bulk(token, std::make_move_iterator(items.begin()), items.size());
+}
+
+bool BufferQueue::enqueueBatchBlocking(std::vector<QueueItem> items, ProducerToken &token,
+                                       std::chrono::milliseconds timeout)
+{
+    auto start = std::chrono::steady_clock::now();
+    int backoffMs = 1;
+    const int maxBackoffMs = 100;
+
+    while (true)
+    {
+        std::vector<QueueItem> itemsCopy = items;
+        if (enqueueBatch(std::move(itemsCopy), token))
+        {
+            return true;
+        }
+
+        auto elapsed = std::chrono::steady_clock::now() - start;
+        if (elapsed >= timeout)
+        {
+            return false;
+        }
+
+        int sleepTime = backoffMs;
+
+        // Make sure we don't sleep longer than our remaining timeout
+        if (timeout != std::chrono::milliseconds::max())
+        {
+            auto remainingTime = timeout - elapsed;
+            if (remainingTime <= std::chrono::milliseconds(sleepTime))
+            {
+                sleepTime = std::max(1, static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(remainingTime).count()));
+            }
+        }
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime));
+        backoffMs = std::min(backoffMs * 2, maxBackoffMs);
+    }
+}
+
+bool BufferQueue::tryDequeue(QueueItem &item, ConsumerToken &token)
+{
+    if (m_queue.try_dequeue(token, item))
+    {
+        return true;
+    }
+    return false;
+}
+
+size_t BufferQueue::tryDequeueBatch(std::vector<QueueItem> &items, size_t maxItems, ConsumerToken &token)
+{
+    items.clear();
+    items.resize(maxItems);
+
+    size_t dequeued = m_queue.try_dequeue_bulk(token, items.begin(), maxItems);
+    items.resize(dequeued);
+
+    return dequeued;
+}
+
+bool BufferQueue::flush()
+{
+    do
+    {
+        std::this_thread::sleep_for(std::chrono::milliseconds(500));
+    } while (m_queue.size_approx() != 0);
+
+    return true;
+}
+
+size_t BufferQueue::size() const
+{
+    return m_queue.size_approx();
+}
\ No newline at end of file