diff --git a/server.py b/server.py index 613bfbec4e9125b670b6c7525169937a7ee2002e..8e187fa0ec9790060097e32c217d340edf64ece6 100644 --- a/server.py +++ b/server.py @@ -1,6 +1,7 @@ ##########unsicher, ob heartbeat funktioniert################ ###########deutsch englisch korrigieren################### ################liste mit message ids auf 3-5 setzen######## +############active servers dic shared data################ import socket import multiprocessing @@ -10,29 +11,26 @@ import threading import os from multiprocessing import Manager -# TCP Socket erstellen -#tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -#tcp_server_socket.bind(('', 55556)) # Listen auf einem separaten Port -#tcp_server_socket.listen(5) - broadcast_ip = '255.255.255.255' broadcast_port = 55555 election_port= 55559 -# Socket for broadcast +# Socket for client broadcast server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('', broadcast_port)) -# Socket for broadcast +# Socket for election/server broadcast election_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) election_socket.bind(('', election_port)) -# Global list for active servers -active_servers = {} + #does not work, use shared data?? +#global leader_election_done +leader_election_done = threading.Event() +active_servers={} # Broadcast function def broadcast(message): @@ -48,28 +46,90 @@ def broadcast_election(message): #full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message election_socket.sendto(full_message, (broadcast_ip, election_port)) + +# Leader election function +def start_election(queue, shared_data): + """Starts leader election based on received messages.""" + global active_servers + #global leader_election_done + + print("Starting election...") + broadcast_election(f"START_ELECTION: {shared_data['server_id']}") #every server which broadcasts his id is going to be part of the election + + timeout = time.time() + 20 # 20-second timeout, waiting for other servers + highest_id = shared_data['server_id'] #set highest id to own id + leader= False + + #while current time is smaller than current time plus 20s: + while time.time() < timeout: + try: + message = queue.get(timeout=1) #waits 1 sec for a message + if shared_data['server_id']==shared_data['sender_uuid']: + continue + if "START_ELECTION" in message: + shared_data['sender_uuid'] = message.split(": ")[1] + active_servers[shared_data['sender_uuid']] = time.time() + print(f"Received UUID for election: {shared_data['sender_uuid']}") + +################dumblogic###onlyworkswith21erver################################ + if leader == False: + if highest_id>shared_data['sender_uuid']: + leader = True + elif highest_id<shared_data['sender_uuid']: + highest_id=shared_data['sender_uuid'] + leader = True + if leader: + shared_data['current_leader'] = highest_id + print(f"(sender) Leader elected: {shared_data['current_leader']}") + broadcast_election(f"LEADER: {shared_data['current_leader']}")###do not broadcast leader, send it to the neighbour + leader_election_done.set() # Signal, that the leader election is done + + else: + shared_data['current_leader'] = shared_data['server_id'] + print(f"(official, its me) Leader elected: {shared_data['current_leader']}") + broadcast_election(f"LEADER: {shared_data['current_leader']}") + leader_election_done.set() # Signal, that the leader election is done + + elif "LEADER" in message: + leader_uuid = message.split(": ")[1] + shared_data['current_leader'] = leader_uuid + #time.sleep(1) # Delay to ensure all processes are aware of the leader + print(f"(leader) Leader elected: {shared_data['current_leader']}") + #leader_election_done.set() # Signal, that the leader election is done + return + + except multiprocessing.queues.Empty: + continue + # Listener function def listen_broadcast_election(queue, shared_data): """Receives messages and updates shared state.""" print(f"Listener from {shared_data['server_id']} is listening...") global active_servers server_socket.setblocking(False) # Set socket to non-blocking mode - #global sender_uuid + while True: try: message, sender_address = election_socket.recvfrom(4096) decoded_sender_message = message.decode() print(f"Received from {sender_address}: {decoded_sender_message}") + leader_election_done.set() # Handle heartbeat messages - if "HEARTBEAT" in decoded_sender_message: - shared_data['sender_uuid'] = decoded_sender_message.split(": ")[1] + #if "HEARTBEAT" in decoded_sender_message: + # shared_data['sender_uuid'] = decoded_sender_message.split(": ")[1] #print("sender_uuid", sender_uuid) #debug - active_servers[shared_data['sender_uuid']] = time.time() #update active server dictionary - #send to neighbour + # active_servers[shared_data['sender_uuid']] = time.time() #update active server dictionary + #send to neighbour###################### + # continue + + if "HEARTBEAT" in decoded_sender_message: + sender_uuid = decoded_sender_message.split(": ")[1] + active_servers[sender_uuid] = time.time() continue + # Put message in the queue for election process queue.put(decoded_sender_message) @@ -79,10 +139,8 @@ def listen_broadcast_election(queue, shared_data): time.sleep(0.5) # Small delay to prevent busy waiting #listener function -##################TypeError: listen_client() takes 0 positional arguments but 2 were given############ def listen_client(shared_data): """receives messages from clients and broadcast them.""" - #manager=Manager() #global received_messages received_messages = set() @@ -92,155 +150,59 @@ def listen_client(shared_data): #global save_last_messages #only listen if current leader - #if shared_data['current_leader'] == shared_data['server_id']: - while True: - try: - message, client_address = server_socket.recvfrom(4096) - decoded_message = message.decode() + + ###################NameError: name 'leader_election_done' is not defined ############ + #global leader_election_done - #create a dictionary for message uuid - # message_uuid = decoded_message.split(": ")[0] - #print("message id:", message_uuid) #debug + #leader_election_done.wait() # blocks until the leader election is done - # Extract message UUID and check if already processed - message_id = decoded_message.split(":")[0] - if message_id in received_messages: - continue - received_messages.add(message_id) + print("current leader for listening is:", shared_data['current_leader']) #debug - # Enqueue messages and maintain a fixed size - """if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)): - if message_queue.full(): - message_queue.get() # Remove oldest message - message_queue.put(decoded_message) + if shared_data['current_leader'] == shared_data['server_id']: + while True: + try: + message, client_address = server_socket.recvfrom(4096) + decoded_message = message.decode() - # Print all messages - print("here are your last messages:", list(message_queue.queue)) # Debug - """ - - """ - if message_uuid not in save_last_messages: - if len(save_last_messages) <= 3: - oldest_key = next(iter(save_last_messages)) # get oldest key - del save_last_messages[oldest_key] # delete it - #save_last_messages["last message"] = message_uuid - save_last_messages[message_uuid] = decoded_message - #save_last_messages.update(f"last message: {message}: {message_uuid}") - print("here are your last messages:", dict(save_last_messages)) #debug - """ - - #broadcast_election(save_last_messages) #send to neighbour - - # checks wheter the message comes from the server itself - if decoded_message.startswith(f"{shared_data['server_id']}"): - continue # if...ignores it - - #checks if the message containts the string "entered" - if decoded_message.__contains__("entered"): - print(f"{client_address} entered the chat.") #client_address is the nickname + # Extract message UUID and check if already processed + message_id = decoded_message.split(":")[0] + if message_id in received_messages: + continue + received_messages.add(message_id) - print(f"Received from {client_address}: {decoded_message}") + #broadcast_election(save_last_messages) #send to neighbour - broadcast(decoded_message) + # checks wheter the message comes from the server itself + if decoded_message.startswith(f"{shared_data['server_id']}"): + continue # if...ignores it - #exceptions - except socket.error as e: - print(f"An error occurred: {e}") - break - - #does not work - except KeyboardInterrupt: - print("\nServer wird beendet...") - break - - -# Leader election function -def start_election(queue, shared_data): - """Starts leader election based on received messages.""" - global active_servers - #sender_uuid = None - #global sender_uuid - #current_leader = None - print("Starting election...") - broadcast_election(f"START_ELECTION: {shared_data['server_id']}") #every server which broadcasts his id is going to be part of the election - - timeout = time.time() + 20 # 20-second timeout, waiting for other servers - highest_id = shared_data['server_id'] #highest id is own id - leader= False - - #while current time is smaller than current time plus 20s: - while time.time() < timeout: - try: - message = queue.get(timeout=1) #waits 1 sec for a message - if shared_data['server_id']==shared_data['sender_uuid']: #wtf?????????????? - continue - if "START_ELECTION" in message: - shared_data['sender_uuid'] = message.split(": ")[1] - active_servers[shared_data['sender_uuid']] = time.time() - print(f"Received UUID for election: {shared_data['sender_uuid']}") - -################dumblogic###onlyworkswith21erver################################ - if leader == False: - if highest_id>shared_data['sender_uuid']: - leader == True - elif highest_id<shared_data['sender_uuid']: - highest_id=shared_data['sender_uuid'] - leader == True - - if leader == True: - highest_id=shared_data['current_leader'] - print(f"(sender) Leader elected: {shared_data['current_leader']}") - broadcast_election(f"LEADER: {shared_data['current_leader']}") - - else: - shared_data['current_leader'] = shared_data['server_id'] - print(f"(official, its me) Leader elected: {shared_data['current_leader']}") - broadcast_election(f"LEADER: {shared_data['current_leader']}") + #checks if the message containts the string "entered" + if decoded_message.__contains__("entered"): + print(f"{client_address} entered the chat.") #client_address is the nickname - elif "LEADER" in message: - leader_uuid = message.split(": ")[1] - shared_data['current_leader'] = leader_uuid - #time.sleep(1) # Delay to ensure all processes are aware of the leader - print(f"(leader) Leader elected: {shared_data['current_leader']}") - leader_election_done.set() # Signal, dass die Leaderwahl abgeschlossen ist - return + print(f"Received from {client_address}: {decoded_message}") - except multiprocessing.queues.Empty: - continue + broadcast(decoded_message) + + #exceptions + except socket.error as e: + print(f"An error occurred: {e}") + break - #if highest_id == shared_data['server_id']: - # shared_data['current_leader'] = shared_data['server_id'] - # broadcast(f"LEADER: {shared_data['current_leader']}") - # print(f"I am the leader: {shared_data['current_leader']}") - #else: - # print(f"Leader election finished, leader is {highest_id}") - -#################tcpclient################################# -#def handle_tcp_client(conn, addr): - """Sendet die Leader-Information über eine TCP-Verbindung.""" -# print(f"TCP connection established with {addr}") -# if shared_data['current_leader']: -# conn.send(f"LEADER: {shared_data['current_leader']}".encode()) -# else: -# conn.send("NO_LEADER".encode()) -# conn.close() - -#def tcp_listener(): - """Wartet auf TCP-Verbindungen und bearbeitet sie.""" -# while True: -# conn, addr = tcp_server_socket.accept() -# threading.Thread(target=handle_tcp_client, args=(conn, addr)).start() - -######################endtcp################################################# + #does not work atm... + except KeyboardInterrupt: + print("\nShutting down server...") + break # Heartbeat function def send_heartbeat(shared_data): + #global leader_election_done """Sends heartbeat messages to keep the server active.""" - leader_election_done.wait() # Blockiert, bis die Leaderwahl abgeschlossen ist + #leader_election_done.wait() print(f"Heartbeat function started. Leader: {shared_data['current_leader']}, Server ID: {shared_data['server_id']}") - #only if you are the current leader + #only sends heartbeat if you are the current leader if shared_data['current_leader'] == shared_data['server_id']: - #time.sleep(10) #waiting for leader election #set to 20s + #time.sleep(10) #waiting for leader election while True: try: broadcast_election(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election @@ -266,7 +228,6 @@ if __name__ == "__main__": multiprocessing.freeze_support() print(f"Script started with PID: {os.getpid()}") - leader_election_done = threading.Event() # Manager for shared data manager = Manager() @@ -274,7 +235,9 @@ if __name__ == "__main__": shared_data['server_id'] = str(uuid.uuid4()) # Generate server ID once shared_data['current_leader'] = None shared_data['sender_uuid'] = None - #sender_uuid = None + + # Global list for active servers + active_servers = active_servers = manager.dict() print(f"Server is running with ID {shared_data['server_id']} and broadcasting on port {broadcast_port}...") @@ -282,28 +245,28 @@ if __name__ == "__main__": save_last_messages={} + # Start the listener for servers in a thread + listener_election_thread = threading.Thread(target=listen_broadcast_election, args=(message_queue, shared_data), daemon=True) # Create processes #listener_election_process = multiprocessing.Process(target=listen_broadcast_election, args=(message_queue, shared_data)) listener_client_process = multiprocessing.Process(target=listen_client, args=(shared_data,)) election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data)) - # Start the listener for clients in a thread - listener_election_thread = threading.Thread(target=listen_broadcast_election, args=(message_queue, shared_data), daemon=True) # Heartbeat threads heartbeat_thread = threading.Thread(target=send_heartbeat, args=(shared_data,), daemon=True) heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True) # Start processes and threads - #listener_election_process.start() - listener_client_process.start() - listener_election_thread.start() + election_process.start() + # Warte auf Leaderwahl + leader_election_done.wait() + + #listener_election_thread.start() + listener_client_process.start() heartbeat_thread.start() heartbeat_monitor_thread.start() - - #heartbeat_thread.join() - #heartbeat_monitor_thread.join() try: #listener_election_process.join() @@ -314,4 +277,4 @@ if __name__ == "__main__": #listener_election_process.terminate() listener_client_process.terminate() election_process.terminate() - + \ No newline at end of file diff --git a/things.py b/things.py new file mode 100644 index 0000000000000000000000000000000000000000..4c81a6edab2828a5f458b170dcb908fedb33200b --- /dev/null +++ b/things.py @@ -0,0 +1,59 @@ +# create TCP Socket +#tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +#tcp_server_socket.bind(('', 55556)) # Listen auf einem separaten Port +#tcp_server_socket.listen(5) + + + #if highest_id == shared_data['server_id']: + # shared_data['current_leader'] = shared_data['server_id'] + # broadcast(f"LEADER: {shared_data['current_leader']}") + # print(f"I am the leader: {shared_data['current_leader']}") + #else: + # print(f"Leader election finished, leader is {highest_id}") + + + +#################tcpclient################################# +#def handle_tcp_client(conn, addr): + """Sendet die Leader-Information über eine TCP-Verbindung.""" +# print(f"TCP connection established with {addr}") +# if shared_data['current_leader']: +# conn.send(f"LEADER: {shared_data['current_leader']}".encode()) +# else: +# conn.send("NO_LEADER".encode()) +# conn.close() + +#def tcp_listener(): + """Wartet auf TCP-Verbindungen und bearbeitet sie.""" +# while True: +# conn, addr = tcp_server_socket.accept() +# threading.Thread(target=handle_tcp_client, args=(conn, addr)).start() + +######################endtcp################################################# + + + #create a dictionary for message uuid + # message_uuid = decoded_message.split(": ")[0] + #print("message id:", message_uuid) #debug + + + # Enqueue messages and maintain a fixed size + """if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)): + if message_queue.full(): + message_queue.get() # Remove oldest message + message_queue.put(decoded_message) + + # Print all messages + print("here are your last messages:", list(message_queue.queue)) # Debug + """ + + """ + if message_uuid not in save_last_messages: + if len(save_last_messages) <= 3: + oldest_key = next(iter(save_last_messages)) # get oldest key + del save_last_messages[oldest_key] # delete it + #save_last_messages["last message"] = message_uuid + save_last_messages[message_uuid] = decoded_message + #save_last_messages.update(f"last message: {message}: {message_uuid}") + print("here are your last messages:", dict(save_last_messages)) #debug + """