diff options
Diffstat (limited to 'archive/2025/summer/bsc_karidas/src/Writer.cpp')
| -rw-r--r-- | archive/2025/summer/bsc_karidas/src/Writer.cpp | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/archive/2025/summer/bsc_karidas/src/Writer.cpp b/archive/2025/summer/bsc_karidas/src/Writer.cpp new file mode 100644 index 000000000..bc3fe9acd --- /dev/null +++ b/archive/2025/summer/bsc_karidas/src/Writer.cpp @@ -0,0 +1,103 @@ +#include "Writer.hpp" +#include "Crypto.hpp" +#include "Compression.hpp" +#include <iostream> +#include <chrono> +#include <map> + +Writer::Writer(BufferQueue &queue, + std::shared_ptr<SegmentedStorage> storage, + size_t batchSize, + bool useEncryption, + int compressionLevel) + : m_queue(queue), + m_storage(std::move(storage)), + m_batchSize(batchSize), + m_useEncryption(useEncryption), + m_compressionLevel(compressionLevel), + m_consumerToken(queue.createConsumerToken()) +{ +} + +Writer::~Writer() +{ + stop(); +} + +void Writer::start() +{ + if (m_running.exchange(true)) + { + return; + } + + m_writerThread.reset(new std::thread(&Writer::processLogEntries, this)); +} + +void Writer::stop() +{ + if (m_running.exchange(false)) + { + if (m_writerThread && m_writerThread->joinable()) + { + m_writerThread->join(); + } + } +} + +bool Writer::isRunning() const +{ + return m_running.load(); +} + +void Writer::processLogEntries() +{ + std::vector<QueueItem> batch; + + Crypto crypto; + std::vector<uint8_t> encryptionKey(crypto.KEY_SIZE, 0x42); // dummy key + std::vector<uint8_t> dummyIV(crypto.GCM_IV_SIZE, 0x24); // dummy IV + + while (m_running) + { + size_t entriesDequeued = m_queue.tryDequeueBatch(batch, m_batchSize, m_consumerToken); + if (entriesDequeued == 0) + { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + continue; + } + + std::map<std::optional<std::string>, std::vector<LogEntry>> groupedEntries; + for (auto &item : batch) + { + groupedEntries[item.targetFilename].emplace_back(std::move(item.entry)); + } + + for (auto &[targetFilename, entries] : groupedEntries) + { + std::vector<uint8_t> processedData = LogEntry::serializeBatch(std::move(entries)); + + // Apply compression if enabled + if (m_compressionLevel > 0) + { + processedData = Compression::compress(std::move(processedData), m_compressionLevel); + } + // Apply encryption if enabled + if (m_useEncryption) + { + processedData = crypto.encrypt(std::move(processedData), encryptionKey, dummyIV); + } + + if (targetFilename) + { + m_storage->writeToFile(*targetFilename, std::move(processedData)); + } + else + { + m_storage->write(std::move(processedData)); + } + } + + batch.clear(); + } +} \ No newline at end of file |