From 1784e4a2a938bdef104135bfbf03f7b47ca0b507 Mon Sep 17 00:00:00 2001 From: Christian Krinitsin Date: Thu, 20 Mar 2025 17:34:42 +0100 Subject: server: add common structure for shared memory --- src/common/shared_memory.h | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 src/common/shared_memory.h (limited to 'src') diff --git a/src/common/shared_memory.h b/src/common/shared_memory.h new file mode 100644 index 0000000..53c7fb7 --- /dev/null +++ b/src/common/shared_memory.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +#define QUEUE_SIZE 10 +#define SHM_NAME "/hashtable_queue" + +enum Operations { INSERT, DELETE, GET }; + +struct SharedMemory { + pthread_mutex_t mutex; + pthread_cond_t cond_var; + Operations queue[QUEUE_SIZE]; + int head; + int tail; +}; -- cgit 1.4.1 From 75390f5097cb3116de20663bc5fb7490b6a0c1b3 Mon Sep 17 00:00:00 2001 From: Christian Krinitsin Date: Thu, 20 Mar 2025 17:35:29 +0100 Subject: server: implement a SharedMemoryServer with (de)initilization of the memory buffer --- src/server/main.cpp | 26 +++------------------ src/server/shared_memory_server.h | 49 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 23 deletions(-) create mode 100644 src/server/shared_memory_server.h (limited to 'src') diff --git a/src/server/main.cpp b/src/server/main.cpp index b2f8bed..4dd7761 100644 --- a/src/server/main.cpp +++ b/src/server/main.cpp @@ -1,8 +1,9 @@ #include +#include #include #include -#include "hashtable.h" +#include "shared_memory_server.h" int main(int argc, char* argv[]) { @@ -18,29 +19,8 @@ int main(int argc, char* argv[]) std::cout << "Invalid argument" << '\n'; return 1; } - - HashTable hash_table { size }; - std::cout << "Add various kv-pairs" << '\n'; - hash_table.insert(1, "1"); - hash_table.insert(2, "2"); - hash_table.insert(3, "3"); - hash_table.insert(4, "4"); - hash_table.insert(5, "5"); - hash_table.insert(6, "6"); - hash_table.insert(7, "7"); - - hash_table.print(); - - std::cout << '\n'; - - std::cout << "Value for key 8: " << hash_table.get(8).value_or("Key not found!") << '\n'; - std::cout << "Value for key 4: " << hash_table.get(4).value_or("Key not found!") << '\n'; - - std::cout << '\n'; - std::cout << "Remove pair with key 5" << '\n'; - hash_table.remove(5); - hash_table.print(); + SharedMemoryServer shm(size); return 0; } diff --git a/src/server/shared_memory_server.h b/src/server/shared_memory_server.h new file mode 100644 index 0000000..034466d --- /dev/null +++ b/src/server/shared_memory_server.h @@ -0,0 +1,49 @@ +#pragma once + +#include "hashtable.h" +#include "shared_memory.h" +#include +#include +#include + +template +class SharedMemoryServer { +public: + SharedMemoryServer(size_t size) + : hash_table(size) + { + shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666); + + ftruncate(shm_fd, sizeof(SharedMemory)); + + shm = (SharedMemory*) + mmap(0, sizeof(SharedMemory), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + + pthread_mutexattr_t mutex_attr; + pthread_condattr_t cond_attr; + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); + pthread_condattr_init(&cond_attr); + pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); + + pthread_mutex_init(&shm->mutex, &mutex_attr); + pthread_cond_init(&shm->cond_var, &cond_attr); + + shm->head = (shm->tail = 0); + } + + ~SharedMemoryServer() + { + munmap(shm, sizeof(SharedMemory)); + close(shm_fd); + shm_unlink(SHM_NAME); + } + + void process_requests(); + +private: + HashTable hash_table; + + int shm_fd; + SharedMemory* shm; +}; -- cgit 1.4.1 From 44f772a7442e617d4a93c0b3ed318ffb42991d8b Mon Sep 17 00:00:00 2001 From: Christian Krinitsin Date: Fri, 21 Mar 2025 10:42:09 +0100 Subject: implement communication between server and client (without mutexes and with queue_length of 1) --- src/client/main.cpp | 49 +++++++++++++++++++++++++++++++ src/common/shared_memory.h | 37 +++++++++++++++++++----- src/server/main.cpp | 2 ++ src/server/shared_memory_server.h | 61 +++++++++++++++++++++++++++++---------- 4 files changed, 126 insertions(+), 23 deletions(-) create mode 100644 src/client/main.cpp (limited to 'src') diff --git a/src/client/main.cpp b/src/client/main.cpp new file mode 100644 index 0000000..2e4a606 --- /dev/null +++ b/src/client/main.cpp @@ -0,0 +1,49 @@ +#include "shared_memory.h" +#include +#include +#include +#include +#include +#include + +void sendRequest( + SharedMemory* shared_memory, + Operations type, + std::pair arguments) +{ + shared_memory->request.type = type; + strncpy(shared_memory->request.key, serialize(arguments.first).c_str(), MAX_KEY_SIZE); + strncpy(shared_memory->request.value, serialize(arguments.second).c_str(), MAX_VALUE_SIZE); + shared_memory->processed = false; + + std::cout << "Command sent" << '\n'; + + sleep(2); + + if (type == Operations::GET) { + std::string result(shared_memory->response); + std::cout << "Result: " << result << std::endl; + } +} + +int main() +{ + int shm_fd = shm_open(SHM_NAME, O_RDWR, 0666); + if (shm_fd == -1) { + std::cout << "Server not running" << '\n'; + return -1; + } + + SharedMemory* shared_memory = + (SharedMemory*)mmap(0, sizeof(SharedMemory), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + + std::cout << "Start inserting.." << '\n'; + sendRequest(shared_memory, INSERT, std::pair(serialize(10), serialize(5))); + + std::cout << "Start printing.." << '\n'; + sendRequest(shared_memory, PRINT, std::pair(serialize(10), serialize(5))); + + munmap(shared_memory, sizeof(SharedMemory)); + close(shm_fd); + return 0; +} diff --git a/src/common/shared_memory.h b/src/common/shared_memory.h index 53c7fb7..43fe2f5 100644 --- a/src/common/shared_memory.h +++ b/src/common/shared_memory.h @@ -1,16 +1,39 @@ #pragma once #include +#include -#define QUEUE_SIZE 10 #define SHM_NAME "/hashtable_queue" +#define MAX_KEY_SIZE 64 +#define MAX_VALUE_SIZE 128 -enum Operations { INSERT, DELETE, GET }; +enum Operations { INSERT, DELETE, GET, PRINT }; + +struct Request { + Operations type; + char key[MAX_KEY_SIZE]; + char value[MAX_VALUE_SIZE]; +}; struct SharedMemory { - pthread_mutex_t mutex; - pthread_cond_t cond_var; - Operations queue[QUEUE_SIZE]; - int head; - int tail; + Request request; + bool processed; + char response[MAX_VALUE_SIZE]; }; + +template +std::string serialize(const T& data) +{ + std::ostringstream oss; + oss << data; + return oss.str(); +} + +template +T deserialize(const std::string& str) +{ + std::istringstream iss(str); + T data; + iss >> data; + return data; +} diff --git a/src/server/main.cpp b/src/server/main.cpp index 4dd7761..424326f 100644 --- a/src/server/main.cpp +++ b/src/server/main.cpp @@ -22,5 +22,7 @@ int main(int argc, char* argv[]) SharedMemoryServer shm(size); + shm.process_requests(); + return 0; } diff --git a/src/server/shared_memory_server.h b/src/server/shared_memory_server.h index 034466d..6d4fe19 100644 --- a/src/server/shared_memory_server.h +++ b/src/server/shared_memory_server.h @@ -2,7 +2,9 @@ #include "hashtable.h" #include "shared_memory.h" +#include #include +#include #include #include @@ -16,34 +18,61 @@ public: ftruncate(shm_fd, sizeof(SharedMemory)); - shm = (SharedMemory*) + shared_memory = (SharedMemory*) mmap(0, sizeof(SharedMemory), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); - - pthread_mutexattr_t mutex_attr; - pthread_condattr_t cond_attr; - pthread_mutexattr_init(&mutex_attr); - pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); - pthread_condattr_init(&cond_attr); - pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); - - pthread_mutex_init(&shm->mutex, &mutex_attr); - pthread_cond_init(&shm->cond_var, &cond_attr); - - shm->head = (shm->tail = 0); + shared_memory->processed = true; } ~SharedMemoryServer() { - munmap(shm, sizeof(SharedMemory)); + munmap(shared_memory, sizeof(SharedMemory)); close(shm_fd); shm_unlink(SHM_NAME); + std::cout << "bye" << '\n'; } - void process_requests(); + void process_requests() + { + while (true) { + sleep(1); // TODO: remove sleep, add a mutex for shared memory, should then work with + // queue size 1 + if (!shared_memory->processed) { + std::cout << "Got a request!" << '\n'; + + Request request = shared_memory->request; + shared_memory->processed = true; + + K key = deserialize(request.key); + V value = deserialize(request.value); + + switch (request.type) { + case INSERT: + hash_table.insert(key, value); + break; + case GET: { + std::optional result = hash_table.get(key); + if (result.has_value()) { + std::string response = serialize(result.value()); + strncpy(shared_memory->response, response.c_str(), MAX_VALUE_SIZE); + } + break; + } + case DELETE: + hash_table.remove(key); + break; + case PRINT: + hash_table.print(); + break; + default: + break; + } + } + } + } private: HashTable hash_table; int shm_fd; - SharedMemory* shm; + SharedMemory* shared_memory; }; -- cgit 1.4.1 From fb3fdf2262a7ced18009516f00473aa975aaa08a Mon Sep 17 00:00:00 2001 From: Christian Krinitsin Date: Fri, 21 Mar 2025 15:11:06 +0100 Subject: implement a queue with a mutex to prevent concurrent accesses to the shared memory --- src/client/main.cpp | 88 ++++++++++++++++++++++++++++++++++++++++------ src/common/shared_memory.h | 13 +++++-- 2 files changed, 87 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/client/main.cpp b/src/client/main.cpp index 2e4a606..6dfd48f 100644 --- a/src/client/main.cpp +++ b/src/client/main.cpp @@ -2,26 +2,65 @@ #include #include #include +#include #include #include #include -void sendRequest( +bool request_processed(SharedMemory* shared_memory, int index) +{ + if (shared_memory->full) { + return false; + } + if (shared_memory->tail == shared_memory->head && !shared_memory->full) { + return true; + } + + for (int i = shared_memory->head - 1; i != shared_memory->tail; i = (i - 1) % QUEUE_SIZE) { + if (i == index) { + return false; + } + } + + return shared_memory->tail != index; +} + +void send_request( SharedMemory* shared_memory, Operations type, std::pair arguments) { - shared_memory->request.type = type; - strncpy(shared_memory->request.key, serialize(arguments.first).c_str(), MAX_KEY_SIZE); - strncpy(shared_memory->request.value, serialize(arguments.second).c_str(), MAX_VALUE_SIZE); - shared_memory->processed = false; + int index; - std::cout << "Command sent" << '\n'; + pthread_mutex_lock(&shared_memory->mutex); - sleep(2); + if (shared_memory->full) { + std::cout << "Queue is full" << '\n'; + pthread_cond_wait(&shared_memory->cond_var, &shared_memory->mutex); + return; + } + + index = shared_memory->head; + Request* request = &shared_memory->request[index]; + request->type = type; + strncpy(request->key, serialize(arguments.first).c_str(), MAX_KEY_SIZE); + strncpy(request->value, serialize(arguments.second).c_str(), MAX_VALUE_SIZE); + shared_memory->head = (1 + shared_memory->head) % QUEUE_SIZE; + shared_memory->full = shared_memory->head == shared_memory->tail; + pthread_cond_signal(&shared_memory->cond_var); + + pthread_mutex_unlock(&shared_memory->mutex); + + std::cout << "Command sent" << '\n'; if (type == Operations::GET) { - std::string result(shared_memory->response); + pthread_mutex_lock(&shared_memory->mutex); + + while (!request_processed(shared_memory, index)) { + pthread_cond_wait(&shared_memory->cond_var, &shared_memory->mutex); + } + std::string result(shared_memory->request[index].response); + pthread_mutex_unlock(&shared_memory->mutex); std::cout << "Result: " << result << std::endl; } } @@ -38,10 +77,37 @@ int main() (SharedMemory*)mmap(0, sizeof(SharedMemory), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); std::cout << "Start inserting.." << '\n'; - sendRequest(shared_memory, INSERT, std::pair(serialize(10), serialize(5))); + send_request(shared_memory, INSERT, std::pair(serialize(3), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(4), serialize(5))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(6), serialize(6))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(7), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(8), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(9), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(10), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(11), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(12), serialize(3))); + + std::cout << "Start inserting.." << '\n'; + send_request(shared_memory, INSERT, std::pair(serialize(13), serialize(4))); - std::cout << "Start printing.." << '\n'; - sendRequest(shared_memory, PRINT, std::pair(serialize(10), serialize(5))); + std::cout << "Start PRINTING.." << '\n'; + send_request(shared_memory, PRINT, std::pair(serialize(0), serialize(0))); munmap(shared_memory, sizeof(SharedMemory)); close(shm_fd); diff --git a/src/common/shared_memory.h b/src/common/shared_memory.h index 43fe2f5..70554e4 100644 --- a/src/common/shared_memory.h +++ b/src/common/shared_memory.h @@ -4,6 +4,8 @@ #include #define SHM_NAME "/hashtable_queue" +#define QUEUE_SIZE 10 + #define MAX_KEY_SIZE 64 #define MAX_VALUE_SIZE 128 @@ -13,12 +15,17 @@ struct Request { Operations type; char key[MAX_KEY_SIZE]; char value[MAX_VALUE_SIZE]; + char response[MAX_VALUE_SIZE]; }; struct SharedMemory { - Request request; - bool processed; - char response[MAX_VALUE_SIZE]; + Request request[QUEUE_SIZE]; + pthread_mutex_t mutex; + pthread_cond_t cond_var; + + int tail; + int head; + bool full; }; template -- cgit 1.4.1 From 4d0d9f183d7f92a41d66ffdcde15b040ff4032ef Mon Sep 17 00:00:00 2001 From: Christian Krinitsin Date: Fri, 21 Mar 2025 15:12:27 +0100 Subject: implement the server to process the queue --- src/server/shared_memory_server.h | 78 ++++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 30 deletions(-) (limited to 'src') diff --git a/src/server/shared_memory_server.h b/src/server/shared_memory_server.h index 6d4fe19..74abedf 100644 --- a/src/server/shared_memory_server.h +++ b/src/server/shared_memory_server.h @@ -20,7 +20,18 @@ public: shared_memory = (SharedMemory*) mmap(0, sizeof(SharedMemory), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); - shared_memory->processed = true; + + shared_memory->tail = shared_memory->head = 0; + + pthread_mutexattr_t mutex_attr; + pthread_condattr_t cond_attr; + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); + pthread_condattr_init(&cond_attr); + pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); + + pthread_mutex_init(&shared_memory->mutex, &mutex_attr); + pthread_cond_init(&shared_memory->cond_var, &cond_attr); } ~SharedMemoryServer() @@ -28,45 +39,52 @@ public: munmap(shared_memory, sizeof(SharedMemory)); close(shm_fd); shm_unlink(SHM_NAME); - std::cout << "bye" << '\n'; } void process_requests() { while (true) { - sleep(1); // TODO: remove sleep, add a mutex for shared memory, should then work with - // queue size 1 - if (!shared_memory->processed) { - std::cout << "Got a request!" << '\n'; + pthread_mutex_lock(&shared_memory->mutex); - Request request = shared_memory->request; - shared_memory->processed = true; + if (shared_memory->tail == shared_memory->head && !shared_memory->full) { + pthread_cond_wait(&shared_memory->cond_var, &shared_memory->mutex); + } - K key = deserialize(request.key); - V value = deserialize(request.value); + Request* request = &shared_memory->request[shared_memory->tail]; - switch (request.type) { - case INSERT: - hash_table.insert(key, value); - break; - case GET: { - std::optional result = hash_table.get(key); - if (result.has_value()) { - std::string response = serialize(result.value()); - strncpy(shared_memory->response, response.c_str(), MAX_VALUE_SIZE); - } - break; - } - case DELETE: - hash_table.remove(key); - break; - case PRINT: - hash_table.print(); - break; - default: - break; + K key = deserialize(request->key); + V value = deserialize(request->value); + + switch (request->type) { + case INSERT: + std::cout << "Inserting" << '\n'; + hash_table.insert(key, value); + break; + case GET: { + std::cout << "Getting" << '\n'; + hash_table.insert(key, value); + std::optional result = hash_table.get(key); + if (result.has_value()) { + std::string response = serialize(result.value()); + strncpy(request->response, response.c_str(), MAX_VALUE_SIZE); + pthread_cond_signal(&shared_memory->cond_var); } + break; + } + case DELETE: + std::cout << "Deleting" << '\n'; + hash_table.remove(key); + break; + case PRINT: + std::cout << "Printing" << '\n'; + hash_table.print(); + break; + default: + break; } + shared_memory->tail = (1 + shared_memory->tail) % QUEUE_SIZE; + shared_memory->full = false; + pthread_mutex_unlock(&shared_memory->mutex); } } -- cgit 1.4.1