about summary refs log tree commit diff stats
path: root/archive/2025/summer/bsc_karidas/include/SegmentedStorage.hpp
blob: 3984aea79c29dbd60f76cdab2009ca6a30c660eb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#ifndef SEGMENTED_STORAGE_HPP
#define SEGMENTED_STORAGE_HPP

#include <string>
#include <vector>
#include <atomic>
#include <mutex>
#include <shared_mutex>
#include <filesystem>
#include <cstdint>
#include <unordered_map>
#include <fcntl.h>  // for open flags
#include <unistd.h> // for close, pwrite, fsync
#include <chrono>
#include <thread>
#include <stdexcept>
#include <list> // For LRU cache

class SegmentedStorage
{
public:
    SegmentedStorage(const std::string &basePath,
                     const std::string &baseFilename,
                     size_t maxSegmentSize = 100 * 1024 * 1024, // 100 MB default
                     size_t maxAttempts = 5,
                     std::chrono::milliseconds baseRetryDelay = std::chrono::milliseconds(1),
                     size_t maxOpenFiles = 512);

    ~SegmentedStorage();

    size_t write(std::vector<uint8_t> &&data);
    size_t writeToFile(const std::string &filename, std::vector<uint8_t> &&data);
    void flush();

private:
    std::string m_basePath;
    std::string m_baseFilename;
    size_t m_maxSegmentSize;
    size_t m_maxAttempts;
    std::chrono::milliseconds m_baseRetryDelay;
    size_t m_maxOpenFiles; // Max number of cache entries

    struct CacheEntry
    {
        int fd{-1};
        std::atomic<size_t> segmentIndex{0};
        std::atomic<size_t> currentOffset{0};
        std::string currentSegmentPath;
        mutable std::shared_mutex fileMutex; // shared for writes, exclusive for rotate/flush
    };

    // Unified LRU Cache for both file descriptors and segment information
    class LRUCache
    {
    public:
        LRUCache(size_t capacity, SegmentedStorage *parent) : m_capacity(capacity), m_parent(parent) {}

        std::shared_ptr<CacheEntry> get(const std::string &filename);
        void flush(const std::string &filename);
        void flushAll();
        void closeAll();

    private:
        size_t m_capacity;
        SegmentedStorage *m_parent;

        // LRU list of filenames
        std::list<std::string> m_lruList;
        // Map from filename to cache entry and iterator in LRU list
        struct CacheData
        {
            std::shared_ptr<CacheEntry> entry;
            std::list<std::string>::iterator lruIt;
        };
        std::unordered_map<std::string, CacheData> m_cache;
        mutable std::mutex m_mutex; // Protects m_lruList and m_cache

        void evictLRU();
        std::shared_ptr<CacheEntry> reconstructState(const std::string &filename);
    };

    LRUCache m_cache;

    std::string rotateSegment(const std::string &filename, std::shared_ptr<CacheEntry> entry);
    std::string generateSegmentPath(const std::string &filename, size_t segmentIndex) const;
    size_t getFileSize(const std::string &path) const;
    size_t findLatestSegmentIndex(const std::string &filename) const;

    // Retry helpers use member-configured parameters
    template <typename Func>
    auto retryWithBackoff(Func &&f)
    {
        for (size_t attempt = 1;; ++attempt)
        {
            try
            {
                return f();
            }
            catch (const std::runtime_error &)
            {
                if (attempt >= m_maxAttempts)
                    throw;
                auto delay = m_baseRetryDelay * (1 << (attempt - 1));
                std::this_thread::sleep_for(delay);
            }
        }
    }

    int openWithRetry(const char *path, int flags, mode_t mode)
    {
        return retryWithBackoff([&]()
                                {
            int fd = ::open(path, flags, mode);
            if (fd < 0) throw std::runtime_error("open failed");
            return fd; });
    }

    size_t pwriteFull(int fd, const uint8_t *buf, size_t count, off_t offset)
    {
        size_t total = 0;
        while (total < count)
        {
            ssize_t written = ::pwrite(fd, buf + total, count - total, offset + total);
            if (written < 0)
            {
                if (errno == EINTR)
                    continue;
                throw std::runtime_error("pwrite failed");
            }
            total += written;
        }
        return total;
    }

    void fsyncRetry(int fd)
    {
        retryWithBackoff([&]()
                         {
            if (::fsync(fd) < 0) throw std::runtime_error("fsync failed");
            return 0; });
    }
};

#endif