diff --git a/kclient.py b/kclient.py index 28dff6fde3d7d60e62106122180002375c698d03..1ee660eb8c4867e1115062f3cc86e4119d29742f 100644 --- a/kclient.py +++ b/kclient.py @@ -1,3 +1,4 @@ +#########problem: server is sticking to a process or something and cant process client messages :( import socket import threading import time @@ -11,8 +12,8 @@ MY_HOST = socket.gethostname() MY_IP = socket.gethostbyname(MY_HOST) #print(f"host:{MY_HOST} and ip: {MY_IP}") -print(f"Listening for leader response on {broadcast_ip}:{broadcast_port}...") -print(MY_HOST,MY_IP) +#print(f"Listening for leader response on {broadcast_ip}:{broadcast_port}...") #debug +#print(MY_HOST,MY_IP) #debug #stop_listening = False # control for listener @@ -20,26 +21,25 @@ print(MY_HOST,MY_IP) client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -client_socket.bind(('', broadcast_port)) # listen on broadcast-port +client_socket.bind(('', broadcast_port)) # listen on broadcast #socket is bind to ALL available IP addresses # global var for leader address leader_address = None +listener_ready = threading.Event() def listen(): """receives messages from server and clients.""" global leader_address + listener_ready.set() #makes sure that listener is ready + while True: try: data, address = client_socket.recvfrom(4096) decoded_message = data.decode() - - # Ignoriere Nachrichten mit Server-Kennung - #if decoded_message.startswith("["): - # continue - + print(f"Received {data.decode()} from {address}") #ignores broadcast messages with own ip #################please enable it after testing!!!!!!!!!!!!!!!!!!!S #if address[0]==MY_IP: - # continue + # continue #####identifyLEADER############ # Erkennen von LEADER-Nachrichten @@ -47,11 +47,11 @@ def listen(): leader_uuid = decoded_message.split(": ")[1] print(f"Leader discovered: {leader_uuid} at {address[0]}:{address[1]}") leader_address = (address[0], broadcast_port) # IP-Adresse des Leaders speichern - #print(leader_address) + print(leader_address) #debug continue ################################# if "HEARTBEAT:" in decoded_message: - continue + continue #bringt nix... print(decoded_message) @@ -59,35 +59,18 @@ def listen(): print(f"An error occurred: {e}") break -############requestleader############ -def request_leader(): - """Sendet eine Anfrage, um den aktuellen Leader zu ermitteln.""" - global leader_address - print("Requesting current leader...") - client_socket.sendto("REQUEST_LEAD".encode(), (broadcast_ip, broadcast_port)) - - timeout = time.time() + 5 # 10 Sekunden auf Antwort warten - while time.time() < timeout: - if leader_address: - #print("leader address:", leader_address) - #client_socket.sendto("REQUEST_LEAD finish".encode(), leader_address) - return - time.sleep(0.5) # Warten, bis eine Antwort vom Leader eintrifft - #print("No leader found. Unable to send messages.") -#################################### def sender(): """Ermöglicht dem Benutzer, Nachrichten zu schreiben und zu senden.""" global leader_address - nickname = input("Enter your nickname: ") print("Requesting leader before starting...") - request_leader() # Versuche, den Leader zu finden + #request_leader() # Versuche, den Leader zu finden #########print(leader_address)###########debug: answer: None if not leader_address: print("No leader found. Exiting...") return - + nickname = input("Enter your nickname: ") just_nickname= f"{nickname} entered the chat".encode() #client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) client_socket.sendto(just_nickname, leader_address) @@ -105,16 +88,39 @@ def sender(): else: print("No leader available. Unable to send message.") +############requestleader############ +def request_leader(): + """Sendet eine Anfrage, um den aktuellen Leader zu ermitteln.""" + global leader_address + print("Requesting current leader...") + client_socket.sendto("REQUEST_LEAD".encode(), (broadcast_ip, broadcast_port)) + + timeout = time.time() + 10 # 5 Sekunden auf Antwort warten + while time.time() < timeout: + if leader_address: + print("leader address:", leader_address) + #client_socket.sendto("REQUEST_LEAD finish".encode(), leader_address) + return + time.sleep(0.5) # Warten, bis eine Antwort vom Leader eintrifft + #print("No leader found. Unable to send messages.") +#################################### + +if __name__ == "__main__": # starts listen-thread -listen_thread = threading.Thread(target=listen) + listen_thread = threading.Thread(target=listen) #Daemon Thread does not block the main thread from exiting and continues to run in the background -listen_thread.daemon = True -listen_thread.start() + listen_thread.daemon = True + listen_thread.start() #print(listen_thread.is_alive()) + listener_ready.wait() + + request_leader() # starts sender-function in main thread #sender() -sender_thread= threading.Thread(target=sender) + sender_thread= threading.Thread(target=sender) #sender_thread.daemon = True -sender_thread.start() + sender_thread.start() #print(sender_thread.is_alive()) + #request_thread = threading.Thread(target=request_leader_thread) + #request_thread.start() diff --git a/server.py b/server.py index 3559eaedcdcc29bbd33b488e7b60b42eb17d5dbc..523595eb164c271a7fa6bac38f1e49fcfa37405e 100644 --- a/server.py +++ b/server.py @@ -22,6 +22,7 @@ active_servers = {} def broadcast(message): """Sends messages to all server IDs in the network.""" full_message = f"{message}".encode() + #full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message server_socket.sendto(full_message, (broadcast_ip, broadcast_port)) @@ -36,8 +37,13 @@ def listen(queue, shared_data): decoded_message = message.decode() # Ignore messages from this server - if decoded_message.startswith(f"[{shared_data['server_id']}]"): - continue + #v1 + #if decoded_message.startswith(f"[{shared_data['server_id']}]"): + # continue + #v2 + #if decoded_message.startswith(f"[{shared_data['server_id']}]") or client_address[0] == socket.gethostbyname(socket.gethostname()): + # continue + print(f"Received from {client_address}: {decoded_message}") @@ -51,13 +57,16 @@ def listen(queue, shared_data): if shared_data['current_leader']: #while "REQUEST_LEAD finish" not in decoded_message: response = f"LEADER: {shared_data['current_leader']}".encode() - time.sleep(2) #wait before sending response that client can prepare for answer - server_socket.sendto(response, (broadcast_ip, broadcast_port)) - #broadcast(response, client_address) + time.sleep(4) #wait before sending response that client can prepare for answer + + server_socket.sendto(response, client_address) #does not work atm bc same socket for client...? + #broadcast(response) #bad solution + #print("response:", response) #debug print(f"Sent leader information to {client_address}: {response.decode()}") #for debug + #time.sleep(2) else: print("No leader set, unable to respond to REQUEST_LEAD") - #continue + continue # Put message in the queue for election process @@ -71,7 +80,7 @@ def listen(queue, shared_data): # Leader election function def start_election(queue, shared_data): """Starts leader election based on received messages.""" - #global active_servers + global active_servers #current_leader = None print("Starting election...") broadcast(f"START_ELECTION: {shared_data['server_id']}") @@ -79,6 +88,7 @@ def start_election(queue, shared_data): timeout = time.time() + 20 # 20-second timeout, waiting for other servers highest_id = shared_data['server_id'] + #while current time is smaller than current time plus 20s: while time.time() < timeout: try: message = queue.get(timeout=1) #waits 1 sec for a message @@ -89,7 +99,7 @@ def start_election(queue, shared_data): active_servers[sender_uuid] = time.time() print(f"Received UUID for election: {sender_uuid}") -################dumblogic###onlyworkswith2servers################################ +################dumblogic###onlyworkswith21erver################################ if sender_uuid > highest_id: highest_id = sender_uuid print(f"Received higher ID {sender_uuid}, forwarding...") @@ -127,10 +137,9 @@ def send_heartbeat(shared_data): """Sends heartbeat messages to keep the server active.""" time.sleep(30) #waiting for leader election #set to 20s while True: - broadcast(f"HEARTBEAT: {shared_data['server_id']}") + broadcast(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election time.sleep(5) - # Monitor heartbeats def monitor_heartbeats(): """Checks active servers based on heartbeat messages."""