diff options
Diffstat (limited to 'archive/2025/summer/bsc_karidas/benchmarks')
14 files changed, 2513 insertions, 0 deletions
diff --git a/archive/2025/summer/bsc_karidas/benchmarks/BenchmarkUtils.cpp b/archive/2025/summer/bsc_karidas/benchmarks/BenchmarkUtils.cpp new file mode 100644 index 000000000..070d0672c --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/BenchmarkUtils.cpp @@ -0,0 +1,263 @@ +#include "BenchmarkUtils.hpp" + +LatencyCollector appendLogEntries(LoggingManager &loggingManager, const std::vector<BatchWithDestination> &batches) +{ + LatencyCollector localCollector; + // Pre-allocate to avoid reallocations during measurement + localCollector.reserve(batches.size()); + + auto token = loggingManager.createProducerToken(); + + for (const auto &batchWithDest : batches) + { + // Measure latency for each appendBatch call + auto startTime = std::chrono::high_resolution_clock::now(); + + bool success = loggingManager.appendBatch(batchWithDest.first, token, batchWithDest.second); + + auto endTime = std::chrono::high_resolution_clock::now(); + auto latency = std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - startTime); + + // Record the latency measurement in thread-local collector + localCollector.addMeasurement(latency); + + if (!success) + { + std::cerr << "Failed to append batch of " << batchWithDest.first.size() << " entries to " + << (batchWithDest.second ? *batchWithDest.second : "default") << std::endl; + } + } + + return localCollector; +} + +void cleanupLogDirectory(const std::string &logDir) +{ + try + { + if (std::filesystem::exists(logDir)) + { + std::filesystem::remove_all(logDir); + } + } + catch (const std::exception &e) + { + std::cerr << "Error cleaning log directory: " << e.what() << std::endl; + } +} + +size_t calculateTotalDataSize(const std::vector<BatchWithDestination> &batches, int numProducers) +{ + size_t totalSize = 0; + + for (const auto &batchWithDest : batches) + { + for (const auto &entry : batchWithDest.first) + { + totalSize += entry.serialize().size(); + } + } + + return totalSize * numProducers; +} + +size_t calculateDirectorySize(const std::string &dirPath) +{ + size_t totalSize = 0; + for (const auto &entry : std::filesystem::recursive_directory_iterator(dirPath)) + { + if (entry.is_regular_file()) + { + totalSize += std::filesystem::file_size(entry.path()); + } + } + return totalSize; +} + +std::vector<BatchWithDestination> generateBatches( + int numEntries, + int numSpecificFiles, + int batchSize, + int payloadSize) +{ + std::vector<BatchWithDestination> batches; + + // Generate specific filenames + std::vector<std::string> specificFilenames; + for (int i = 0; i < numSpecificFiles; i++) + { + specificFilenames.push_back("specific_log_file" + std::to_string(i + 1) + ".log"); + } + + int totalChoices = numSpecificFiles + 1; // +1 for default (std::nullopt) + int generated = 0; + int destinationIndex = 0; + + // Random number generation setup + std::random_device rd; + std::mt19937 rng(rd()); + + // Define pools similar to compressionRatio.cpp + std::vector<std::string> userIds; + for (int i = 1; i <= 1000; ++i) + { + userIds.push_back("user_" + std::to_string(i)); + } + + std::vector<std::string> attributes = { + "profile", "settings", "history", "preferences", "contacts", + "messages", "photos", "documents", "videos", "audio"}; + + std::vector<std::string> controllerIds; + for (int i = 1; i <= 10; ++i) + { + controllerIds.push_back("controller_" + std::to_string(i)); + } + + std::vector<std::string> processorIds; + for (int i = 1; i <= 20; ++i) + { + processorIds.push_back("processor_" + std::to_string(i)); + } + + std::vector<std::string> wordList = { + "the", "data", //"to", "and", "user","is", "in", "for", "of", "access", + //"system", "time", "log", "with", "on", "from", "request", "error", "file", "server", + //"update", "status", "by", "at", "process", "information", "new", "this", "connection", "failed", + //"success", "operation", "id", "network", "event", "application", "check", "value", "into", "service", + //"query", "response", "get", "set", "action", "report", "now", "client", "device", "start" + }; + + // Zipfian distribution for payload words + std::vector<double> weights; + for (size_t k = 0; k < wordList.size(); ++k) + { + weights.push_back(1.0 / (k + 1.0)); + } + std::discrete_distribution<size_t> wordDist(weights.begin(), weights.end()); + + // Generate power-of-2 sizes for variable payload + std::vector<size_t> powerOf2Sizes; + int minPowerOf2 = 5; // 2^5 = 32 + int maxPowerOf2 = static_cast<int>(std::log2(payloadSize)); + for (int power = minPowerOf2; power <= maxPowerOf2; power++) + { + powerOf2Sizes.push_back(1 << power); // 2^power + } + + // Distributions for random selections + std::uniform_int_distribution<int> actionDist(0, 3); // CREATE, READ, UPDATE, DELETE + std::uniform_int_distribution<size_t> userDist(0, userIds.size() - 1); + std::uniform_int_distribution<size_t> attrDist(0, attributes.size() - 1); + std::uniform_int_distribution<size_t> controllerDist(0, controllerIds.size() - 1); + std::uniform_int_distribution<size_t> processorDist(0, processorIds.size() - 1); + std::uniform_int_distribution<size_t> powerOf2SizeDist(0, powerOf2Sizes.size() - 1); + + while (generated < numEntries) + { + int currentBatchSize = std::min(batchSize, numEntries - generated); + + // Assign destination in round-robin manner + std::optional<std::string> targetFilename = std::nullopt; + if (destinationIndex % totalChoices > 0) + { + targetFilename = specificFilenames[(destinationIndex % totalChoices) - 1]; + } + + // Generate the batch + std::vector<LogEntry> batch; + batch.reserve(currentBatchSize); + for (int i = 0; i < currentBatchSize; i++) + { + // Generate realistic log entry + auto action = static_cast<LogEntry::ActionType>(actionDist(rng)); + std::string user_id = userIds[userDist(rng)]; + std::string attribute = attributes[attrDist(rng)]; + std::string dataLocation = "user/" + user_id + "/" + attribute; + std::string dataSubjectId = user_id; + std::string dataControllerId = controllerIds[controllerDist(rng)]; + std::string dataProcessorId = processorIds[processorDist(rng)]; + + // Determine targetSize + size_t targetSize = static_cast<size_t>(payloadSize); + + // Build payload + std::string payloadStr; + while (payloadStr.size() < targetSize) + { + if (!payloadStr.empty()) + payloadStr += " "; + size_t wordIndex = wordDist(rng); + payloadStr += wordList[wordIndex]; + } + if (payloadStr.size() > targetSize) + { + payloadStr = payloadStr.substr(0, targetSize); + } + std::vector<uint8_t> payload(payloadStr.begin(), payloadStr.end()); + + LogEntry entry(action, + dataLocation, + dataControllerId, + dataProcessorId, + dataSubjectId, + std::move(payload)); + batch.push_back(std::move(entry)); + } + + batches.push_back({std::move(batch), targetFilename}); + generated += currentBatchSize; + destinationIndex++; // Move to the next destination + } + + return batches; +} + +LatencyStats calculateLatencyStats(const LatencyCollector &collector) +{ + const auto &latencies = collector.getMeasurements(); + + if (latencies.empty()) + { + return {0.0, 0.0, 0.0, 0}; + } + + // Convert to milliseconds for easier reading + std::vector<double> latenciesMs; + latenciesMs.reserve(latencies.size()); + for (const auto &lat : latencies) + { + latenciesMs.push_back(static_cast<double>(lat.count()) / 1e6); // ns to ms + } + + // Sort for percentile calculations + std::sort(latenciesMs.begin(), latenciesMs.end()); + + LatencyStats stats; + stats.count = latenciesMs.size(); + stats.maxMs = latenciesMs.back(); + stats.avgMs = std::accumulate(latenciesMs.begin(), latenciesMs.end(), 0.0) / latenciesMs.size(); + + // Median + size_t medianIdx = latenciesMs.size() / 2; + if (latenciesMs.size() % 2 == 0) + { + stats.medianMs = (latenciesMs[medianIdx - 1] + latenciesMs[medianIdx]) / 2.0; + } + else + { + stats.medianMs = latenciesMs[medianIdx]; + } + + return stats; +} + +void printLatencyStats(const LatencyStats &stats) +{ + std::cout << "============== Latency Statistics ==============" << std::endl; + std::cout << "Total append operations: " << stats.count << std::endl; + std::cout << "Max latency: " << stats.maxMs << " ms" << std::endl; + std::cout << "Average latency: " << stats.avgMs << " ms" << std::endl; + std::cout << "Median latency: " << stats.medianMs << " ms" << std::endl; + std::cout << "===============================================" << std::endl; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/BenchmarkUtils.hpp b/archive/2025/summer/bsc_karidas/benchmarks/BenchmarkUtils.hpp new file mode 100644 index 000000000..ec94d89ce --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/BenchmarkUtils.hpp @@ -0,0 +1,79 @@ +#ifndef BENCHMARK_UTILS_HPP +#define BENCHMARK_UTILS_HPP + +#include "LoggingManager.hpp" +#include <vector> +#include <string> +#include <optional> +#include <filesystem> +#include <iostream> +#include <chrono> +#include <algorithm> +#include <random> +#include <numeric> + +using BatchWithDestination = std::pair<std::vector<LogEntry>, std::optional<std::string>>; + +class LatencyCollector +{ +private: + std::vector<std::chrono::nanoseconds> latencies; + +public: + void addMeasurement(std::chrono::nanoseconds latency) + { + latencies.push_back(latency); + } + + void reserve(size_t capacity) + { + latencies.reserve(capacity); + } + + const std::vector<std::chrono::nanoseconds> &getMeasurements() const + { + return latencies; + } + + void clear() + { + latencies.clear(); + } + + // Merge another collector's measurements into this one + void merge(const LatencyCollector &other) + { + const auto &otherLatencies = other.getMeasurements(); + latencies.insert(latencies.end(), otherLatencies.begin(), otherLatencies.end()); + } +}; + +struct LatencyStats +{ + double maxMs; + double avgMs; + double medianMs; + size_t count; +}; + +// Function to calculate statistics from a merged collector +LatencyStats calculateLatencyStats(const LatencyCollector &collector); + +// Modified to return latency measurements instead of using global state +LatencyCollector appendLogEntries(LoggingManager &loggingManager, const std::vector<BatchWithDestination> &batches); + +void cleanupLogDirectory(const std::string &logDir); + +size_t calculateTotalDataSize(const std::vector<BatchWithDestination> &batches, int numProducers); + +size_t calculateDirectorySize(const std::string &dirPath); + +std::vector<BatchWithDestination> generateBatches( + int numEntries, + int numSpecificFiles, + int batchSize, + int payloadSize); + +void printLatencyStats(const LatencyStats &stats); + +#endif \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/validation/batch_size.cpp b/archive/2025/summer/bsc_karidas/benchmarks/validation/batch_size.cpp new file mode 100644 index 000000000..e5f2a44e6 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/validation/batch_size.cpp @@ -0,0 +1,226 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <fstream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <iomanip> +#include <filesystem> + +struct BenchmarkResult +{ + double elapsedSeconds; + double throughputEntries; + double logicalThroughputGiB; + double physicalThroughputGiB; + double writeAmplification; + LatencyStats latencyStats; +}; + +BenchmarkResult runBatchSizeBenchmark(const LoggingConfig &baseConfig, int writerBatchSize, int numProducerThreads, + int entriesPerProducer, int numSpecificFiles, int producerBatchSize, int payloadSize) +{ + LoggingConfig config = baseConfig; + config.basePath = "./logs/batch_" + std::to_string(writerBatchSize); + config.batchSize = writerBatchSize; + + cleanupLogDirectory(config.basePath); + + std::cout << "Generating batches with pre-determined destinations for all threads..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducerThreads); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" + << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(config); + loggingManager.start(); + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducerThreads; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(config.basePath); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducerThreads * entriesPerProducer; + double throughputEntries = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = static_cast<double>(finalStorageSize) / (1024.0 * 1024.0 * 1024.0 * elapsedSeconds); + + // Calculate latency statistics from merged measurements + LatencyStats latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(config.basePath); + + return BenchmarkResult{ + elapsedSeconds, + throughputEntries, + logicalThroughputGiB, + physicalThroughputGiB, + writeAmplification, + latencyStats}; +} + +// Write CSV header +void writeCSVHeader(std::ofstream &csvFile) +{ + csvFile << "batch_size,elapsed_seconds,throughput_entries_per_sec,logical_throughput_gib_per_sec," + << "physical_throughput_gib_per_sec,relative_performance,write_amplification," + << "avg_latency_ms,median_latency_ms,max_latency_ms,latency_count\n"; +} + +// Write a single result row to CSV +void writeCSVRow(std::ofstream &csvFile, int batchSize, const BenchmarkResult &result, double relativePerf) +{ + csvFile << batchSize << "," + << std::fixed << std::setprecision(6) << result.elapsedSeconds << "," + << std::fixed << std::setprecision(2) << result.throughputEntries << "," + << std::fixed << std::setprecision(6) << result.logicalThroughputGiB << "," + << std::fixed << std::setprecision(6) << result.physicalThroughputGiB << "," + << std::fixed << std::setprecision(6) << relativePerf << "," + << std::fixed << std::setprecision(8) << result.writeAmplification << "," + << std::fixed << std::setprecision(6) << result.latencyStats.avgMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.medianMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.maxMs << "," + << result.latencyStats.count << "\n"; +} + +void runBatchSizeComparison(const LoggingConfig &baseConfig, const std::vector<int> &batchSizes, + int numProducerThreads, int entriesPerProducer, + int numSpecificFiles, int producerBatchSize, int payloadSize, + const std::string &csvFilename = "batch_size_benchmark.csv") +{ + std::vector<BenchmarkResult> results; + + // Open CSV file for writing + std::ofstream csvFile(csvFilename); + if (!csvFile.is_open()) + { + std::cerr << "Error: Could not open CSV file " << csvFilename << " for writing." << std::endl; + return; + } + + writeCSVHeader(csvFile); + + std::cout << "Running batch size benchmark with " << batchSizes.size() << " data points..." << std::endl; + std::cout << "Results will be saved to: " << csvFilename << std::endl; + + for (size_t i = 0; i < batchSizes.size(); i++) + { + int batchSize = batchSizes[i]; + std::cout << "\nProgress: " << (i + 1) << "/" << batchSizes.size() + << " - Running benchmark with writer batch size: " << batchSize << "..." << std::endl; + + BenchmarkResult result = runBatchSizeBenchmark( + baseConfig, batchSize, numProducerThreads, + entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + + results.push_back(result); + + // Calculate relative performance (using first result as baseline) + double relativePerf = results.size() > 1 ? result.throughputEntries / results[0].throughputEntries : 1.0; + + // Write result to CSV immediately + writeCSVRow(csvFile, batchSize, result, relativePerf); + csvFile.flush(); // Ensure data is written in case of early termination + + // Print progress summary + std::cout << " Completed: " << std::fixed << std::setprecision(2) + << result.throughputEntries << " entries/s, " + << std::fixed << std::setprecision(3) << result.logicalThroughputGiB << " GiB/s" << std::endl; + + // Small delay between runs + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + csvFile.close(); + std::cout << "\nBenchmark completed! Results saved to " << csvFilename << std::endl; + + std::cout << "\n=========== WRITER BATCH SIZE BENCHMARK SUMMARY ===========" << std::endl; + std::cout << std::left << std::setw(12) << "Batch Size" + << std::setw(15) << "Time (sec)" + << std::setw(20) << "Throughput (entries/s)" + << std::setw(15) << "Logical (GiB/s)" + << std::setw(15) << "Physical (GiB/s)" + << std::setw(12) << "Rel. Perf" + << std::setw(15) << "Write Amp." + << std::setw(12) << "Avg Lat(ms)" << std::endl; + std::cout << "--------------------------------------------------------------------------------------------------------------------------------" << std::endl; + + for (size_t i = 0; i < batchSizes.size(); i++) + { + double relativePerf = results[i].throughputEntries / results[0].throughputEntries; + std::cout << std::left << std::setw(12) << batchSizes[i] + << std::setw(15) << std::fixed << std::setprecision(2) << results[i].elapsedSeconds + << std::setw(20) << std::fixed << std::setprecision(2) << results[i].throughputEntries + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].logicalThroughputGiB + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].physicalThroughputGiB + << std::setw(12) << std::fixed << std::setprecision(2) << relativePerf + << std::setw(15) << std::fixed << std::setprecision(4) << results[i].writeAmplification + << std::setw(12) << std::fixed << std::setprecision(3) << results[i].latencyStats.avgMs << std::endl; + } + std::cout << "======================================================================================================================================" << std::endl; +} + +int main() +{ + // System parameters + LoggingConfig baseConfig; + baseConfig.baseFilename = "default"; + baseConfig.maxSegmentSize = 500 * 1024 * 1024; // 100 MB + baseConfig.maxAttempts = 5; + baseConfig.baseRetryDelay = std::chrono::milliseconds(1); + baseConfig.queueCapacity = 2000000; + baseConfig.maxExplicitProducers = 16; + baseConfig.numWriterThreads = 16; + baseConfig.appendTimeout = std::chrono::minutes(2); + baseConfig.useEncryption = true; + baseConfig.compressionLevel = 4; + baseConfig.maxOpenFiles = 512; + // Benchmark parameters + const int numSpecificFiles = 256; + const int producerBatchSize = 4096; + const int numProducers = 16; + const int entriesPerProducer = 2000000; + const int payloadSize = 4096; + + std::vector<int> batchSizes = {1, 4, 8, 16, 32, 64, 96, 128, 256, 512, 768, 1024, 1536, 2048, 4096, 8192, 16384, 32768, 65536, 131072}; + + runBatchSizeComparison(baseConfig, + batchSizes, + numProducers, + entriesPerProducer, + numSpecificFiles, + producerBatchSize, + payloadSize, + "batch_size_benchmark_results.csv"); + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/validation/concurrency.cpp b/archive/2025/summer/bsc_karidas/benchmarks/validation/concurrency.cpp new file mode 100644 index 000000000..428fd40e7 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/validation/concurrency.cpp @@ -0,0 +1,235 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <fstream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <iomanip> +#include <filesystem> + +struct BenchmarkResult +{ + double executionTime; + double throughputEntries; + double logicalThroughputGiB; + double physicalThroughputGiB; + size_t inputDataSizeBytes; + size_t outputDataSizeBytes; + double writeAmplification; + LatencyStats latencyStats; +}; + +BenchmarkResult runBenchmark(const LoggingConfig &baseConfig, int numWriterThreads, int numProducerThreads, + int entriesPerProducer, int numSpecificFiles, int producerBatchSize, int payloadSize) +{ + LoggingConfig config = baseConfig; + config.basePath = "./logs_writers"; + config.numWriterThreads = numWriterThreads; + + cleanupLogDirectory(config.basePath); + + std::cout << "Generating batches with pre-determined destinations for all threads..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducerThreads); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" + << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(config); + loggingManager.start(); + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducerThreads; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(config.basePath); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducerThreads * entriesPerProducer; + double throughputEntries = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = static_cast<double>(finalStorageSize) / (1024.0 * 1024.0 * 1024.0 * elapsedSeconds); + + // Calculate latency statistics from merged measurements + LatencyStats latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(config.basePath); + + return BenchmarkResult{ + elapsedSeconds, + throughputEntries, + logicalThroughputGiB, + physicalThroughputGiB, + totalDataSizeBytes, + finalStorageSize, + writeAmplification, + latencyStats}; +} + +// Write CSV header +void writeCSVHeader(std::ofstream &csvFile) +{ + csvFile << "writer_threads,execution_time_seconds,throughput_entries_per_sec,logical_throughput_gib_per_sec," + << "physical_throughput_gib_per_sec,input_data_size_bytes,output_data_size_bytes,speedup_vs_baseline," + << "write_amplification,avg_latency_ms,median_latency_ms,max_latency_ms,latency_count\n"; +} + +// Write a single result row to CSV +void writeCSVRow(std::ofstream &csvFile, int writerThreads, const BenchmarkResult &result, double speedup) +{ + csvFile << writerThreads << "," + << std::fixed << std::setprecision(6) << result.executionTime << "," + << std::fixed << std::setprecision(2) << result.throughputEntries << "," + << std::fixed << std::setprecision(6) << result.logicalThroughputGiB << "," + << std::fixed << std::setprecision(6) << result.physicalThroughputGiB << "," + << result.inputDataSizeBytes << "," + << result.outputDataSizeBytes << "," + << std::fixed << std::setprecision(6) << speedup << "," + << std::fixed << std::setprecision(8) << result.writeAmplification << "," + << std::fixed << std::setprecision(6) << result.latencyStats.avgMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.medianMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.maxMs << "," + << result.latencyStats.count << "\n"; +} + +void runConcurrencyBenchmark(const LoggingConfig &baseConfig, const std::vector<int> &writerThreadCounts, + int numProducerThreads, int entriesPerProducer, + int numSpecificFiles, int producerBatchSize, int payloadSize, + const std::string &csvFilename = "concurrency_benchmark.csv") +{ + std::vector<BenchmarkResult> results; + + // Open CSV file for writing + std::ofstream csvFile(csvFilename); + if (!csvFile.is_open()) + { + std::cerr << "Error: Could not open CSV file " << csvFilename << " for writing." << std::endl; + return; + } + + writeCSVHeader(csvFile); + + std::cout << "Running concurrency benchmark with " << writerThreadCounts.size() << " data points..." << std::endl; + std::cout << "Results will be saved to: " << csvFilename << std::endl; + + for (size_t i = 0; i < writerThreadCounts.size(); i++) + { + int writerCount = writerThreadCounts[i]; + std::cout << "\nProgress: " << (i + 1) << "/" << writerThreadCounts.size() + << " - Running benchmark with " << writerCount << " writer thread(s)..." << std::endl; + + BenchmarkResult result = runBenchmark(baseConfig, writerCount, numProducerThreads, entriesPerProducer, + numSpecificFiles, producerBatchSize, payloadSize); + + results.push_back(result); + + // Calculate speedup (using first result as baseline) + double speedup = results.size() > 1 ? result.throughputEntries / results[0].throughputEntries : 1.0; + + // Write result to CSV immediately + writeCSVRow(csvFile, writerCount, result, speedup); + csvFile.flush(); // Ensure data is written in case of early termination + + // Print progress summary + std::cout << " Completed: " << std::fixed << std::setprecision(2) + << result.throughputEntries << " entries/s, " + << std::fixed << std::setprecision(3) << result.logicalThroughputGiB << " GiB/s, " + << std::fixed << std::setprecision(2) << speedup << "x speedup" << std::endl; + } + + csvFile.close(); + std::cout << "\nBenchmark completed! Results saved to " << csvFilename << std::endl; + + // Still print summary table to console for immediate review + std::cout << "\n=================== CONCURRENCY BENCHMARK SUMMARY ===================" << std::endl; + std::cout << std::left << std::setw(20) << "Writer Threads" + << std::setw(15) << "Time (sec)" + << std::setw(20) << "Throughput (ent/s)" + << std::setw(15) << "Logical (GiB/s)" + << std::setw(15) << "Physical (GiB/s)" + << std::setw(20) << "Input Size (bytes)" + << std::setw(20) << "Storage Size (bytes)" + << std::setw(15) << "Write Amp." + << std::setw(12) << "Speedup vs. 1" + << std::setw(12) << "Avg Lat(ms)" << std::endl; + std::cout << "--------------------------------------------------------------------------------------------------------------------------------" << std::endl; + + double baselineThroughputEntries = results[0].throughputEntries; + + for (size_t i = 0; i < writerThreadCounts.size(); i++) + { + double speedup = results[i].throughputEntries / baselineThroughputEntries; + std::cout << std::left << std::setw(20) << writerThreadCounts[i] + << std::setw(15) << std::fixed << std::setprecision(2) << results[i].executionTime + << std::setw(20) << std::fixed << std::setprecision(2) << results[i].throughputEntries + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].logicalThroughputGiB + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].physicalThroughputGiB + << std::setw(20) << results[i].inputDataSizeBytes + << std::setw(20) << results[i].outputDataSizeBytes + << std::setw(15) << std::fixed << std::setprecision(4) << results[i].writeAmplification + << std::setw(12) << std::fixed << std::setprecision(2) << speedup + << std::setw(12) << std::fixed << std::setprecision(3) << results[i].latencyStats.avgMs << std::endl; + } + std::cout << "================================================================================================================================" << std::endl; +} + +int main() +{ + // system parameters + LoggingConfig baseConfig; + baseConfig.baseFilename = "default"; + baseConfig.maxSegmentSize = 50 * 1024 * 1024; // 50 MB + baseConfig.maxAttempts = 5; + baseConfig.baseRetryDelay = std::chrono::milliseconds(1); + baseConfig.queueCapacity = 3000000; + baseConfig.maxExplicitProducers = 16; + baseConfig.batchSize = 8192; + baseConfig.appendTimeout = std::chrono::minutes(5); + baseConfig.useEncryption = true; + baseConfig.compressionLevel = 9; + // benchmark parameters + const int numSpecificFiles = 256; + const int producerBatchSize = 512; + const int numProducers = 16; + const int entriesPerProducer = 2000000; + const int payloadSize = 2048; + + std::vector<int> writerThreadCounts = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; + + runConcurrencyBenchmark(baseConfig, + writerThreadCounts, + numProducers, + entriesPerProducer, + numSpecificFiles, + producerBatchSize, + payloadSize, + "concurrency_benchmark_results.csv"); + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/validation/encryption_compression_usage.cpp b/archive/2025/summer/bsc_karidas/benchmarks/validation/encryption_compression_usage.cpp new file mode 100644 index 000000000..34e0cb929 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/validation/encryption_compression_usage.cpp @@ -0,0 +1,248 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <fstream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <iomanip> +#include <filesystem> + +struct BenchmarkResult +{ + bool useEncryption; + int compressionLevel; + double executionTime; + size_t totalEntries; + double throughputEntries; + size_t totalDataSizeBytes; + size_t finalStorageSize; + double logicalThroughputGiB; + double physicalThroughputGiB; + double writeAmplification; + LatencyStats latencyStats; +}; + +BenchmarkResult runBenchmark(const LoggingConfig &baseConfig, bool useEncryption, int compressionLevel, + const std::vector<BatchWithDestination> &batches, + int numProducerThreads, int entriesPerProducer) +{ + LoggingConfig config = baseConfig; + config.basePath = "./encryption_compression_usage"; + config.useEncryption = useEncryption; + config.compressionLevel = compressionLevel; + + cleanupLogDirectory(config.basePath); + + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducerThreads); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + std::cout << "Benchmark with Encryption: " << (useEncryption ? "Enabled" : "Disabled") + << ", Compression: " << (compressionLevel != 0 ? "Enabled" : "Disabled") + << " - Total data to be written: " << totalDataSizeBytes + << " bytes (" << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(config); + loggingManager.start(); + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducerThreads; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(config.basePath); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducerThreads * entriesPerProducer; + double throughputEntries = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = static_cast<double>(finalStorageSize) / (1024.0 * 1024.0 * 1024.0 * elapsedSeconds); + + // Calculate latency statistics from merged measurements + LatencyStats latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(config.basePath); + + return BenchmarkResult{ + useEncryption, + compressionLevel, + elapsedSeconds, + totalEntries, + throughputEntries, + totalDataSizeBytes, + finalStorageSize, + logicalThroughputGiB, + physicalThroughputGiB, + writeAmplification, + latencyStats}; +} + +// Write CSV header +void writeCSVHeader(std::ofstream &csvFile) +{ + csvFile << "encryption_enabled,compression_level,execution_time_seconds,total_entries," + << "throughput_entries_per_sec,total_data_size_bytes,final_storage_size_bytes,logical_throughput_gib_per_sec," + << "physical_throughput_gib_per_sec,write_amplification,avg_latency_ms,median_latency_ms," + << "max_latency_ms,latency_count\n"; +} + +// Write a single result row to CSV +void writeCSVRow(std::ofstream &csvFile, const BenchmarkResult &result) +{ + csvFile << (result.useEncryption ? "true" : "false") << "," + << result.compressionLevel << "," + << std::fixed << std::setprecision(6) << result.executionTime << "," + << result.totalEntries << "," + << std::fixed << std::setprecision(2) << result.throughputEntries << "," + << result.totalDataSizeBytes << "," + << result.finalStorageSize << "," + << std::fixed << std::setprecision(6) << result.logicalThroughputGiB << "," + << std::fixed << std::setprecision(6) << result.physicalThroughputGiB << "," + << std::fixed << std::setprecision(8) << result.writeAmplification << "," + << std::fixed << std::setprecision(6) << result.latencyStats.avgMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.medianMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.maxMs << "," + << result.latencyStats.count << "\n"; +} + +void runEncryptionCompressionBenchmark(const LoggingConfig &baseConfig, + const std::vector<bool> &encryptionSettings, + const std::vector<int> &compressionLevels, + const std::vector<BatchWithDestination> &batches, + int numProducers, int entriesPerProducer, + const std::string &csvFilename = "encryption_compression_benchmark.csv") +{ + std::vector<BenchmarkResult> results; + + // Open CSV file for writing + std::ofstream csvFile(csvFilename); + if (!csvFile.is_open()) + { + std::cerr << "Error: Could not open CSV file " << csvFilename << " for writing." << std::endl; + return; + } + + writeCSVHeader(csvFile); + + int totalCombinations = encryptionSettings.size() * compressionLevels.size(); + std::cout << "Running encryption/compression benchmark with " << totalCombinations << " configurations..." << std::endl; + std::cout << "Results will be saved to: " << csvFilename << std::endl; + + int currentTest = 0; + for (bool useEncryption : encryptionSettings) + { + for (int compressionLevel : compressionLevels) + { + currentTest++; + std::cout << "\nProgress: " << currentTest << "/" << totalCombinations + << " - Testing Encryption: " << (useEncryption ? "Enabled" : "Disabled") + << ", Compression: " << compressionLevel << "..." << std::endl; + + BenchmarkResult result = runBenchmark(baseConfig, useEncryption, compressionLevel, batches, numProducers, entriesPerProducer); + results.push_back(result); + + // Write result to CSV immediately + writeCSVRow(csvFile, result); + csvFile.flush(); // Ensure data is written in case of early termination + + // Print progress summary + std::cout << " Completed: " << std::fixed << std::setprecision(2) + << result.throughputEntries << " entries/s, " + << std::fixed << std::setprecision(3) << result.logicalThroughputGiB << " GiB/s, " + << "write amp: " << std::fixed << std::setprecision(3) << result.writeAmplification << std::endl; + } + } + + csvFile.close(); + std::cout << "\nBenchmark completed! Results saved to " << csvFilename << std::endl; + + // Still print summary table to console for immediate review + std::cout << "\n============== ENCRYPTION/COMPRESSION LEVEL BENCHMARK SUMMARY ==============" << std::endl; + std::cout << std::left << std::setw(12) << "Encryption" + << std::setw(15) << "Comp. Level" + << std::setw(15) << "Exec. Time (s)" + << std::setw(20) << "Input Size (bytes)" + << std::setw(20) << "Storage Size (bytes)" + << std::setw(12) << "Write Amp." + << std::setw(20) << "Throughput (ent/s)" + << std::setw(15) << "Logical (GiB/s)" + << std::setw(15) << "Physical (GiB/s)" + << std::setw(12) << "Avg Lat(ms)" << std::endl; + std::cout << "--------------------------------------------------------------------------------------------------------------------------------" << std::endl; + + // Display results for each configuration + for (const auto &result : results) + { + std::cout << std::left << std::setw(12) << (result.useEncryption ? "True" : "False") + << std::setw(15) << result.compressionLevel + << std::fixed << std::setprecision(2) << std::setw(15) << result.executionTime + << std::setw(20) << result.totalDataSizeBytes + << std::setw(20) << result.finalStorageSize + << std::fixed << std::setprecision(3) << std::setw(12) << result.writeAmplification + << std::fixed << std::setprecision(2) << std::setw(20) << result.throughputEntries + << std::fixed << std::setprecision(3) << std::setw(15) << result.logicalThroughputGiB + << std::fixed << std::setprecision(3) << std::setw(15) << result.physicalThroughputGiB + << std::fixed << std::setprecision(3) << std::setw(12) << result.latencyStats.avgMs << std::endl; + } + + std::cout << "================================================================================================================================" << std::endl; +} + +int main() +{ + // system parameters + LoggingConfig baseConfig; + baseConfig.baseFilename = "default"; + baseConfig.maxSegmentSize = 50 * 1024 * 1024; // 50 MB + baseConfig.maxAttempts = 10; + baseConfig.baseRetryDelay = std::chrono::milliseconds(2); + baseConfig.queueCapacity = 3000000; + baseConfig.maxExplicitProducers = 96; + baseConfig.batchSize = 8192; + baseConfig.numWriterThreads = 64; + baseConfig.appendTimeout = std::chrono::minutes(2); + // Benchmark parameters + const int numSpecificFiles = 256; + const int producerBatchSize = 512; + const int numProducers = 96; + const int entriesPerProducer = 260000; + const int payloadSize = 4096; + + const std::vector<int> compressionLevels = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + const std::vector<bool> encryptionSettings = {false, true}; + + std::cout << "Generating batches with pre-determined destinations for all threads..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + + runEncryptionCompressionBenchmark(baseConfig, + encryptionSettings, + compressionLevels, + batches, + numProducers, + entriesPerProducer, + "encryption_compression_benchmark_results.csv"); + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/validation/file_rotation.cpp b/archive/2025/summer/bsc_karidas/benchmarks/validation/file_rotation.cpp new file mode 100644 index 000000000..9132180c6 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/validation/file_rotation.cpp @@ -0,0 +1,270 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <fstream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <iomanip> +#include <filesystem> + +struct BenchmarkResult +{ + double elapsedSeconds; + double throughputEntries; + double logicalThroughputGiB; + double physicalThroughputGiB; + int fileCount; + double writeAmplification; + LatencyStats latencyStats; +}; + +int countLogFiles(const std::string &basePath) +{ + int count = 0; + for (const auto &entry : std::filesystem::directory_iterator(basePath)) + { + if (entry.is_regular_file() && entry.path().extension() == ".log") + { + count++; + } + } + return count; +} + +BenchmarkResult runFileRotationBenchmark( + const LoggingConfig &baseConfig, + int maxSegmentSizeMB, + int numProducerThreads, + int entriesPerProducer, + int numSpecificFiles, + int producerBatchSize, + int payloadSize) +{ + std::string logDir = "./logs/rotation_" + std::to_string(maxSegmentSizeMB) + "mb"; + + cleanupLogDirectory(logDir); + + LoggingConfig config = baseConfig; + config.basePath = logDir; + config.maxSegmentSize = static_cast<size_t>(maxSegmentSizeMB) * 1024 * 1024; + std::cout << "Configured max segment size: " << config.maxSegmentSize << " bytes" << std::endl; + + std::cout << "Generating batches with pre-determined destinations for all threads..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducerThreads); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + + std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" + << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(config); + loggingManager.start(); + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducerThreads; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(logDir); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducerThreads * entriesPerProducer; + double throughputEntries = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = static_cast<double>(finalStorageSize) / (1024.0 * 1024.0 * 1024.0 * elapsedSeconds); + int fileCount = countLogFiles(logDir); + + // Calculate latency statistics from merged measurements + LatencyStats latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(logDir); + + return BenchmarkResult{ + elapsedSeconds, + throughputEntries, + logicalThroughputGiB, + physicalThroughputGiB, + fileCount, + writeAmplification, + latencyStats}; +} + +// Write CSV header +void writeCSVHeader(std::ofstream &csvFile) +{ + csvFile << "segment_size_mb,elapsed_seconds,throughput_entries_per_sec,logical_throughput_gib_per_sec," + << "physical_throughput_gib_per_sec,file_count,relative_performance,write_amplification," + << "avg_latency_ms,median_latency_ms,max_latency_ms,latency_count\n"; +} + +// Write a single result row to CSV +void writeCSVRow(std::ofstream &csvFile, int segmentSizeMB, const BenchmarkResult &result, double relativePerf) +{ + csvFile << segmentSizeMB << "," + << std::fixed << std::setprecision(6) << result.elapsedSeconds << "," + << std::fixed << std::setprecision(2) << result.throughputEntries << "," + << std::fixed << std::setprecision(6) << result.logicalThroughputGiB << "," + << std::fixed << std::setprecision(6) << result.physicalThroughputGiB << "," + << result.fileCount << "," + << std::fixed << std::setprecision(6) << relativePerf << "," + << std::fixed << std::setprecision(8) << result.writeAmplification << "," + << std::fixed << std::setprecision(6) << result.latencyStats.avgMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.medianMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.maxMs << "," + << result.latencyStats.count << "\n"; +} + +void runFileRotationComparison( + const LoggingConfig &baseConfig, + const std::vector<int> &segmentSizesMB, + int numProducerThreads, + int entriesPerProducer, + int numSpecificFiles, + int producerBatchSize, + int payloadSize, + const std::string &csvFilename = "file_rotation_benchmark.csv") +{ + std::vector<BenchmarkResult> results; + + // Open CSV file for writing + std::ofstream csvFile(csvFilename); + if (!csvFile.is_open()) + { + std::cerr << "Error: Could not open CSV file " << csvFilename << " for writing." << std::endl; + return; + } + + writeCSVHeader(csvFile); + + std::cout << "Running file rotation benchmark with " << segmentSizesMB.size() << " data points..." << std::endl; + std::cout << "Results will be saved to: " << csvFilename << std::endl; + + for (size_t i = 0; i < segmentSizesMB.size(); i++) + { + int segmentSize = segmentSizesMB[i]; + std::cout << "\nProgress: " << (i + 1) << "/" << segmentSizesMB.size() + << " - Running benchmark with segment size: " << segmentSize << " MB..." << std::endl; + + BenchmarkResult result = runFileRotationBenchmark( + baseConfig, + segmentSize, + numProducerThreads, + entriesPerProducer, + numSpecificFiles, + producerBatchSize, + payloadSize); + + results.push_back(result); + + // Calculate relative performance (using first result as baseline) + double relativePerf = results.size() > 1 ? result.throughputEntries / results[0].throughputEntries : 1.0; + + // Write result to CSV immediately + writeCSVRow(csvFile, segmentSize, result, relativePerf); + csvFile.flush(); // Ensure data is written in case of early termination + + // Print progress summary + std::cout << " Completed: " << std::fixed << std::setprecision(2) + << result.throughputEntries << " entries/s, " + << std::fixed << std::setprecision(3) << result.logicalThroughputGiB << " GiB/s, " + << result.fileCount << " files created" << std::endl; + + // Add a small delay between runs + std::this_thread::sleep_for(std::chrono::seconds(5)); + } + + csvFile.close(); + std::cout << "\nBenchmark completed! Results saved to " << csvFilename << std::endl; + + // Still print summary table to console for immediate review + std::cout << "\n========================== FILE ROTATION BENCHMARK SUMMARY ==========================" << std::endl; + std::cout << std::left << std::setw(20) << "Segment Size (MB)" + << std::setw(15) << "Time (sec)" + << std::setw(20) << "Throughput (ent/s)" + << std::setw(15) << "Logical (GiB/s)" + << std::setw(15) << "Physical (GiB/s)" + << std::setw(15) << "Files Created" + << std::setw(15) << "Write Amp." + << std::setw(12) << "Rel. Perf" + << std::setw(12) << "Avg Lat(ms)" << std::endl; + std::cout << "--------------------------------------------------------------------------------------------------------------------------------" << std::endl; + + // Use the first segment size as the baseline for relative performance + double baselineThroughput = results[0].throughputEntries; + + for (size_t i = 0; i < segmentSizesMB.size(); i++) + { + double relativePerf = results[i].throughputEntries / baselineThroughput; + std::cout << std::left << std::setw(20) << segmentSizesMB[i] + << std::setw(15) << std::fixed << std::setprecision(2) << results[i].elapsedSeconds + << std::setw(20) << std::fixed << std::setprecision(2) << results[i].throughputEntries + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].logicalThroughputGiB + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].physicalThroughputGiB + << std::setw(15) << results[i].fileCount + << std::setw(15) << std::fixed << std::setprecision(4) << results[i].writeAmplification + << std::setw(12) << std::fixed << std::setprecision(2) << relativePerf + << std::setw(12) << std::fixed << std::setprecision(3) << results[i].latencyStats.avgMs << std::endl; + } + std::cout << "================================================================================================================================" << std::endl; +} + +int main() +{ + // system parameters + LoggingConfig baseConfig; + baseConfig.baseFilename = "default"; + baseConfig.maxAttempts = 5; + baseConfig.baseRetryDelay = std::chrono::milliseconds(1); + baseConfig.queueCapacity = 3000000; + baseConfig.maxExplicitProducers = 32; + baseConfig.batchSize = 8192; + baseConfig.numWriterThreads = 64; + baseConfig.appendTimeout = std::chrono::minutes(2); + baseConfig.useEncryption = false; + baseConfig.compressionLevel = 0; + // benchmark parameters + const int numSpecificFiles = 0; + const int producerBatchSize = 1024; + const int numProducers = 32; + const int entriesPerProducer = 1000000; + const int payloadSize = 256; + + std::vector<int> segmentSizesMB = {8000, 6000, 4000, 3000, 2000, 1500, 1000, 800, 650, 500, 350, 250, 150, 100, 85, 70, 55, 40, 25, 10}; + + runFileRotationComparison( + baseConfig, + segmentSizesMB, + numProducers, + entriesPerProducer, + numSpecificFiles, + producerBatchSize, + payloadSize, + "file_rotation_benchmark_results.csv"); + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/validation/queue_capacity.cpp b/archive/2025/summer/bsc_karidas/benchmarks/validation/queue_capacity.cpp new file mode 100644 index 000000000..c3588e3e4 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/validation/queue_capacity.cpp @@ -0,0 +1,225 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <fstream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <iomanip> +#include <filesystem> + +struct BenchmarkResult +{ + double elapsedSeconds; + double throughputEntries; + double logicalThroughputGiB; + double physicalThroughputGiB; + double writeAmplification; + LatencyStats latencyStats; +}; + +BenchmarkResult runQueueCapacityBenchmark(const LoggingConfig &config, int numProducerThreads, + int entriesPerProducer, int numSpecificFiles, int producerBatchSize, int payloadSize) +{ + cleanupLogDirectory(config.basePath); + + std::cout << "Generating batches with pre-determined destinations for all threads..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducerThreads); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + + std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" + << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(config); + loggingManager.start(); + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducerThreads; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(config.basePath); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducerThreads * entriesPerProducer; + double throughputEntries = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = static_cast<double>(finalStorageSize) / (1024.0 * 1024.0 * 1024.0 * elapsedSeconds); + + // Calculate latency statistics from merged measurements + LatencyStats latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(config.basePath); + + return BenchmarkResult{ + elapsedSeconds, + throughputEntries, + logicalThroughputGiB, + physicalThroughputGiB, + writeAmplification, + latencyStats}; +} + +// Write CSV header +void writeCSVHeader(std::ofstream &csvFile) +{ + csvFile << "queue_capacity,elapsed_seconds,throughput_entries_per_sec,logical_throughput_gib_per_sec," + << "physical_throughput_gib_per_sec,relative_performance,write_amplification," + << "avg_latency_ms,median_latency_ms,max_latency_ms,latency_count\n"; +} + +// Write a single result row to CSV +void writeCSVRow(std::ofstream &csvFile, int queueCapacity, const BenchmarkResult &result, double relativePerf) +{ + csvFile << queueCapacity << "," + << std::fixed << std::setprecision(6) << result.elapsedSeconds << "," + << std::fixed << std::setprecision(2) << result.throughputEntries << "," + << std::fixed << std::setprecision(6) << result.logicalThroughputGiB << "," + << std::fixed << std::setprecision(6) << result.physicalThroughputGiB << "," + << std::fixed << std::setprecision(6) << relativePerf << "," + << std::fixed << std::setprecision(8) << result.writeAmplification << "," + << std::fixed << std::setprecision(6) << result.latencyStats.avgMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.medianMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.maxMs << "," + << result.latencyStats.count << "\n"; +} + +void runQueueCapacityComparison(const LoggingConfig &baseConfig, const std::vector<int> &queueSizes, + int numProducerThreads, + int entriesPerProducer, int numSpecificFiles, int producerBatchSize, int payloadSize, + const std::string &csvFilename = "queue_capacity_benchmark.csv") +{ + std::vector<BenchmarkResult> results; + + // Open CSV file for writing + std::ofstream csvFile(csvFilename); + if (!csvFile.is_open()) + { + std::cerr << "Error: Could not open CSV file " << csvFilename << " for writing." << std::endl; + return; + } + + writeCSVHeader(csvFile); + + std::cout << "Running queue capacity benchmark with " << queueSizes.size() << " data points..." << std::endl; + std::cout << "Results will be saved to: " << csvFilename << std::endl; + + for (size_t i = 0; i < queueSizes.size(); i++) + { + int queueSize = queueSizes[i]; + std::cout << "\nProgress: " << (i + 1) << "/" << queueSizes.size() + << " - Running benchmark with queue capacity: " << queueSize << "..." << std::endl; + + LoggingConfig runConfig = baseConfig; + runConfig.queueCapacity = queueSize; + runConfig.basePath = "./logs/queue_" + std::to_string(queueSize); + + BenchmarkResult result = runQueueCapacityBenchmark( + runConfig, numProducerThreads, + entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + + results.push_back(result); + + // Calculate relative performance (using first result as baseline) + double relativePerf = results.size() > 1 ? result.throughputEntries / results[0].throughputEntries : 1.0; + + // Write result to CSV immediately + writeCSVRow(csvFile, queueSize, result, relativePerf); + csvFile.flush(); // Ensure data is written in case of early termination + + // Print progress summary + std::cout << " Completed: " << std::fixed << std::setprecision(2) + << result.throughputEntries << " entries/s, " + << std::fixed << std::setprecision(3) << result.logicalThroughputGiB << " GiB/s" << std::endl; + + // Add a small delay between runs + std::this_thread::sleep_for(std::chrono::seconds(5)); + } + + csvFile.close(); + std::cout << "\nBenchmark completed! Results saved to " << csvFilename << std::endl; + + std::cout << "\n=========== QUEUE CAPACITY BENCHMARK SUMMARY ===========" << std::endl; + std::cout << std::left << std::setw(15) << "Queue Capacity" + << std::setw(15) << "Time (sec)" + << std::setw(20) << "Throughput (ent/s)" + << std::setw(15) << "Logical (GiB/s)" + << std::setw(15) << "Physical (GiB/s)" + << std::setw(15) << "Write Amp." + << std::setw(12) << "Rel. Perf" + << std::setw(12) << "Avg Lat(ms)" << std::endl; + std::cout << "--------------------------------------------------------------------------------------------------------------------------------" << std::endl; + + for (size_t i = 0; i < queueSizes.size(); i++) + { + double relativePerf = results[i].throughputEntries / results[0].throughputEntries; // Relative to smallest queue + std::cout << std::left << std::setw(15) << queueSizes[i] + << std::setw(15) << std::fixed << std::setprecision(2) << results[i].elapsedSeconds + << std::setw(20) << std::fixed << std::setprecision(2) << results[i].throughputEntries + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].logicalThroughputGiB + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].physicalThroughputGiB + << std::setw(15) << std::fixed << std::setprecision(4) << results[i].writeAmplification + << std::setw(12) << std::fixed << std::setprecision(2) << relativePerf + << std::setw(12) << std::fixed << std::setprecision(3) << results[i].latencyStats.avgMs << std::endl; + } + std::cout << "================================================================================================================================" << std::endl; +} + +int main() +{ + // system parameters + LoggingConfig baseConfig; + baseConfig.baseFilename = "default"; + baseConfig.maxSegmentSize = 50 * 1024 * 1024; // 50 MB + baseConfig.maxAttempts = 5; + baseConfig.baseRetryDelay = std::chrono::milliseconds(1); + baseConfig.batchSize = 8192; + baseConfig.maxExplicitProducers = 32; + baseConfig.numWriterThreads = 32; + baseConfig.appendTimeout = std::chrono::minutes(2); + baseConfig.useEncryption = true; + baseConfig.compressionLevel = 9; + baseConfig.maxOpenFiles = 512; + // benchmark parameters + const int numSpecificFiles = 256; + const int producerBatchSize = 2048; + const int numProducers = 32; + const int entriesPerProducer = 2000000; + const int payloadSize = 2048; + + std::vector<int> queueSizes = {8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608, 16777216, 33554432}; + runQueueCapacityComparison(baseConfig, queueSizes, + numProducers, + entriesPerProducer, + numSpecificFiles, + producerBatchSize, + payloadSize, + "queue_capacity_benchmark_results.csv"); + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/validation/scaling_concurrency.cpp b/archive/2025/summer/bsc_karidas/benchmarks/validation/scaling_concurrency.cpp new file mode 100644 index 000000000..ae14aa969 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/validation/scaling_concurrency.cpp @@ -0,0 +1,248 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <fstream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <iomanip> +#include <filesystem> + +struct BenchmarkResult +{ + double executionTime; + double throughputEntries; + double logicalThroughputGiB; + double physicalThroughputGiB; + size_t inputDataSizeBytes; + size_t outputDataSizeBytes; + double writeAmplification; + LatencyStats latencyStats; +}; + +BenchmarkResult runBenchmark(const LoggingConfig &baseConfig, int numWriterThreads, int numProducerThreads, + int entriesPerProducer, int numSpecificFiles, int producerBatchSize, int payloadSize) +{ + LoggingConfig config = baseConfig; + config.basePath = "./logs_writers"; + config.numWriterThreads = numWriterThreads; + config.maxExplicitProducers = numProducerThreads; + + cleanupLogDirectory(config.basePath); + + std::cout << "Generating batches with pre-determined destinations for all threads..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducerThreads); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" + << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(config); + loggingManager.start(); + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducerThreads; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(config.basePath); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducerThreads * entriesPerProducer; + double throughputEntries = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = static_cast<double>(finalStorageSize) / (1024.0 * 1024.0 * 1024.0 * elapsedSeconds); + + // Calculate latency statistics from merged measurements + LatencyStats latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(config.basePath); + + return BenchmarkResult{ + elapsedSeconds, + throughputEntries, + logicalThroughputGiB, + physicalThroughputGiB, + totalDataSizeBytes, + finalStorageSize, + writeAmplification, + latencyStats}; +} + +// Write CSV header +void writeCSVHeader(std::ofstream &csvFile) +{ + csvFile << "writer_threads,producer_threads,execution_time_seconds,throughput_entries_per_sec,logical_throughput_gib_per_sec," + << "physical_throughput_gib_per_sec,input_data_size_bytes,output_data_size_bytes,scaling_efficiency," + << "write_amplification,avg_latency_ms,median_latency_ms,max_latency_ms,latency_count\n"; +} + +// Write a single result row to CSV +void writeCSVRow(std::ofstream &csvFile, int writerThreads, int producerThreads, const BenchmarkResult &result, double scalingEfficiency) +{ + csvFile << writerThreads << "," + << producerThreads << "," + << std::fixed << std::setprecision(6) << result.executionTime << "," + << std::fixed << std::setprecision(2) << result.throughputEntries << "," + << std::fixed << std::setprecision(6) << result.logicalThroughputGiB << "," + << std::fixed << std::setprecision(6) << result.physicalThroughputGiB << "," + << result.inputDataSizeBytes << "," + << result.outputDataSizeBytes << "," + << std::fixed << std::setprecision(6) << scalingEfficiency << "," + << std::fixed << std::setprecision(8) << result.writeAmplification << "," + << std::fixed << std::setprecision(6) << result.latencyStats.avgMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.medianMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.maxMs << "," + << result.latencyStats.count << "\n"; +} + +void runScalabilityBenchmark(const LoggingConfig &baseConfig, const std::vector<int> &writerThreadCounts, + int baseProducerThreads, int baseEntriesPerProducer, + int numSpecificFiles, int producerBatchSize, int payloadSize, + const std::string &csvFilename = "scaling_concurrency_benchmark.csv") +{ + std::vector<BenchmarkResult> results; + std::vector<int> producerThreadCounts; + + // Open CSV file for writing + std::ofstream csvFile(csvFilename); + if (!csvFile.is_open()) + { + std::cerr << "Error: Could not open CSV file " << csvFilename << " for writing." << std::endl; + return; + } + + writeCSVHeader(csvFile); + + std::cout << "Running scaling concurrency benchmark with " << writerThreadCounts.size() << " data points..." << std::endl; + std::cout << "Results will be saved to: " << csvFilename << std::endl; + + for (size_t i = 0; i < writerThreadCounts.size(); i++) + { + int writerCount = writerThreadCounts[i]; + std::cout << "\nProgress: " << (i + 1) << "/" << writerThreadCounts.size() + << " - Running scalability benchmark with " << writerCount << " writer thread(s)..." << std::endl; + + // Option 1: Scale producer threads, keeping entries per producer constant + int scaledProducers = baseProducerThreads * writerCount; + int entriesPerProducer = baseEntriesPerProducer; + producerThreadCounts.push_back(scaledProducers); + + std::cout << "Scaled workload: " << scaledProducers << " producers, " + << entriesPerProducer << " entries per producer" << std::endl; + + BenchmarkResult result = runBenchmark(baseConfig, writerCount, scaledProducers, entriesPerProducer, + numSpecificFiles, producerBatchSize, payloadSize); + + results.push_back(result); + + // Calculate scaling efficiency (normalized by expected linear scaling) + double scalingEfficiency = results.size() > 1 ? (result.throughputEntries / results[0].throughputEntries) / writerCount : 1.0; + + // Write result to CSV immediately + writeCSVRow(csvFile, writerCount, scaledProducers, result, scalingEfficiency); + csvFile.flush(); // Ensure data is written in case of early termination + + // Print progress summary + std::cout << " Completed: " << std::fixed << std::setprecision(2) + << result.throughputEntries << " entries/s, " + << std::fixed << std::setprecision(3) << result.logicalThroughputGiB << " GiB/s, " + << std::fixed << std::setprecision(2) << scalingEfficiency << " scaling efficiency" << std::endl; + } + + csvFile.close(); + std::cout << "\nBenchmark completed! Results saved to " << csvFilename << std::endl; + + // Still print summary table to console for immediate review + std::cout << "\n=================== SCALABILITY BENCHMARK SUMMARY ===================" << std::endl; + std::cout << std::left << std::setw(20) << "Writer Threads" + << std::setw(20) << "Producer Threads" + << std::setw(15) << "Time (sec)" + << std::setw(20) << "Throughput (ent/s)" + << std::setw(15) << "Logical (GiB/s)" + << std::setw(15) << "Physical (GiB/s)" + << std::setw(20) << "Input Size (bytes)" + << std::setw(20) << "Storage Size (bytes)" + << std::setw(15) << "Write Amp." + << std::setw(12) << "Rel. Perf." + << std::setw(12) << "Avg Lat(ms)" << std::endl; + std::cout << "--------------------------------------------------------------------------------------------------------------------------------" << std::endl; + + double baselineThroughput = results[0].throughputEntries; + + for (size_t i = 0; i < writerThreadCounts.size(); i++) + { + double relativePerformance = results[i].throughputEntries / (baselineThroughput * writerThreadCounts[i]); + + std::cout << std::left << std::setw(20) << writerThreadCounts[i] + << std::setw(20) << producerThreadCounts[i] + << std::setw(15) << std::fixed << std::setprecision(2) << results[i].executionTime + << std::setw(20) << std::fixed << std::setprecision(2) << results[i].throughputEntries + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].logicalThroughputGiB + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].physicalThroughputGiB + << std::setw(20) << results[i].inputDataSizeBytes + << std::setw(20) << results[i].outputDataSizeBytes + << std::setw(15) << std::fixed << std::setprecision(4) << results[i].writeAmplification + << std::setw(12) << std::fixed << std::setprecision(2) << relativePerformance + << std::setw(12) << std::fixed << std::setprecision(3) << results[i].latencyStats.avgMs << std::endl; + } + std::cout << "================================================================================================================================" << std::endl; +} + +int main() +{ + // system parameters + LoggingConfig baseConfig; + baseConfig.baseFilename = "default"; + baseConfig.maxSegmentSize = 250 * 1024 * 1024; // 250 MB + baseConfig.maxAttempts = 5; + baseConfig.baseRetryDelay = std::chrono::milliseconds(1); + baseConfig.queueCapacity = 3000000; + baseConfig.batchSize = 8192; + baseConfig.appendTimeout = std::chrono::minutes(5); + baseConfig.useEncryption = true; + baseConfig.compressionLevel = 9; + // benchmark parameters + const int numSpecificFiles = 256; + const int producerBatchSize = 512; + const int baseProducerThreads = 1; + const int baseEntriesPerProducer = 4000000; + const int payloadSize = 2048; + + std::vector<int> writerThreadCounts = {1, 2, 4, 8, 12, 16, 20, 24, 28, 32, 40, 48, 56, 64}; + + runScalabilityBenchmark(baseConfig, + writerThreadCounts, + baseProducerThreads, + baseEntriesPerProducer, + numSpecificFiles, + producerBatchSize, + payloadSize, + "scaling_concurrency_benchmark_results.csv"); + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/workloads/compression_ratio.cpp b/archive/2025/summer/bsc_karidas/benchmarks/workloads/compression_ratio.cpp new file mode 100644 index 000000000..a8ba49433 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/workloads/compression_ratio.cpp @@ -0,0 +1,62 @@ +#include "BenchmarkUtils.hpp" +#include "Compression.hpp" +#include "LogEntry.hpp" +#include <chrono> +#include <cstdint> +#include <iomanip> +#include <iostream> +#include <random> +#include <vector> + +struct Result +{ + int level; + size_t uncompressedSize; + size_t compressedSize; + double compressionRatio; + long long durationMs; +}; + +int main() +{ + constexpr size_t batchSize = 1000; + const std::vector<int> compressionLevels = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + std::vector<Result> results; + + for (int level : compressionLevels) + { + // Generate one batch with batchSize entries, no specific destinations + std::vector<BatchWithDestination> batches = generateBatches(batchSize, 0, batchSize, 4096); + std::vector<LogEntry> entries = std::move(batches[0].first); + + // Serialize the entries + std::vector<uint8_t> serializedEntries = LogEntry::serializeBatch(std::move(entries)); + size_t uncompressedSize = serializedEntries.size(); + + // Measure compression time + auto start = std::chrono::high_resolution_clock::now(); + std::vector<uint8_t> compressed = Compression::compress(std::move(serializedEntries), level); + auto end = std::chrono::high_resolution_clock::now(); + + size_t compressedSize = compressed.size(); + double compressionRatio = static_cast<double>(uncompressedSize) / compressedSize; + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count(); + + results.push_back({level, uncompressedSize, compressedSize, compressionRatio, duration}); + } + + // Print results + std::cout << std::fixed << std::setprecision(2); + std::cout << "Level | Uncompressed (B) | Compressed (B) | Ratio | Time (ms)\n"; + std::cout << "------|------------------|----------------|-------|----------\n"; + for (const auto &r : results) + { + std::cout << std::setw(5) << r.level << " | " + << std::setw(16) << r.uncompressedSize << " | " + << std::setw(14) << r.compressedSize << " | " + << std::setw(5) << r.compressionRatio << " | " + << std::setw(9) << r.durationMs << "\n"; + } + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/workloads/diverse_filepaths.cpp b/archive/2025/summer/bsc_karidas/benchmarks/workloads/diverse_filepaths.cpp new file mode 100644 index 000000000..c5fda742d --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/workloads/diverse_filepaths.cpp @@ -0,0 +1,248 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <fstream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <iomanip> +#include <filesystem> + +struct BenchmarkResult +{ + double elapsedSeconds; + double throughputEntries; + double logicalThroughputGiB; + double physicalThroughputGiB; + double writeAmplification; + LatencyStats latencyStats; +}; + +BenchmarkResult runFilepathDiversityBenchmark(const LoggingConfig &config, int numSpecificFiles, int numProducerThreads, + int entriesPerProducer, int producerBatchSize, int payloadSize) +{ + LoggingConfig runConfig = config; + runConfig.basePath = "./logs/files_" + std::to_string(numSpecificFiles); + + cleanupLogDirectory(runConfig.basePath); + + std::cout << "Generating batches with " << numSpecificFiles << " specific files for all threads..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducerThreads); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" + << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(runConfig); + loggingManager.start(); + + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducerThreads; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(runConfig.basePath); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducerThreads * entriesPerProducer; + double throughputEntries = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = static_cast<double>(finalStorageSize) / (1024.0 * 1024.0 * 1024.0 * elapsedSeconds); + + // Calculate latency statistics from merged measurements + LatencyStats latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(runConfig.basePath); + + return BenchmarkResult{ + elapsedSeconds, + throughputEntries, + logicalThroughputGiB, + physicalThroughputGiB, + writeAmplification, + latencyStats}; +} + +// Write CSV header +void writeCSVHeader(std::ofstream &csvFile) +{ + csvFile << "num_specific_files,configuration_description,elapsed_seconds,throughput_entries_per_sec,logical_throughput_gib_per_sec," + << "physical_throughput_gib_per_sec,relative_performance,write_amplification," + << "avg_latency_ms,median_latency_ms,max_latency_ms,latency_count\n"; +} + +// Write a single result row to CSV +void writeCSVRow(std::ofstream &csvFile, int numSpecificFiles, const std::string &description, const BenchmarkResult &result, double relativePerf) +{ + csvFile << numSpecificFiles << "," + << "\"" << description << "\"," // Quote the description in case it contains commas + << std::fixed << std::setprecision(6) << result.elapsedSeconds << "," + << std::fixed << std::setprecision(2) << result.throughputEntries << "," + << std::fixed << std::setprecision(6) << result.logicalThroughputGiB << "," + << std::fixed << std::setprecision(6) << result.physicalThroughputGiB << "," + << std::fixed << std::setprecision(6) << relativePerf << "," + << std::fixed << std::setprecision(8) << result.writeAmplification << "," + << std::fixed << std::setprecision(6) << result.latencyStats.avgMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.medianMs << "," + << std::fixed << std::setprecision(6) << result.latencyStats.maxMs << "," + << result.latencyStats.count << "\n"; +} + +void runFilepathDiversityComparison(const LoggingConfig &config, const std::vector<int> &numFilesVariants, + int numProducerThreads, int entriesPerProducer, int producerBatchSize, int payloadSize, + const std::string &csvFilename = "diverse_filepaths_benchmark.csv") +{ + std::vector<BenchmarkResult> results; + std::vector<std::string> descriptions; + + // Generate descriptions for each file count variant + for (int fileCount : numFilesVariants) + { + if (fileCount == 0) + { + descriptions.push_back("Default file only"); + } + else if (fileCount == 1) + { + descriptions.push_back("1 specific file"); + } + else + { + descriptions.push_back(std::to_string(fileCount) + " specific files"); + } + } + + // Open CSV file for writing + std::ofstream csvFile(csvFilename); + if (!csvFile.is_open()) + { + std::cerr << "Error: Could not open CSV file " << csvFilename << " for writing." << std::endl; + return; + } + + writeCSVHeader(csvFile); + + std::cout << "Running filepath diversity benchmark with " << numFilesVariants.size() << " data points..." << std::endl; + std::cout << "Results will be saved to: " << csvFilename << std::endl; + + for (size_t i = 0; i < numFilesVariants.size(); i++) + { + int fileCount = numFilesVariants[i]; + std::cout << "\nProgress: " << (i + 1) << "/" << numFilesVariants.size() + << " - Running benchmark with " << descriptions[i] << "..." << std::endl; + + BenchmarkResult result = runFilepathDiversityBenchmark( + config, + fileCount, + numProducerThreads, entriesPerProducer, producerBatchSize, payloadSize); + + results.push_back(result); + + // Calculate relative performance (using first result as baseline) + double relativePerf = results.size() > 1 ? result.throughputEntries / results[0].throughputEntries : 1.0; + + // Write result to CSV immediately + writeCSVRow(csvFile, fileCount, descriptions[i], result, relativePerf); + csvFile.flush(); // Ensure data is written in case of early termination + + // Print progress summary + std::cout << " Completed: " << std::fixed << std::setprecision(2) + << result.throughputEntries << " entries/s, " + << std::fixed << std::setprecision(3) << result.logicalThroughputGiB << " GiB/s, " + << std::fixed << std::setprecision(2) << relativePerf << "x relative performance" << std::endl; + + // Add a small delay between runs + std::this_thread::sleep_for(std::chrono::seconds(5)); + } + + csvFile.close(); + std::cout << "\nBenchmark completed! Results saved to " << csvFilename << std::endl; + + // Still print summary table to console for immediate review + std::cout << "\n=========== FILEPATH DIVERSITY BENCHMARK SUMMARY ===========" << std::endl; + std::cout << std::left << std::setw(25) << "Configuration" + << std::setw(15) << "Time (sec)" + << std::setw(20) << "Throughput (ent/s)" + << std::setw(15) << "Logical (GiB/s)" + << std::setw(15) << "Physical (GiB/s)" + << std::setw(15) << "Write Amp." + << std::setw(12) << "Rel. Perf" + << std::setw(12) << "Avg Lat(ms)" << std::endl; + std::cout << "--------------------------------------------------------------------------------------------------------------------------------" << std::endl; + + // Calculate base throughput for relative performance + double baseThroughputEntries = results[0].throughputEntries; + + for (size_t i = 0; i < numFilesVariants.size(); i++) + { + double relativePerf = results[i].throughputEntries / baseThroughputEntries; + std::cout << std::left << std::setw(25) << descriptions[i] + << std::setw(15) << std::fixed << std::setprecision(2) << results[i].elapsedSeconds + << std::setw(20) << std::fixed << std::setprecision(2) << results[i].throughputEntries + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].logicalThroughputGiB + << std::setw(15) << std::fixed << std::setprecision(3) << results[i].physicalThroughputGiB + << std::setw(15) << std::fixed << std::setprecision(4) << results[i].writeAmplification + << std::setw(12) << std::fixed << std::setprecision(2) << relativePerf + << std::setw(12) << std::fixed << std::setprecision(3) << results[i].latencyStats.avgMs << std::endl; + } + std::cout << "======================================================================================================================================" << std::endl; +} + +int main() +{ + // system parameters + LoggingConfig config; + config.baseFilename = "default"; + config.maxSegmentSize = static_cast<size_t>(1000) * 1024 * 1024; // 1 GB + config.maxAttempts = 10; + config.baseRetryDelay = std::chrono::milliseconds(2); + config.queueCapacity = 3000000; + config.maxExplicitProducers = 32; + config.batchSize = 8192; + config.numWriterThreads = 64; + config.appendTimeout = std::chrono::minutes(2); + config.useEncryption = true; + config.compressionLevel = 9; + config.maxOpenFiles = 256; + // benchmark parameters + const int producerBatchSize = 8192; + const int numProducers = 32; + const int entriesPerProducer = 2000000; + const int payloadSize = 2048; + + std::vector<int> numFilesVariants = {0, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192}; + + runFilepathDiversityComparison(config, + numFilesVariants, + numProducers, + entriesPerProducer, + producerBatchSize, + payloadSize, + "diverse_filepaths_benchmark_results.csv"); + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/workloads/large_batches.cpp b/archive/2025/summer/bsc_karidas/benchmarks/workloads/large_batches.cpp new file mode 100644 index 000000000..8dd16028c --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/workloads/large_batches.cpp @@ -0,0 +1,102 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <filesystem> +#include <numeric> + +int main() +{ + // system parameters + LoggingConfig config; + config.basePath = "./logs"; + config.baseFilename = "default"; + config.maxSegmentSize = 50 * 1024 * 1024; // 50 MB + config.maxAttempts = 5; + config.baseRetryDelay = std::chrono::milliseconds(1); + config.queueCapacity = 3000000; + config.maxExplicitProducers = 4; + config.batchSize = 8400; + config.numWriterThreads = 32; + config.appendTimeout = std::chrono::minutes(2); + config.useEncryption = true; + config.compressionLevel = 9; + // benchmark parameters + const int numProducerThreads = 4; + const int entriesPerProducer = 30000000; + const int numSpecificFiles = 25; + const int producerBatchSize = 1000; + const int payloadSize = 2048; + + cleanupLogDirectory(config.basePath); + + std::cout << "Generating batches with pre-determined destinations for all threads..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducerThreads); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(config); + loggingManager.start(); + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducerThreads; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(config.basePath); + double finalStorageSizeGiB = static_cast<double>(finalStorageSize) / (1024 * 1024 * 1024); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducerThreads * entriesPerProducer; + double entriesThroughput = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = finalStorageSizeGiB / elapsedSeconds; + double averageEntrySize = static_cast<double>(totalDataSizeBytes) / totalEntries; + + // Calculate latency statistics from merged measurements + auto latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(config.basePath); + + std::cout << "============== Benchmark Results ==============" << std::endl; + std::cout << "Execution time: " << elapsedSeconds << " seconds" << std::endl; + std::cout << "Total entries appended: " << totalEntries << std::endl; + std::cout << "Average entry size: " << averageEntrySize << " bytes" << std::endl; + std::cout << "Total data written: " << totalDataSizeGiB << " GiB" << std::endl; + std::cout << "Final storage size: " << finalStorageSizeGiB << " GiB" << std::endl; + std::cout << "Write amplification: " << writeAmplification << " (ratio)" << std::endl; + std::cout << "Throughput (entries): " << entriesThroughput << " entries/second" << std::endl; + std::cout << "Throughput (logical): " << logicalThroughputGiB << " GiB/second" << std::endl; + std::cout << "Throughput (physical): " << physicalThroughputGiB << " GiB/second" << std::endl; + std::cout << "===============================================" << std::endl; + + printLatencyStats(latencyStats); + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/workloads/main.cpp b/archive/2025/summer/bsc_karidas/benchmarks/workloads/main.cpp new file mode 100644 index 000000000..401db22a0 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/workloads/main.cpp @@ -0,0 +1,103 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <filesystem> +#include <numeric> + +int main() +{ + // system parameters + LoggingConfig config; + config.basePath = "./logs"; + config.baseFilename = "default"; + config.maxSegmentSize = 50 * 1024 * 1024; // 50 MB + config.maxAttempts = 5; + config.baseRetryDelay = std::chrono::milliseconds(1); + config.queueCapacity = 3000000; + config.maxExplicitProducers = 96; + config.batchSize = 8192; + config.numWriterThreads = 96; + config.appendTimeout = std::chrono::minutes(2); + config.useEncryption = true; + config.compressionLevel = 9; + config.maxOpenFiles = 512; + // benchmark parameters + const int numSpecificFiles = 1024; + const int producerBatchSize = 4096; + const int numProducers = 96; + const int entriesPerProducer = 800000; + const int payloadSize = 4096; + + cleanupLogDirectory(config.basePath); + + std::cout << "Generating batches with pre-determined destinations..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducers); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(config); + loggingManager.start(); + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducers; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(config.basePath); + double finalStorageSizeGiB = static_cast<double>(finalStorageSize) / (1024 * 1024 * 1024); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducers * entriesPerProducer; + double entriesThroughput = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = finalStorageSizeGiB / elapsedSeconds; + double averageEntrySize = static_cast<double>(totalDataSizeBytes) / totalEntries; + + // Calculate latency statistics from merged measurements + auto latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(config.basePath); + + std::cout << "============== Benchmark Results ==============" << std::endl; + std::cout << "Execution time: " << elapsedSeconds << " seconds" << std::endl; + std::cout << "Total entries appended: " << totalEntries << std::endl; + std::cout << "Average entry size: " << averageEntrySize << " bytes" << std::endl; + std::cout << "Total data written: " << totalDataSizeGiB << " GiB" << std::endl; + std::cout << "Final storage size: " << finalStorageSizeGiB << " GiB" << std::endl; + std::cout << "Write amplification: " << writeAmplification << " (ratio)" << std::endl; + std::cout << "Throughput (entries): " << entriesThroughput << " entries/second" << std::endl; + std::cout << "Throughput (logical): " << logicalThroughputGiB << " GiB/second" << std::endl; + std::cout << "Throughput (physical): " << physicalThroughputGiB << " GiB/second" << std::endl; + std::cout << "===============================================" << std::endl; + + printLatencyStats(latencyStats); + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/workloads/multi_producer_small_batches.cpp b/archive/2025/summer/bsc_karidas/benchmarks/workloads/multi_producer_small_batches.cpp new file mode 100644 index 000000000..705ef1523 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/workloads/multi_producer_small_batches.cpp @@ -0,0 +1,102 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <filesystem> +#include <numeric> + +int main() +{ + // system parameters + LoggingConfig config; + config.basePath = "./logs"; + config.baseFilename = "default"; + config.maxSegmentSize = 50 * 1024 * 1024; // 50 MB + config.maxAttempts = 5; + config.baseRetryDelay = std::chrono::milliseconds(1); + config.queueCapacity = 3000000; + config.maxExplicitProducers = 64; + config.batchSize = 8400; + config.numWriterThreads = 32; + config.appendTimeout = std::chrono::minutes(2); + config.useEncryption = true; + config.compressionLevel = 9; + // benchmark parameters + const int numProducerThreads = 64; + const int entriesPerProducer = 100000; + const int numSpecificFiles = 25; + const int producerBatchSize = 50; + const int payloadSize = 2048; + + cleanupLogDirectory(config.basePath); + + std::cout << "Generating batches with pre-determined destinations for all threads..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducerThreads); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(config); + loggingManager.start(); + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducerThreads; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(config.basePath); + double finalStorageSizeGiB = static_cast<double>(finalStorageSize) / (1024 * 1024 * 1024); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducerThreads * entriesPerProducer; + double entriesThroughput = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = finalStorageSizeGiB / elapsedSeconds; + double averageEntrySize = static_cast<double>(totalDataSizeBytes) / totalEntries; + + // Calculate latency statistics from merged measurements + auto latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(config.basePath); + + std::cout << "============== Benchmark Results ==============" << std::endl; + std::cout << "Execution time: " << elapsedSeconds << " seconds" << std::endl; + std::cout << "Total entries appended: " << totalEntries << std::endl; + std::cout << "Average entry size: " << averageEntrySize << " bytes" << std::endl; + std::cout << "Total data written: " << totalDataSizeGiB << " GiB" << std::endl; + std::cout << "Final storage size: " << finalStorageSizeGiB << " GiB" << std::endl; + std::cout << "Write amplification: " << writeAmplification << " (ratio)" << std::endl; + std::cout << "Throughput (entries): " << entriesThroughput << " entries/second" << std::endl; + std::cout << "Throughput (logical): " << logicalThroughputGiB << " GiB/second" << std::endl; + std::cout << "Throughput (physical): " << physicalThroughputGiB << " GiB/second" << std::endl; + std::cout << "===============================================" << std::endl; + + printLatencyStats(latencyStats); + + return 0; +} \ No newline at end of file diff --git a/archive/2025/summer/bsc_karidas/benchmarks/workloads/single_entry_appends.cpp b/archive/2025/summer/bsc_karidas/benchmarks/workloads/single_entry_appends.cpp new file mode 100644 index 000000000..0029e1722 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/benchmarks/workloads/single_entry_appends.cpp @@ -0,0 +1,102 @@ +#include "BenchmarkUtils.hpp" +#include "LoggingManager.hpp" +#include <iostream> +#include <thread> +#include <chrono> +#include <vector> +#include <future> +#include <optional> +#include <filesystem> +#include <numeric> + +int main() +{ + // system parameters + LoggingConfig config; + config.basePath = "./logs"; + config.baseFilename = "default"; + config.maxSegmentSize = 50 * 1024 * 1024; // 50 MB + config.maxAttempts = 5; + config.baseRetryDelay = std::chrono::milliseconds(1); + config.queueCapacity = 3000000; + config.maxExplicitProducers = 64; + config.batchSize = 8400; + config.numWriterThreads = 32; + config.appendTimeout = std::chrono::minutes(2); + config.useEncryption = true; + config.compressionLevel = 9; + // benchmark parameters + const int numProducerThreads = 64; + const int entriesPerProducer = 25000; + const int numSpecificFiles = 25; + const int producerBatchSize = 1; + const int payloadSize = 2048; + + cleanupLogDirectory(config.basePath); + + std::cout << "Generating batches with pre-determined destinations for all threads..."; + std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize); + std::cout << " Done." << std::endl; + size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducerThreads); + double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024); + std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" << totalDataSizeGiB << " GiB)" << std::endl; + + LoggingManager loggingManager(config); + loggingManager.start(); + auto startTime = std::chrono::high_resolution_clock::now(); + + // Each future now returns a LatencyCollector with thread-local measurements + std::vector<std::future<LatencyCollector>> futures; + for (int i = 0; i < numProducerThreads; i++) + { + futures.push_back(std::async( + std::launch::async, + appendLogEntries, + std::ref(loggingManager), + std::ref(batches))); + } + + // Collect latency measurements from all threads + LatencyCollector masterCollector; + for (auto &future : futures) + { + LatencyCollector threadCollector = future.get(); + masterCollector.merge(threadCollector); + } + + loggingManager.stop(); + auto endTime = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> elapsed = endTime - startTime; + + size_t finalStorageSize = calculateDirectorySize(config.basePath); + double finalStorageSizeGiB = static_cast<double>(finalStorageSize) / (1024 * 1024 * 1024); + double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes; + + double elapsedSeconds = elapsed.count(); + const size_t totalEntries = numProducerThreads * entriesPerProducer; + double entriesThroughput = totalEntries / elapsedSeconds; + double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds; + double physicalThroughputGiB = finalStorageSizeGiB / elapsedSeconds; + double averageEntrySize = static_cast<double>(totalDataSizeBytes) / totalEntries; + + // Calculate latency statistics from merged measurements + auto latencyStats = calculateLatencyStats(masterCollector); + + cleanupLogDirectory(config.basePath); + + std::cout << "============== Benchmark Results ==============" << std::endl; + std::cout << "Execution time: " << elapsedSeconds << " seconds" << std::endl; + std::cout << "Total entries appended: " << totalEntries << std::endl; + std::cout << "Average entry size: " << averageEntrySize << " bytes" << std::endl; + std::cout << "Total data written: " << totalDataSizeGiB << " GiB" << std::endl; + std::cout << "Final storage size: " << finalStorageSizeGiB << " GiB" << std::endl; + std::cout << "Write amplification: " << writeAmplification << " (ratio)" << std::endl; + std::cout << "Throughput (entries): " << entriesThroughput << " entries/second" << std::endl; + std::cout << "Throughput (logical): " << logicalThroughputGiB << " GiB/second" << std::endl; + std::cout << "Throughput (physical): " << physicalThroughputGiB << " GiB/second" << std::endl; + std::cout << "===============================================" << std::endl; + + printLatencyStats(latencyStats); + + return 0; +} \ No newline at end of file |