diff --git a/kserver.py b/kserver.py index f8633cb22b3dd83af9646280fdab64db7666351a..0c24716c45c09c0bb726ec5badede8d640e4e17b 100644 --- a/kserver.py +++ b/kserver.py @@ -1,6 +1,8 @@ import socket import multiprocessing import uuid +import time +import threading broadcast_ip = '255.255.255.255' broadcast_port = 55555 @@ -16,6 +18,11 @@ server_socket.bind(('', broadcast_port)) print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...") +# Globale Liste für bekannte Server-IDs +active_servers = set() +current_leader = None + + # Broadcast-Funktion def broadcast(message): """Sendet Nachrichten an alle Server-IDs im Netzwerk""" @@ -30,7 +37,7 @@ def listen(queue): message, client_address = server_socket.recvfrom(4096) decoded_message = message.decode() - # Ignoriere Nachrichten vom eigenen Server + # Ignoriere Nachrichten vom eigenen Server/ also running for clients from this ip if decoded_message.startswith(f"[{server_id}]"): continue @@ -46,40 +53,74 @@ def listen(queue): # Leader Election Funktion def start_election(queue): """Startet die Leader Election basierend auf empfangenen Nachrichten.""" + global current_leader print("Starting election...") - broadcast(f"START_ELECTION: {server_id}") + broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function - current_leader = None - while True: + #current_leader = None + timeout = time.time() + 15 # 5 Sekunden auf Antworten warten + highest_id = server_id + while time.time() < timeout: # Auf Nachrichten aus der Queue warten try: - message = queue.get() + message = queue.get(timeout=1) # Nachricht verarbeiten if "START_ELECTION" in message: sender_uuid = message.split(": ")[1] print("extracted uuid?:", sender_uuid) + active_servers.add(sender_uuid) + print(f"Received UUID for election: {sender_uuid}") # Vergleich der UUIDs für Leader Election - if sender_uuid > server_id: + if sender_uuid > highest_id: + highest_id=sender_uuid print(f"Received higher ID {sender_uuid}, forwarding...") - broadcast(f"START_ELECTION {sender_uuid}") + broadcast(f"START_ELECTION: {sender_uuid}") + elif sender_uuid < server_id: + highest_id=server_id print(f"Received lower ID {sender_uuid}, sending own ID...") - broadcast(f"START_ELECTION {server_id}") + broadcast(f"START_ELECTION: {server_id}") else: # Wir sind der Leader current_leader = server_id - broadcast(f"LEADER {server_id}") + broadcast(f"LEADER: {server_id}") print(f"I am the leader: {server_id}") + elif "LEADER" in message: # Leader wurde gewählt - current_leader = message.split(" ")[1] + leader_uuid = message.split(": ")[1] + current_leader = leader_uuid print(f"Leader elected: {current_leader}") + return + + except multiprocessing.queues.Empty: + continue + + # Nach Timeout: Eigener Server wird Leader, falls niemand anderes gewählt wurde + if highest_id == server_id: + current_leader = server_id + broadcast(f"LEADER {server_id}") + print(f"I am the leader: {server_id}") + else: + print(f"Leader election finished, leader is {highest_id}") + +################### Heartbeat-Funktion +def send_heartbeat(): + """Sendet regelmäßig Heartbeat-Nachrichten, um die Server-ID aktiv zu halten.""" + while True: + broadcast(f"HEARTBEAT: {server_id}") + time.sleep(3) # Alle 3 Sekunden - except Exception as e: - print(f"Error during election: {e}") - break +def monitor_heartbeats(): + """Überprüft die aktiven Server anhand von Heartbeat-Nachrichten.""" + global active_servers + while True: + time.sleep(6) # Alle 6 Sekunden überprüfen + now = time.time() + active_servers = {server for server, last_seen in active_servers.items() if now - last_seen < 6} + print(f"Active servers: {active_servers}") # Main if __name__ == "__main__": @@ -88,8 +129,12 @@ if __name__ == "__main__": # Prozesse erstellen listener_process = multiprocessing.Process(target=listen, args=(message_queue,)) + #time.sleep(10) election_process = multiprocessing.Process(target=start_election, args=(message_queue,)) + # Heartbeat in einem separaten Thread + heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True) + # Prozesse starten listener_process.start() election_process.start()