diff options
| author | Dimitris <dimstav23@gmail.com> | 2025-07-15 15:11:36 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-15 15:11:36 +0200 |
| commit | 73e505f04d17eba36c41fce7b48bc4d6884b8fd0 (patch) | |
| tree | 44b5f4627309a48d6f22b54bb2ad9a2976e8601b /archive/2025/summer/bsc_karidas/src | |
| parent | ca92e7ad181a02890496872012ecc6c1d08b1658 (diff) | |
| parent | d8c365681a41961ebe2daea5701a4d56f5400d1d (diff) | |
| download | research-work-archive-artifacts-73e505f04d17eba36c41fce7b48bc4d6884b8fd0.tar.gz research-work-archive-artifacts-73e505f04d17eba36c41fce7b48bc4d6884b8fd0.zip | |
Merge pull request #6 from chriskari/upload-artifacts
Add bsc_karidas
Diffstat (limited to 'archive/2025/summer/bsc_karidas/src')
| -rw-r--r-- | archive/2025/summer/bsc_karidas/src/BufferQueue.cpp | 131 | ||||
| -rw-r--r-- | archive/2025/summer/bsc_karidas/src/Compression.cpp | 109 | ||||
| -rw-r--r-- | archive/2025/summer/bsc_karidas/src/Crypto.cpp | 211 | ||||
| -rw-r--r-- | archive/2025/summer/bsc_karidas/src/LogEntry.cpp | 375 | ||||
| -rw-r--r-- | archive/2025/summer/bsc_karidas/src/Logger.cpp | 139 | ||||
| -rw-r--r-- | archive/2025/summer/bsc_karidas/src/LoggingManager.cpp | 145 | ||||
| -rw-r--r-- | archive/2025/summer/bsc_karidas/src/SegmentedStorage.cpp | 303 | ||||
| -rw-r--r-- | archive/2025/summer/bsc_karidas/src/Writer.cpp | 103 |
8 files changed, 1516 insertions, 0 deletions
diff --git a/archive/2025/summer/bsc_karidas/src/BufferQueue.cpp b/archive/2025/summer/bsc_karidas/src/BufferQueue.cpp new file mode 100644 index 000000000..3d9af1c0c --- /dev/null +++ b/archive/2025/summer/bsc_karidas/src/BufferQueue.cpp @@ -0,0 +1,131 @@ +#include "BufferQueue.hpp" +#include <algorithm> +#include <thread> +#include <iostream> +#include <chrono> +#include <cmath> + +BufferQueue::BufferQueue(size_t capacity, size_t maxExplicitProducers) +{ + m_queue = moodycamel::ConcurrentQueue<QueueItem>(capacity, maxExplicitProducers, 0); +} + +bool BufferQueue::enqueue(QueueItem item, ProducerToken &token) +{ + return m_queue.try_enqueue(token, std::move(item)); +} + +bool BufferQueue::enqueueBlocking(QueueItem item, ProducerToken &token, std::chrono::milliseconds timeout) +{ + auto start = std::chrono::steady_clock::now(); + int backoffMs = 1; + const int maxBackoffMs = 100; + + while (true) + { + QueueItem itemCopy = item; + if (enqueue(std::move(itemCopy), token)) + { + return true; + } + + auto elapsed = std::chrono::steady_clock::now() - start; + if (elapsed >= timeout) + { + return false; + } + + int sleepTime = backoffMs; + + // Make sure we don't sleep longer than our remaining timeout + if (timeout != std::chrono::milliseconds::max()) + { + auto remainingTime = timeout - elapsed; + if (remainingTime <= std::chrono::milliseconds(sleepTime)) + { + sleepTime = std::max(1, static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(remainingTime).count())); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + backoffMs = std::min(backoffMs * 2, maxBackoffMs); + } +} + +bool BufferQueue::enqueueBatch(std::vector<QueueItem> items, ProducerToken &token) +{ + return m_queue.try_enqueue_bulk(token, std::make_move_iterator(items.begin()), items.size()); +} + +bool BufferQueue::enqueueBatchBlocking(std::vector<QueueItem> items, ProducerToken &token, + std::chrono::milliseconds timeout) +{ + auto start = std::chrono::steady_clock::now(); + int backoffMs = 1; + const int maxBackoffMs = 100; + + while (true) + { + std::vector<QueueItem> itemsCopy = items; + if (enqueueBatch(std::move(itemsCopy), token)) + { + return true; + } + + auto elapsed = std::chrono::steady_clock::now() - start; + if (elapsed >= timeout) + { + return false; + } + + int sleepTime = backoffMs; + + // Make sure we don't sleep longer than our remaining timeout + if (timeout != std::chrono::milliseconds::max()) + { + auto remainingTime = timeout - elapsed; + if (remainingTime <= std::chrono::milliseconds(sleepTime)) + { + sleepTime = std::max(1, static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(remainingTime).count())); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + backoffMs = std::min(backoffMs * 2, maxBackoffMs); + } +} + +bool BufferQueue::tryDequeue(QueueItem &item, ConsumerToken &token) +{ + if (m_queue.try_dequeue(token, item)) + { + return true; + } + return false; +} + +size_t BufferQueue::tryDequeueBatch(std::vector<QueueItem> &items, size_t maxItems, ConsumerToken &token) +{ + items.clear(); + items.resize(maxItems); + + size_t dequeued = m_queue.try_dequeue_bulk(token, items.begin(), maxItems); + items.resize(dequeued); + + return dequeued; +} + +bool BufferQueue::flush() +{ + do + { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } while (m_queue.size_approx() != 0); + + return true; +} + +size_t BufferQueue::size() const +{ + return m_queue.size_approx(); +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/src/Compression.cpp b/archive/2025/summer/bsc_karidas/src/Compression.cpp new file mode 100644 index 000000000..cd25c9310 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/src/Compression.cpp @@ -0,0 +1,109 @@ +#include "Compression.hpp" +#include <stdexcept> +#include <cstring> +#include <iostream> + +// Helper function to compress raw data using zlib +std::vector<uint8_t> Compression::compress(std::vector<uint8_t> &&data, int level) +{ + if (data.empty()) + { + return std::vector<uint8_t>(); + } + + z_stream zs; + std::memset(&zs, 0, sizeof(zs)); + + // Use the provided compression level instead of hardcoded Z_BEST_COMPRESSION + if (deflateInit(&zs, level) != Z_OK) + { + throw std::runtime_error("Failed to initialize zlib deflate"); + } + + zs.next_in = const_cast<Bytef *>(data.data()); + zs.avail_in = data.size(); + + int ret; + char outbuffer[32768]; + std::vector<uint8_t> compressedData; + + // Compress data in chunks + do + { + zs.next_out = reinterpret_cast<Bytef *>(outbuffer); + zs.avail_out = sizeof(outbuffer); + + ret = deflate(&zs, Z_FINISH); + + if (compressedData.size() < zs.total_out) + { + compressedData.insert(compressedData.end(), + outbuffer, + outbuffer + (zs.total_out - compressedData.size())); + } + } while (ret == Z_OK); + + deflateEnd(&zs); + + if (ret != Z_STREAM_END) + { + throw std::runtime_error("Exception during zlib compression"); + } + + return compressedData; +} + +// Helper function to decompress raw data using zlib +std::vector<uint8_t> Compression::decompress(std::vector<uint8_t> &&compressedData) +{ + if (compressedData.empty()) + { + return std::vector<uint8_t>(); + } + + z_stream zs; + std::memset(&zs, 0, sizeof(zs)); + + if (inflateInit(&zs) != Z_OK) + { + throw std::runtime_error("Failed to initialize zlib inflate"); + } + + zs.next_in = const_cast<Bytef *>(compressedData.data()); + zs.avail_in = compressedData.size(); + + int ret; + char outbuffer[32768]; + std::vector<uint8_t> decompressedData; + + // Decompress data in chunks + do + { + zs.next_out = reinterpret_cast<Bytef *>(outbuffer); + zs.avail_out = sizeof(outbuffer); + + ret = inflate(&zs, Z_NO_FLUSH); + + if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR) + { + inflateEnd(&zs); + throw std::runtime_error("Exception during zlib decompression"); + } + + if (decompressedData.size() < zs.total_out) + { + decompressedData.insert(decompressedData.end(), + outbuffer, + outbuffer + (zs.total_out - decompressedData.size())); + } + } while (ret == Z_OK); + + inflateEnd(&zs); + + if (ret != Z_STREAM_END) + { + throw std::runtime_error("Exception during zlib decompression"); + } + + return decompressedData; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/src/Crypto.cpp b/archive/2025/summer/bsc_karidas/src/Crypto.cpp new file mode 100644 index 000000000..0e7912721 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/src/Crypto.cpp @@ -0,0 +1,211 @@ +#include "Crypto.hpp" +#include <openssl/evp.h> +#include <openssl/rand.h> +#include <openssl/err.h> +#include <stdexcept> +#include <cstring> +#include <iostream> + +Crypto::Crypto() +{ + // Initialize OpenSSL + OpenSSL_add_all_algorithms(); + m_encryptCtx = EVP_CIPHER_CTX_new(); + if (!m_encryptCtx) + { + throw std::runtime_error("Failed to create encryption context"); + } + + m_decryptCtx = EVP_CIPHER_CTX_new(); + if (!m_decryptCtx) + { + EVP_CIPHER_CTX_free(m_encryptCtx); + throw std::runtime_error("Failed to create decryption context"); + } +} + +Crypto::~Crypto() +{ + // Free contexts + if (m_encryptCtx) + { + EVP_CIPHER_CTX_free(m_encryptCtx); + } + + if (m_decryptCtx) + { + EVP_CIPHER_CTX_free(m_decryptCtx); + } + + // Clean up OpenSSL + EVP_cleanup(); +} + +// Encrypt data using AES-256-GCM with provided IV +std::vector<uint8_t> Crypto::encrypt(std::vector<uint8_t> &&plaintext, + const std::vector<uint8_t> &key, + const std::vector<uint8_t> &iv) +{ + if (plaintext.empty()) + return {}; + if (key.size() != KEY_SIZE) + throw std::runtime_error("Invalid key size"); + if (iv.size() != GCM_IV_SIZE) + throw std::runtime_error("Invalid IV size"); + + // Reset the existing context instead of creating a new one + EVP_CIPHER_CTX_reset(m_encryptCtx); + + // Initialize encryption operation + if (EVP_EncryptInit_ex(m_encryptCtx, EVP_aes_256_gcm(), nullptr, key.data(), iv.data()) != 1) + { + throw std::runtime_error("Failed to initialize encryption"); + } + + // Calculate the exact output size: size_field + ciphertext + tag + // For GCM mode, ciphertext size equals plaintext size (no padding) + const size_t sizeFieldSize = sizeof(uint32_t); + const size_t ciphertextSize = plaintext.size(); + const size_t totalSize = sizeFieldSize + ciphertextSize + GCM_TAG_SIZE; + + // Pre-allocate result buffer with exact final size + std::vector<uint8_t> result(totalSize); + + // Reserve space for data size field + uint32_t dataSize = ciphertextSize; + std::memcpy(result.data(), &dataSize, sizeFieldSize); + + // Perform encryption directly into the result buffer (after the size field) + int encryptedLen = 0; + if (EVP_EncryptUpdate(m_encryptCtx, result.data() + sizeFieldSize, &encryptedLen, + plaintext.data(), plaintext.size()) != 1) + { + throw std::runtime_error("Failed during encryption update"); + } + + // Finalize encryption (writing to the buffer right after the existing encrypted data) + int finalLen = 0; + if (EVP_EncryptFinal_ex(m_encryptCtx, result.data() + sizeFieldSize + encryptedLen, &finalLen) != 1) + { + throw std::runtime_error("Failed to finalize encryption"); + } + + // Sanity check: for GCM, encryptedLen + finalLen should equal plaintext.size() + if (encryptedLen + finalLen != static_cast<int>(plaintext.size())) + { + throw std::runtime_error("Unexpected encryption output size"); + } + + // Get the authentication tag and write it directly to the result buffer + if (EVP_CIPHER_CTX_ctrl(m_encryptCtx, EVP_CTRL_GCM_GET_TAG, GCM_TAG_SIZE, + result.data() + sizeFieldSize + ciphertextSize) != 1) + { + throw std::runtime_error("Failed to get authentication tag"); + } + + return result; +} + +// Decrypt data using AES-256-GCM with provided IV +std::vector<uint8_t> Crypto::decrypt(const std::vector<uint8_t> &encryptedData, + const std::vector<uint8_t> &key, + const std::vector<uint8_t> &iv) +{ + try + { + if (encryptedData.empty()) + { + return std::vector<uint8_t>(); + } + + // Validate key size + if (key.size() != KEY_SIZE) + { + throw std::runtime_error("Invalid key size. Expected 32 bytes for AES-256"); + } + + // Validate IV size + if (iv.size() != GCM_IV_SIZE) + { + throw std::runtime_error("Invalid IV size. Expected 12 bytes for GCM"); + } + + // Ensure we have at least enough data for the data size field + if (encryptedData.size() < sizeof(uint32_t)) + { + throw std::runtime_error("Encrypted data too small - missing data size"); + } + + // Extract the encrypted data size + uint32_t dataSize; + std::memcpy(&dataSize, encryptedData.data(), sizeof(dataSize)); + size_t position = sizeof(dataSize); + + // Validate data size + if (position + dataSize > encryptedData.size()) + { + throw std::runtime_error("Encrypted data too small - missing complete data"); + } + + // Extract the encrypted data + std::vector<uint8_t> ciphertext(dataSize); + std::memcpy(ciphertext.data(), encryptedData.data() + position, dataSize); + position += dataSize; + + // Extract the authentication tag + if (position + GCM_TAG_SIZE > encryptedData.size()) + { + throw std::runtime_error("Encrypted data too small - missing authentication tag"); + } + + std::vector<uint8_t> tag(GCM_TAG_SIZE); + std::memcpy(tag.data(), encryptedData.data() + position, GCM_TAG_SIZE); + + // Reset the existing context instead of creating a new one + EVP_CIPHER_CTX_reset(m_decryptCtx); + + // Initialize decryption operation + if (EVP_DecryptInit_ex(m_decryptCtx, EVP_aes_256_gcm(), nullptr, key.data(), iv.data()) != 1) + { + throw std::runtime_error("Failed to initialize decryption"); + } + + // Set expected tag value + if (EVP_CIPHER_CTX_ctrl(m_decryptCtx, EVP_CTRL_GCM_SET_TAG, GCM_TAG_SIZE, tag.data()) != 1) + { + throw std::runtime_error("Failed to set authentication tag"); + } + + // Prepare output buffer for plaintext + std::vector<uint8_t> decryptedData(ciphertext.size()); + int decryptedLen = 0; + + // Perform decryption + if (EVP_DecryptUpdate(m_decryptCtx, decryptedData.data(), &decryptedLen, + ciphertext.data(), ciphertext.size()) != 1) + { + throw std::runtime_error("Failed during decryption update"); + } + + // Finalize decryption and verify tag + int finalLen = 0; + int ret = EVP_DecryptFinal_ex(m_decryptCtx, decryptedData.data() + decryptedLen, &finalLen); + + if (ret != 1) + { + throw std::runtime_error("Authentication failed: data may have been tampered with"); + } + + // Resize the decrypted data to the actual length + decryptedData.resize(decryptedLen + finalLen); + + return decryptedData; + } + catch (const std::exception &e) + { + std::cerr << "Error decrypting data: " << e.what() << std::endl; + // Print OpenSSL error queue + ERR_print_errors_fp(stderr); + return std::vector<uint8_t>(); + } +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/src/LogEntry.cpp b/archive/2025/summer/bsc_karidas/src/LogEntry.cpp new file mode 100644 index 000000000..af487a29d --- /dev/null +++ b/archive/2025/summer/bsc_karidas/src/LogEntry.cpp @@ -0,0 +1,375 @@ +#include "LogEntry.hpp" +#include <cstring> +#include <stdexcept> +#include <iostream> + +LogEntry::LogEntry() + : m_actionType(ActionType::CREATE), + m_dataLocation(""), + m_dataControllerId(""), + m_dataProcessorId(""), + m_dataSubjectId(""), + m_timestamp(std::chrono::system_clock::now()), + m_payload() {} + +LogEntry::LogEntry(ActionType actionType, + std::string dataLocation, + std::string dataControllerId, + std::string dataProcessorId, + std::string dataSubjectId, + std::vector<uint8_t> payload) + : m_actionType(actionType), + m_dataLocation(std::move(dataLocation)), + m_dataControllerId(std::move(dataControllerId)), + m_dataProcessorId(std::move(dataProcessorId)), + m_dataSubjectId(std::move(dataSubjectId)), + m_timestamp(std::chrono::system_clock::now()), + m_payload(std::move(payload)) +{ +} + +// Move version that consumes the LogEntry +std::vector<uint8_t> LogEntry::serialize() && +{ + // Calculate required size upfront + size_t totalSize = + sizeof(int) + // ActionType + sizeof(uint32_t) + m_dataLocation.size() + // Size + data location + sizeof(uint32_t) + m_dataControllerId.size() + // Size + data controller ID + sizeof(uint32_t) + m_dataProcessorId.size() + // Size + data processor ID + sizeof(uint32_t) + m_dataSubjectId.size() + // Size + data subject ID + sizeof(int64_t) + // Timestamp + sizeof(uint32_t) + m_payload.size(); // Size + payload data + + // Pre-allocate the vector + std::vector<uint8_t> result; + result.reserve(totalSize); + + // Push ActionType + int actionType = static_cast<int>(m_actionType); + appendToVector(result, &actionType, sizeof(actionType)); + + // Move strings + appendStringToVector(result, std::move(m_dataLocation)); + appendStringToVector(result, std::move(m_dataControllerId)); + appendStringToVector(result, std::move(m_dataProcessorId)); + appendStringToVector(result, std::move(m_dataSubjectId)); + + // Push timestamp + int64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>( + m_timestamp.time_since_epoch()) + .count(); + appendToVector(result, ×tamp, sizeof(timestamp)); + + // Move payload + uint32_t payloadSize = static_cast<uint32_t>(m_payload.size()); + appendToVector(result, &payloadSize, sizeof(payloadSize)); + if (!m_payload.empty()) + { + result.insert(result.end(), + std::make_move_iterator(m_payload.begin()), + std::make_move_iterator(m_payload.end())); + } + + return result; +} + +// Const version for when you need to keep the LogEntry +std::vector<uint8_t> LogEntry::serialize() const & +{ + // Calculate required size upfront + size_t totalSize = + sizeof(int) + // ActionType + sizeof(uint32_t) + m_dataLocation.size() + // Size + data location + sizeof(uint32_t) + m_dataControllerId.size() + // Size + data controller ID + sizeof(uint32_t) + m_dataProcessorId.size() + // Size + data processor ID + sizeof(uint32_t) + m_dataSubjectId.size() + // Size + data subject ID + sizeof(int64_t) + // Timestamp + sizeof(uint32_t) + m_payload.size(); // Size + payload data + + // Pre-allocate the vector + std::vector<uint8_t> result; + result.reserve(totalSize); + + // Push ActionType + int actionType = static_cast<int>(m_actionType); + appendToVector(result, &actionType, sizeof(actionType)); + + // Copy strings + appendStringToVector(result, m_dataLocation); + appendStringToVector(result, m_dataControllerId); + appendStringToVector(result, m_dataProcessorId); + appendStringToVector(result, m_dataSubjectId); + + // Push timestamp + int64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>( + m_timestamp.time_since_epoch()) + .count(); + appendToVector(result, ×tamp, sizeof(timestamp)); + + // Copy payload + uint32_t payloadSize = static_cast<uint32_t>(m_payload.size()); + appendToVector(result, &payloadSize, sizeof(payloadSize)); + if (!m_payload.empty()) + { + appendToVector(result, m_payload.data(), m_payload.size()); + } + + return result; +} + +bool LogEntry::deserialize(std::vector<uint8_t> &&data) +{ + try + { + size_t offset = 0; + + // Check if we have enough data for the basic structure + if (data.size() < sizeof(int)) + return false; + + // Extract action type + int actionType; + std::memcpy(&actionType, data.data() + offset, sizeof(actionType)); + offset += sizeof(actionType); + m_actionType = static_cast<ActionType>(actionType); + + // Extract data location + if (!extractStringFromVector(data, offset, m_dataLocation)) + return false; + + // Extract data controller ID + if (!extractStringFromVector(data, offset, m_dataControllerId)) + return false; + + // Extract data processor ID + if (!extractStringFromVector(data, offset, m_dataProcessorId)) + return false; + + // Extract data subject ID + if (!extractStringFromVector(data, offset, m_dataSubjectId)) + return false; + + // Extract timestamp + if (offset + sizeof(int64_t) > data.size()) + return false; + + int64_t timestamp; + std::memcpy(×tamp, data.data() + offset, sizeof(timestamp)); + offset += sizeof(timestamp); + m_timestamp = std::chrono::system_clock::time_point(std::chrono::milliseconds(timestamp)); + + // Extract payload + if (offset + sizeof(uint32_t) > data.size()) + return false; + + uint32_t payloadSize; + std::memcpy(&payloadSize, data.data() + offset, sizeof(payloadSize)); + offset += sizeof(payloadSize); + + if (offset + payloadSize > data.size()) + return false; + + if (payloadSize > 0) + { + m_payload.clear(); + m_payload.reserve(payloadSize); + + auto start_it = data.begin() + offset; + auto end_it = start_it + payloadSize; + m_payload.assign(std::make_move_iterator(start_it), + std::make_move_iterator(end_it)); + offset += payloadSize; + } + else + { + m_payload.clear(); + } + + return true; + } + catch (const std::exception &) + { + return false; + } +} + +std::vector<uint8_t> LogEntry::serializeBatch(std::vector<LogEntry> &&entries) +{ + if (entries.empty()) + { + // Just return a vector with count = 0 + std::vector<uint8_t> batchData(sizeof(uint32_t)); + uint32_t numEntries = 0; + std::memcpy(batchData.data(), &numEntries, sizeof(numEntries)); + return batchData; + } + + // Pre-calculate approximate total size to minimize reallocations + size_t estimatedSize = sizeof(uint32_t); // Number of entries + for (const auto &entry : entries) + { + // Rough estimate: header size + string sizes + payload size + estimatedSize += sizeof(uint32_t) + // Entry size field + sizeof(int) + // ActionType + 3 * sizeof(uint32_t) + // 3 string length fields + entry.getDataLocation().size() + + entry.getDataControllerId().size() + + entry.getDataProcessorId().size() + + entry.getDataSubjectId().size() + + sizeof(int64_t) + // Timestamp + sizeof(uint32_t) + // Payload size + entry.getPayload().size(); + } + + std::vector<uint8_t> batchData; + batchData.reserve(estimatedSize); + + // Store the number of entries + uint32_t numEntries = static_cast<uint32_t>(entries.size()); + batchData.resize(sizeof(numEntries)); + std::memcpy(batchData.data(), &numEntries, sizeof(numEntries)); + + // Serialize and append each entry using move semantics + for (auto &entry : entries) + { + // Move-serialize the entry + std::vector<uint8_t> entryData = std::move(entry).serialize(); + + // Store the size of the serialized entry + uint32_t entrySize = static_cast<uint32_t>(entryData.size()); + size_t currentSize = batchData.size(); + batchData.resize(currentSize + sizeof(entrySize)); + std::memcpy(batchData.data() + currentSize, &entrySize, sizeof(entrySize)); + + // Move the serialized entry data + batchData.insert(batchData.end(), + std::make_move_iterator(entryData.begin()), + std::make_move_iterator(entryData.end())); + } + + return batchData; +} + +std::vector<LogEntry> LogEntry::deserializeBatch(std::vector<uint8_t> &&batchData) +{ + std::vector<LogEntry> entries; + + try + { + // Read the number of entries + if (batchData.size() < sizeof(uint32_t)) + { + throw std::runtime_error("Batch data too small to contain entry count"); + } + + uint32_t numEntries; + std::memcpy(&numEntries, batchData.data(), sizeof(numEntries)); + + // Reserve space for entries to avoid reallocations + entries.reserve(numEntries); + + // Position in the batch data + size_t position = sizeof(numEntries); + + // Extract each entry + for (uint32_t i = 0; i < numEntries; ++i) + { + // Check if we have enough data left to read the entry size + if (position + sizeof(uint32_t) > batchData.size()) + { + throw std::runtime_error("Unexpected end of batch data"); + } + + // Read the size of the entry + uint32_t entrySize; + std::memcpy(&entrySize, batchData.data() + position, sizeof(entrySize)); + position += sizeof(entrySize); + + // Check if we have enough data left to read the entry + if (position + entrySize > batchData.size()) + { + throw std::runtime_error("Unexpected end of batch data"); + } + + // Create entry data by moving a slice from the batch data + std::vector<uint8_t> entryData; + entryData.reserve(entrySize); + + auto start_it = batchData.begin() + position; + auto end_it = start_it + entrySize; + entryData.assign(std::make_move_iterator(start_it), + std::make_move_iterator(end_it)); + position += entrySize; + + // Deserialize the entry using move semantics + LogEntry entry; + if (entry.deserialize(std::move(entryData))) + { + entries.emplace_back(std::move(entry)); + } + else + { + throw std::runtime_error("Failed to deserialize log entry"); + } + } + } + catch (const std::exception &e) + { + std::cerr << "Error deserializing log batch: " << e.what() << std::endl; + } + + return entries; +} + +// Helper method to append data to a vector +void LogEntry::appendToVector(std::vector<uint8_t> &vec, const void *data, size_t size) const +{ + const uint8_t *bytes = static_cast<const uint8_t *>(data); + vec.insert(vec.end(), bytes, bytes + size); +} + +// Helper method to append a string with its length (const version) +void LogEntry::appendStringToVector(std::vector<uint8_t> &vec, const std::string &str) const +{ + uint32_t length = static_cast<uint32_t>(str.size()); + appendToVector(vec, &length, sizeof(length)); + + if (length > 0) + { + appendToVector(vec, str.data(), str.size()); + } +} + +// Helper method to append a string with its length (move version) +void LogEntry::appendStringToVector(std::vector<uint8_t> &vec, std::string &&str) +{ + uint32_t length = static_cast<uint32_t>(str.size()); + appendToVector(vec, &length, sizeof(length)); + + if (length > 0) + { + vec.insert(vec.end(), str.begin(), str.end()); + } +} + +// Helper method to extract a string from a vector +bool LogEntry::extractStringFromVector(std::vector<uint8_t> &vec, size_t &offset, std::string &str) +{ + // Check if we have enough data for the string length + if (offset + sizeof(uint32_t) > vec.size()) + return false; + + uint32_t length; + std::memcpy(&length, vec.data() + offset, sizeof(length)); + offset += sizeof(length); + + // Check if we have enough data for the string content + if (offset + length > vec.size()) + return false; + + str.assign(reinterpret_cast<const char *>(vec.data() + offset), length); + offset += length; + + return true; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/src/Logger.cpp b/archive/2025/summer/bsc_karidas/src/Logger.cpp new file mode 100644 index 000000000..5db75f303 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/src/Logger.cpp @@ -0,0 +1,139 @@ +#include "Logger.hpp" +#include "QueueItem.hpp" +#include <iostream> + +// Initialize static members +std::unique_ptr<Logger> Logger::s_instance = nullptr; +std::mutex Logger::s_instanceMutex; + +Logger &Logger::getInstance() +{ + std::lock_guard<std::mutex> lock(s_instanceMutex); + if (s_instance == nullptr) + { + s_instance.reset(new Logger()); + } + return *s_instance; +} + +Logger::Logger() + : m_logQueue(nullptr), + m_appendTimeout(std::chrono::milliseconds::max()), + m_initialized(false) +{ +} + +Logger::~Logger() +{ + if (m_initialized) + { + reset(); + } +} + +bool Logger::initialize(std::shared_ptr<BufferQueue> queue, + std::chrono::milliseconds appendTimeout) +{ + if (m_initialized) + { + reportError("Logger already initialized"); + return false; + } + + if (!queue) + { + reportError("Cannot initialize with a null queue"); + return false; + } + + m_logQueue = std::move(queue); + m_appendTimeout = appendTimeout; + m_initialized = true; + + return true; +} + +BufferQueue::ProducerToken Logger::createProducerToken() +{ + if (!m_initialized) + { + reportError("Logger not initialized"); + throw std::runtime_error("Logger not initialized"); + } + + return m_logQueue->createProducerToken(); +} + +bool Logger::append(LogEntry entry, + BufferQueue::ProducerToken &token, + const std::optional<std::string> &filename) +{ + if (!m_initialized) + { + reportError("Logger not initialized"); + return false; + } + + QueueItem item{std::move(entry), filename}; + return m_logQueue->enqueueBlocking(std::move(item), token, m_appendTimeout); +} + +bool Logger::appendBatch(std::vector<LogEntry> entries, + BufferQueue::ProducerToken &token, + const std::optional<std::string> &filename) +{ + if (!m_initialized) + { + reportError("Logger not initialized"); + return false; + } + + if (entries.empty()) + { + return true; + } + + std::vector<QueueItem> batch; + batch.reserve(entries.size()); + for (auto &entry : entries) + { + batch.emplace_back(std::move(entry), filename); + } + return m_logQueue->enqueueBatchBlocking(std::move(batch), token, m_appendTimeout); +} + +bool Logger::reset() +{ + if (!m_initialized) + { + return false; + } + + // Reset state + m_initialized = false; + m_logQueue.reset(); + + return true; +} + +bool Logger::exportLogs( + const std::string &outputPath, + std::chrono::system_clock::time_point fromTimestamp, + std::chrono::system_clock::time_point toTimestamp) +{ + if (!m_initialized) + { + reportError("Logger not initialized"); + return false; + } + + // This functionality would typically be handled by a separate component, + // such as a log storage or retrieval system + reportError("Export logs functionality not implemented in Logger"); + return false; +} + +void Logger::reportError(const std::string &message) +{ + std::cerr << "Logger Error: " << message << std::endl; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/src/LoggingManager.cpp b/archive/2025/summer/bsc_karidas/src/LoggingManager.cpp new file mode 100644 index 000000000..a1d69bd5c --- /dev/null +++ b/archive/2025/summer/bsc_karidas/src/LoggingManager.cpp @@ -0,0 +1,145 @@ +#include "LoggingManager.hpp" +#include "Crypto.hpp" +#include "Compression.hpp" +#include <iostream> +#include <filesystem> + +LoggingManager::LoggingManager(const LoggingConfig &config) + : m_numWriterThreads(config.numWriterThreads), + m_batchSize(config.batchSize), + m_useEncryption(config.useEncryption), + m_compressionLevel(config.compressionLevel) +{ + if (!std::filesystem::create_directories(config.basePath) && + !std::filesystem::exists(config.basePath)) + { + throw std::runtime_error("Failed to create log directory: " + config.basePath); + } + + m_queue = std::make_shared<BufferQueue>(config.queueCapacity, config.maxExplicitProducers); + m_storage = std::make_shared<SegmentedStorage>( + config.basePath, config.baseFilename, + config.maxSegmentSize, + config.maxAttempts, + config.baseRetryDelay, + config.maxOpenFiles); + + Logger::getInstance().initialize(m_queue, config.appendTimeout); + + m_writers.reserve(m_numWriterThreads); +} + +LoggingManager::~LoggingManager() +{ + stop(); +} + +bool LoggingManager::start() +{ + std::lock_guard<std::mutex> lock(m_systemMutex); + + if (m_running.load(std::memory_order_acquire)) + { + std::cerr << "LoggingSystem: Already running" << std::endl; + return false; + } + + m_running.store(true, std::memory_order_release); + m_acceptingEntries.store(true, std::memory_order_release); + + for (size_t i = 0; i < m_numWriterThreads; ++i) + { + auto writer = std::make_unique<Writer>(*m_queue, m_storage, m_batchSize, m_useEncryption, m_compressionLevel); + writer->start(); + m_writers.push_back(std::move(writer)); + } + + std::cout << "LoggingSystem: Started " << m_numWriterThreads << " writer threads"; + std::cout << " (Encryption: " << (m_useEncryption ? "Enabled" : "Disabled"); + std::cout << ", Compression: " << (m_compressionLevel != 0 ? "Enabled" : "Disabled") << ")" << std::endl; + return true; +} + +bool LoggingManager::stop() +{ + std::lock_guard<std::mutex> lock(m_systemMutex); + + if (!m_running.load(std::memory_order_acquire)) + { + return false; + } + + m_acceptingEntries.store(false, std::memory_order_release); + + if (m_queue) + { + std::cout << "LoggingSystem: Waiting for queue to empty..." << std::endl; + m_queue->flush(); + } + + for (auto &writer : m_writers) + { + writer->stop(); + } + m_writers.clear(); + + // Flush storage to ensure all data is written + if (m_storage) + { + m_storage->flush(); + } + + m_running.store(false, std::memory_order_release); + + Logger::getInstance().reset(); + + std::cout << "LoggingSystem: Stopped" << std::endl; + return true; +} + +BufferQueue::ProducerToken LoggingManager::createProducerToken() +{ + return Logger::getInstance().createProducerToken(); +} + +bool LoggingManager::append(LogEntry entry, + BufferQueue::ProducerToken &token, + const std::optional<std::string> &filename) +{ + if (!m_acceptingEntries.load(std::memory_order_acquire)) + { + std::cerr << "LoggingSystem: Not accepting entries" << std::endl; + return false; + } + + return Logger::getInstance().append(std::move(entry), token, filename); +} + +bool LoggingManager::appendBatch(std::vector<LogEntry> entries, + BufferQueue::ProducerToken &token, + const std::optional<std::string> &filename) +{ + if (!m_acceptingEntries.load(std::memory_order_acquire)) + { + std::cerr << "LoggingSystem: Not accepting entries" << std::endl; + return false; + } + + return Logger::getInstance().appendBatch(std::move(entries), token, filename); +} + +bool LoggingManager::exportLogs( + const std::string &outputPath, + std::chrono::system_clock::time_point fromTimestamp, + std::chrono::system_clock::time_point toTimestamp) +{ + // This is a placeholder implementation for log export + // A complete solution would: + // 1. Read the encrypted segments from storage + // 2. Decrypt and decompress them + // 3. Filter by timestamp if requested + // 4. Write to the output path + + std::cerr << "LoggingSystem: Export logs not fully implemented" << std::endl; + return false; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/src/SegmentedStorage.cpp b/archive/2025/summer/bsc_karidas/src/SegmentedStorage.cpp new file mode 100644 index 000000000..6b2a853e9 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/src/SegmentedStorage.cpp @@ -0,0 +1,303 @@ +#include "SegmentedStorage.hpp" +#include <iomanip> +#include <sstream> +#include <algorithm> +#include <sys/stat.h> + +SegmentedStorage::SegmentedStorage(const std::string &basePath, + const std::string &baseFilename, + size_t maxSegmentSize, + size_t maxAttempts, + std::chrono::milliseconds baseRetryDelay, + size_t maxOpenFiles) + : m_basePath(basePath), + m_baseFilename(baseFilename), + m_maxSegmentSize(maxSegmentSize), + m_maxAttempts(maxAttempts), + m_baseRetryDelay(baseRetryDelay), + m_maxOpenFiles(maxOpenFiles), + m_cache(maxOpenFiles, this) +{ + std::filesystem::create_directories(m_basePath); + // Pre-warm the cache with the base filename + m_cache.get(m_baseFilename); +} + +SegmentedStorage::~SegmentedStorage() +{ + m_cache.closeAll(); +} + +// LRUCache methods +std::shared_ptr<SegmentedStorage::CacheEntry> SegmentedStorage::LRUCache::get(const std::string &filename) +{ + std::lock_guard<std::mutex> lock(m_mutex); + + auto it = m_cache.find(filename); + if (it != m_cache.end()) + { + // Found in cache, move to front (most recently used) + m_lruList.erase(it->second.lruIt); + m_lruList.push_front(filename); + it->second.lruIt = m_lruList.begin(); + return it->second.entry; + } + + // Not in cache, need to reconstruct state + auto entry = reconstructState(filename); + + // Check if we need to evict + if (m_cache.size() >= m_capacity) + { + evictLRU(); + } + + // Add to cache + m_lruList.push_front(filename); + m_cache[filename] = {entry, m_lruList.begin()}; + + return entry; +} + +void SegmentedStorage::LRUCache::evictLRU() +{ + // Called with m_mutex already locked + if (m_lruList.empty()) + return; + + const std::string &lru_filename = m_lruList.back(); + auto it = m_cache.find(lru_filename); + if (it != m_cache.end()) + { + // Close the file descriptor if it's open + if (it->second.entry->fd >= 0) + { + m_parent->fsyncRetry(it->second.entry->fd); + ::close(it->second.entry->fd); + } + m_cache.erase(it); + } + m_lruList.pop_back(); +} + +std::shared_ptr<SegmentedStorage::CacheEntry> SegmentedStorage::LRUCache::reconstructState(const std::string &filename) +{ + // Called with m_mutex already locked + auto entry = std::make_shared<CacheEntry>(); + + // Find the latest segment index for this filename + size_t latestIndex = m_parent->findLatestSegmentIndex(filename); + entry->segmentIndex.store(latestIndex, std::memory_order_release); + + // Generate the path for the current segment + std::string segmentPath = m_parent->generateSegmentPath(filename, latestIndex); + entry->currentSegmentPath = segmentPath; + + // Open the file and get its current size + entry->fd = m_parent->openWithRetry(segmentPath.c_str(), O_CREAT | O_RDWR | O_APPEND, 0644); + + // Get the current file size to set as the offset + size_t fileSize = m_parent->getFileSize(segmentPath); + entry->currentOffset.store(fileSize, std::memory_order_release); + + return entry; +} + +void SegmentedStorage::LRUCache::flush(const std::string &filename) +{ + std::lock_guard<std::mutex> lock(m_mutex); + auto it = m_cache.find(filename); + if (it != m_cache.end() && it->second.entry->fd >= 0) + { + m_parent->fsyncRetry(it->second.entry->fd); + } +} + +void SegmentedStorage::LRUCache::flushAll() +{ + std::lock_guard<std::mutex> lock(m_mutex); + for (const auto &pair : m_cache) + { + if (pair.second.entry->fd >= 0) + { + m_parent->fsyncRetry(pair.second.entry->fd); + } + } +} + +void SegmentedStorage::LRUCache::closeAll() +{ + std::lock_guard<std::mutex> lock(m_mutex); + for (const auto &pair : m_cache) + { + if (pair.second.entry->fd >= 0) + { + m_parent->fsyncRetry(pair.second.entry->fd); + ::close(pair.second.entry->fd); + } + } + m_cache.clear(); + m_lruList.clear(); +} + +size_t SegmentedStorage::findLatestSegmentIndex(const std::string &filename) const +{ + size_t maxIndex = 0; + std::string pattern = filename + "_"; + + try + { + for (const auto &entry : std::filesystem::directory_iterator(m_basePath)) + { + if (entry.is_regular_file()) + { + std::string name = entry.path().filename().string(); + if (name.find(pattern) == 0) + { + // Extract the index from filename format: filename_YYYYMMDD_HHMMSS_NNNNNN.log + size_t lastUnderscore = name.find_last_of('_'); + if (lastUnderscore != std::string::npos) + { + size_t dotPos = name.find('.', lastUnderscore); + if (dotPos != std::string::npos) + { + std::string indexStr = name.substr(lastUnderscore + 1, dotPos - lastUnderscore - 1); + try + { + size_t index = std::stoull(indexStr); + maxIndex = std::max(maxIndex, index); + } + catch (...) + { + // Ignore files that don't match the expected format + } + } + } + } + } + } + } + catch (const std::filesystem::filesystem_error &) + { + // If directory doesn't exist or other filesystem error, return 0 + } + + return maxIndex; +} + +size_t SegmentedStorage::getFileSize(const std::string &path) const +{ + struct stat st; + if (::stat(path.c_str(), &st) == 0) + { + return static_cast<size_t>(st.st_size); + } + return 0; +} + +size_t SegmentedStorage::write(std::vector<uint8_t> &&data) +{ + return writeToFile(m_baseFilename, std::move(data)); +} + +size_t SegmentedStorage::writeToFile(const std::string &filename, std::vector<uint8_t> &&data) +{ + size_t size = data.size(); + if (size == 0) + return 0; + + std::shared_ptr<CacheEntry> entry = m_cache.get(filename); + size_t writeOffset; + + // This loop handles race conditions around rotation + while (true) + { + // First check if we need to rotate WITHOUT reserving space + size_t currentOffset = entry->currentOffset.load(std::memory_order_acquire); + if (currentOffset + size > m_maxSegmentSize) + { + std::unique_lock<std::shared_mutex> rotLock(entry->fileMutex); + // Double-check if rotation is still needed + if (entry->currentOffset.load(std::memory_order_acquire) + size > m_maxSegmentSize) + { + rotateSegment(filename, entry); + // After rotation, entry has been updated with new fd and path + continue; + } + } + + // Now safely reserve space + writeOffset = entry->currentOffset.fetch_add(size, std::memory_order_acq_rel); + + // Double-check we didn't cross the boundary after reservation + if (writeOffset + size > m_maxSegmentSize) + { + // Another thread increased the offset past our threshold, try again + continue; + } + + // We have a valid offset and can proceed with the write + break; + } + + // Write under shared lock to prevent racing with rotate/close + { + std::shared_lock<std::shared_mutex> writeLock(entry->fileMutex); + + // Verify the fd is still valid + if (entry->fd < 0) + { + // This shouldn't happen, but if it does, retry + return writeToFile(filename, std::move(data)); + } + + pwriteFull(entry->fd, data.data(), size, static_cast<off_t>(writeOffset)); + } + + return size; +} + +void SegmentedStorage::flush() +{ + m_cache.flushAll(); +} + +std::string SegmentedStorage::rotateSegment(const std::string &filename, std::shared_ptr<CacheEntry> entry) +{ + // exclusive lock assumed by the caller (writeToFile) + + // Close the old file descriptor + if (entry->fd >= 0) + { + fsyncRetry(entry->fd); + ::close(entry->fd); + entry->fd = -1; + } + + size_t newIndex = entry->segmentIndex.fetch_add(1, std::memory_order_acq_rel) + 1; + entry->currentOffset.store(0, std::memory_order_release); + std::string newPath = generateSegmentPath(filename, newIndex); + + // Update the entry's path and open the new file + entry->currentSegmentPath = newPath; + entry->fd = openWithRetry(newPath.c_str(), O_CREAT | O_RDWR | O_APPEND, 0644); + + return newPath; +} + +std::string SegmentedStorage::generateSegmentPath(const std::string &filename, size_t segmentIndex) const +{ + auto now = std::chrono::system_clock::now(); + auto now_time_t = std::chrono::system_clock::to_time_t(now); + std::tm time_info; + + // Linux-specific thread-safe version of localtime + localtime_r(&now_time_t, &time_info); + + std::stringstream ss; + ss << m_basePath << "/"; + ss << filename << "_"; + ss << std::put_time(&time_info, "%Y%m%d_%H%M%S") << "_"; + ss << std::setw(6) << std::setfill('0') << segmentIndex << ".log"; + return ss.str(); +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/src/Writer.cpp b/archive/2025/summer/bsc_karidas/src/Writer.cpp new file mode 100644 index 000000000..bc3fe9acd --- /dev/null +++ b/archive/2025/summer/bsc_karidas/src/Writer.cpp @@ -0,0 +1,103 @@ +#include "Writer.hpp" +#include "Crypto.hpp" +#include "Compression.hpp" +#include <iostream> +#include <chrono> +#include <map> + +Writer::Writer(BufferQueue &queue, + std::shared_ptr<SegmentedStorage> storage, + size_t batchSize, + bool useEncryption, + int compressionLevel) + : m_queue(queue), + m_storage(std::move(storage)), + m_batchSize(batchSize), + m_useEncryption(useEncryption), + m_compressionLevel(compressionLevel), + m_consumerToken(queue.createConsumerToken()) +{ +} + +Writer::~Writer() +{ + stop(); +} + +void Writer::start() +{ + if (m_running.exchange(true)) + { + return; + } + + m_writerThread.reset(new std::thread(&Writer::processLogEntries, this)); +} + +void Writer::stop() +{ + if (m_running.exchange(false)) + { + if (m_writerThread && m_writerThread->joinable()) + { + m_writerThread->join(); + } + } +} + +bool Writer::isRunning() const +{ + return m_running.load(); +} + +void Writer::processLogEntries() +{ + std::vector<QueueItem> batch; + + Crypto crypto; + std::vector<uint8_t> encryptionKey(crypto.KEY_SIZE, 0x42); // dummy key + std::vector<uint8_t> dummyIV(crypto.GCM_IV_SIZE, 0x24); // dummy IV + + while (m_running) + { + size_t entriesDequeued = m_queue.tryDequeueBatch(batch, m_batchSize, m_consumerToken); + if (entriesDequeued == 0) + { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + continue; + } + + std::map<std::optional<std::string>, std::vector<LogEntry>> groupedEntries; + for (auto &item : batch) + { + groupedEntries[item.targetFilename].emplace_back(std::move(item.entry)); + } + + for (auto &[targetFilename, entries] : groupedEntries) + { + std::vector<uint8_t> processedData = LogEntry::serializeBatch(std::move(entries)); + + // Apply compression if enabled + if (m_compressionLevel > 0) + { + processedData = Compression::compress(std::move(processedData), m_compressionLevel); + } + // Apply encryption if enabled + if (m_useEncryption) + { + processedData = crypto.encrypt(std::move(processedData), encryptionKey, dummyIV); + } + + if (targetFilename) + { + m_storage->writeToFile(*targetFilename, std::move(processedData)); + } + else + { + m_storage->write(std::move(processedData)); + } + } + + batch.clear(); + } +} \ No newline at end of file |