blob: 401db22a07a5d9989aa1e92a68ace338524e7f91 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
#include "BenchmarkUtils.hpp"
#include "LoggingManager.hpp"
#include <iostream>
#include <thread>
#include <chrono>
#include <vector>
#include <future>
#include <optional>
#include <filesystem>
#include <numeric>
int main()
{
// system parameters
LoggingConfig config;
config.basePath = "./logs";
config.baseFilename = "default";
config.maxSegmentSize = 50 * 1024 * 1024; // 50 MB
config.maxAttempts = 5;
config.baseRetryDelay = std::chrono::milliseconds(1);
config.queueCapacity = 3000000;
config.maxExplicitProducers = 96;
config.batchSize = 8192;
config.numWriterThreads = 96;
config.appendTimeout = std::chrono::minutes(2);
config.useEncryption = true;
config.compressionLevel = 9;
config.maxOpenFiles = 512;
// benchmark parameters
const int numSpecificFiles = 1024;
const int producerBatchSize = 4096;
const int numProducers = 96;
const int entriesPerProducer = 800000;
const int payloadSize = 4096;
cleanupLogDirectory(config.basePath);
std::cout << "Generating batches with pre-determined destinations...";
std::vector<BatchWithDestination> batches = generateBatches(entriesPerProducer, numSpecificFiles, producerBatchSize, payloadSize);
std::cout << " Done." << std::endl;
size_t totalDataSizeBytes = calculateTotalDataSize(batches, numProducers);
double totalDataSizeGiB = static_cast<double>(totalDataSizeBytes) / (1024 * 1024 * 1024);
std::cout << "Total data to be written: " << totalDataSizeBytes << " bytes (" << totalDataSizeGiB << " GiB)" << std::endl;
LoggingManager loggingManager(config);
loggingManager.start();
auto startTime = std::chrono::high_resolution_clock::now();
// Each future now returns a LatencyCollector with thread-local measurements
std::vector<std::future<LatencyCollector>> futures;
for (int i = 0; i < numProducers; i++)
{
futures.push_back(std::async(
std::launch::async,
appendLogEntries,
std::ref(loggingManager),
std::ref(batches)));
}
// Collect latency measurements from all threads
LatencyCollector masterCollector;
for (auto &future : futures)
{
LatencyCollector threadCollector = future.get();
masterCollector.merge(threadCollector);
}
loggingManager.stop();
auto endTime = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = endTime - startTime;
size_t finalStorageSize = calculateDirectorySize(config.basePath);
double finalStorageSizeGiB = static_cast<double>(finalStorageSize) / (1024 * 1024 * 1024);
double writeAmplification = static_cast<double>(finalStorageSize) / totalDataSizeBytes;
double elapsedSeconds = elapsed.count();
const size_t totalEntries = numProducers * entriesPerProducer;
double entriesThroughput = totalEntries / elapsedSeconds;
double logicalThroughputGiB = totalDataSizeGiB / elapsedSeconds;
double physicalThroughputGiB = finalStorageSizeGiB / elapsedSeconds;
double averageEntrySize = static_cast<double>(totalDataSizeBytes) / totalEntries;
// Calculate latency statistics from merged measurements
auto latencyStats = calculateLatencyStats(masterCollector);
cleanupLogDirectory(config.basePath);
std::cout << "============== Benchmark Results ==============" << std::endl;
std::cout << "Execution time: " << elapsedSeconds << " seconds" << std::endl;
std::cout << "Total entries appended: " << totalEntries << std::endl;
std::cout << "Average entry size: " << averageEntrySize << " bytes" << std::endl;
std::cout << "Total data written: " << totalDataSizeGiB << " GiB" << std::endl;
std::cout << "Final storage size: " << finalStorageSizeGiB << " GiB" << std::endl;
std::cout << "Write amplification: " << writeAmplification << " (ratio)" << std::endl;
std::cout << "Throughput (entries): " << entriesThroughput << " entries/second" << std::endl;
std::cout << "Throughput (logical): " << logicalThroughputGiB << " GiB/second" << std::endl;
std::cout << "Throughput (physical): " << physicalThroughputGiB << " GiB/second" << std::endl;
std::cout << "===============================================" << std::endl;
printLatencyStats(latencyStats);
return 0;
}
|