From d6f49be79f0fddb729043f6af093e7e6901f25eb Mon Sep 17 00:00:00 2001 From: Katharina <katharina.willig@outlook.com> Date: Sun, 24 Nov 2024 17:53:44 +0100 Subject: [PATCH] add request leader --- kclient.py | 64 ++++++++++++++--- kserver.py | 66 +++++++++-------- leader_heartbeat.py | 168 ++++++++++++++++++++++++++++++++++++++++++++ server.py | 167 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 426 insertions(+), 39 deletions(-) create mode 100644 leader_heartbeat.py create mode 100644 server.py diff --git a/kclient.py b/kclient.py index cd9f020..5c8831d 100644 --- a/kclient.py +++ b/kclient.py @@ -1,5 +1,6 @@ import socket import threading +import time broadcast_ip = '255.255.255.255'#change ip??? #hard coded? broadcast_port = 55555 @@ -9,40 +10,80 @@ MY_HOST = socket.gethostname() #socket.gethostbyname(socket.gethostname()) #getip MY_IP = socket.gethostbyname(MY_HOST) #print(f"host:{MY_HOST} and ip: {MY_IP}") - + +print(f"Listening for leader response on {broadcast_ip}:{broadcast_port}...") +print(MY_HOST,MY_IP) + # create client-socket for broadcast client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) client_socket.bind(('', broadcast_port)) # listen on broadcast-port -#nickname = input("Enter your nickname: ") +# global var for leader address +leader_address = None def listen(): """receives messages from server and clients.""" + global leader_address while True: try: data, address = client_socket.recvfrom(4096) decoded_message = data.decode() # Ignoriere Nachrichten mit Server-Kennung - if decoded_message.startswith("["): - continue - #ignores broadcast messages with own ip - if address[0]==MY_IP: - continue + #if decoded_message.startswith("["): + # continue + + #ignores broadcast messages with own ip #################please enable it after testing!!!!!!!!!!!!!!!!!!!S + #if address[0]==MY_IP: + # continue +#####identifyLEADER############ + # Erkennen von LEADER-Nachrichten + if "LEADER:" in decoded_message: + leader_uuid = decoded_message.split(": ")[1] + print(f"Leader discovered: {leader_uuid} at {address[0]}:{address[1]}") + leader_address = (address[0], broadcast_port) # IP-Adresse des Leaders speichern + continue +################################# + print(decoded_message) except socket.error as e: print(f"An error occurred: {e}") break +############requestleader############ +def request_leader(): + """Sendet eine Anfrage, um den aktuellen Leader zu ermitteln.""" + global leader_address + print("Requesting current leader...") + client_socket.sendto("REQUEST_LEAD".encode(), (broadcast_ip, broadcast_port)) + + timeout = time.time() + 5 # 10 Sekunden auf Antwort warten + while time.time() < timeout: + if leader_address: + return + time.sleep(0.5) # Warten, bis eine Antwort vom Leader eintrifft + #print("No leader found. Unable to send messages.") +#################################### + def sender(): """Ermöglicht dem Benutzer, Nachrichten zu schreiben und zu senden.""" + global leader_address + nickname = input("Enter your nickname: ") + print("Requesting leader before starting...") + request_leader() # Versuche, den Leader zu finden + #########print(leader_address)###########debug: answer: None + if not leader_address: + print("No leader found. Exiting...") + return + just_nickname= f"{nickname} entered the chat".encode() - client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) + #client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) + client_socket.sendto(just_nickname, leader_address) while True: #allows the client to send any message @@ -51,8 +92,11 @@ def sender(): #checks for whitemarks if message.strip(): full_message = f"{nickname}: {message}".encode() - - client_socket.sendto(full_message, (broadcast_ip, broadcast_port)) +#(broadcast_ip, broadcast_port) #for sendto + if leader_address: + client_socket.sendto(full_message, leader_address) + else: + print("No leader available. Unable to send message.") # starts listen-thread listen_thread = threading.Thread(target=listen) diff --git a/kserver.py b/kserver.py index bc0389f..3d9f160 100644 --- a/kserver.py +++ b/kserver.py @@ -3,25 +3,25 @@ import multiprocessing import uuid import time import threading +import os + +#current_method=multiprocessing.set_start_method('spawn', force=True) #result=none (why??) +#print(f"Multiprocessing start method: {current_method}") + +print(f"Script started with PID: {os.getpid()}") #is starting 4 times +server_id=str(uuid.uuid4()) broadcast_ip = '255.255.255.255' broadcast_port = 55555 -# creates unique server-ID -server_id = str(uuid.uuid4()) - # Socket for broadcast server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('', broadcast_port)) -print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...") - -# global list for known server-IDs (which does not work atm) +##here active_servers = {} -current_leader = None - # broadcast-function def broadcast(message): @@ -33,6 +33,7 @@ def broadcast(message): def listen(queue): """Receives messages from other processes and forwards them/puts them in queue""" global active_servers + #server_id = shared_data["server_id"] while True: try: message, client_address = server_socket.recvfrom(4096) @@ -42,6 +43,11 @@ def listen(queue): if decoded_message.startswith(f"[{server_id}]"): continue + if "REQUEST_LEADER" in decoded_message: + if current_leader: + response = f"LEADER: {current_leader}".encode() + server_socket.sendto(response, client_address) + print(f"Received from {client_address}: {decoded_message}") # handle heartbeat messages @@ -61,6 +67,7 @@ def listen(queue): def start_election(queue): """Starts leader election based on received messages.""" global current_leader + #server_id = shared_data["server_id"] print("Starting election...") broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function @@ -116,15 +123,8 @@ def start_election(queue): else: print(f"Leader election finished, leader is {highest_id}") -# send message to the current leader -def send_message_to_leader(message): - """Sends a message directly to the current leader.""" - global current_leader - if current_leader is None: - print("No leader available to send the message.") - return - - broadcast(f"TO_LEAD: {message}") +############broadcastLEADER##################### +#fct ################### Heartbeat-function not working########################### def send_heartbeat(): @@ -144,34 +144,42 @@ def monitor_heartbeats(): print(f"Active servers: {list(active_servers.keys())}") ################################################################################ -def test_send_to_leader(): - """Tests sending a message to the current leader.""" - time.sleep(15) # Wait for leader election to complete - send_message_to_leader("This is a message for the leader.") - # Main if __name__ == "__main__": + + # current_process = psutil.Process() + # print(f"DEBUG: Number of child processes: {len(current_process.children())}") + #manager = Manager() + #shared_data = manager.dict() # Gemeinsame Datenstruktur + #shared_data["server_id"] = str(uuid.uuid4()) # Nur im Hauptprozess erzeugen + + multiprocessing.freeze_support() # important for Windows, to not start the main process three times # queue for communication between processes + +#################### + print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...") + + # global list for known server-IDs (which does not work atm) + + current_leader = None +################### + message_queue = multiprocessing.Queue() # create processes - listener_process = multiprocessing.Process(target=listen, args=(message_queue,)) - #time.sleep(10) - election_process = multiprocessing.Process(target=start_election, args=(message_queue,)) + listener_process = multiprocessing.Process(target=listen, args=(message_queue, )) + election_process = multiprocessing.Process(target=start_election, args=(message_queue, )) # heartbeat in seperate thread########### heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True) heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True) - # test sending to leader in a separate thread - test_thread = threading.Thread(target=test_send_to_leader, daemon=True) - # start processes listener_process.start() election_process.start() heartbeat_thread.start() heartbeat_monitor_thread.start() - test_thread.start() + #test_thread.start() try: # main process is waiting for the sub processes to finish diff --git a/leader_heartbeat.py b/leader_heartbeat.py new file mode 100644 index 0000000..b686bf8 --- /dev/null +++ b/leader_heartbeat.py @@ -0,0 +1,168 @@ +import socket +import multiprocessing +import uuid +import time +import threading + +broadcast_ip = '255.255.255.255' +broadcast_port = 55555 + +# creates unique server-ID +server_id = str(uuid.uuid4()) + +# Socket for broadcast +server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) +server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +server_socket.bind(('', broadcast_port)) + +print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...") + +# global list for known server-IDs (which does not work atm) +active_servers = {} +current_leader = None + + +# broadcast-function +def broadcast(message): + """Sendet Nachrichten an alle Server-IDs im Netzwerk""" + full_message = f"[{server_id}] {message}".encode() + server_socket.sendto(full_message, (broadcast_ip, broadcast_port)) + +# listener-function +def listen(queue): + """Receives messages from other processes and forwards them/puts them in queue""" + global active_servers + while True: + try: + message, client_address = server_socket.recvfrom(4096) + decoded_message = message.decode() + + # ignore messages from own server-ID/ also running for clients from this ip + if decoded_message.startswith(f"[{server_id}]"): + continue + + print(f"Received from {client_address}: {decoded_message}") + + # handle heartbeat messages + if "HEARTBEAT" in decoded_message: + sender_uuid = decoded_message.split(": ")[1] + active_servers[sender_uuid] = time.time() + continue + + # put message in queue, that broadcast process can react + queue.put(decoded_message) + + except socket.error as e: + print(f"Socket error occurred: {e}") + break + +# leader election function +def start_election(queue): + """Starts leader election based on received messages.""" + global current_leader + print("Starting election...") + broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function + + timeout = time.time() + 20 # wait 20 secs for answers + highest_id = server_id + + while time.time() < timeout: + # wait for messages from queue + try: + message = queue.get(timeout=1) + + # processing messages + if "START_ELECTION" in message: + sender_uuid = message.split(": ")[1] + #print("extracted uuid?:", sender_uuid) + active_servers[sender_uuid] = time.time() + print(f"Received UUID for election: {sender_uuid}") + + # compare UUIDs for leader election + if sender_uuid > highest_id: + highest_id=sender_uuid + print(f"Received higher ID {sender_uuid}, forwarding...") + broadcast(f"START_ELECTION: {sender_uuid}") + + #####does this work????######## + elif sender_uuid < server_id: + highest_id=server_id + print(f"Received lower ID {sender_uuid}, sending own ID...") + broadcast(f"START_ELECTION: {server_id}") + else: + # you are the leader: + current_leader = server_id + broadcast(f"LEADER: {server_id}") + print(f"I am the leader: {server_id}") + + elif "LEADER" in message: + # leader was elected + leader_uuid = message.split(": ")[1] + current_leader = leader_uuid + print(f"Leader elected: {current_leader}") + broadcast(f"current leader is: {current_leader}") + return + + #continue when queue is empty + except multiprocessing.queues.Empty: + continue + + # after timeout: own server becomes leader, if no other has been chosen/higher ID + if highest_id == server_id: + current_leader = server_id + broadcast(f"LEADER {server_id}") + print(f"I am the leader: {server_id}") + else: + print(f"Leader election finished, leader is {highest_id}") + +############broadcastLEADER##################### +#fct + +################### Heartbeat-function not working########################### +def send_heartbeat(): + """Sends heartbeat messages regularly to keep the server ID active""" + while True: + broadcast(f"HEARTBEAT: {server_id}") + time.sleep(5) # sends every 3 seconds + +#monitor heartbeats +def monitor_heartbeats(): + """Checks active servers based on heartbeat messages""" + global active_servers + while True: + time.sleep(6) + now = time.time() + active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6} + print(f"Active servers: {list(active_servers.keys())}") +################################################################################ + +# Main +if __name__ == "__main__": + # queue for communication between processes + message_queue = multiprocessing.Queue() + + # create processes + listener_process = multiprocessing.Process(target=listen, args=(message_queue,)) + #time.sleep(10) + election_process = multiprocessing.Process(target=start_election, args=(message_queue,)) + + # heartbeat in seperate thread########### + heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True) + heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True) + + # start processes + listener_process.start() + election_process.start() + heartbeat_thread.start() + heartbeat_monitor_thread.start() + #test_thread.start() + + try: + # main process is waiting for the sub processes to finish + listener_process.join() + election_process.join() + except KeyboardInterrupt: + print("\nShutting down server...") + listener_process.terminate() + election_process.terminate() diff --git a/server.py b/server.py new file mode 100644 index 0000000..8e45dd7 --- /dev/null +++ b/server.py @@ -0,0 +1,167 @@ +import socket +import multiprocessing +import uuid +import time +import threading +import os +from multiprocessing import Manager + +broadcast_ip = '255.255.255.255' +broadcast_port = 55555 + +# Socket for broadcast +server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) +server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +server_socket.bind(('', broadcast_port)) + +# Global list for active servers +active_servers = {} + +# Broadcast function +def broadcast(message): + """Sends messages to all server IDs in the network.""" + full_message = f"{message}".encode() + server_socket.sendto(full_message, (broadcast_ip, broadcast_port)) + + +# Listener function +def listen(queue, shared_data): + """Receives messages and updates shared state.""" + global active_servers + while True: + try: + message, client_address = server_socket.recvfrom(4096) + decoded_message = message.decode() + + # Ignore messages from this server + if decoded_message.startswith(f"[{shared_data['server_id']}]"): + continue + + print(f"Received from {client_address}: {decoded_message}") + + # Handle heartbeat messages + if "HEARTBEAT" in decoded_message: + sender_uuid = decoded_message.split(": ")[1] + active_servers[sender_uuid] = time.time() + continue + + if "REQUEST_LEAD" in decoded_message: + if shared_data['current_leader']: + while True: + response = f"LEADER: {shared_data['current_leader']}".encode() + time.sleep(2) #wait before sending response that client can prepare for answer + server_socket.sendto(response, (broadcast_ip, broadcast_port)) + #broadcast(response, client_address) + print(f"Sent leader information to {client_address}: {response.decode()}") #for debug + else: + print("No leader set, unable to respond to REQUEST_LEAD") + #continue + + + # Put message in the queue for election process + queue.put(decoded_message) + + except socket.error as e: + print(f"Socket error occurred: {e}") + break + + +# Leader election function +def start_election(queue, shared_data): + """Starts leader election based on received messages.""" + #global active_servers + #current_leader = None + print("Starting election...") + broadcast(f"START_ELECTION: {shared_data['server_id']}") + + timeout = time.time() + 20 # 20-second timeout + highest_id = shared_data['server_id'] + + while time.time() < timeout: + try: + message = queue.get(timeout=1) + if "START_ELECTION" in message: + sender_uuid = message.split(": ")[1] + active_servers[sender_uuid] = time.time() + print(f"Received UUID for election: {sender_uuid}") + + if sender_uuid > highest_id: + highest_id = sender_uuid + print(f"Received higher ID {sender_uuid}, forwarding...") + broadcast(f"START_ELECTION: {sender_uuid}") + + elif "LEADER" in message: + leader_uuid = message.split(": ")[1] + shared_data['current_leader'] = leader_uuid + print(f"Leader elected: {shared_data['current_leader']}") + return + + except multiprocessing.queues.Empty: + continue + + if highest_id == shared_data['server_id']: + shared_data['current_leader'] = shared_data['server_id'] + broadcast(f"LEADER: {shared_data['current_leader']}") + print(f"I am the leader: {shared_data['current_leader']}") + else: + print(f"Leader election finished, leader is {highest_id}") + + +# Heartbeat function +def send_heartbeat(shared_data): + """Sends heartbeat messages to keep the server active.""" + time.sleep(30) #waiting for leader election #set to 20s + while True: + broadcast(f"HEARTBEAT: {shared_data['server_id']}") + time.sleep(5) + + +# Monitor heartbeats +def monitor_heartbeats(): + """Checks active servers based on heartbeat messages.""" + global active_servers + while True: + time.sleep(6) + now = time.time() + active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6} + print(f"Active servers: {list(active_servers.keys())}") + + +# Main function +if __name__ == "__main__": + multiprocessing.freeze_support() + + print(f"Script started with PID: {os.getpid()}") + + # Manager for shared data + manager = Manager() + shared_data = manager.dict() + shared_data['server_id'] = str(uuid.uuid4()) # Generate server ID once + shared_data['current_leader'] = None + + print(f"Server is running with ID {shared_data['server_id']} and broadcasting on port {broadcast_port}...") + + message_queue = multiprocessing.Queue() + + # Create processes + listener_process = multiprocessing.Process(target=listen, args=(message_queue, shared_data)) + election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data)) + + # Heartbeat threads + heartbeat_thread = threading.Thread(target=send_heartbeat, args=(shared_data,), daemon=True) + heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True) + + # Start processes and threads + listener_process.start() + election_process.start() + heartbeat_thread.start() + heartbeat_monitor_thread.start() + + try: + listener_process.join() + election_process.join() + except KeyboardInterrupt: + print("\nShutting down server...") + listener_process.terminate() + election_process.terminate() -- GitLab