diff --git a/kclient.py b/kclient.py index 9bfaf0c3b796cae57df99479023f14c1e9137094..b906d6c0ed55f42070e149794ebc5cc0c8d4433f 100644 --- a/kclient.py +++ b/kclient.py @@ -1,10 +1,9 @@ -#########problem: server is sticking to a process or something and cant process client messages :( import socket import threading import time broadcast_ip = '255.255.255.255'#change ip??? #hard coded? -broadcast_port = 55555 +broadcast_port = 55559 #local host information MY_HOST = socket.gethostname() @@ -12,48 +11,29 @@ MY_HOST = socket.gethostname() MY_IP = socket.gethostbyname(MY_HOST) #print(f"host:{MY_HOST} and ip: {MY_IP}") -#print(f"Listening for leader response on {broadcast_ip}:{broadcast_port}...") #debug -#print(MY_HOST,MY_IP) #debug - -#stop_listening = False # control for listener - # create client-socket for broadcast client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) client_socket.bind(('', broadcast_port)) # listen on broadcast #socket is bind to ALL available IP addresses -# global var for leader address -leader_address = None listener_ready = threading.Event() +#listen for server? def listen(): - """receives messages from server and clients.""" - global leader_address - listener_ready.set() #makes sure that listener is ready - + """receives messages from server.""" + #listener_ready.set() #makes sure that listener is ready while True: try: data, address = client_socket.recvfrom(4096) decoded_message = data.decode() - print(f"Received {data.decode()} from {address}") + #print(f"Received {data.decode()} from {address}") #debug #ignores broadcast messages with own ip #################please enable it after testing!!!!!!!!!!!!!!!!!!!S + #does not work #if address[0]==MY_IP: - # continue - -#####identifyLEADER############ - # Erkennen von LEADER-Nachrichten - if "LEADER:" in decoded_message: - leader_uuid = decoded_message.split(": ")[1] - print(f"Leader discovered: {leader_uuid} at {address[0]}:{address[1]}") - leader_address = (address[0], broadcast_port) # IP-Adresse des Leaders speichern - print(leader_address) #debug - continue -################################# - if "HEARTBEAT:" in decoded_message: - continue #bringt nix... + # continue - print(decoded_message) + print("this decoded msg", decoded_message) except socket.error as e: print(f"An error occurred: {e}") @@ -62,18 +42,10 @@ def listen(): def sender(): """Ermöglicht dem Benutzer, Nachrichten zu schreiben und zu senden.""" - global leader_address - - #print("Requesting leader before starting...") - #request_leader() # Versuche, den Leader zu finden - #########print(leader_address)###########debug: answer: None - if not leader_address: - print("No leader found. Exiting...") - return nickname = input("Enter your nickname: ") just_nickname= f"{nickname} entered the chat".encode() #client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) - client_socket.sendto(just_nickname, (leader_address)) + client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) #print("is it leader adresse here", leader_address) while True: #allows the client to send any message @@ -82,43 +54,7 @@ def sender(): #checks for whitemarks if message.strip(): full_message = f"{nickname}: {message}".encode() -#(broadcast_ip, broadcast_port) #for sendto - if leader_address: - client_socket.sendto(full_message, leader_address) - else: - print("No leader available. Unable to send message.") - - -############requestleader############ -def request_leader_tcp(): - """Sendet eine Anfrage, um den aktuellen Leader zu ermitteln, und wartet auf eine TCP-Antwort.""" - global leader_address - - print("Requesting current leader via broadcast...") - client_socket.sendto("REQUEST_LEAD".encode(), (broadcast_ip, broadcast_port)) - - # Warte auf eine TCP-Verbindung vom Server - tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - tcp_client_socket.settimeout(10) # Timeout für die Verbindung - try: - tcp_client_socket.bind((MY_IP, 55556)) # Lauschen auf eingehende Verbindungen - tcp_client_socket.listen(1) - - print("Waiting for leader response via TCP...") - conn, addr = tcp_client_socket.accept() # Verbindung akzeptieren - response = conn.recv(1024).decode() - conn.close() - - if "LEADER:" in response: - leader_address = response.split(": ")[1] - print(f"Leader address received via TCP: {leader_address}") - else: - print("No leader information received.") - except socket.timeout: - print("TCP leader response timeout.") - finally: - tcp_client_socket.close() -###############endrequestleader##################### + client_socket.sendto(full_message, (broadcast_ip, broadcast_port)) ###############main################################# if __name__ == "__main__": @@ -126,15 +62,8 @@ if __name__ == "__main__": listen_thread = threading.Thread(target=listen) listen_thread.daemon = True listen_thread.start() - listener_ready.wait() - - # Request leader via TCP - request_leader_tcp() + #listener_ready.wait() - if leader_address: - print(f"Leader identified: {leader_address}") - sender_thread = threading.Thread(target=sender) - sender_thread.start() - else: - print("No leader found. Exiting...") + sender_thread = threading.Thread(target=sender) + sender_thread.start() diff --git a/server.py b/server.py index 60365434a9c7d4311a32d627e33a7afc71203a89..eaee4a4b84201699664e46416286f2f8e41e0a8d 100644 --- a/server.py +++ b/server.py @@ -13,6 +13,7 @@ from multiprocessing import Manager broadcast_ip = '255.255.255.255' broadcast_port = 55555 +election_port= 55559 # Socket for broadcast server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -20,6 +21,12 @@ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('', broadcast_port)) +# Socket for broadcast +election_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) +election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +election_socket.bind(('', election_port)) + # Global list for active servers active_servers = {} @@ -30,15 +37,22 @@ def broadcast(message): #full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message server_socket.sendto(full_message, (broadcast_ip, broadcast_port)) +# Broadcast function +def broadcast_election(message): + """Sends messages to all server IDs in the network.""" + full_message = f"{message}".encode() + #full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message + election_socket.sendto(full_message, (broadcast_ip, election_port)) # Listener function -def listen_broadcast(queue, shared_data): +def listen_broadcast_election(queue, shared_data): """Receives messages and updates shared state.""" print(f"Listener from {shared_data['server_id']} is listening...") global active_servers + global sender_uuid while True: try: - message, client_address = server_socket.recvfrom(4096) + message, client_address = election_socket.recvfrom(4096) decoded_message = message.decode() print(f"Received from {client_address}: {decoded_message}") @@ -46,26 +60,9 @@ def listen_broadcast(queue, shared_data): # Handle heartbeat messages if "HEARTBEAT" in decoded_message: sender_uuid = decoded_message.split(": ")[1] - active_servers[sender_uuid] = time.time() - continue - - # Respond to REQUEST_LEAD over TCP - if "REQUEST_LEAD" in decoded_message: - if shared_data['current_leader']: - response = f"LEADER: {shared_data['current_leader']}".encode() - print(f"Preparing to send leader information to {client_address} via TCP") - - # Open a TCP connection to the client and send the response - try: - tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - tcp_client_socket.connect((client_address[0], 55556)) - tcp_client_socket.send(response) - tcp_client_socket.close() - print(f"Leader information sent to {client_address} via TCP") - except socket.error as e: - print(f"Failed to send leader info via TCP to {client_address}: {e}") - else: - print("No leader set, unable to respond to REQUEST_LEAD") + #print("sender_uuid", sender_uuid) #debug + active_servers[sender_uuid] = time.time() #update active server dictionary + #send to neighbour continue # Put message in the queue for election process @@ -75,14 +72,46 @@ def listen_broadcast(queue, shared_data): print(f"Socket error occurred: {e}") break +#listener function +##################TypeError: listen_client() takes 0 positional arguments but 2 were given############ +def listen_client(): + """receives messages from clients and broadcast them.""" + while True: + try: + message, client_address = server_socket.recvfrom(4096) + decoded_message = message.decode() + + # checks wheter the message comes from the server itself + if decoded_message.startswith(f"{shared_data['server_id']}"): + continue # if...ignores it + + #checks if the message containts the string "entered" + if decoded_message.__contains__("entered"): + print(f"{client_address} entered the chat.") + + print(f"Received from {client_address}: {decoded_message}") + + broadcast(decoded_message) + + #exceptions + except socket.error as e: + print(f"An error occurred: {e}") + break + + #does not work + except KeyboardInterrupt: + print("\nServer wird beendet...") + break + # Leader election function def start_election(queue, shared_data): """Starts leader election based on received messages.""" global active_servers + global sender_uuid #current_leader = None print("Starting election...") - broadcast(f"START_ELECTION: {shared_data['server_id']}") + broadcast_election(f"START_ELECTION: {shared_data['server_id']}") timeout = time.time() + 20 # 20-second timeout, waiting for other servers highest_id = shared_data['server_id'] @@ -91,8 +120,8 @@ def start_election(queue, shared_data): while time.time() < timeout: try: message = queue.get(timeout=1) #waits 1 sec for a message - #if shared_data['server_id']==sender_uuid: - # continue + if shared_data['server_id']==sender_uuid: + continue if "START_ELECTION" in message: sender_uuid = message.split(": ")[1] active_servers[sender_uuid] = time.time() @@ -102,17 +131,17 @@ def start_election(queue, shared_data): if sender_uuid > highest_id: highest_id = sender_uuid print(f"Received higher ID {sender_uuid}, forwarding...") - broadcast(f"START_ELECTION: {sender_uuid}") + broadcast_election(f"START_ELECTION: {sender_uuid}") elif sender_uuid == highest_id: shared_data['current_leader'] = sender_uuid print(f"(sender) Leader elected: {shared_data['current_leader']}") - broadcast(f"LEADER: {shared_data['current_leader']}") + broadcast_election(f"LEADER: {shared_data['current_leader']}") else: shared_data['current_leader'] = shared_data['server_id'] print(f"(official, its me) Leader elected: {shared_data['current_leader']}") - broadcast(f"LEADER: {shared_data['current_leader']}") + broadcast_election(f"LEADER: {shared_data['current_leader']}") elif "LEADER" in message: leader_uuid = message.split(": ")[1] @@ -131,20 +160,20 @@ def start_election(queue, shared_data): # print(f"Leader election finished, leader is {highest_id}") #################tcpclient################################# -def handle_tcp_client(conn, addr): +#def handle_tcp_client(conn, addr): """Sendet die Leader-Information über eine TCP-Verbindung.""" - print(f"TCP connection established with {addr}") - if shared_data['current_leader']: - conn.send(f"LEADER: {shared_data['current_leader']}".encode()) - else: - conn.send("NO_LEADER".encode()) - conn.close() - -def tcp_listener(): +# print(f"TCP connection established with {addr}") +# if shared_data['current_leader']: +# conn.send(f"LEADER: {shared_data['current_leader']}".encode()) +# else: +# conn.send("NO_LEADER".encode()) +# conn.close() + +#def tcp_listener(): """Wartet auf TCP-Verbindungen und bearbeitet sie.""" - while True: - conn, addr = tcp_server_socket.accept() - threading.Thread(target=handle_tcp_client, args=(conn, addr)).start() +# while True: +# conn, addr = tcp_server_socket.accept() +# threading.Thread(target=handle_tcp_client, args=(conn, addr)).start() ######################endtcp################################################# @@ -153,7 +182,7 @@ def send_heartbeat(shared_data): """Sends heartbeat messages to keep the server active.""" time.sleep(30) #waiting for leader election #set to 20s while True: - broadcast(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election + broadcast_election(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election time.sleep(5) # Monitor heartbeats @@ -184,7 +213,8 @@ if __name__ == "__main__": message_queue = multiprocessing.Queue() # Create processes - listener_process = multiprocessing.Process(target=listen_broadcast, args=(message_queue, shared_data)) + listener_election_process = multiprocessing.Process(target=listen_broadcast_election, args=(message_queue, shared_data)) + listener_client_process = multiprocessing.Process(target=listen_client, ) election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data)) # Heartbeat threads @@ -192,15 +222,22 @@ if __name__ == "__main__": heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True) # Start processes and threads - listener_process.start() + listener_election_process.start() + listener_client_process.start() election_process.start() heartbeat_thread.start() heartbeat_monitor_thread.start() + #heartbeat_thread.join() + #heartbeat_monitor_thread.join() + try: - listener_process.join() + listener_election_process.join() + listener_client_process.join() election_process.join() except KeyboardInterrupt: print("\nShutting down server...") - listener_process.terminate() + listener_election_process.terminate() + listener_client_process.terminate() election_process.terminate() +