From d001505986b15ef93c6c9994b0753ca524e3fbf9 Mon Sep 17 00:00:00 2001 From: Katharina <katharina.willig@outlook.com> Date: Sat, 23 Nov 2024 17:14:38 +0100 Subject: [PATCH] multiprocessing --- kserver.py | 128 +++++++++++++++++++++++++++++------------------------ 1 file changed, 71 insertions(+), 57 deletions(-) diff --git a/kserver.py b/kserver.py index eaecd0a..6c7df80 100644 --- a/kserver.py +++ b/kserver.py @@ -1,14 +1,14 @@ import socket -import threading +import multiprocessing import uuid broadcast_ip = '255.255.255.255' broadcast_port = 55555 -# Create a unique server ID +# Erstelle eine eindeutige Server-ID server_id = str(uuid.uuid4()) -# Create server socket for broadcasting +# Socket für Broadcast erstellen server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -16,74 +16,88 @@ server_socket.bind(('', broadcast_port)) print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...") -# Store the current leader's ID -current_leader = None - -# Broadcast function +# Broadcast-Funktion def broadcast(message): - """Send messages to all other nodes in the network.""" + """Sendet Nachrichten an alle Server-IDs im Netzwerk""" full_message = f"[{server_id}] {message}".encode() server_socket.sendto(full_message, (broadcast_ip, broadcast_port)) -# Listener function -def listen(): - """Receive messages and implement LCR logic.""" - global current_leader - +# Listener-Funktion +def listen(queue): + """Empfängt Nachrichten von anderen Prozessen und leitet sie weiter.""" while True: try: message, client_address = server_socket.recvfrom(4096) decoded_message = message.decode() - # Extract sender UUID and payload - if decoded_message.startswith("[") and "]" in decoded_message: - sender_uuid = decoded_message.split("]")[0][1:] - payload = decoded_message.split("]")[1].strip() - - # Ignore messages from self - if sender_uuid == server_id: - continue - - # LCR logic - if payload == "START_ELECTION": - print(f"Received election start from {sender_uuid}") - if sender_uuid > server_id: - # Forward the sender's ID - broadcast(f"START_ELECTION {sender_uuid}") - elif sender_uuid < server_id: - # Send own ID into the ring - broadcast(f"START_ELECTION {server_id}") - else: - # If sender_uuid == server_id, we are the leader - current_leader = server_id - broadcast(f"LEADER {server_id}") - print(f"I am the leader: {server_id}") - - elif payload.startswith("LEADER"): - leader_id = payload.split(" ")[1] - current_leader = leader_id - print(f"Leader has been elected: {leader_id}") + # Ignoriere Nachrichten vom eigenen Server + if decoded_message.startswith(f"[{server_id}]"): + continue - except socket.error as e: - print(f"An error occurred: {e}") - break + print(f"Received from {client_address}: {decoded_message}") + + # Nachricht in die Queue legen, damit der Broadcast-Prozess darauf reagieren kann + queue.put(decoded_message) - except KeyboardInterrupt: - print("\nShutting down server...") + except socket.error as e: + print(f"Socket error occurred: {e}") break -# Start the election -def start_election(): - """Initiate the LCR election process.""" +# Leader Election Funktion +def start_election(queue): + """Startet die Leader Election basierend auf empfangenen Nachrichten.""" print("Starting election...") broadcast(f"START_ELECTION {server_id}") + current_leader = None + while True: + # Auf Nachrichten aus der Queue warten + try: + message = queue.get() + + # Nachricht verarbeiten + if "START_ELECTION" in message: + sender_uuid = message.split(" ")[1] + + # Vergleich der UUIDs für Leader Election + if sender_uuid > server_id: + print(f"Received higher ID {sender_uuid}, forwarding...") + broadcast(f"START_ELECTION {sender_uuid}") + elif sender_uuid < server_id: + print(f"Received lower ID {sender_uuid}, sending own ID...") + broadcast(f"START_ELECTION {server_id}") + else: + # Wir sind der Leader + current_leader = server_id + broadcast(f"LEADER {server_id}") + print(f"I am the leader: {server_id}") + elif "LEADER" in message: + # Leader wurde gewählt + current_leader = message.split(" ")[1] + print(f"Leader elected: {current_leader}") + + except Exception as e: + print(f"Error during election: {e}") + break + +# Main if __name__ == "__main__": - # Start the listener thread - listen_thread = threading.Thread(target=listen) - listen_thread.start() - - # Start election after a short delay - import time - time.sleep(2) - start_election() + # Queue für die Kommunikation zwischen Prozessen + message_queue = multiprocessing.Queue() + + # Prozesse erstellen + listener_process = multiprocessing.Process(target=listen, args=(message_queue,)) + election_process = multiprocessing.Process(target=start_election, args=(message_queue,)) + + # Prozesse starten + listener_process.start() + election_process.start() + + try: + # Hauptprozess wartet auf Beendigung der Subprozesse + listener_process.join() + election_process.join() + except KeyboardInterrupt: + print("\nShutting down server...") + listener_process.terminate() + election_process.terminate() -- GitLab