diff --git a/kclient.py b/kclient.py index 1ee660eb8c4867e1115062f3cc86e4119d29742f..9bfaf0c3b796cae57df99479023f14c1e9137094 100644 --- a/kclient.py +++ b/kclient.py @@ -64,7 +64,7 @@ def sender(): """Ermöglicht dem Benutzer, Nachrichten zu schreiben und zu senden.""" global leader_address - print("Requesting leader before starting...") + #print("Requesting leader before starting...") #request_leader() # Versuche, den Leader zu finden #########print(leader_address)###########debug: answer: None if not leader_address: @@ -73,7 +73,7 @@ def sender(): nickname = input("Enter your nickname: ") just_nickname= f"{nickname} entered the chat".encode() #client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) - client_socket.sendto(just_nickname, leader_address) + client_socket.sendto(just_nickname, (leader_address)) #print("is it leader adresse here", leader_address) while True: #allows the client to send any message @@ -88,39 +88,53 @@ def sender(): else: print("No leader available. Unable to send message.") + ############requestleader############ -def request_leader(): - """Sendet eine Anfrage, um den aktuellen Leader zu ermitteln.""" +def request_leader_tcp(): + """Sendet eine Anfrage, um den aktuellen Leader zu ermitteln, und wartet auf eine TCP-Antwort.""" global leader_address - print("Requesting current leader...") - client_socket.sendto("REQUEST_LEAD".encode(), (broadcast_ip, broadcast_port)) - timeout = time.time() + 10 # 5 Sekunden auf Antwort warten - while time.time() < timeout: - if leader_address: - print("leader address:", leader_address) - #client_socket.sendto("REQUEST_LEAD finish".encode(), leader_address) - return - time.sleep(0.5) # Warten, bis eine Antwort vom Leader eintrifft - #print("No leader found. Unable to send messages.") -#################################### + print("Requesting current leader via broadcast...") + client_socket.sendto("REQUEST_LEAD".encode(), (broadcast_ip, broadcast_port)) + # Warte auf eine TCP-Verbindung vom Server + tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tcp_client_socket.settimeout(10) # Timeout für die Verbindung + try: + tcp_client_socket.bind((MY_IP, 55556)) # Lauschen auf eingehende Verbindungen + tcp_client_socket.listen(1) + + print("Waiting for leader response via TCP...") + conn, addr = tcp_client_socket.accept() # Verbindung akzeptieren + response = conn.recv(1024).decode() + conn.close() + + if "LEADER:" in response: + leader_address = response.split(": ")[1] + print(f"Leader address received via TCP: {leader_address}") + else: + print("No leader information received.") + except socket.timeout: + print("TCP leader response timeout.") + finally: + tcp_client_socket.close() +###############endrequestleader##################### + +###############main################################# if __name__ == "__main__": -# starts listen-thread + # Start listener thread listen_thread = threading.Thread(target=listen) -#Daemon Thread does not block the main thread from exiting and continues to run in the background listen_thread.daemon = True listen_thread.start() -#print(listen_thread.is_alive()) listener_ready.wait() - request_leader() + # Request leader via TCP + request_leader_tcp() + + if leader_address: + print(f"Leader identified: {leader_address}") + sender_thread = threading.Thread(target=sender) + sender_thread.start() + else: + print("No leader found. Exiting...") -# starts sender-function in main thread -#sender() - sender_thread= threading.Thread(target=sender) -#sender_thread.daemon = True - sender_thread.start() -#print(sender_thread.is_alive()) - #request_thread = threading.Thread(target=request_leader_thread) - #request_thread.start() diff --git a/server.py b/server.py index 615a4e9f4e1ae196cad637c1d57458b851bfa1bf..60365434a9c7d4311a32d627e33a7afc71203a89 100644 --- a/server.py +++ b/server.py @@ -6,6 +6,11 @@ import threading import os from multiprocessing import Manager +# TCP Socket erstellen +#tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +#tcp_server_socket.bind(('', 55556)) # Listen auf einem separaten Port +#tcp_server_socket.listen(5) + broadcast_ip = '255.255.255.255' broadcast_port = 55555 @@ -27,7 +32,7 @@ def broadcast(message): # Listener function -def listen(queue, shared_data): +def listen_broadcast(queue, shared_data): """Receives messages and updates shared state.""" print(f"Listener from {shared_data['server_id']} is listening...") global active_servers @@ -36,15 +41,6 @@ def listen(queue, shared_data): message, client_address = server_socket.recvfrom(4096) decoded_message = message.decode() - # Ignore messages from this server - #v1 - #if decoded_message.startswith(f"[{shared_data['server_id']}]"): - # continue - #v2 - #if decoded_message.startswith(f"[{shared_data['server_id']}]") or client_address[0] == socket.gethostbyname(socket.gethostname()): - # continue - - print(f"Received from {client_address}: {decoded_message}") # Handle heartbeat messages @@ -53,22 +49,25 @@ def listen(queue, shared_data): active_servers[sender_uuid] = time.time() continue + # Respond to REQUEST_LEAD over TCP if "REQUEST_LEAD" in decoded_message: if shared_data['current_leader']: - #while "REQUEST_LEAD finish" not in decoded_message: - response = f"LEADER: {shared_data['current_leader']}".encode() - time.sleep(4) #wait before sending response that client can prepare for answer - - #server_socket.sendto(response, client_address) #does not work atm bc same socket for client...? - broadcast(response) #bad solution - #print("response:", response) #debug - print(f"Sent leader information to {client_address}: {response.decode()}") #for debug - #time.sleep(2) + response = f"LEADER: {shared_data['current_leader']}".encode() + print(f"Preparing to send leader information to {client_address} via TCP") + + # Open a TCP connection to the client and send the response + try: + tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tcp_client_socket.connect((client_address[0], 55556)) + tcp_client_socket.send(response) + tcp_client_socket.close() + print(f"Leader information sent to {client_address} via TCP") + except socket.error as e: + print(f"Failed to send leader info via TCP to {client_address}: {e}") else: print("No leader set, unable to respond to REQUEST_LEAD") continue - # Put message in the queue for election process queue.put(decoded_message) @@ -131,6 +130,23 @@ def start_election(queue, shared_data): #else: # print(f"Leader election finished, leader is {highest_id}") +#################tcpclient################################# +def handle_tcp_client(conn, addr): + """Sendet die Leader-Information über eine TCP-Verbindung.""" + print(f"TCP connection established with {addr}") + if shared_data['current_leader']: + conn.send(f"LEADER: {shared_data['current_leader']}".encode()) + else: + conn.send("NO_LEADER".encode()) + conn.close() + +def tcp_listener(): + """Wartet auf TCP-Verbindungen und bearbeitet sie.""" + while True: + conn, addr = tcp_server_socket.accept() + threading.Thread(target=handle_tcp_client, args=(conn, addr)).start() + +######################endtcp################################################# # Heartbeat function def send_heartbeat(shared_data): @@ -168,7 +184,7 @@ if __name__ == "__main__": message_queue = multiprocessing.Queue() # Create processes - listener_process = multiprocessing.Process(target=listen, args=(message_queue, shared_data)) + listener_process = multiprocessing.Process(target=listen_broadcast, args=(message_queue, shared_data)) election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data)) # Heartbeat threads