From b374a351fe47f15c81a7f6949e508571ae9d5d57 Mon Sep 17 00:00:00 2001 From: Katharina <katharina.willig@outlook.com> Date: Mon, 9 Dec 2024 23:07:36 +0100 Subject: [PATCH] try dictionary or queue --- kclient.py | 6 +++-- server.py | 66 +++++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/kclient.py b/kclient.py index b906d6c..90f2428 100644 --- a/kclient.py +++ b/kclient.py @@ -1,6 +1,7 @@ import socket import threading import time +import uuid broadcast_ip = '255.255.255.255'#change ip??? #hard coded? broadcast_port = 55559 @@ -33,7 +34,7 @@ def listen(): #if address[0]==MY_IP: # continue - print("this decoded msg", decoded_message) + # print("this decoded msg", decoded_message) #debug except socket.error as e: print(f"An error occurred: {e}") @@ -48,12 +49,13 @@ def sender(): client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) #print("is it leader adresse here", leader_address) while True: + message_id = str(uuid.uuid4()) #allows the client to send any message message = input("") #checks for whitemarks if message.strip(): - full_message = f"{nickname}: {message}".encode() + full_message = f"{message_id}: {nickname}: {message}".encode() client_socket.sendto(full_message, (broadcast_ip, broadcast_port)) ###############main################################# diff --git a/server.py b/server.py index eaee4a4..c3e3253 100644 --- a/server.py +++ b/server.py @@ -49,7 +49,7 @@ def listen_broadcast_election(queue, shared_data): """Receives messages and updates shared state.""" print(f"Listener from {shared_data['server_id']} is listening...") global active_servers - global sender_uuid + #global sender_uuid while True: try: message, client_address = election_socket.recvfrom(4096) @@ -59,9 +59,9 @@ def listen_broadcast_election(queue, shared_data): # Handle heartbeat messages if "HEARTBEAT" in decoded_message: - sender_uuid = decoded_message.split(": ")[1] + shared_data['server_id'] = decoded_message.split(": ")[1] #print("sender_uuid", sender_uuid) #debug - active_servers[sender_uuid] = time.time() #update active server dictionary + active_servers[shared_data['server_id']] = time.time() #update active server dictionary #send to neighbour continue @@ -76,18 +76,48 @@ def listen_broadcast_election(queue, shared_data): ##################TypeError: listen_client() takes 0 positional arguments but 2 were given############ def listen_client(): """receives messages from clients and broadcast them.""" + manager=Manager() + save_last_messages = manager.dict() + message_queue = multiprocessing.Queue(maxsize=3) + #global save_last_messages while True: try: message, client_address = server_socket.recvfrom(4096) decoded_message = message.decode() + #create a dictionary for message uuid + message_uuid = decoded_message.split(": ")[0] + print("message id:", message_uuid) #debug + + # Enqueue messages and maintain a fixed size + if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)): + if message_queue.full(): + message_queue.get() # Remove oldest message + message_queue.put(decoded_message) + + # Print all messages + print("here are your last messages:", list(message_queue.queue)) # Debug + + """ + if message_uuid not in save_last_messages: + if len(save_last_messages) <= 3: + oldest_key = next(iter(save_last_messages)) # get oldest key + del save_last_messages[oldest_key] # delete it + #save_last_messages["last message"] = message_uuid + save_last_messages[message_uuid] = decoded_message + #save_last_messages.update(f"last message: {message}: {message_uuid}") + print("here are your last messages:", dict(save_last_messages)) #debug + """ + + #broadcast_election(save_last_messages) #send to neighbour + # checks wheter the message comes from the server itself if decoded_message.startswith(f"{shared_data['server_id']}"): continue # if...ignores it #checks if the message containts the string "entered" if decoded_message.__contains__("entered"): - print(f"{client_address} entered the chat.") + print(f"{client_address} entered the chat.") #client_address is the nickname print(f"Received from {client_address}: {decoded_message}") @@ -108,7 +138,8 @@ def listen_client(): def start_election(queue, shared_data): """Starts leader election based on received messages.""" global active_servers - global sender_uuid + #sender_uuid = None + #global sender_uuid #current_leader = None print("Starting election...") broadcast_election(f"START_ELECTION: {shared_data['server_id']}") @@ -120,21 +151,21 @@ def start_election(queue, shared_data): while time.time() < timeout: try: message = queue.get(timeout=1) #waits 1 sec for a message - if shared_data['server_id']==sender_uuid: + if shared_data['server_id']==shared_data['server_id']: continue if "START_ELECTION" in message: - sender_uuid = message.split(": ")[1] - active_servers[sender_uuid] = time.time() - print(f"Received UUID for election: {sender_uuid}") + shared_data['server_id'] = message.split(": ")[1] + active_servers[shared_data['server_id']] = time.time() + print(f"Received UUID for election: {shared_data['server_id']}") ################dumblogic###onlyworkswith21erver################################ - if sender_uuid > highest_id: - highest_id = sender_uuid - print(f"Received higher ID {sender_uuid}, forwarding...") - broadcast_election(f"START_ELECTION: {sender_uuid}") + if shared_data['server_id'] > highest_id: + highest_id = shared_data['server_id'] + print(f"Received higher ID {shared_data['server_id']}, forwarding...") + broadcast_election(f"START_ELECTION: {shared_data['server_id']}") - elif sender_uuid == highest_id: - shared_data['current_leader'] = sender_uuid + elif shared_data['server_id'] == highest_id: + shared_data['current_leader'] = shared_data['server_id'] print(f"(sender) Leader elected: {shared_data['current_leader']}") broadcast_election(f"LEADER: {shared_data['current_leader']}") @@ -207,11 +238,15 @@ if __name__ == "__main__": shared_data = manager.dict() shared_data['server_id'] = str(uuid.uuid4()) # Generate server ID once shared_data['current_leader'] = None + shared_data['sender_uuid'] = None + #sender_uuid = None print(f"Server is running with ID {shared_data['server_id']} and broadcasting on port {broadcast_port}...") message_queue = multiprocessing.Queue() + save_last_messages={} + # Create processes listener_election_process = multiprocessing.Process(target=listen_broadcast_election, args=(message_queue, shared_data)) listener_client_process = multiprocessing.Process(target=listen_client, ) @@ -224,6 +259,7 @@ if __name__ == "__main__": # Start processes and threads listener_election_process.start() listener_client_process.start() + election_process.start() heartbeat_thread.start() heartbeat_monitor_thread.start() -- GitLab