1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
|
#include "LogEntry.hpp"
#include <cstring>
#include <stdexcept>
#include <iostream>
LogEntry::LogEntry()
: m_actionType(ActionType::CREATE),
m_dataLocation(""),
m_dataControllerId(""),
m_dataProcessorId(""),
m_dataSubjectId(""),
m_timestamp(std::chrono::system_clock::now()),
m_payload() {}
LogEntry::LogEntry(ActionType actionType,
std::string dataLocation,
std::string dataControllerId,
std::string dataProcessorId,
std::string dataSubjectId,
std::vector<uint8_t> payload)
: m_actionType(actionType),
m_dataLocation(std::move(dataLocation)),
m_dataControllerId(std::move(dataControllerId)),
m_dataProcessorId(std::move(dataProcessorId)),
m_dataSubjectId(std::move(dataSubjectId)),
m_timestamp(std::chrono::system_clock::now()),
m_payload(std::move(payload))
{
}
// Move version that consumes the LogEntry
std::vector<uint8_t> LogEntry::serialize() &&
{
// Calculate required size upfront
size_t totalSize =
sizeof(int) + // ActionType
sizeof(uint32_t) + m_dataLocation.size() + // Size + data location
sizeof(uint32_t) + m_dataControllerId.size() + // Size + data controller ID
sizeof(uint32_t) + m_dataProcessorId.size() + // Size + data processor ID
sizeof(uint32_t) + m_dataSubjectId.size() + // Size + data subject ID
sizeof(int64_t) + // Timestamp
sizeof(uint32_t) + m_payload.size(); // Size + payload data
// Pre-allocate the vector
std::vector<uint8_t> result;
result.reserve(totalSize);
// Push ActionType
int actionType = static_cast<int>(m_actionType);
appendToVector(result, &actionType, sizeof(actionType));
// Move strings
appendStringToVector(result, std::move(m_dataLocation));
appendStringToVector(result, std::move(m_dataControllerId));
appendStringToVector(result, std::move(m_dataProcessorId));
appendStringToVector(result, std::move(m_dataSubjectId));
// Push timestamp
int64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
m_timestamp.time_since_epoch())
.count();
appendToVector(result, ×tamp, sizeof(timestamp));
// Move payload
uint32_t payloadSize = static_cast<uint32_t>(m_payload.size());
appendToVector(result, &payloadSize, sizeof(payloadSize));
if (!m_payload.empty())
{
result.insert(result.end(),
std::make_move_iterator(m_payload.begin()),
std::make_move_iterator(m_payload.end()));
}
return result;
}
// Const version for when you need to keep the LogEntry
std::vector<uint8_t> LogEntry::serialize() const &
{
// Calculate required size upfront
size_t totalSize =
sizeof(int) + // ActionType
sizeof(uint32_t) + m_dataLocation.size() + // Size + data location
sizeof(uint32_t) + m_dataControllerId.size() + // Size + data controller ID
sizeof(uint32_t) + m_dataProcessorId.size() + // Size + data processor ID
sizeof(uint32_t) + m_dataSubjectId.size() + // Size + data subject ID
sizeof(int64_t) + // Timestamp
sizeof(uint32_t) + m_payload.size(); // Size + payload data
// Pre-allocate the vector
std::vector<uint8_t> result;
result.reserve(totalSize);
// Push ActionType
int actionType = static_cast<int>(m_actionType);
appendToVector(result, &actionType, sizeof(actionType));
// Copy strings
appendStringToVector(result, m_dataLocation);
appendStringToVector(result, m_dataControllerId);
appendStringToVector(result, m_dataProcessorId);
appendStringToVector(result, m_dataSubjectId);
// Push timestamp
int64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
m_timestamp.time_since_epoch())
.count();
appendToVector(result, ×tamp, sizeof(timestamp));
// Copy payload
uint32_t payloadSize = static_cast<uint32_t>(m_payload.size());
appendToVector(result, &payloadSize, sizeof(payloadSize));
if (!m_payload.empty())
{
appendToVector(result, m_payload.data(), m_payload.size());
}
return result;
}
bool LogEntry::deserialize(std::vector<uint8_t> &&data)
{
try
{
size_t offset = 0;
// Check if we have enough data for the basic structure
if (data.size() < sizeof(int))
return false;
// Extract action type
int actionType;
std::memcpy(&actionType, data.data() + offset, sizeof(actionType));
offset += sizeof(actionType);
m_actionType = static_cast<ActionType>(actionType);
// Extract data location
if (!extractStringFromVector(data, offset, m_dataLocation))
return false;
// Extract data controller ID
if (!extractStringFromVector(data, offset, m_dataControllerId))
return false;
// Extract data processor ID
if (!extractStringFromVector(data, offset, m_dataProcessorId))
return false;
// Extract data subject ID
if (!extractStringFromVector(data, offset, m_dataSubjectId))
return false;
// Extract timestamp
if (offset + sizeof(int64_t) > data.size())
return false;
int64_t timestamp;
std::memcpy(×tamp, data.data() + offset, sizeof(timestamp));
offset += sizeof(timestamp);
m_timestamp = std::chrono::system_clock::time_point(std::chrono::milliseconds(timestamp));
// Extract payload
if (offset + sizeof(uint32_t) > data.size())
return false;
uint32_t payloadSize;
std::memcpy(&payloadSize, data.data() + offset, sizeof(payloadSize));
offset += sizeof(payloadSize);
if (offset + payloadSize > data.size())
return false;
if (payloadSize > 0)
{
m_payload.clear();
m_payload.reserve(payloadSize);
auto start_it = data.begin() + offset;
auto end_it = start_it + payloadSize;
m_payload.assign(std::make_move_iterator(start_it),
std::make_move_iterator(end_it));
offset += payloadSize;
}
else
{
m_payload.clear();
}
return true;
}
catch (const std::exception &)
{
return false;
}
}
std::vector<uint8_t> LogEntry::serializeBatch(std::vector<LogEntry> &&entries)
{
if (entries.empty())
{
// Just return a vector with count = 0
std::vector<uint8_t> batchData(sizeof(uint32_t));
uint32_t numEntries = 0;
std::memcpy(batchData.data(), &numEntries, sizeof(numEntries));
return batchData;
}
// Pre-calculate approximate total size to minimize reallocations
size_t estimatedSize = sizeof(uint32_t); // Number of entries
for (const auto &entry : entries)
{
// Rough estimate: header size + string sizes + payload size
estimatedSize += sizeof(uint32_t) + // Entry size field
sizeof(int) + // ActionType
3 * sizeof(uint32_t) + // 3 string length fields
entry.getDataLocation().size() +
entry.getDataControllerId().size() +
entry.getDataProcessorId().size() +
entry.getDataSubjectId().size() +
sizeof(int64_t) + // Timestamp
sizeof(uint32_t) + // Payload size
entry.getPayload().size();
}
std::vector<uint8_t> batchData;
batchData.reserve(estimatedSize);
// Store the number of entries
uint32_t numEntries = static_cast<uint32_t>(entries.size());
batchData.resize(sizeof(numEntries));
std::memcpy(batchData.data(), &numEntries, sizeof(numEntries));
// Serialize and append each entry using move semantics
for (auto &entry : entries)
{
// Move-serialize the entry
std::vector<uint8_t> entryData = std::move(entry).serialize();
// Store the size of the serialized entry
uint32_t entrySize = static_cast<uint32_t>(entryData.size());
size_t currentSize = batchData.size();
batchData.resize(currentSize + sizeof(entrySize));
std::memcpy(batchData.data() + currentSize, &entrySize, sizeof(entrySize));
// Move the serialized entry data
batchData.insert(batchData.end(),
std::make_move_iterator(entryData.begin()),
std::make_move_iterator(entryData.end()));
}
return batchData;
}
std::vector<LogEntry> LogEntry::deserializeBatch(std::vector<uint8_t> &&batchData)
{
std::vector<LogEntry> entries;
try
{
// Read the number of entries
if (batchData.size() < sizeof(uint32_t))
{
throw std::runtime_error("Batch data too small to contain entry count");
}
uint32_t numEntries;
std::memcpy(&numEntries, batchData.data(), sizeof(numEntries));
// Reserve space for entries to avoid reallocations
entries.reserve(numEntries);
// Position in the batch data
size_t position = sizeof(numEntries);
// Extract each entry
for (uint32_t i = 0; i < numEntries; ++i)
{
// Check if we have enough data left to read the entry size
if (position + sizeof(uint32_t) > batchData.size())
{
throw std::runtime_error("Unexpected end of batch data");
}
// Read the size of the entry
uint32_t entrySize;
std::memcpy(&entrySize, batchData.data() + position, sizeof(entrySize));
position += sizeof(entrySize);
// Check if we have enough data left to read the entry
if (position + entrySize > batchData.size())
{
throw std::runtime_error("Unexpected end of batch data");
}
// Create entry data by moving a slice from the batch data
std::vector<uint8_t> entryData;
entryData.reserve(entrySize);
auto start_it = batchData.begin() + position;
auto end_it = start_it + entrySize;
entryData.assign(std::make_move_iterator(start_it),
std::make_move_iterator(end_it));
position += entrySize;
// Deserialize the entry using move semantics
LogEntry entry;
if (entry.deserialize(std::move(entryData)))
{
entries.emplace_back(std::move(entry));
}
else
{
throw std::runtime_error("Failed to deserialize log entry");
}
}
}
catch (const std::exception &e)
{
std::cerr << "Error deserializing log batch: " << e.what() << std::endl;
}
return entries;
}
// Helper method to append data to a vector
void LogEntry::appendToVector(std::vector<uint8_t> &vec, const void *data, size_t size) const
{
const uint8_t *bytes = static_cast<const uint8_t *>(data);
vec.insert(vec.end(), bytes, bytes + size);
}
// Helper method to append a string with its length (const version)
void LogEntry::appendStringToVector(std::vector<uint8_t> &vec, const std::string &str) const
{
uint32_t length = static_cast<uint32_t>(str.size());
appendToVector(vec, &length, sizeof(length));
if (length > 0)
{
appendToVector(vec, str.data(), str.size());
}
}
// Helper method to append a string with its length (move version)
void LogEntry::appendStringToVector(std::vector<uint8_t> &vec, std::string &&str)
{
uint32_t length = static_cast<uint32_t>(str.size());
appendToVector(vec, &length, sizeof(length));
if (length > 0)
{
vec.insert(vec.end(), str.begin(), str.end());
}
}
// Helper method to extract a string from a vector
bool LogEntry::extractStringFromVector(std::vector<uint8_t> &vec, size_t &offset, std::string &str)
{
// Check if we have enough data for the string length
if (offset + sizeof(uint32_t) > vec.size())
return false;
uint32_t length;
std::memcpy(&length, vec.data() + offset, sizeof(length));
offset += sizeof(length);
// Check if we have enough data for the string content
if (offset + length > vec.size())
return false;
str.assign(reinterpret_cast<const char *>(vec.data() + offset), length);
offset += length;
return true;
}
|