diff --git a/server.py b/server.py index 8e187fa0ec9790060097e32c217d340edf64ece6..44f937b2217bdfa9ed8b2bc04cc065f278f36032 100644 --- a/server.py +++ b/server.py @@ -1,7 +1,4 @@ -##########unsicher, ob heartbeat funktioniert################ -###########deutsch englisch korrigieren################### -################liste mit message ids auf 3-5 setzen######## -############active servers dic shared data################ +##########save content of the received message############does only saves the message id atm######### import socket import multiprocessing @@ -10,6 +7,7 @@ import time import threading import os from multiprocessing import Manager +from collections import deque broadcast_ip = '255.255.255.255' broadcast_port = 55555 @@ -27,7 +25,6 @@ election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) election_socket.bind(('', election_port)) - #does not work, use shared data?? #global leader_election_done leader_election_done = threading.Event() active_servers={} @@ -51,7 +48,6 @@ def broadcast_election(message): def start_election(queue, shared_data): """Starts leader election based on received messages.""" global active_servers - #global leader_election_done print("Starting election...") broadcast_election(f"START_ELECTION: {shared_data['server_id']}") #every server which broadcasts his id is going to be part of the election @@ -116,20 +112,11 @@ def listen_broadcast_election(queue, shared_data): print(f"Received from {sender_address}: {decoded_sender_message}") leader_election_done.set() - # Handle heartbeat messages - #if "HEARTBEAT" in decoded_sender_message: - # shared_data['sender_uuid'] = decoded_sender_message.split(": ")[1] - #print("sender_uuid", sender_uuid) #debug - # active_servers[shared_data['sender_uuid']] = time.time() #update active server dictionary - #send to neighbour###################### - # continue - if "HEARTBEAT" in decoded_sender_message: sender_uuid = decoded_sender_message.split(": ")[1] active_servers[sender_uuid] = time.time() continue - # Put message in the queue for election process queue.put(decoded_sender_message) @@ -143,18 +130,9 @@ def listen_client(shared_data): """receives messages from clients and broadcast them.""" #global received_messages - received_messages = set() - - #save_last_messages = manager.dict() - #message_queue = multiprocessing.Queue(maxsize=3) - - #global save_last_messages - #only listen if current leader - - ###################NameError: name 'leader_election_done' is not defined ############ - #global leader_election_done - - #leader_election_done.wait() # blocks until the leader election is done + received_message_uuid = set() + MAX_MESSAGES = 3 + message_queue = deque(maxlen=MAX_MESSAGES) print("current leader for listening is:", shared_data['current_leader']) #debug @@ -166,11 +144,21 @@ def listen_client(shared_data): # Extract message UUID and check if already processed message_id = decoded_message.split(":")[0] - if message_id in received_messages: + if message_id in received_message_uuid: continue - received_messages.add(message_id) - #broadcast_election(save_last_messages) #send to neighbour + # mark message as proccessed + message_queue.append(message_id) + received_message_uuid.add(message_id) + + # Entferne die älteste Nachricht aus dem Set, wenn die Queue sie entfernt + if len(message_queue) == MAX_MESSAGES: + removed_id = message_queue.popleft() + received_message_uuid.remove(removed_id) + + print("erhaltene msg uuid:", received_message_uuid) + + #broadcast_election(sreceived_message_uuid) #send to neighbour # checks wheter the message comes from the server itself if decoded_message.startswith(f"{shared_data['server_id']}"): @@ -223,7 +211,7 @@ def monitor_heartbeats(): print(f"Active servers: {list(active_servers.keys())}") -# Main function +#**************************************Main function************************************************* if __name__ == "__main__": multiprocessing.freeze_support() diff --git a/things.py b/things.py index 4c81a6edab2828a5f458b170dcb908fedb33200b..05bea2aad9cdcfcbb01cd9bfd6ea508035d04e32 100644 --- a/things.py +++ b/things.py @@ -36,7 +36,7 @@ # message_uuid = decoded_message.split(": ")[0] #print("message id:", message_uuid) #debug - + # Enqueue messages and maintain a fixed size """if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)): if message_queue.full(): @@ -57,3 +57,12 @@ #save_last_messages.update(f"last message: {message}: {message_uuid}") print("here are your last messages:", dict(save_last_messages)) #debug """ + + + # Handle heartbeat messages + #if "HEARTBEAT" in decoded_sender_message: + # shared_data['sender_uuid'] = decoded_sender_message.split(": ")[1] + #print("sender_uuid", sender_uuid) #debug + # active_servers[shared_data['sender_uuid']] = time.time() #update active server dictionary + #send to neighbour###################### + # continue \ No newline at end of file