Skip to content
Snippets Groups Projects
Commit 5371c4d7 authored by Katharina's avatar Katharina
Browse files

try tcp connection for request leader

parent 983a1cf2
No related merge requests found
......@@ -64,7 +64,7 @@ def sender():
"""Ermöglicht dem Benutzer, Nachrichten zu schreiben und zu senden."""
global leader_address
print("Requesting leader before starting...")
#print("Requesting leader before starting...")
#request_leader() # Versuche, den Leader zu finden
#########print(leader_address)###########debug: answer: None
if not leader_address:
......@@ -73,7 +73,7 @@ def sender():
nickname = input("Enter your nickname: ")
just_nickname= f"{nickname} entered the chat".encode()
#client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port))
client_socket.sendto(just_nickname, leader_address)
client_socket.sendto(just_nickname, (leader_address))
#print("is it leader adresse here", leader_address)
while True:
#allows the client to send any message
......@@ -88,39 +88,53 @@ def sender():
else:
print("No leader available. Unable to send message.")
############requestleader############
def request_leader():
"""Sendet eine Anfrage, um den aktuellen Leader zu ermitteln."""
def request_leader_tcp():
"""Sendet eine Anfrage, um den aktuellen Leader zu ermitteln, und wartet auf eine TCP-Antwort."""
global leader_address
print("Requesting current leader...")
client_socket.sendto("REQUEST_LEAD".encode(), (broadcast_ip, broadcast_port))
timeout = time.time() + 10 # 5 Sekunden auf Antwort warten
while time.time() < timeout:
if leader_address:
print("leader address:", leader_address)
#client_socket.sendto("REQUEST_LEAD finish".encode(), leader_address)
return
time.sleep(0.5) # Warten, bis eine Antwort vom Leader eintrifft
#print("No leader found. Unable to send messages.")
####################################
print("Requesting current leader via broadcast...")
client_socket.sendto("REQUEST_LEAD".encode(), (broadcast_ip, broadcast_port))
# Warte auf eine TCP-Verbindung vom Server
tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_client_socket.settimeout(10) # Timeout für die Verbindung
try:
tcp_client_socket.bind((MY_IP, 55556)) # Lauschen auf eingehende Verbindungen
tcp_client_socket.listen(1)
print("Waiting for leader response via TCP...")
conn, addr = tcp_client_socket.accept() # Verbindung akzeptieren
response = conn.recv(1024).decode()
conn.close()
if "LEADER:" in response:
leader_address = response.split(": ")[1]
print(f"Leader address received via TCP: {leader_address}")
else:
print("No leader information received.")
except socket.timeout:
print("TCP leader response timeout.")
finally:
tcp_client_socket.close()
###############endrequestleader#####################
###############main#################################
if __name__ == "__main__":
# starts listen-thread
# Start listener thread
listen_thread = threading.Thread(target=listen)
#Daemon Thread does not block the main thread from exiting and continues to run in the background
listen_thread.daemon = True
listen_thread.start()
#print(listen_thread.is_alive())
listener_ready.wait()
request_leader()
# Request leader via TCP
request_leader_tcp()
if leader_address:
print(f"Leader identified: {leader_address}")
sender_thread = threading.Thread(target=sender)
sender_thread.start()
else:
print("No leader found. Exiting...")
# starts sender-function in main thread
#sender()
sender_thread= threading.Thread(target=sender)
#sender_thread.daemon = True
sender_thread.start()
#print(sender_thread.is_alive())
#request_thread = threading.Thread(target=request_leader_thread)
#request_thread.start()
......@@ -6,6 +6,11 @@ import threading
import os
from multiprocessing import Manager
# TCP Socket erstellen
#tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#tcp_server_socket.bind(('', 55556)) # Listen auf einem separaten Port
#tcp_server_socket.listen(5)
broadcast_ip = '255.255.255.255'
broadcast_port = 55555
......@@ -27,7 +32,7 @@ def broadcast(message):
# Listener function
def listen(queue, shared_data):
def listen_broadcast(queue, shared_data):
"""Receives messages and updates shared state."""
print(f"Listener from {shared_data['server_id']} is listening...")
global active_servers
......@@ -36,15 +41,6 @@ def listen(queue, shared_data):
message, client_address = server_socket.recvfrom(4096)
decoded_message = message.decode()
# Ignore messages from this server
#v1
#if decoded_message.startswith(f"[{shared_data['server_id']}]"):
# continue
#v2
#if decoded_message.startswith(f"[{shared_data['server_id']}]") or client_address[0] == socket.gethostbyname(socket.gethostname()):
# continue
print(f"Received from {client_address}: {decoded_message}")
# Handle heartbeat messages
......@@ -53,22 +49,25 @@ def listen(queue, shared_data):
active_servers[sender_uuid] = time.time()
continue
# Respond to REQUEST_LEAD over TCP
if "REQUEST_LEAD" in decoded_message:
if shared_data['current_leader']:
#while "REQUEST_LEAD finish" not in decoded_message:
response = f"LEADER: {shared_data['current_leader']}".encode()
time.sleep(4) #wait before sending response that client can prepare for answer
#server_socket.sendto(response, client_address) #does not work atm bc same socket for client...?
broadcast(response) #bad solution
#print("response:", response) #debug
print(f"Sent leader information to {client_address}: {response.decode()}") #for debug
#time.sleep(2)
response = f"LEADER: {shared_data['current_leader']}".encode()
print(f"Preparing to send leader information to {client_address} via TCP")
# Open a TCP connection to the client and send the response
try:
tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_client_socket.connect((client_address[0], 55556))
tcp_client_socket.send(response)
tcp_client_socket.close()
print(f"Leader information sent to {client_address} via TCP")
except socket.error as e:
print(f"Failed to send leader info via TCP to {client_address}: {e}")
else:
print("No leader set, unable to respond to REQUEST_LEAD")
continue
# Put message in the queue for election process
queue.put(decoded_message)
......@@ -131,6 +130,23 @@ def start_election(queue, shared_data):
#else:
# print(f"Leader election finished, leader is {highest_id}")
#################tcpclient#################################
def handle_tcp_client(conn, addr):
"""Sendet die Leader-Information über eine TCP-Verbindung."""
print(f"TCP connection established with {addr}")
if shared_data['current_leader']:
conn.send(f"LEADER: {shared_data['current_leader']}".encode())
else:
conn.send("NO_LEADER".encode())
conn.close()
def tcp_listener():
"""Wartet auf TCP-Verbindungen und bearbeitet sie."""
while True:
conn, addr = tcp_server_socket.accept()
threading.Thread(target=handle_tcp_client, args=(conn, addr)).start()
######################endtcp#################################################
# Heartbeat function
def send_heartbeat(shared_data):
......@@ -168,7 +184,7 @@ if __name__ == "__main__":
message_queue = multiprocessing.Queue()
# Create processes
listener_process = multiprocessing.Process(target=listen, args=(message_queue, shared_data))
listener_process = multiprocessing.Process(target=listen_broadcast, args=(message_queue, shared_data))
election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data))
# Heartbeat threads
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment