From fb3fdf2262a7ced18009516f00473aa975aaa08a Mon Sep 17 00:00:00 2001 From: Christian Krinitsin Date: Fri, 21 Mar 2025 15:11:06 +0100 Subject: implement a queue with a mutex to prevent concurrent accesses to the shared memory --- src/client/main.cpp | 88 ++++++++++++++++++++++++++++++++++++++++------ src/common/shared_memory.h | 13 +++++-- 2 files changed, 87 insertions(+), 14 deletions(-) diff --git a/src/client/main.cpp b/src/client/main.cpp index 2e4a606..6dfd48f 100644 --- a/src/client/main.cpp +++ b/src/client/main.cpp @@ -2,26 +2,65 @@ #include #include #include +#include #include #include #include -void sendRequest( +bool request_processed(SharedMemory* shared_memory, int index) +{ + if (shared_memory->full) { + return false; + } + if (shared_memory->tail == shared_memory->head && !shared_memory->full) { + return true; + } + + for (int i = shared_memory->head - 1; i != shared_memory->tail; i = (i - 1) % QUEUE_SIZE) { + if (i == index) { + return false; + } + } + + return shared_memory->tail != index; +} + +void send_request( SharedMemory* shared_memory, Operations type, std::pair arguments) { - shared_memory->request.type = type; - strncpy(shared_memory->request.key, serialize(arguments.first).c_str(), MAX_KEY_SIZE); - strncpy(shared_memory->request.value, serialize(arguments.second).c_str(), MAX_VALUE_SIZE); - shared_memory->processed = false; + int index; - std::cout << "Command sent" << '\n'; + pthread_mutex_lock(&shared_memory->mutex); - sleep(2); + if (shared_memory->full) { + std::cout << "Queue is full" << '\n'; + pthread_cond_wait(&shared_memory->cond_var, &shared_memory->mutex); + return; + } + + index = shared_memory->head; + Request* request = &shared_memory->request[index]; + request->type = type; + strncpy(request->key, serialize(arguments.first).c_str(), MAX_KEY_SIZE); + strncpy(request->value, serialize(arguments.second).c_str(), MAX_VALUE_SIZE); + shared_memory->head = (1 + shared_memory->head) % QUEUE_SIZE; + shared_memory->full = shared_memory->head == shared_memory->tail; + pthread_cond_signal(&shared_memory->cond_var); + + pthread_mutex_unlock(&shared_memory->mutex); + + std::cout << "Command sent" << '\n'; if (type == Operations::GET) { - std::string result(shared_memory->response); + pthread_mutex_lock(&shared_memory->mutex); + + while (!request_processed(shared_memory, index)) { + pthread_cond_wait(&shared_memory->cond_var, &shared_memory->mutex); + } + std::string result(shared_memory->request[index].response); + pthread_mutex_unlock(&shared_memory->mutex); std::cout << "Result: " << result << std::endl; } } @@ -38,10 +77,37 @@ int main() (SharedMemory*)mmap(0, sizeof(SharedMemory), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); std::cout << "Start inserting.." << '\n'; - sendRequest(shared_memory, INSERT, std::pair(serialize(10), serialize(5))); + send_request(shared_memory, INSERT, std::pair(serialize(3), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(4), serialize(5))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(6), serialize(6))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(7), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(8), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(9), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(10), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(11), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(12), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(13), serialize(4))); - std::cout << "Start printing.." << '\n'; - sendRequest(shared_memory, PRINT, std::pair(serialize(10), serialize(5))); + std::cout << "Start PRINTING.." << '\n'; + send_request(shared_memory, PRINT, std::pair(serialize(0), serialize(0))); munmap(shared_memory, sizeof(SharedMemory)); close(shm_fd); diff --git a/src/common/shared_memory.h b/src/common/shared_memory.h index 43fe2f5..70554e4 100644 --- a/src/common/shared_memory.h +++ b/src/common/shared_memory.h @@ -4,6 +4,8 @@ #include #define SHM_NAME "/hashtable_queue" +#define QUEUE_SIZE 10 + #define MAX_KEY_SIZE 64 #define MAX_VALUE_SIZE 128 @@ -13,12 +15,17 @@ struct Request { Operations type; char key[MAX_KEY_SIZE]; char value[MAX_VALUE_SIZE]; + char response[MAX_VALUE_SIZE]; }; struct SharedMemory { - Request request; - bool processed; - char response[MAX_VALUE_SIZE]; + Request request[QUEUE_SIZE]; + pthread_mutex_t mutex; + pthread_cond_t cond_var; + + int tail; + int head; + bool full; }; template -- cgit 1.4.1