diff options
Diffstat (limited to 'archive/2025/summer/bsc_karidas/include')
10 files changed, 509 insertions, 0 deletions
diff --git a/archive/2025/summer/bsc_karidas/include/BufferQueue.hpp b/archive/2025/summer/bsc_karidas/include/BufferQueue.hpp new file mode 100644 index 000000000..3a5ab3c08 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/include/BufferQueue.hpp @@ -0,0 +1,49 @@ +#ifndef BUFFER_QUEUE_HPP +#define BUFFER_QUEUE_HPP + +#include "QueueItem.hpp" +#include "concurrentqueue.h" +#include <atomic> +#include <vector> +#include <memory> +#include <condition_variable> +#include <chrono> + +class BufferQueue +{ +public: + using ProducerToken = moodycamel::ProducerToken; + using ConsumerToken = moodycamel::ConsumerToken; + +private: + moodycamel::ConcurrentQueue<QueueItem> m_queue; + +public: + explicit BufferQueue(size_t capacity, size_t maxExplicitProducers); + + ProducerToken createProducerToken() { return ProducerToken(m_queue); } + ConsumerToken createConsumerToken() { return ConsumerToken(m_queue); } + + bool enqueueBlocking(QueueItem item, + ProducerToken &token, + std::chrono::milliseconds timeout = std::chrono::milliseconds::max()); + bool enqueueBatchBlocking(std::vector<QueueItem> items, + ProducerToken &token, + std::chrono::milliseconds timeout = std::chrono::milliseconds::max()); + bool tryDequeue(QueueItem &item, ConsumerToken &token); + size_t tryDequeueBatch(std::vector<QueueItem> &items, size_t maxItems, ConsumerToken &token); + bool flush(); + size_t size() const; + + // delete copy/move + BufferQueue(const BufferQueue &) = delete; + BufferQueue &operator=(const BufferQueue &) = delete; + BufferQueue(BufferQueue &&) = delete; + BufferQueue &operator=(BufferQueue &&) = delete; + +private: + bool enqueue(QueueItem item, ProducerToken &token); + bool enqueueBatch(std::vector<QueueItem> items, ProducerToken &token); +}; + +#endif \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/include/Compression.hpp b/archive/2025/summer/bsc_karidas/include/Compression.hpp new file mode 100644 index 000000000..43bc34da9 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/include/Compression.hpp @@ -0,0 +1,17 @@ +#ifndef COMPRESSION_HPP +#define COMPRESSION_HPP + +#include "LogEntry.hpp" +#include <vector> +#include <cstdint> +#include <zlib.h> + +class Compression +{ +public: + static std::vector<uint8_t> compress(std::vector<uint8_t> &&data, int level = Z_DEFAULT_COMPRESSION); + + static std::vector<uint8_t> decompress(std::vector<uint8_t> &&compressedData); +}; + +#endif \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/include/Config.hpp b/archive/2025/summer/bsc_karidas/include/Config.hpp new file mode 100644 index 000000000..265f66a78 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/include/Config.hpp @@ -0,0 +1,28 @@ +#ifndef CONFIG_HPP +#define CONFIG_HPP + +#include <string> +#include <chrono> + +struct LoggingConfig +{ + // api + std::chrono::milliseconds appendTimeout = std::chrono::milliseconds(30000); + // queue + size_t queueCapacity = 8192; + size_t maxExplicitProducers = 16; // maximum number of producers creating a producer token + // writers + size_t batchSize = 100; + size_t numWriterThreads = 2; + bool useEncryption = true; + int compressionLevel = 9; // 0 = no compression, 1-9 = compression levels + // segmented storage + std::string basePath = "./logs"; + std::string baseFilename = "default"; + size_t maxSegmentSize = 100 * 1024 * 1024; // 100 MB + size_t maxAttempts = 10; + std::chrono::milliseconds baseRetryDelay = std::chrono::milliseconds(1); + size_t maxOpenFiles = 512; +}; + +#endif \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/include/Crypto.hpp b/archive/2025/summer/bsc_karidas/include/Crypto.hpp new file mode 100644 index 000000000..53e5fa12e --- /dev/null +++ b/archive/2025/summer/bsc_karidas/include/Crypto.hpp @@ -0,0 +1,33 @@ +#ifndef CRYPTO_HPP +#define CRYPTO_HPP + +#include <vector> +#include <string> +#include <cstdint> +#include <memory> +#include <openssl/evp.h> + +class Crypto +{ +private: + EVP_CIPHER_CTX *m_encryptCtx; + EVP_CIPHER_CTX *m_decryptCtx; + +public: + Crypto(); + ~Crypto(); + + static constexpr size_t KEY_SIZE = 32; // 256 bits + static constexpr size_t GCM_IV_SIZE = 12; // 96 bits (recommended for GCM) + static constexpr size_t GCM_TAG_SIZE = 16; // 128 bits + + std::vector<uint8_t> encrypt(std::vector<uint8_t> &&plaintext, + const std::vector<uint8_t> &key, + const std::vector<uint8_t> &iv); + + std::vector<uint8_t> decrypt(const std::vector<uint8_t> &encryptedData, + const std::vector<uint8_t> &key, + const std::vector<uint8_t> &iv); +}; + +#endif \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/include/LogEntry.hpp b/archive/2025/summer/bsc_karidas/include/LogEntry.hpp new file mode 100644 index 000000000..4af355741 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/include/LogEntry.hpp @@ -0,0 +1,61 @@ +#ifndef LOG_ENTRY_HPP +#define LOG_ENTRY_HPP + +#include <string> +#include <chrono> +#include <vector> +#include <memory> +#include <cstdint> + +class LogEntry +{ +public: + enum class ActionType + { + CREATE, + READ, + UPDATE, + DELETE, + }; + + LogEntry(); + + LogEntry(ActionType actionType, + std::string dataLocation, + std::string dataControllerId, + std::string dataProcessorId, + std::string dataSubjectId, + std::vector<uint8_t> payload = std::vector<uint8_t>()); + + std::vector<uint8_t> serialize() &&; + std::vector<uint8_t> serialize() const &; + bool deserialize(std::vector<uint8_t> &&data); + + static std::vector<uint8_t> serializeBatch(std::vector<LogEntry> &&entries); + static std::vector<LogEntry> deserializeBatch(std::vector<uint8_t> &&batchData); + + ActionType getActionType() const { return m_actionType; } + std::string getDataLocation() const { return m_dataLocation; } + std::string getDataControllerId() const { return m_dataControllerId; } + std::string getDataProcessorId() const { return m_dataProcessorId; } + std::string getDataSubjectId() const { return m_dataSubjectId; } + std::chrono::system_clock::time_point getTimestamp() const { return m_timestamp; } + const std::vector<uint8_t> &getPayload() const { return m_payload; } + +private: + // Helper methods for binary serialization + void appendToVector(std::vector<uint8_t> &vec, const void *data, size_t size) const; + void appendStringToVector(std::vector<uint8_t> &vec, const std::string &str) const; + void appendStringToVector(std::vector<uint8_t> &vec, std::string &&str); + bool extractStringFromVector(std::vector<uint8_t> &vec, size_t &offset, std::string &str); + + ActionType m_actionType; // Type of GDPR operation + std::string m_dataLocation; // Location of the data being operated on + std::string m_dataControllerId; // ID of the entity controlling the data + std::string m_dataProcessorId; // ID of the entity performing the operation + std::string m_dataSubjectId; // ID of the data subject + std::chrono::system_clock::time_point m_timestamp; // When the operation occurred + std::vector<uint8_t> m_payload; // optional extra bytes +}; + +#endif \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/include/Logger.hpp b/archive/2025/summer/bsc_karidas/include/Logger.hpp new file mode 100644 index 000000000..d4119d364 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/include/Logger.hpp @@ -0,0 +1,59 @@ +#ifndef LOGGER_HPP +#define LOGGER_HPP + +#include "LogEntry.hpp" +#include "BufferQueue.hpp" +#include "QueueItem.hpp" +#include <string> +#include <chrono> +#include <memory> +#include <vector> +#include <shared_mutex> +#include <functional> +#include <optional> + +class Logger +{ + friend class LoggerTest; + +public: + static Logger &getInstance(); + + bool initialize(std::shared_ptr<BufferQueue> queue, + std::chrono::milliseconds appendTimeout = std::chrono::milliseconds::max()); + + BufferQueue::ProducerToken createProducerToken(); + bool append(LogEntry entry, + BufferQueue::ProducerToken &token, + const std::optional<std::string> &filename = std::nullopt); + bool appendBatch(std::vector<LogEntry> entries, + BufferQueue::ProducerToken &token, + const std::optional<std::string> &filename = std::nullopt); + + bool exportLogs(const std::string &outputPath, + std::chrono::system_clock::time_point fromTimestamp = std::chrono::system_clock::time_point(), + std::chrono::system_clock::time_point toTimestamp = std::chrono::system_clock::time_point()); + + bool reset(); + + ~Logger(); + +private: + Logger(); + Logger(const Logger &) = delete; + Logger &operator=(const Logger &) = delete; + // Singleton instance + static std::unique_ptr<Logger> s_instance; + static std::mutex s_instanceMutex; + + std::shared_ptr<BufferQueue> m_logQueue; + std::chrono::milliseconds m_appendTimeout; + + // State tracking + bool m_initialized; + + // Helper to report errors + void reportError(const std::string &message); +}; + +#endif \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/include/LoggingManager.hpp b/archive/2025/summer/bsc_karidas/include/LoggingManager.hpp new file mode 100644 index 000000000..782c04d23 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/include/LoggingManager.hpp @@ -0,0 +1,53 @@ +#ifndef LOGGING_SYSTEM_HPP +#define LOGGING_SYSTEM_HPP + +#include "Config.hpp" +#include "Logger.hpp" +#include "BufferQueue.hpp" +#include "SegmentedStorage.hpp" +#include "Writer.hpp" +#include "LogEntry.hpp" +#include <memory> +#include <vector> +#include <atomic> +#include <mutex> +#include <chrono> +#include <string> +#include <optional> + +class LoggingManager +{ +public: + explicit LoggingManager(const LoggingConfig &config); + ~LoggingManager(); + + bool start(); + bool stop(); + + BufferQueue::ProducerToken createProducerToken(); + bool append(LogEntry entry, + BufferQueue::ProducerToken &token, + const std::optional<std::string> &filename = std::nullopt); + bool appendBatch(std::vector<LogEntry> entries, + BufferQueue::ProducerToken &token, + const std::optional<std::string> &filename = std::nullopt); + + bool exportLogs(const std::string &outputPath, + std::chrono::system_clock::time_point fromTimestamp = std::chrono::system_clock::time_point(), + std::chrono::system_clock::time_point toTimestamp = std::chrono::system_clock::time_point()); + +private: + std::shared_ptr<BufferQueue> m_queue; // Thread-safe queue for queue items + std::shared_ptr<SegmentedStorage> m_storage; // Manages append-only log segments + std::vector<std::unique_ptr<Writer>> m_writers; // Multiple writer threads + std::atomic<bool> m_running{false}; // System running state + std::atomic<bool> m_acceptingEntries{false}; // Controls whether new entries are accepted + std::mutex m_systemMutex; // For system-wide operations + + size_t m_numWriterThreads; // Number of writer threads + size_t m_batchSize; // Batch size for writers + bool m_useEncryption; + int m_compressionLevel; +}; + +#endif \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/include/QueueItem.hpp b/archive/2025/summer/bsc_karidas/include/QueueItem.hpp new file mode 100644 index 000000000..18c19de00 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/include/QueueItem.hpp @@ -0,0 +1,25 @@ +#ifndef QUEUE_ITEM_HPP +#define QUEUE_ITEM_HPP + +#include "LogEntry.hpp" +#include <optional> +#include <string> + +struct QueueItem +{ + LogEntry entry; + std::optional<std::string> targetFilename = std::nullopt; + + QueueItem() = default; + QueueItem(LogEntry &&logEntry) + : entry(std::move(logEntry)), targetFilename(std::nullopt) {} + QueueItem(LogEntry &&logEntry, const std::optional<std::string> &filename) + : entry(std::move(logEntry)), targetFilename(filename) {} + + QueueItem(const QueueItem &) = default; + QueueItem(QueueItem &&) = default; + QueueItem &operator=(const QueueItem &) = default; + QueueItem &operator=(QueueItem &&) = default; +}; + +#endif \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/include/SegmentedStorage.hpp b/archive/2025/summer/bsc_karidas/include/SegmentedStorage.hpp new file mode 100644 index 000000000..3984aea79 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/include/SegmentedStorage.hpp @@ -0,0 +1,144 @@ +#ifndef SEGMENTED_STORAGE_HPP +#define SEGMENTED_STORAGE_HPP + +#include <string> +#include <vector> +#include <atomic> +#include <mutex> +#include <shared_mutex> +#include <filesystem> +#include <cstdint> +#include <unordered_map> +#include <fcntl.h> // for open flags +#include <unistd.h> // for close, pwrite, fsync +#include <chrono> +#include <thread> +#include <stdexcept> +#include <list> // For LRU cache + +class SegmentedStorage +{ +public: + SegmentedStorage(const std::string &basePath, + const std::string &baseFilename, + size_t maxSegmentSize = 100 * 1024 * 1024, // 100 MB default + size_t maxAttempts = 5, + std::chrono::milliseconds baseRetryDelay = std::chrono::milliseconds(1), + size_t maxOpenFiles = 512); + + ~SegmentedStorage(); + + size_t write(std::vector<uint8_t> &&data); + size_t writeToFile(const std::string &filename, std::vector<uint8_t> &&data); + void flush(); + +private: + std::string m_basePath; + std::string m_baseFilename; + size_t m_maxSegmentSize; + size_t m_maxAttempts; + std::chrono::milliseconds m_baseRetryDelay; + size_t m_maxOpenFiles; // Max number of cache entries + + struct CacheEntry + { + int fd{-1}; + std::atomic<size_t> segmentIndex{0}; + std::atomic<size_t> currentOffset{0}; + std::string currentSegmentPath; + mutable std::shared_mutex fileMutex; // shared for writes, exclusive for rotate/flush + }; + + // Unified LRU Cache for both file descriptors and segment information + class LRUCache + { + public: + LRUCache(size_t capacity, SegmentedStorage *parent) : m_capacity(capacity), m_parent(parent) {} + + std::shared_ptr<CacheEntry> get(const std::string &filename); + void flush(const std::string &filename); + void flushAll(); + void closeAll(); + + private: + size_t m_capacity; + SegmentedStorage *m_parent; + + // LRU list of filenames + std::list<std::string> m_lruList; + // Map from filename to cache entry and iterator in LRU list + struct CacheData + { + std::shared_ptr<CacheEntry> entry; + std::list<std::string>::iterator lruIt; + }; + std::unordered_map<std::string, CacheData> m_cache; + mutable std::mutex m_mutex; // Protects m_lruList and m_cache + + void evictLRU(); + std::shared_ptr<CacheEntry> reconstructState(const std::string &filename); + }; + + LRUCache m_cache; + + std::string rotateSegment(const std::string &filename, std::shared_ptr<CacheEntry> entry); + std::string generateSegmentPath(const std::string &filename, size_t segmentIndex) const; + size_t getFileSize(const std::string &path) const; + size_t findLatestSegmentIndex(const std::string &filename) const; + + // Retry helpers use member-configured parameters + template <typename Func> + auto retryWithBackoff(Func &&f) + { + for (size_t attempt = 1;; ++attempt) + { + try + { + return f(); + } + catch (const std::runtime_error &) + { + if (attempt >= m_maxAttempts) + throw; + auto delay = m_baseRetryDelay * (1 << (attempt - 1)); + std::this_thread::sleep_for(delay); + } + } + } + + int openWithRetry(const char *path, int flags, mode_t mode) + { + return retryWithBackoff([&]() + { + int fd = ::open(path, flags, mode); + if (fd < 0) throw std::runtime_error("open failed"); + return fd; }); + } + + size_t pwriteFull(int fd, const uint8_t *buf, size_t count, off_t offset) + { + size_t total = 0; + while (total < count) + { + ssize_t written = ::pwrite(fd, buf + total, count - total, offset + total); + if (written < 0) + { + if (errno == EINTR) + continue; + throw std::runtime_error("pwrite failed"); + } + total += written; + } + return total; + } + + void fsyncRetry(int fd) + { + retryWithBackoff([&]() + { + if (::fsync(fd) < 0) throw std::runtime_error("fsync failed"); + return 0; }); + } +}; + +#endif \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/include/Writer.hpp b/archive/2025/summer/bsc_karidas/include/Writer.hpp new file mode 100644 index 000000000..3bc9e1672 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/include/Writer.hpp @@ -0,0 +1,40 @@ +#ifndef WRITER_HPP +#define WRITER_HPP + +#include <thread> +#include <atomic> +#include <memory> +#include <vector> +#include "QueueItem.hpp" +#include "BufferQueue.hpp" +#include "SegmentedStorage.hpp" + +class Writer +{ +public: + explicit Writer(BufferQueue &queue, + std::shared_ptr<SegmentedStorage> storage, + size_t batchSize = 100, + bool useEncryption = true, + int m_compressionLevel = 9); + + ~Writer(); + + void start(); + void stop(); + bool isRunning() const; + +private: + void processLogEntries(); + + BufferQueue &m_queue; + std::shared_ptr<SegmentedStorage> m_storage; + std::unique_ptr<std::thread> m_writerThread; + std::atomic<bool> m_running{false}; + const size_t m_batchSize; + const bool m_useEncryption; + const int m_compressionLevel; + + BufferQueue::ConsumerToken m_consumerToken; +}; +#endif \ No newline at end of file |