diff --git a/kserver.py b/kserver.py index b1ccf68dd4d99ce09efb983be4abf257ca30ba3e..bc0389f9b86a78bc4e07b3df3d19506223e726ad 100644 --- a/kserver.py +++ b/kserver.py @@ -19,7 +19,7 @@ server_socket.bind(('', broadcast_port)) print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...") # global list for known server-IDs (which does not work atm) -active_servers = set() +active_servers = {} current_leader = None @@ -31,7 +31,8 @@ def broadcast(message): # listener-function def listen(queue): - """Empfängt Nachrichten von anderen Prozessen und leitet sie weiter.""" + """Receives messages from other processes and forwards them/puts them in queue""" + global active_servers while True: try: message, client_address = server_socket.recvfrom(4096) @@ -43,6 +44,12 @@ def listen(queue): print(f"Received from {client_address}: {decoded_message}") + # handle heartbeat messages + if "HEARTBEAT" in decoded_message: + sender_uuid = decoded_message.split(": ")[1] + active_servers[sender_uuid] = time.time() + continue + # put message in queue, that broadcast process can react queue.put(decoded_message) @@ -52,13 +59,14 @@ def listen(queue): # leader election function def start_election(queue): - """Startet die Leader Election basierend auf empfangenen Nachrichten.""" + """Starts leader election based on received messages.""" global current_leader print("Starting election...") broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function timeout = time.time() + 20 # wait 20 secs for answers highest_id = server_id + while time.time() < timeout: # wait for messages from queue try: @@ -68,7 +76,7 @@ def start_election(queue): if "START_ELECTION" in message: sender_uuid = message.split(": ")[1] #print("extracted uuid?:", sender_uuid) - active_servers.add(sender_uuid) + active_servers[sender_uuid] = time.time() print(f"Received UUID for election: {sender_uuid}") # compare UUIDs for leader election @@ -77,6 +85,7 @@ def start_election(queue): print(f"Received higher ID {sender_uuid}, forwarding...") broadcast(f"START_ELECTION: {sender_uuid}") + #####does this work????######## elif sender_uuid < server_id: highest_id=server_id print(f"Received lower ID {sender_uuid}, sending own ID...") @@ -95,10 +104,11 @@ def start_election(queue): broadcast(f"current leader is: {current_leader}") return + #continue when queue is empty except multiprocessing.queues.Empty: continue - # after timeout: own server becomes leader, if no other has been chosen + # after timeout: own server becomes leader, if no other has been chosen/higher ID if highest_id == server_id: current_leader = server_id broadcast(f"LEADER {server_id}") @@ -106,23 +116,39 @@ def start_election(queue): else: print(f"Leader election finished, leader is {highest_id}") +# send message to the current leader +def send_message_to_leader(message): + """Sends a message directly to the current leader.""" + global current_leader + if current_leader is None: + print("No leader available to send the message.") + return + + broadcast(f"TO_LEAD: {message}") + ################### Heartbeat-function not working########################### def send_heartbeat(): - """Sendet regelmäßig Heartbeat-Nachrichten, um die Server-ID aktiv zu halten.""" + """Sends heartbeat messages regularly to keep the server ID active""" while True: broadcast(f"HEARTBEAT: {server_id}") - time.sleep(3) # sends every 3 seconds + time.sleep(5) # sends every 3 seconds +#monitor heartbeats def monitor_heartbeats(): - """Überprüft die aktiven Server anhand von Heartbeat-Nachrichten.""" + """Checks active servers based on heartbeat messages""" global active_servers while True: - time.sleep(6) # checks every 6 seconds + time.sleep(6) now = time.time() - active_servers = {server for server, last_seen in active_servers.items() if now - last_seen < 6} - print(f"Active servers: {active_servers}") + active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6} + print(f"Active servers: {list(active_servers.keys())}") ################################################################################ +def test_send_to_leader(): + """Tests sending a message to the current leader.""" + time.sleep(15) # Wait for leader election to complete + send_message_to_leader("This is a message for the leader.") + # Main if __name__ == "__main__": # queue for communication between processes @@ -135,10 +161,17 @@ if __name__ == "__main__": # heartbeat in seperate thread########### heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True) + heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True) + + # test sending to leader in a separate thread + test_thread = threading.Thread(target=test_send_to_leader, daemon=True) # start processes listener_process.start() election_process.start() + heartbeat_thread.start() + heartbeat_monitor_thread.start() + test_thread.start() try: # main process is waiting for the sub processes to finish