about summary refs log tree commit diff stats
path: root/archive/2025/summer/bsc_karidas/src/Writer.cpp
blob: bc3fe9acd2488a91a53a141735951c849bef85b8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include "Writer.hpp"
#include "Crypto.hpp"
#include "Compression.hpp"
#include <iostream>
#include <chrono>
#include <map>

Writer::Writer(BufferQueue &queue,
               std::shared_ptr<SegmentedStorage> storage,
               size_t batchSize,
               bool useEncryption,
               int compressionLevel)
    : m_queue(queue),
      m_storage(std::move(storage)),
      m_batchSize(batchSize),
      m_useEncryption(useEncryption),
      m_compressionLevel(compressionLevel),
      m_consumerToken(queue.createConsumerToken())
{
}

Writer::~Writer()
{
    stop();
}

void Writer::start()
{
    if (m_running.exchange(true))
    {
        return;
    }

    m_writerThread.reset(new std::thread(&Writer::processLogEntries, this));
}

void Writer::stop()
{
    if (m_running.exchange(false))
    {
        if (m_writerThread && m_writerThread->joinable())
        {
            m_writerThread->join();
        }
    }
}

bool Writer::isRunning() const
{
    return m_running.load();
}

void Writer::processLogEntries()
{
    std::vector<QueueItem> batch;

    Crypto crypto;
    std::vector<uint8_t> encryptionKey(crypto.KEY_SIZE, 0x42); // dummy key
    std::vector<uint8_t> dummyIV(crypto.GCM_IV_SIZE, 0x24);    // dummy IV

    while (m_running)
    {
        size_t entriesDequeued = m_queue.tryDequeueBatch(batch, m_batchSize, m_consumerToken);
        if (entriesDequeued == 0)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(5));
            continue;
        }

        std::map<std::optional<std::string>, std::vector<LogEntry>> groupedEntries;
        for (auto &item : batch)
        {
            groupedEntries[item.targetFilename].emplace_back(std::move(item.entry));
        }

        for (auto &[targetFilename, entries] : groupedEntries)
        {
            std::vector<uint8_t> processedData = LogEntry::serializeBatch(std::move(entries));

            // Apply compression if enabled
            if (m_compressionLevel > 0)
            {
                processedData = Compression::compress(std::move(processedData), m_compressionLevel);
            }
            // Apply encryption if enabled
            if (m_useEncryption)
            {
                processedData = crypto.encrypt(std::move(processedData), encryptionKey, dummyIV);
            }

            if (targetFilename)
            {
                m_storage->writeToFile(*targetFilename, std::move(processedData));
            }
            else
            {
                m_storage->write(std::move(processedData));
            }
        }

        batch.clear();
    }
}