diff --git a/client.py b/client.py index b80b968fd2dd9e8f5946ad2189578cf74cb2cabf..0ffa0a5e0e33cbb71defaa7107d17f45bc72b04d 100644 --- a/client.py +++ b/client.py @@ -21,9 +21,11 @@ class Client(multiprocessing.Process): send_thread = threading.Thread(target=self.send_message) receive_thread = threading.Thread(target=self.receive_messages) + receive_new_server_thread = threading.Thread(target=self.receive_new_server) send_thread.start() receive_thread.start() + receive_new_server_thread.start() # waiting for thread to stop = prevent the programm from shutdown before thread is stopped send_thread.join() @@ -84,4 +86,19 @@ class Client(multiprocessing.Process): while True: connection, addr = client_receive_message_socket.accept() message = connection.recv(1024) - print(f"GC message: {message.decode('utf-8')}") \ No newline at end of file + print(f"GC message: {message.decode('utf-8')}") + + def receive_new_server(self): + PORT = 52000 + + client_receive_message_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_receive_message_socket.bind((self.client_address, PORT)) + client_receive_message_socket.listen() + + print("Listening for server address update messages") + + while True: + connection, addr = client_receive_message_socket.accept() + message = connection.recv(1024) + print(f"New server: {message.decode('utf-8')}") + self.registered_server = message.decode('utf-8') \ No newline at end of file diff --git a/client_register.py b/client_register.py index bce0baf5e10adf196943e7f6b8815205e42a3fbb..f664d7be2b0fea150d43cd20417ddd64aba52310 100644 --- a/client_register.py +++ b/client_register.py @@ -4,4 +4,4 @@ import time if __name__ == '__main__': client = Client() - time.sleep(20) + time.sleep(100) diff --git a/main_server.py b/main_server.py index 634ab6c4940cb461cc1ee0e4650040028937b8f4..74803e0ac33d61436372ba435856f1ebaf3b6e53 100644 --- a/main_server.py +++ b/main_server.py @@ -14,11 +14,5 @@ if __name__ == '__main__': # Start the Server process server.start() - # Optionally, if you want the main program to wait for a certain duration - # and then terminate the server, you can use: - # time.sleep(20) - # server.terminate() # This will terminate the server process after 20 seconds - - # If you want the main program to wait indefinitely until the server process - # completes its execution, use: + # Wait indefinitely until the server process completes its execution server.join() diff --git a/server.py b/server.py index 2cf7d40d8c1dea0d3c99e8f89294b40d4ffb9214..f610113068de975a3b4ba9d3845d858826cde111 100644 --- a/server.py +++ b/server.py @@ -30,6 +30,7 @@ class Server(multiprocessing.Process): client_cache_key_offset = 0 local_servers_cache = dict() local_clients_cache = dict() + local_group_cache = dict() def __init__(self, client_address, server_id, server_port, server_cache, clients_cache, leader): super(Server, self).__init__() @@ -55,6 +56,7 @@ class Server(multiprocessing.Process): print("My UUID: ", self.server_uuid) self.participant = False self.keep_running_nonLeader = True + self.is_admin_of_groupchat = False @staticmethod def get_local_ip_address(): @@ -128,7 +130,7 @@ class Server(multiprocessing.Process): def run(self): print(self.server_id+": "+"Up and running") - if self.server_id == "MAIN" or self.leader == True: + if self.server_id == "MAIN": client_listener_thread = threading.Thread(target=self.listen_for_clients) client_listener_thread.start() @@ -150,6 +152,8 @@ class Server(multiprocessing.Process): self.heartbeat_receive_thread.start() self.heartbeat_timeout_thread.start() self.leader_election_thread.start() + + self.is_admin_of_groupchat = True def get_broadcast_address(self): IP = self.server_address @@ -167,14 +171,26 @@ class Server(multiprocessing.Process): print("Local Server Cache:", self.local_servers_cache) while True: time.sleep(10) + failed_group_server = [] for server_id, server_address in self.local_servers_cache.items(): if server_address[0] != self.server_address: - acknowledgment_received = self.send_heartbeat_to_server(server_address[0], server_heartbeat_tcp_listener_port) - #acknowledgment_received = "YES" - if acknowledgment_received: - print(f"Heartbeat acknowledgment received from {server_id}") - else: - print(f"No acknowledgment received from {server_id}. Server may be down.") + count = 0 + for i in range(0,3): + acknowledgment_received = self.send_heartbeat_to_server(server_address[0], server_heartbeat_tcp_listener_port) + #acknowledgment_received = "YES" + if acknowledgment_received: + print(f"Heartbeat acknowledgment received from {server_id}") + break + else: + count = count + 1 + print(f"No acknowledgment received from {server_id}. Server may be down. Error Count : {count}") + if count == 3: + failed_group_server.append(server_id) + + for server_id in failed_group_server: + del self.local_servers_cache[server_id] + self.reassign_chat_groups(server_id) + def send_heartbeat_to_server(self, server_address, server_port): acknowledgment_received = False @@ -221,6 +237,49 @@ class Server(multiprocessing.Process): except socket.error as e: print(f"Error: {e}") + # find every group where the dead server was admin and reassign group to (new) MAIN server + def reassign_chat_groups(self, dead_server_id): + + reassigned_groups = [] + + for group in self.local_group_cache: + if self.local_group_cache[group] == dead_server_id: + self.local_group_cache[group] = self.server_id + reassigned_groups.append(group) + + update_cache_thread2 = threading.Thread(target=self.updateCacheList) + update_group_server_of_client_thread = threading.Thread(target=self.update_group_server_of_client(reassigned_groups)) + update_cache_thread2.start() + update_group_server_of_client_thread.start() + + # find out which clients need to be informed about their groupchat server change + def update_group_server_of_client(self, reassigned_groups): + + for group in reassigned_groups: + clients_to_inform = [] + for client in self.local_clients_cache: + if client[0] == group: + clients_to_inform.append(client) + + new_group_server_addr = self.server_address + self.send_client_new_group_server_address(new_group_server_addr, clients_to_inform) + + # inform clients about the address of their new groupchat server + def send_client_new_group_server_address(self, addr, clients_to_inform): + PORT = 52000 + + for client in clients_to_inform: + client_addr = self.local_clients_cache[client] + + try: + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.connect((client_addr[0], PORT)) + server_socket.sendall(addr.encode('utf-8')) + server_socket.close() + except (ConnectionRefusedError, TimeoutError): + print(f'Unable to send to {client_addr}') + + def check_heartbeat_timeout(self): while self.keep_running_nonLeader == True: time.sleep(5) # Adjust the interval as needed @@ -237,9 +296,8 @@ class Server(multiprocessing.Process): # find highest server ID in cache def get_last_server_id(self): - if self.local_servers_cache: - return ord(max(self.local_servers_cache, key=lambda k: ord(k))) - #return max(self.local_servers_cache) + if self.local_group_cache: + return ord(max(self.local_group_cache, key=lambda k: ord(k))) else: # ascii value before A return 64 @@ -270,6 +328,8 @@ class Server(multiprocessing.Process): new_server_id = chr(last_server_id + 1) #new_server_id = last_server_id + 1 self.local_servers_cache[new_server_id] = addr + self.local_group_cache[new_server_id] = new_server_id + print("GroupCache: ", self.local_group_cache) print(self.server_id+": "+"Received server register broadcast message:", message) @@ -420,9 +480,10 @@ class Server(multiprocessing.Process): BROADCAST_ADDRESS = self.get_broadcast_address() servers_cache_as_string = json.dumps(self.local_servers_cache, indent=2).encode('utf-8') clients_cache_as_string = json.dumps(self.local_clients_cache, indent=2).encode('utf-8') + group_cache_as_string = json.dumps(self.local_group_cache, indent=2).encode('utf-8') separator = "_" - MSG = servers_cache_as_string + separator.encode('utf-8') + clients_cache_as_string + MSG = servers_cache_as_string + separator.encode('utf-8') + clients_cache_as_string + separator.encode('utf-8') + group_cache_as_string broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -468,10 +529,13 @@ class Server(multiprocessing.Process): splitted = message.split("_") server_cache_json = json.loads(splitted[0]) client_cache_json = json.loads(splitted[1]) + group_cache_json = json.loads(splitted[2]) self.local_servers_cache = server_cache_json self.local_clients_cache = client_cache_json - print("Server Cache:", self.local_servers_cache) - print("Client Cache:", self.local_clients_cache) + self.local_group_cache = group_cache_json + print("Group Cache: ", self.local_group_cache) + print("Server Cache: ", self.local_servers_cache) + print("Client Cache: ", self.local_clients_cache) except socket.timeout: pass #Timeout reached @@ -659,7 +723,7 @@ class Server(multiprocessing.Process): def handle_leader_tasks(self): # Perform leader-specific tasks here print(self.server_address, " is now the leader.") - self.leader = True + self.server_id = "MAIN" self.stop_threads() self.run()