diff options
Diffstat (limited to 'archive/2025/summer/bsc_karidas/src/SegmentedStorage.cpp')
| -rw-r--r-- | archive/2025/summer/bsc_karidas/src/SegmentedStorage.cpp | 303 |
1 files changed, 303 insertions, 0 deletions
diff --git a/archive/2025/summer/bsc_karidas/src/SegmentedStorage.cpp b/archive/2025/summer/bsc_karidas/src/SegmentedStorage.cpp new file mode 100644 index 000000000..6b2a853e9 --- /dev/null +++ b/archive/2025/summer/bsc_karidas/src/SegmentedStorage.cpp @@ -0,0 +1,303 @@ +#include "SegmentedStorage.hpp" +#include <iomanip> +#include <sstream> +#include <algorithm> +#include <sys/stat.h> + +SegmentedStorage::SegmentedStorage(const std::string &basePath, + const std::string &baseFilename, + size_t maxSegmentSize, + size_t maxAttempts, + std::chrono::milliseconds baseRetryDelay, + size_t maxOpenFiles) + : m_basePath(basePath), + m_baseFilename(baseFilename), + m_maxSegmentSize(maxSegmentSize), + m_maxAttempts(maxAttempts), + m_baseRetryDelay(baseRetryDelay), + m_maxOpenFiles(maxOpenFiles), + m_cache(maxOpenFiles, this) +{ + std::filesystem::create_directories(m_basePath); + // Pre-warm the cache with the base filename + m_cache.get(m_baseFilename); +} + +SegmentedStorage::~SegmentedStorage() +{ + m_cache.closeAll(); +} + +// LRUCache methods +std::shared_ptr<SegmentedStorage::CacheEntry> SegmentedStorage::LRUCache::get(const std::string &filename) +{ + std::lock_guard<std::mutex> lock(m_mutex); + + auto it = m_cache.find(filename); + if (it != m_cache.end()) + { + // Found in cache, move to front (most recently used) + m_lruList.erase(it->second.lruIt); + m_lruList.push_front(filename); + it->second.lruIt = m_lruList.begin(); + return it->second.entry; + } + + // Not in cache, need to reconstruct state + auto entry = reconstructState(filename); + + // Check if we need to evict + if (m_cache.size() >= m_capacity) + { + evictLRU(); + } + + // Add to cache + m_lruList.push_front(filename); + m_cache[filename] = {entry, m_lruList.begin()}; + + return entry; +} + +void SegmentedStorage::LRUCache::evictLRU() +{ + // Called with m_mutex already locked + if (m_lruList.empty()) + return; + + const std::string &lru_filename = m_lruList.back(); + auto it = m_cache.find(lru_filename); + if (it != m_cache.end()) + { + // Close the file descriptor if it's open + if (it->second.entry->fd >= 0) + { + m_parent->fsyncRetry(it->second.entry->fd); + ::close(it->second.entry->fd); + } + m_cache.erase(it); + } + m_lruList.pop_back(); +} + +std::shared_ptr<SegmentedStorage::CacheEntry> SegmentedStorage::LRUCache::reconstructState(const std::string &filename) +{ + // Called with m_mutex already locked + auto entry = std::make_shared<CacheEntry>(); + + // Find the latest segment index for this filename + size_t latestIndex = m_parent->findLatestSegmentIndex(filename); + entry->segmentIndex.store(latestIndex, std::memory_order_release); + + // Generate the path for the current segment + std::string segmentPath = m_parent->generateSegmentPath(filename, latestIndex); + entry->currentSegmentPath = segmentPath; + + // Open the file and get its current size + entry->fd = m_parent->openWithRetry(segmentPath.c_str(), O_CREAT | O_RDWR | O_APPEND, 0644); + + // Get the current file size to set as the offset + size_t fileSize = m_parent->getFileSize(segmentPath); + entry->currentOffset.store(fileSize, std::memory_order_release); + + return entry; +} + +void SegmentedStorage::LRUCache::flush(const std::string &filename) +{ + std::lock_guard<std::mutex> lock(m_mutex); + auto it = m_cache.find(filename); + if (it != m_cache.end() && it->second.entry->fd >= 0) + { + m_parent->fsyncRetry(it->second.entry->fd); + } +} + +void SegmentedStorage::LRUCache::flushAll() +{ + std::lock_guard<std::mutex> lock(m_mutex); + for (const auto &pair : m_cache) + { + if (pair.second.entry->fd >= 0) + { + m_parent->fsyncRetry(pair.second.entry->fd); + } + } +} + +void SegmentedStorage::LRUCache::closeAll() +{ + std::lock_guard<std::mutex> lock(m_mutex); + for (const auto &pair : m_cache) + { + if (pair.second.entry->fd >= 0) + { + m_parent->fsyncRetry(pair.second.entry->fd); + ::close(pair.second.entry->fd); + } + } + m_cache.clear(); + m_lruList.clear(); +} + +size_t SegmentedStorage::findLatestSegmentIndex(const std::string &filename) const +{ + size_t maxIndex = 0; + std::string pattern = filename + "_"; + + try + { + for (const auto &entry : std::filesystem::directory_iterator(m_basePath)) + { + if (entry.is_regular_file()) + { + std::string name = entry.path().filename().string(); + if (name.find(pattern) == 0) + { + // Extract the index from filename format: filename_YYYYMMDD_HHMMSS_NNNNNN.log + size_t lastUnderscore = name.find_last_of('_'); + if (lastUnderscore != std::string::npos) + { + size_t dotPos = name.find('.', lastUnderscore); + if (dotPos != std::string::npos) + { + std::string indexStr = name.substr(lastUnderscore + 1, dotPos - lastUnderscore - 1); + try + { + size_t index = std::stoull(indexStr); + maxIndex = std::max(maxIndex, index); + } + catch (...) + { + // Ignore files that don't match the expected format + } + } + } + } + } + } + } + catch (const std::filesystem::filesystem_error &) + { + // If directory doesn't exist or other filesystem error, return 0 + } + + return maxIndex; +} + +size_t SegmentedStorage::getFileSize(const std::string &path) const +{ + struct stat st; + if (::stat(path.c_str(), &st) == 0) + { + return static_cast<size_t>(st.st_size); + } + return 0; +} + +size_t SegmentedStorage::write(std::vector<uint8_t> &&data) +{ + return writeToFile(m_baseFilename, std::move(data)); +} + +size_t SegmentedStorage::writeToFile(const std::string &filename, std::vector<uint8_t> &&data) +{ + size_t size = data.size(); + if (size == 0) + return 0; + + std::shared_ptr<CacheEntry> entry = m_cache.get(filename); + size_t writeOffset; + + // This loop handles race conditions around rotation + while (true) + { + // First check if we need to rotate WITHOUT reserving space + size_t currentOffset = entry->currentOffset.load(std::memory_order_acquire); + if (currentOffset + size > m_maxSegmentSize) + { + std::unique_lock<std::shared_mutex> rotLock(entry->fileMutex); + // Double-check if rotation is still needed + if (entry->currentOffset.load(std::memory_order_acquire) + size > m_maxSegmentSize) + { + rotateSegment(filename, entry); + // After rotation, entry has been updated with new fd and path + continue; + } + } + + // Now safely reserve space + writeOffset = entry->currentOffset.fetch_add(size, std::memory_order_acq_rel); + + // Double-check we didn't cross the boundary after reservation + if (writeOffset + size > m_maxSegmentSize) + { + // Another thread increased the offset past our threshold, try again + continue; + } + + // We have a valid offset and can proceed with the write + break; + } + + // Write under shared lock to prevent racing with rotate/close + { + std::shared_lock<std::shared_mutex> writeLock(entry->fileMutex); + + // Verify the fd is still valid + if (entry->fd < 0) + { + // This shouldn't happen, but if it does, retry + return writeToFile(filename, std::move(data)); + } + + pwriteFull(entry->fd, data.data(), size, static_cast<off_t>(writeOffset)); + } + + return size; +} + +void SegmentedStorage::flush() +{ + m_cache.flushAll(); +} + +std::string SegmentedStorage::rotateSegment(const std::string &filename, std::shared_ptr<CacheEntry> entry) +{ + // exclusive lock assumed by the caller (writeToFile) + + // Close the old file descriptor + if (entry->fd >= 0) + { + fsyncRetry(entry->fd); + ::close(entry->fd); + entry->fd = -1; + } + + size_t newIndex = entry->segmentIndex.fetch_add(1, std::memory_order_acq_rel) + 1; + entry->currentOffset.store(0, std::memory_order_release); + std::string newPath = generateSegmentPath(filename, newIndex); + + // Update the entry's path and open the new file + entry->currentSegmentPath = newPath; + entry->fd = openWithRetry(newPath.c_str(), O_CREAT | O_RDWR | O_APPEND, 0644); + + return newPath; +} + +std::string SegmentedStorage::generateSegmentPath(const std::string &filename, size_t segmentIndex) const +{ + auto now = std::chrono::system_clock::now(); + auto now_time_t = std::chrono::system_clock::to_time_t(now); + std::tm time_info; + + // Linux-specific thread-safe version of localtime + localtime_r(&now_time_t, &time_info); + + std::stringstream ss; + ss << m_basePath << "/"; + ss << filename << "_"; + ss << std::put_time(&time_info, "%Y%m%d_%H%M%S") << "_"; + ss << std::setw(6) << std::setfill('0') << segmentIndex << ".log"; + return ss.str(); +} \ No newline at end of file |