diff --git "a/2025-01-10_Server_V8_ohne_weiteren_\303\204nderungen.py" "b/2025-01-10_Server_V8_ohne_weiteren_\303\204nderungen.py" new file mode 100644 index 0000000000000000000000000000000000000000..22ef8c0565fe24272e2c4f428a9b1c8e1323f0ab --- /dev/null +++ "b/2025-01-10_Server_V8_ohne_weiteren_\303\204nderungen.py" @@ -0,0 +1,547 @@ + +from inspect import _empty +import time +import threading +from uuid import uuid4 +import socket +import uuid +import pickle +import json +import multiprocessing +import os +from multiprocessing import Manager +from collections import deque + +# Global variables to manage ring members and their information +global members_UUID +global members_IP +# Initialize lists to keep track of members +members_UUID = [] # List for UUIDs of members in the ring +members_IP = [] # List for IP addresses of members in the ring +# Network and port configurations +broadcast_ip = "255.255.255.255" #Broadcast-adress in the Network +enter_port = 12348 #Port that is used for the discovery of new server participants +ringport = 12343 +election_port = 12345 +client_broadcast_port = 55555 #client sends, server listens +client_broadcast_port2 = 33333 #server sends, client listens +acknowledgement_port = 22222 +heartbeat_port = 44444 +heartbeat_client_broadcast_port = 11111 +# Unique identification for this server +myuuid = uuid.uuid4() #Creating a unique ip Adress using uuid4 +my_ID = str(myuuid) #Creating a unique ip Adress using uuid4 +hostname = socket.gethostname() +ip_address = socket.gethostbyname(hostname) # Retrieves the hostname of the current machine +#ip_address = "127.0.0.1" ########verwenden zum Testen auf einem Gerät +# Leader election-related variables +participating = False # Indicates whether the server is currently participating in an election +is_leader = False # Boolean flag to indicate if the server is the leader +Leader = False # Alternate flag to indicate leader status (can be consolidated with is_leader) +ELECTION = 0 # Message type for initiating an election +NEW_LEAD = 1 # Message type for announcing a new leader +leader_ip = 'unknown' # Stores the IP address of the current leader; default is 'unknown' +# Heartbeat related variables +last_five_messages = deque(maxlen=5) # Initialization of message storage (last 5 messages) +send_heartbeat = deque() +received_heartbeat = deque() +last_heartbeat_time = time.time() + +# variables for Listen to Client +hold_back_queue = deque() # A double-ended queue (deque) to store messages that need to be temporarily held back before they are processed. +processed_message_ids = set() # A set to track the IDs of messages that have already been processed. This helps avoid duplicate processing. + +########################### Start - Neighbour ########################### +def get_neighbour(members_IP, current_member_ip, direction='left'): + """ + Determines the neighbor of a server in a circular ring topology based on the direction. + """ + current_member_index = members_IP.index(current_member_ip) if current_member_ip in members_IP else -1 # Find the index of the current member in the list. If not found, set to -1. + if current_member_index != -1: # Determine the neighbor to the 'left' + if direction == 'left': # Determine the neighbor to the 'left' (next in the ring) + if current_member_index + 1 == len(members_IP): # If the current member is the last in the list, wrap around to the first + return members_IP[0] # Return the first member in the list + else: + return members_IP[current_member_index + 1] # Return the next member in the list + else: # Determine the neighbor to the 'right' + if current_member_index - 1 < 0: # If the current member is the first in the list, wrap around to the last + return members_IP[len(members_IP) - 1] # Return the last member in the list + else: + return members_IP[current_member_index - 1] # Return the previous member in the list + else: + return None # If the current member IP is not found in the list, return None +########################### End - Neighbour ########################### + +########################### Start - Acknowledgement ############################ +def send_acknowledgement(): + """ + Function for sending an acknowledgment for a received message. + """ + if len(members_IP) > 1: + right_neighbour = get_neighbour(members_IP, ip_address, 'left') # Determine the right neighbor based on the current ring structure + msg=("Your Message was received.") # Message sent as a receipt confirmation + send_ack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a UDP socket to send the acknowledgment + send_ack_socket.sendto(msg.encode('utf-8'),(right_neighbour,acknowledgement_port)) # Send the acknowledgment message to the right neighbor using the acknowledgement_port + send_ack_socket.close() # Close the socket after sending the message + +def receive_acknowledgement(msg, ip, so): + """ + Function for receiving acknowledgments. This section includes the system's response to messages not received between servers. + """ + global members_IP + global is_leader + global leader_ip + + if len(members_IP) > 1: + right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + recack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to receive acknowledgment messages + recack_socket.bind((ip_address,acknowledgement_port)) + recack_socket.settimeout(5) # Set a timeout for receiving acknowledgment --> Notwendig in welcher Höhe???? Testen?? + try: + data, addr = recack_socket.recvfrom(1024) # Attempt to receive a response + print(f"{addr} has received the message.") + recack_socket.close() + except socket.timeout: + print(f"No response received. Retrying message delivery.") # If no response is received, resend the message + temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Open a temporary socket for resending the original message + temp_send_socket.sendto(msg,(ip,so)) + temp_send_socket.close() + recack_socket.settimeout(5) # Set a new timeout for acknowledgment + try: + data, addr = recack_socket.recvfrom(1024) # Second attempt to receive a response + print(f"{addr} has received the message.") + recack_socket.close() + except socket.timeout: # If the second attempt fails, handle server failure + print(f"No response again. Server is unreachable. Triggering ring update.") + recack_socket.close() + members_IP.remove(right_neighbour) # Remove the failed server from the member list + # Check if the failed server was the leader + if len(members_IP) == 1: + # If only one server remains, it becomes the leader + if leader_ip != ip_address: # Check if the leader IP is its own; only change is_leader to True if the server was not the leader before + leader_ip = ip_address # Set leder_ip to own IP + is_leader.set_value(True) + print("I am now the last server in this ring and therefore the leader.") + else: + # Update the ring and forward necessary messages + send_update_to_ring() + if right_neighbour == leader_ip: #check if failed server was the leader + start_election() + new_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the new right neighbor with updated ring + temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + temp_send_socket.sendto(msg,(new_neighbour,so)) # Forward the original message to the new neighbor + temp_send_socket.close() + receive_acknowledgement(msg, new_neighbour, so) # Wait for acknowledgment from the new neighbor +########################### End - Acknowledgement ########################### + +########################### Start - Update ring ########################### +def lausche_update_Ring(): + """ + Listens for ring updates via UDP broadcasts and handles updates to the members_IP list. + """ + global members_IP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for communication + sock.bind((ip_address, ringport)) # Bind the socket to the servers IP address and ring port for listening + while True: + try: + data, addr = sock.recvfrom(1024) # Receive data from the socket + members_IP2 = json.loads(data.decode()) # Decode the received JSON data to update the members list + if members_IP2 == members_IP: # Check if the received members list matches the current members list + print(f"Ring update has traveled through the ring.") + send_acknowledgement() # Send an acknowledgment for the received update + else: + members_IP = members_IP2 # Update the local members list + print(f"Ring update received: {members_IP}") + send_acknowledgement() # Send an acknowledgment for the received update + send_update_to_ring() # Forward the updated member list to the next neighbor + except json.JSONDecodeError: + print("Error decoding the JSON data.") # Handle errors in decoding the JSON data + +def send_update_to_ring(): + """ + Sends the updated members list to the next server in the ring. + """ + global members_IP + right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + if len(members_IP) > 1:# If no right neighbor exists, there is no one to send the update to + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending updates + data = json.dumps(members_IP).encode() # Serialize the members list into JSON format + sock.sendto(data, (right_neighbour , ringport)) # Send the members list to the right neighbor + sock.close()# Close the socket to free up resources + receive_acknowledgement(data, right_neighbour , ringport) # Wait for acknowledgment from the right neighbor + except Exception as e: # Handle errors during data transmission + print(f"Error sending data to Ring: {e}") +########################### End - Update ring ########################### + +########################### Start - Server Enters ########################### +def new_server_in_ring(): + """ + This function is executed by the Leader. It listens for incoming messages from servers attempting to join the network. + The Leader maintains the list of all IP addresses in the ring and updates the topology whenever a new server joins. + Topology Updates: The Leader ensures all servers in the network are aware of the latest ring structure. + """ + global members_UUID + global members_IP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode + sock.bind(("0.0.0.0", enter_port)) # Bind the socket to the address "0.0.0.0" and the enter_port + print("Server is running and waiting for broadcast messages from new servers.") + while True: + data, addr = sock.recvfrom(1024) # Listen for messages from other servers + print(f"Message received from {addr}: {data.decode()}") + new_server_ip, new_server_port = addr # Extract the IP and port of the new server from the received address + new_IP = data.decode() + new_IP = new_IP.split(": ")[1] # Extract the IP address from the message + if new_IP in members_IP: # Check if the IP already exists. This might happen if a server temporarily lost connection. #######Tritt das überhaupt ein? In Gruppe diskutieren + msg = json.dumps(members_IP).encode() # If the server already exists, send the updated member list + else: + members_IP.append(new_IP) # If the server is new, add its IP to the list + msg = f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server + #AttributeError: 'bytes' object has no attribute 'encode'. Did you mean: 'decode'? + sock.sendto(msg, (new_IP, new_server_port)) # Send the greeting message back to the new server + print(f"The updated IP_Ring is: {members_IP}") + send_update_to_ring() # Update the ring topology + +def server_enters(): + """ + This function is used when a server wants to join the network. It sends a greeting message to the broadcast address and waits for a response from the Leader. + If no response is received, the server assumes the Leader role and starts managing the ring itself. + Broadcast Communication: This allows new servers to discover the Leader without knowing its specific IP address. + """ + global members_UUID + global leader_ip + + msg = f"I am new: {ip_address}".encode() # Greeting message from the new server + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode + sock.sendto(msg, (broadcast_ip, enter_port)) # Send the greeting message to the broadcast address using the enter_port + sock.settimeout(5) # Set a timeout to wait for a response --> ggf. anpassen, je nach geschwindigkeit mit Handy Internet + try: + data, addr = sock.recvfrom(1024) # receiving response + print(f"Antwort von {addr}: {data.decode()}") + sock.close() + my_leader = data.decode().split(": ")[1] # Extract the Leader's IP address from the response + leader_ip = my_leader # Set leder_ip to the received IP + except socket.timeout: + print(f"Keine Antwort erhalten. Ich bin jetzt der Leader. Meine IP: {ip_address}") # If no answer is received the server sets itself as leader + members_UUID.append(my_ID) # Add itself as a participant in the ring + members_IP.append(ip_address) # Add itself as a participant in the ring + sock.close() + leader_ip = ip_address # Set leder_ip to own IP + print(leader_ip) + is_leader.set_value(True) # Mark itself as the Leader +########################### End - Server Enters ########################### + +########################### Start - Leader Election ########################### +def start_election(): + """ + Initiates an election by sending the server's UUID and IP address to its right neighbor in the ring. + Marks the server as participating in the election process and waits for acknowledgment. + """ + global participating + right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + print("{} is starting an election.".format(myuuid)) + participating = True # Server marks itself as participating in the election + msg = f"{ELECTION}: {myuuid}: {ip_address}".encode('utf-8') # Create the election message with the server's UUID and IP address + send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to send the election message + send_socket.sendto(msg,(right_neighbour,election_port)) # Send msg to the right neighbor usinf the election_port + send_socket.close() + receive_acknowledgement(msg, right_neighbour, election_port) # Wait for acknowledgment + +def accept(group,erhaltene_uuid,erhaltene_ip): + """ + Function to handle election messages and determine the next steps. + If the message is part of an election (ELECTION), the server compares UUIDs to either forward, update, or declare itself as the leader. + If the message is a new leader announcement (NEW_LEAD), it updates the local leader information and forwards the message. + Leader Election: Based on comparing UUIDs, the server with the highest UUID becomes the leader. + Acknowledgment Mechanism: Ensures that messages are received and processed reliably. + """ + global leader_ip + global is_leader + global last_heartbeat_time + last_heartbeat_time = time.time() + right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + if group == ELECTION: # If the received message is part of an election + # Compare the received UUID with the server's own UUID + if erhaltene_uuid > myuuid: # Received UUID is greater, so forward the message without changes + print("{} is forwarding without updates.".format(myuuid)) + participating = True + msg = f"{ELECTION}: {erhaltene_uuid}: {erhaltene_ip}".encode() + if erhaltene_uuid < myuuid: # Received UUID is smaller, update with the server's own UUID and forward + print("{} is updating and forwarding.".format(myuuid)) + participating = True + msg = f"{ELECTION}: {myuuid}: {ip_address}".encode() + if erhaltene_uuid == myuuid: # If the server receives its own UUID, it becomes the leader + print("{} starts acting as a leader!".format(myuuid)) + participating = False + leader_ip = ip_address # Set leader_ip to own IP + leader = myuuid #Set leader to own uuid + msg = f"{NEW_LEAD}: {myuuid}: {ip_address}".encode() + if leader_ip != ip_address: # Update leadership status if server was not already the leader bevor the election + is_leader.set_value(True) + send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour + send_socket.close() + receive_acknowledgement(msg, right_neighbour , election_port) + if group == NEW_LEAD: # If the received message announces a new leader + if erhaltene_uuid == myuuid: # If the UUID matches, the server has already acknowledged + return + if erhaltene_uuid != myuuid: # Update the leader information and forward the new leader announcement + print("{} acknowledged new leader.".format(myuuid)) + if leader_ip == ip_address: # Check if this server was the Leader bevor the election and set is_leader to False + is_leader.set_value(False) + leader_ip = erhaltene_ip # Update leader_ip + leader = erhaltene_uuid # Update leader + msg = f"{NEW_LEAD}: {erhaltene_uuid}: {erhaltene_ip}".encode() + send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour + send_socket.close() + receive_acknowledgement(msg, right_neighbour , election_port) + +def zuhören_election(): + """ + Listens for incoming election or leader messages on the configured election socket. + Decodes the message, sends an acknowledgment to the sender, and processes the message via the accept() function. + """ + sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages + sock.bind((ip_address,election_port)) # Bind to the election socket + while True: + data,addr=sock.recvfrom(4096) # Receive data from other servers + übernahme = data.decode('utf-8') # Decode the received message + grouprec = int(übernahme.split(": ")[0]) # Extract group ID, UUID, and IP address from the message + erhaltene_ip = (übernahme.split(": ")[2]) + erhaltene_uuid2 = uuid.UUID((übernahme.split(": ")[1])) + send_acknowledgement() # Send acknowledgment back to the sender + accept(grouprec,erhaltene_uuid2,erhaltene_ip) # Process the election or new leader message +########################### End - Leader Election ########################### + +########################### Start - Process client messages ########################### +def process_hold_back_queue(): + """ + Processes messages in the hold-back queue in the correct order. + """ + while hold_back_queue: + message = hold_back_queue.popleft() # Remove the oldest message from the queue + message_id, decoded_message = message + if message_id not in processed_message_ids: + # Process or forward the message + print(f"Processing message: {decoded_message}") + broadcast(decoded_message) # Forward the message to all participants + processed_message_ids.add(message_id) # Mark the message as processed + +def broadcast(message): + """ + Sends a message to all participants in the network via broadcast. + """ + server_socket2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + server_socket2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode + full_message = f"{message}".encode() # Encode the message + server_socket2.sendto(full_message, (broadcast_ip, client_broadcast_port2)) # Send the broadcast message + server_socket2.close() + +def listen_client(): + """ + Listens for messages from clients, processes them, and broadcasts them to other participants. + """ + global last_five_messages + server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode + server_socket.bind(('', client_broadcast_port)) # Bind to the broadcast port + while True: # Wait to receive a message from a client + try: + message, client_address = server_socket.recvfrom(4096) # Receive a message from a client + decoded_message = message.decode() # Decode the message + last_five_messages.append(decoded_message) # Store the message for Leader heartbeat + #print(f"Message stored: {last_three_messages}") # Only for debugging + message_id = decoded_message.split(":")[0] # Extract the unique message ID (UUID) from the decoded message + if message_id in processed_message_ids: # Check if the message has already been processed + continue # Skip if the message was already processed + hold_back_queue.append((message_id, decoded_message)) # Add the message to the hold-back queue + process_hold_back_queue() # Process messages in the hold-back queue + except socket.error as e: # Handle socket errors + print(f"An error occurred while listening: {e}") + break + except KeyboardInterrupt: # Handle server shutdown via keyboard interrupt ????? Funktioniert das?? + print("\nShutting down server...") + break +########################### End - Process client messages ########################### + +########################### Start - Leader Heartbeat ################################# +def forward_received_heartbeat_to_clients(): + """ + Forwards the last 5 received heartbeat messages separately to clients via UDP broadcast. + """ + server_socket3 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + server_socket3.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode + print("Starting to send the messages from the heartbeat.") + for message in received_heartbeat: + try: + print(f"Forwarding message: {message}") # Print the message being forwarded for debugging + full_message = f"{message}".encode() # Encode the message + server_socket3.sendto(full_message, (broadcast_ip, client_broadcast_port2)) # Send the broadcast message + except Exception as e: + print(f"Error forwarding message: {e}") + server_socket3.close() # Close the socket to free up resources + +def listen_heartbeat(): + """ + Listens for the leader's heartbeat and forwards it to the right neighbor in the ring. + """ + global received_heartbeat + global last_heartbeat_time + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for for receiving heartbeats + sock.bind((ip_address, heartbeat_port)) # Bind the socket to the servers IP address and heartbeat port for listening + while True: # Receive data from the socket + data, addr = sock.recvfrom(1024) # Receive data from the socket + print("Leader heartbeat received.") + send_acknowledgement() # Send acknowledgment for the received message + received_heartbeat = pickle.loads(data) # Deserialize the received data + print(received_heartbeat) # Debugging. Print the received messages + last_heartbeat_time = time.time() # Update the last heartbeat timestamp + if leader_ip == ip_address: # If the server is the leader + if send_heartbeat==received_heartbeat: + print(f"My heartbeat has traveled through the ring.") + else: + print(f"Received a foreign heartbeat. I am forwarding the messages.") + forward_received_heartbeat_to_clients() + else: # Forward the heartbeat to the right neighbor + print("Heartbeat wird an den nachbarn gesendet") + right_neighbour = get_neighbour(members_IP, ip_address, 'right') + msg = pickle.dumps(received_heartbeat) + try: + sock_heart = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending Hearbeat + sock_heart.sendto(msg, (right_neighbour , heartbeat_port)) # Send heartbeat to the right neighbor + receive_acknowledgement(msg, right_neighbour , heartbeat_port) # Wait for acknowledgment from the right neighbor + sock_heart.close() # Close the socket to free up resources + except Exception as e: # Handle errors during data transmission + print(f"Error sending data lsiten heartbeat: {e}") + + +def leader_send_heartbeat(): + """ + Sends the Heartbeat to the next server in the ring. + """ + global send_heartbeat + global last_heartbeat_time + while True: + send_heartbeat = last_five_messages # Set the send_heartbeat as the last five messages + msg = pickle.dumps(send_heartbeat) # Serialize the messages + right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + if len(members_IP) == 1: # If only this server is part of the Ring it does not have to send an Heartbeat + print("I am the only server. There is no other Server to send a heartbeat to.") + last_heartbeat_time = time.time() # Update the last heartbeat timestamp + else: # Send the heartbeat to the right neighbor + try: + sock_heart_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending updates + sock_heart_send.sendto(msg, (right_neighbour , heartbeat_port)) # Send the members list to the right neighbor + sock_heart_send.close() + receive_acknowledgement(msg, right_neighbour , heartbeat_port) # Wait for acknowledgment from the right neighbor + except Exception as e: # Handle errors during data transmission + print(f"Error sending data Leader_send_heartbeat: {e}") + #sock_heart_send.close() # Close the socket to free up resources + time.sleep(5) # Wait before sending the next heartbeat + +def monitor_heartbeat(): ################ Achtung Testen, ob auch der Leader in ein timeout kommen kann... ggf. vor removal f leader IP einfügen####### + """ + Monitors whether the last heartbeat was received within the timeout period. + """ + global members_IP + global leader_ip + global last_heartbeat_time + while True: + time_since_last_heartbeat = time.time() - last_heartbeat_time # Calculate the time since the last heartbeat + if time_since_last_heartbeat > 18: + print(f"ERROR: No heartbeat received for 18 seconds! Leader is down.") + members_IP.remove(leader_ip) # Remove the Leader server from the member list + if len(members_IP) == 1: # If only one server remains, it becomes the leader + print("I am now the last server in this ring and therefore the leader.") + leader_ip = ip_address # Update the leader IP to the current server + is_leader.set_value(True) + else: + send_update_to_ring() # Update the ring and forward necessary messages + start_election() # Start leader election to determine new Leader + time.sleep(1) # Check for heartbeat status every second +########################### End - Leader Heartbeat ############################ + +########################### Start - Client Heartbeat ########################### +def leader_send_Client_heartbeat(): + """ + Broadcasts a heartbeat message to the clients, indicating the server is available. + """ + while True: + msg = ("Server is available.") # Define the heartbeat message + print("I am sending a heartbeat to the client.") # Debugging message indicating that the server is sending a heartbeat + server_client_heartbeat = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + server_client_heartbeat.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode + server_client_heartbeat.sendto(msg.encode('utf-8'), (broadcast_ip, heartbeat_client_broadcast_port)) # # Send the broadcast message to the specified broadcast IP and port + server_client_heartbeat.close()# Close the socket to free up resources + time.sleep(5) # Wait for 15 seconds before sending the next heartbeat +########################### End - Client Heartbeat ########################### + +########################### Start - observation value changes leader ########################### +class VariableWatcher: # A utility class designed to monitor a variable's value and notify registered observers whenever the value changes. + def __init__(self): + self._value = None # Initializes the variable's value as None + self.observers = [] # Creates a list to store observers + def set_value(self, new_value): # Updates the variable and notifies observers. + self._value = new_value # Updates the variable's value + self.notify(new_value) # Notifies all observers about the new value + def add_observer(self, observer): + self.observers.append(observer) # Adds an observer to the list + def notify(self, new_value): + for observer in self.observers: # Iterates through all registered observers + observer(new_value) # Calls each observer with the new value + +def callback(new_value): + """ + This function is triggered when the `is_leader` value changes. + """ + if new_value: + print("I am now taking over leader responsibilities.") + thread7 = threading.Thread(target=listen_client) # Listens to client messages + thread8 = threading.Thread(target=new_server_in_ring) # Listens for election messages + thread9 = threading.Thread(target= leader_send_Client_heartbeat) # Start sending heartbeats to the client + thread10 = threading.Thread(target= leader_send_heartbeat) + thread7.start() + thread8.start() + thread9.start() + if len(received_heartbeat) > 0: # Check if there are values in received_heartbeat + print("I am sending the last 5 messages of the leader heartbeat to the clients.") + forward_received_heartbeat_to_clients() # Forward the heartbeat to the clients. + thread10.start() # Leader starts sending its heartbeat to its right neighbour. + else: + print("I am no longer the leader.") +########################### End - observation value changes leader ########################### + +########################### Only for Testing ########################## +####################################################################### +####################################################################### +def frage_benutzer(): # A test function that prompts the user to decide whether to execute the election process + antwort = input("Möchten Sie die Funktion ausführen? (Ja/Nein): ").strip().lower() + if antwort == 'ja': + start_election() + else: + print("Die Funktion wurde nicht ausgeführt.") +####################################################################### +####################################################################### +####################################################################### + +if __name__ == "__main__": + # Create threads for different server operations + thread1 = threading.Thread(target=server_enters) # Handles server entry to the ring + thread2 = threading.Thread(target=lausche_update_Ring) # Listens for ring updates + thread3 = threading.Thread(target=frage_benutzer) # Prompts the user for action --> Only for testing! + thread4 = threading.Thread(target=zuhören_election) # Listens for election messages + thread5 = threading.Thread(target=listen_heartbeat) # Listens for leader heartbeat + thread6 = threading.Thread(target=monitor_heartbeat) # Monitord the leader heartbeat + # Start all threads + thread1.start() + thread2.start() + thread3.start() + thread4.start() + thread5.start() + thread6.start() + + is_leader = VariableWatcher() # Create an instance of VariableWatcher to observe changes in leader status + is_leader.add_observer(callback) # Add the callback function as an observer for changes in `is_leader` \ No newline at end of file