Skip to content
Snippets Groups Projects
Commit a28a6280 authored by Katharina's avatar Katharina
Browse files

NameError: name 'sender_uuid' is not defined

parent 5371c4d7
No related branches found
No related tags found
No related merge requests found
#########problem: server is sticking to a process or something and cant process client messages :(
import socket import socket
import threading import threading
import time import time
broadcast_ip = '255.255.255.255'#change ip??? #hard coded? broadcast_ip = '255.255.255.255'#change ip??? #hard coded?
broadcast_port = 55555 broadcast_port = 55559
#local host information #local host information
MY_HOST = socket.gethostname() MY_HOST = socket.gethostname()
...@@ -12,48 +11,29 @@ MY_HOST = socket.gethostname() ...@@ -12,48 +11,29 @@ MY_HOST = socket.gethostname()
MY_IP = socket.gethostbyname(MY_HOST) MY_IP = socket.gethostbyname(MY_HOST)
#print(f"host:{MY_HOST} and ip: {MY_IP}") #print(f"host:{MY_HOST} and ip: {MY_IP}")
#print(f"Listening for leader response on {broadcast_ip}:{broadcast_port}...") #debug
#print(MY_HOST,MY_IP) #debug
#stop_listening = False # control for listener
# create client-socket for broadcast # create client-socket for broadcast
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
client_socket.bind(('', broadcast_port)) # listen on broadcast #socket is bind to ALL available IP addresses client_socket.bind(('', broadcast_port)) # listen on broadcast #socket is bind to ALL available IP addresses
# global var for leader address
leader_address = None
listener_ready = threading.Event() listener_ready = threading.Event()
#listen for server?
def listen(): def listen():
"""receives messages from server and clients.""" """receives messages from server."""
global leader_address #listener_ready.set() #makes sure that listener is ready
listener_ready.set() #makes sure that listener is ready
while True: while True:
try: try:
data, address = client_socket.recvfrom(4096) data, address = client_socket.recvfrom(4096)
decoded_message = data.decode() decoded_message = data.decode()
print(f"Received {data.decode()} from {address}") #print(f"Received {data.decode()} from {address}") #debug
#ignores broadcast messages with own ip #################please enable it after testing!!!!!!!!!!!!!!!!!!!S #ignores broadcast messages with own ip #################please enable it after testing!!!!!!!!!!!!!!!!!!!S
#does not work
#if address[0]==MY_IP: #if address[0]==MY_IP:
# continue # continue
#####identifyLEADER############ print("this decoded msg", decoded_message)
# Erkennen von LEADER-Nachrichten
if "LEADER:" in decoded_message:
leader_uuid = decoded_message.split(": ")[1]
print(f"Leader discovered: {leader_uuid} at {address[0]}:{address[1]}")
leader_address = (address[0], broadcast_port) # IP-Adresse des Leaders speichern
print(leader_address) #debug
continue
#################################
if "HEARTBEAT:" in decoded_message:
continue #bringt nix...
print(decoded_message)
except socket.error as e: except socket.error as e:
print(f"An error occurred: {e}") print(f"An error occurred: {e}")
...@@ -62,18 +42,10 @@ def listen(): ...@@ -62,18 +42,10 @@ def listen():
def sender(): def sender():
"""Ermöglicht dem Benutzer, Nachrichten zu schreiben und zu senden.""" """Ermöglicht dem Benutzer, Nachrichten zu schreiben und zu senden."""
global leader_address
#print("Requesting leader before starting...")
#request_leader() # Versuche, den Leader zu finden
#########print(leader_address)###########debug: answer: None
if not leader_address:
print("No leader found. Exiting...")
return
nickname = input("Enter your nickname: ") nickname = input("Enter your nickname: ")
just_nickname= f"{nickname} entered the chat".encode() just_nickname= f"{nickname} entered the chat".encode()
#client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) #client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port))
client_socket.sendto(just_nickname, (leader_address)) client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port))
#print("is it leader adresse here", leader_address) #print("is it leader adresse here", leader_address)
while True: while True:
#allows the client to send any message #allows the client to send any message
...@@ -82,43 +54,7 @@ def sender(): ...@@ -82,43 +54,7 @@ def sender():
#checks for whitemarks #checks for whitemarks
if message.strip(): if message.strip():
full_message = f"{nickname}: {message}".encode() full_message = f"{nickname}: {message}".encode()
#(broadcast_ip, broadcast_port) #for sendto client_socket.sendto(full_message, (broadcast_ip, broadcast_port))
if leader_address:
client_socket.sendto(full_message, leader_address)
else:
print("No leader available. Unable to send message.")
############requestleader############
def request_leader_tcp():
"""Sendet eine Anfrage, um den aktuellen Leader zu ermitteln, und wartet auf eine TCP-Antwort."""
global leader_address
print("Requesting current leader via broadcast...")
client_socket.sendto("REQUEST_LEAD".encode(), (broadcast_ip, broadcast_port))
# Warte auf eine TCP-Verbindung vom Server
tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_client_socket.settimeout(10) # Timeout für die Verbindung
try:
tcp_client_socket.bind((MY_IP, 55556)) # Lauschen auf eingehende Verbindungen
tcp_client_socket.listen(1)
print("Waiting for leader response via TCP...")
conn, addr = tcp_client_socket.accept() # Verbindung akzeptieren
response = conn.recv(1024).decode()
conn.close()
if "LEADER:" in response:
leader_address = response.split(": ")[1]
print(f"Leader address received via TCP: {leader_address}")
else:
print("No leader information received.")
except socket.timeout:
print("TCP leader response timeout.")
finally:
tcp_client_socket.close()
###############endrequestleader#####################
###############main################################# ###############main#################################
if __name__ == "__main__": if __name__ == "__main__":
...@@ -126,15 +62,8 @@ if __name__ == "__main__": ...@@ -126,15 +62,8 @@ if __name__ == "__main__":
listen_thread = threading.Thread(target=listen) listen_thread = threading.Thread(target=listen)
listen_thread.daemon = True listen_thread.daemon = True
listen_thread.start() listen_thread.start()
listener_ready.wait() #listener_ready.wait()
# Request leader via TCP
request_leader_tcp()
if leader_address:
print(f"Leader identified: {leader_address}")
sender_thread = threading.Thread(target=sender) sender_thread = threading.Thread(target=sender)
sender_thread.start() sender_thread.start()
else:
print("No leader found. Exiting...")
...@@ -13,6 +13,7 @@ from multiprocessing import Manager ...@@ -13,6 +13,7 @@ from multiprocessing import Manager
broadcast_ip = '255.255.255.255' broadcast_ip = '255.255.255.255'
broadcast_port = 55555 broadcast_port = 55555
election_port= 55559
# Socket for broadcast # Socket for broadcast
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
...@@ -20,6 +21,12 @@ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) ...@@ -20,6 +21,12 @@ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('', broadcast_port)) server_socket.bind(('', broadcast_port))
# Socket for broadcast
election_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
election_socket.bind(('', election_port))
# Global list for active servers # Global list for active servers
active_servers = {} active_servers = {}
...@@ -30,15 +37,22 @@ def broadcast(message): ...@@ -30,15 +37,22 @@ def broadcast(message):
#full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message #full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message
server_socket.sendto(full_message, (broadcast_ip, broadcast_port)) server_socket.sendto(full_message, (broadcast_ip, broadcast_port))
# Broadcast function
def broadcast_election(message):
"""Sends messages to all server IDs in the network."""
full_message = f"{message}".encode()
#full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message
election_socket.sendto(full_message, (broadcast_ip, election_port))
# Listener function # Listener function
def listen_broadcast(queue, shared_data): def listen_broadcast_election(queue, shared_data):
"""Receives messages and updates shared state.""" """Receives messages and updates shared state."""
print(f"Listener from {shared_data['server_id']} is listening...") print(f"Listener from {shared_data['server_id']} is listening...")
global active_servers global active_servers
global sender_uuid
while True: while True:
try: try:
message, client_address = server_socket.recvfrom(4096) message, client_address = election_socket.recvfrom(4096)
decoded_message = message.decode() decoded_message = message.decode()
print(f"Received from {client_address}: {decoded_message}") print(f"Received from {client_address}: {decoded_message}")
...@@ -46,26 +60,9 @@ def listen_broadcast(queue, shared_data): ...@@ -46,26 +60,9 @@ def listen_broadcast(queue, shared_data):
# Handle heartbeat messages # Handle heartbeat messages
if "HEARTBEAT" in decoded_message: if "HEARTBEAT" in decoded_message:
sender_uuid = decoded_message.split(": ")[1] sender_uuid = decoded_message.split(": ")[1]
active_servers[sender_uuid] = time.time() #print("sender_uuid", sender_uuid) #debug
continue active_servers[sender_uuid] = time.time() #update active server dictionary
#send to neighbour
# Respond to REQUEST_LEAD over TCP
if "REQUEST_LEAD" in decoded_message:
if shared_data['current_leader']:
response = f"LEADER: {shared_data['current_leader']}".encode()
print(f"Preparing to send leader information to {client_address} via TCP")
# Open a TCP connection to the client and send the response
try:
tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_client_socket.connect((client_address[0], 55556))
tcp_client_socket.send(response)
tcp_client_socket.close()
print(f"Leader information sent to {client_address} via TCP")
except socket.error as e:
print(f"Failed to send leader info via TCP to {client_address}: {e}")
else:
print("No leader set, unable to respond to REQUEST_LEAD")
continue continue
# Put message in the queue for election process # Put message in the queue for election process
...@@ -75,14 +72,46 @@ def listen_broadcast(queue, shared_data): ...@@ -75,14 +72,46 @@ def listen_broadcast(queue, shared_data):
print(f"Socket error occurred: {e}") print(f"Socket error occurred: {e}")
break break
#listener function
##################TypeError: listen_client() takes 0 positional arguments but 2 were given############
def listen_client():
"""receives messages from clients and broadcast them."""
while True:
try:
message, client_address = server_socket.recvfrom(4096)
decoded_message = message.decode()
# checks wheter the message comes from the server itself
if decoded_message.startswith(f"{shared_data['server_id']}"):
continue # if...ignores it
#checks if the message containts the string "entered"
if decoded_message.__contains__("entered"):
print(f"{client_address} entered the chat.")
print(f"Received from {client_address}: {decoded_message}")
broadcast(decoded_message)
#exceptions
except socket.error as e:
print(f"An error occurred: {e}")
break
#does not work
except KeyboardInterrupt:
print("\nServer wird beendet...")
break
# Leader election function # Leader election function
def start_election(queue, shared_data): def start_election(queue, shared_data):
"""Starts leader election based on received messages.""" """Starts leader election based on received messages."""
global active_servers global active_servers
global sender_uuid
#current_leader = None #current_leader = None
print("Starting election...") print("Starting election...")
broadcast(f"START_ELECTION: {shared_data['server_id']}") broadcast_election(f"START_ELECTION: {shared_data['server_id']}")
timeout = time.time() + 20 # 20-second timeout, waiting for other servers timeout = time.time() + 20 # 20-second timeout, waiting for other servers
highest_id = shared_data['server_id'] highest_id = shared_data['server_id']
...@@ -91,8 +120,8 @@ def start_election(queue, shared_data): ...@@ -91,8 +120,8 @@ def start_election(queue, shared_data):
while time.time() < timeout: while time.time() < timeout:
try: try:
message = queue.get(timeout=1) #waits 1 sec for a message message = queue.get(timeout=1) #waits 1 sec for a message
#if shared_data['server_id']==sender_uuid: if shared_data['server_id']==sender_uuid:
# continue continue
if "START_ELECTION" in message: if "START_ELECTION" in message:
sender_uuid = message.split(": ")[1] sender_uuid = message.split(": ")[1]
active_servers[sender_uuid] = time.time() active_servers[sender_uuid] = time.time()
...@@ -102,17 +131,17 @@ def start_election(queue, shared_data): ...@@ -102,17 +131,17 @@ def start_election(queue, shared_data):
if sender_uuid > highest_id: if sender_uuid > highest_id:
highest_id = sender_uuid highest_id = sender_uuid
print(f"Received higher ID {sender_uuid}, forwarding...") print(f"Received higher ID {sender_uuid}, forwarding...")
broadcast(f"START_ELECTION: {sender_uuid}") broadcast_election(f"START_ELECTION: {sender_uuid}")
elif sender_uuid == highest_id: elif sender_uuid == highest_id:
shared_data['current_leader'] = sender_uuid shared_data['current_leader'] = sender_uuid
print(f"(sender) Leader elected: {shared_data['current_leader']}") print(f"(sender) Leader elected: {shared_data['current_leader']}")
broadcast(f"LEADER: {shared_data['current_leader']}") broadcast_election(f"LEADER: {shared_data['current_leader']}")
else: else:
shared_data['current_leader'] = shared_data['server_id'] shared_data['current_leader'] = shared_data['server_id']
print(f"(official, its me) Leader elected: {shared_data['current_leader']}") print(f"(official, its me) Leader elected: {shared_data['current_leader']}")
broadcast(f"LEADER: {shared_data['current_leader']}") broadcast_election(f"LEADER: {shared_data['current_leader']}")
elif "LEADER" in message: elif "LEADER" in message:
leader_uuid = message.split(": ")[1] leader_uuid = message.split(": ")[1]
...@@ -131,20 +160,20 @@ def start_election(queue, shared_data): ...@@ -131,20 +160,20 @@ def start_election(queue, shared_data):
# print(f"Leader election finished, leader is {highest_id}") # print(f"Leader election finished, leader is {highest_id}")
#################tcpclient################################# #################tcpclient#################################
def handle_tcp_client(conn, addr): #def handle_tcp_client(conn, addr):
"""Sendet die Leader-Information über eine TCP-Verbindung.""" """Sendet die Leader-Information über eine TCP-Verbindung."""
print(f"TCP connection established with {addr}") # print(f"TCP connection established with {addr}")
if shared_data['current_leader']: # if shared_data['current_leader']:
conn.send(f"LEADER: {shared_data['current_leader']}".encode()) # conn.send(f"LEADER: {shared_data['current_leader']}".encode())
else: # else:
conn.send("NO_LEADER".encode()) # conn.send("NO_LEADER".encode())
conn.close() # conn.close()
def tcp_listener(): #def tcp_listener():
"""Wartet auf TCP-Verbindungen und bearbeitet sie.""" """Wartet auf TCP-Verbindungen und bearbeitet sie."""
while True: # while True:
conn, addr = tcp_server_socket.accept() # conn, addr = tcp_server_socket.accept()
threading.Thread(target=handle_tcp_client, args=(conn, addr)).start() # threading.Thread(target=handle_tcp_client, args=(conn, addr)).start()
######################endtcp################################################# ######################endtcp#################################################
...@@ -153,7 +182,7 @@ def send_heartbeat(shared_data): ...@@ -153,7 +182,7 @@ def send_heartbeat(shared_data):
"""Sends heartbeat messages to keep the server active.""" """Sends heartbeat messages to keep the server active."""
time.sleep(30) #waiting for leader election #set to 20s time.sleep(30) #waiting for leader election #set to 20s
while True: while True:
broadcast(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election broadcast_election(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election
time.sleep(5) time.sleep(5)
# Monitor heartbeats # Monitor heartbeats
...@@ -184,7 +213,8 @@ if __name__ == "__main__": ...@@ -184,7 +213,8 @@ if __name__ == "__main__":
message_queue = multiprocessing.Queue() message_queue = multiprocessing.Queue()
# Create processes # Create processes
listener_process = multiprocessing.Process(target=listen_broadcast, args=(message_queue, shared_data)) listener_election_process = multiprocessing.Process(target=listen_broadcast_election, args=(message_queue, shared_data))
listener_client_process = multiprocessing.Process(target=listen_client, )
election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data)) election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data))
# Heartbeat threads # Heartbeat threads
...@@ -192,15 +222,22 @@ if __name__ == "__main__": ...@@ -192,15 +222,22 @@ if __name__ == "__main__":
heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True) heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True)
# Start processes and threads # Start processes and threads
listener_process.start() listener_election_process.start()
listener_client_process.start()
election_process.start() election_process.start()
heartbeat_thread.start() heartbeat_thread.start()
heartbeat_monitor_thread.start() heartbeat_monitor_thread.start()
#heartbeat_thread.join()
#heartbeat_monitor_thread.join()
try: try:
listener_process.join() listener_election_process.join()
listener_client_process.join()
election_process.join() election_process.join()
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nShutting down server...") print("\nShutting down server...")
listener_process.terminate() listener_election_process.terminate()
listener_client_process.terminate()
election_process.terminate() election_process.terminate()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment