diff --git a/2024-12-22_Server_V2.py b/2024-12-22_Server_V2.py new file mode 100644 index 0000000000000000000000000000000000000000..9e29aa8e37f467da62061a890d9ca6147d7c225f --- /dev/null +++ b/2024-12-22_Server_V2.py @@ -0,0 +1,327 @@ + +import time +import threading +from uuid import uuid4 +import socket +import uuid +import json +import neighbour +import multiprocessing +import os +from multiprocessing import Manager +from collections import deque + +# Global variables to manage ring members and their information +global members_UUID +global members_IP +# Initialize lists to keep track of members +members_UUID = [] # List for UUIDs of members in the ring +members_IP = [] # List for IP addresses of members in the ring +# Network and port configurations +broadcast_ip = "255.255.255.255" #Broadcast-adress in the Network +enter_port = 12348 #Port that is used for the discovery of new server participants +ringport = 12343 +election_port = 12345 +acknowledgement_port = 22222 +# Unique identification for this server +myuuid = uuid.uuid4() #Creating a unique ip Adress using uuid4 +my_ID = str(myuuid) #Creating a unique ip Adress using uuid4 +hostname = socket.gethostname() +ip_address = socket.gethostbyname(hostname) # Retrieves the hostname of the current machine +#ip_address = "127.0.0.2" ########verwenden zum Testen auf einem Gerät +# Leader election-related variables +participating = False # Indicates whether the server is currently participating in an election +is_leader = False # Boolean flag to indicate if the server is the leader +Leader = False # Alternate flag to indicate leader status (can be consolidated with is_leader) +ELECTION = 0 # Message type for initiating an election +NEW_LEAD = 1 # Message type for announcing a new leader +leader_ip = 'unknown' # Stores the IP address of the current leader; default is 'unknown' + +########################### Start - Acknowledgement ############################ +def send_acknowledgement(): + """ + Function for sending an acknowledgment for a received message. + """ + right_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + msg=("Your Message was received.") # Message sent as a receipt confirmation + send_ack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a UDP socket to send the acknowledgment + send_ack_socket.sendto(msg.encode('utf-8'),(right_neighbour,acknowledgement_port)) # Send the acknowledgment message to the right neighbor using the acknowledgement_port + send_ack_socket.close() # Close the socket after sending the message + +def receive_acknowledgement(msg, ip, so): + """ + Function for receiving acknowledgments. This section includes the system's response to messages not received between servers. + """ + global members_IP + global is_leader + right_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + recack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to receive acknowledgment messages + recack_socket.bind((ip_address,acknowledgement_port)) + recack_socket.settimeout(1) # Set a timeout for receiving acknowledgment --> Notwendig in welcher Höhe???? Testen?? + try: + data, addr = recack_socket.recvfrom(1024) # Attempt to receive a response + print(f"{addr} has received the message.") + recack_socket.close() + except socket.timeout: + print(f"No response received. Retrying message delivery.") # If no response is received, resend the message + temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Open a temporary socket for resending the original message + temp_send_socket.sendto(msg,(ip,so)) + temp_send_socket.close() + recack_socket.settimeout(1) # Set a new timeout for acknowledgment + try: + data, addr = recack_socket.recvfrom(1024) # Second attempt to receive a response + print(f"{addr} has received the message.") + recack_socket.close() + except socket.timeout: # If the second attempt fails, handle server failure + print(f"No response again. Server is unreachable. Triggering ring update.") + recack_socket.close() + members_IP.remove(right_neighbour) # Remove the failed server from the member list + # Check if the failed server was the leader + if len(members_IP) == 1: + # If only one server remains, it becomes the leader + if leader_ip != ip_address: # Check if the leader IP is its own; only change is_leader to True if the server was not the leader before + is_leader.set_value(True) + print("I am now the last server in this ring and therefore the leader.") + else: + # Update the ring and forward necessary messages + send_update_to_ring() + if right_neighbour == leader_ip: #check if failed server was the leader + # Start a new election since the leader is down + start_election() + #####################################################################Funktion Leader is Down einfügen##################################### + new_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the new right neighbor with updated ring + temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + temp_send_socket.sendto(msg,(new_neighbour,so)) # Forward the original message to the new neighbor + temp_send_socket.close() + receive_acknowledgement(msg, new_neighbour, so) # Wait for acknowledgment from the new neighbor +########################### End - Acknowledgement ########################### + +########################### Start - Update ring ########################### +def lausche_update_Ring(): + """ + Listens for ring updates via UDP broadcasts and handles updates to the members_IP list. + """ + global members_IP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for communication + sock.bind((ip_address, ringport)) # Bind the socket to the servers IP address and ring port for listening + while True: + try: + data, addr = sock.recvfrom(1024) # Receive data from the socket + members_IP2 = json.loads(data.decode()) # Decode the received JSON data to update the members list + if members_IP2 == members_IP: # Check if the received members list matches the current members list + print(f"Ring update has traveled through the ring.") + send_acknowledgement() # Send an acknowledgment for the received update + else: + members_IP = members_IP2 # Update the local members list + print(f"Ring update received: {members_IP}") + send_acknowledgement() # Send an acknowledgment for the received update + send_update_to_ring() # Forward the updated member list to the next neighbor + except json.JSONDecodeError: + print("Error decoding the JSON data.") # Handle errors in decoding the JSON data + +def send_update_to_ring(): + """ + Sends the updated members list to the next server in the ring. + """ + global members_IP + right_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + if not right_neighbour: # If no right neighbor exists, there is no one to send the update to + print("No left neighbour to send updates.") + return + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending updates + data = json.dumps(members_IP).encode() # Serialize the members list into JSON format + try: + sock.sendto(data, (right_neighbour , ringport)) # Send the members list to the right neighbor + receive_acknowledgement(data, right_neighbour , ringport) # Wait for acknowledgment from the right neighbor + except Exception as e: # Handle errors during data transmission + print(f"Error sending data: {e}") + sock.close() # Close the socket to free up resources +########################### End - Update ring ########################### + +########################### Start - Server Enters ########################### +def new_server_in_ring(): + """ + This function is executed by the Leader. It listens for incoming messages from servers attempting to join the network. + The Leader maintains the list of all IP addresses in the ring and updates the topology whenever a new server joins. + Topology Updates: The Leader ensures all servers in the network are aware of the latest ring structure. + """ + global members_UUID + global members_IP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode + sock.bind(("0.0.0.0", enter_port)) # Bind the socket to the address "0.0.0.0" and the enter_port + print("Server is running and waiting for broadcast messages from new servers.") + while True: + data, addr = sock.recvfrom(1024) # Listen for messages from other servers + print(f"Message received from {addr}: {data.decode()}") + new_server_ip, new_server_port = addr # Extract the IP and port of the new server from the received address + new_IP = data.decode() + new_IP = new_IP.split(": ")[1] # Extract the IP address from the message + if new_IP in members_IP: # Check if the IP already exists. This might happen if a server temporarily lost connection. #######Tritt das überhaupt ein? In Gruppe diskutieren + msg = json.dumps(members_IP).encode() # If the server already exists, send the updated member list + else: + members_IP.append(new_IP) # If the server is new, add its IP to the list + msg = f"There are already servers. I am the leader: {ip_address}" # Create a message for the new server + sock.sendto(msg.encode(), (new_IP, new_server_port)) # Send the greeting message back to the new server + print(f"The updated IP_Ring is: {members_IP}") + send_update_to_ring() # Update the ring topology + +def server_enters(): + """ + This function is used when a server wants to join the network. It sends a greeting message to the broadcast address and waits for a response from the Leader. + If no response is received, the server assumes the Leader role and starts managing the ring itself. + Broadcast Communication: This allows new servers to discover the Leader without knowing its specific IP address. + """ + global members_UUID + global leader_ip + msg = f"I am new: {ip_address}".encode() # Greeting message from the new server + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode + sock.sendto(msg, (broadcast_ip, enter_port)) # Send the greeting message to the broadcast address using the enter_port + sock.settimeout(1) # Set a timeout to wait for a response --> ggf. anpassen, je nach geschwindigkeit mit Handy Internet + try: + data, addr = sock.recvfrom(1024) # receiving response + print(f"Antwort von {addr}: {data.decode()}") + sock.close() + my_leader = data.decode().split(": ")[1] # Extract the Leader's IP address from the response + leader_ip = my_leader # Set leder_ip to the received IP + except socket.timeout: + print(f"Keine Antwort erhalten. Ich bin jetzt der Leader. Meine IP: {ip_address}") # If no answer is received the server sets itself as leader + members_UUID.append(my_ID) # Add itself as a participant in the ring + members_IP.append(ip_address) # Add itself as a participant in the ring + sock.close() + is_leader.set_value(True) # Mark itself as the Leader + leader_ip = ip_address # Set leder_ip to own IP + new_server_in_ring() # Start the function to manage new servers in the ring --> Achtung nur für tests! ##################################################################### +########################### End - Server Enters ########################### + +########################### Start - Leader Election ########################### +def start_election(): + """ + Initiates an election by sending the server's UUID and IP address to its right neighbor in the ring. + Marks the server as participating in the election process and waits for acknowledgment. + """ + right_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + print("{} is starting an election.".format(myuuid)) + participating = True # Server marks itself as participating in the election + msg = f"{ELECTION}: {myuuid}: {ip_address}".encode('utf-8') # Create the election message with the server's UUID and IP address + send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to send the election message + send_socket.sendto(msg,(right_neighbour,election_port)) # Send msg to the right neighbor usinf the election_port + send_socket.close() + receive_acknowledgement(msg, right_neighbour, election_port) # Wait for acknowledgment + +def accept(group,erhaltene_uuid,erhaltene_ip): + """ + Function to handle election messages and determine the next steps. + If the message is part of an election (ELECTION), the server compares UUIDs to either forward, update, or declare itself as the leader. + If the message is a new leader announcement (NEW_LEAD), it updates the local leader information and forwards the message. + Leader Election: Based on comparing UUIDs, the server with the highest UUID becomes the leader. + Acknowledgment Mechanism: Ensures that messages are received and processed reliably. + """ + global leader_ip + global is_leader + right_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + if group == ELECTION: # If the received message is part of an election + # Compare the received UUID with the server's own UUID + if erhaltene_uuid > myuuid: # Received UUID is greater, so forward the message without changes + print("{} is forwarding without updates.".format(myuuid)) + participating = True + msg = f"{ELECTION}: {erhaltene_uuid}: {erhaltene_ip}".encode() + if erhaltene_uuid < myuuid: # Received UUID is smaller, update with the server's own UUID and forward + print("{} is updating and forwarding.".format(myuuid)) + participating = True + msg = f"{ELECTION}: {myuuid}: {ip_address}".encode() + if erhaltene_uuid == myuuid: # If the server receives its own UUID, it becomes the leader + print("{} starts acting as a leader!".format(myuuid)) + participating = False + if leader_ip != ip_address: # Update leadership status if server was not already the leader bevor the election + is_leader.set_value(True) + leader_ip = ip_address # Set leader_ip to own IP + leader = myuuid #Set leader to own uuid + msg = f"{NEW_LEAD}: {myuuid}: {ip_address}".encode() + if group == NEW_LEAD: # If the received message announces a new leader + if erhaltene_uuid == myuuid: # If the UUID matches, the server has already acknowledged + return + if erhaltene_uuid != myuuid: # Update the leader information and forward the new leader announcement + print("{} acknowledged new leader.".format(myuuid)) + if leader_ip == ip_address: # Check if this server was the Leader bevor the election and set is_leader to False + is_leader.set_value(False) + leader_ip = erhaltene_ip # Update leader_ip + leader = erhaltene_uuid # Update leader + msg = f"{NEW_LEAD}: {erhaltene_uuid}: {erhaltene_ip}".encode() + send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour + send_socket.close() + receive_acknowledgement(msg, right_neighbour , election_port) + +def zuhören_election(): + """ + Listens for incoming election or leader messages on the configured election socket. + Decodes the message, sends an acknowledgment to the sender, and processes the message via the accept() function. + """ + sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages + sock.bind((ip_address,election_port)) # Bind to the election socket + while True: + data,addr=sock.recvfrom(4096) # Receive data from other servers + übernahme = data.decode('utf-8') # Decode the received message + grouprec = int(übernahme.split(": ")[0]) # Extract group ID, UUID, and IP address from the message + erhaltene_ip = (übernahme.split(": ")[2]) + erhaltene_uuid2 = uuid.UUID((übernahme.split(": ")[1])) + send_acknowledgement() # Send acknowledgment back to the sender + accept(grouprec,erhaltene_uuid2,erhaltene_ip) # Process the election or new leader message +########################### End - Leader Election ########################### + +########################### Start - observation value changes leader ########################### +class VariableWatcher: # A utility class designed to monitor a variable's value and notify registered observers whenever the value changes. + def __init__(self): + self._value = None # Initializes the variable's value as None + self.observers = [] # Creates a list to store observers + def set_value(self, new_value): # Updates the variable and notifies observers. + self._value = new_value # Updates the variable's value + self.notify(new_value) # Notifies all observers about the new value + def add_observer(self, observer): + self.observers.append(observer) # Adds an observer to the list + def notify(self, new_value): + for observer in self.observers: # Iterates through all registered observers + observer(new_value) # Calls each observer with the new value + +def callback(new_value): + """ + This function is triggered when the `is_leader` value changes. + """ + print(f"Variable value changed to: {new_value}") # Prints the updated value + if new_value==True: + print("I am now taking over leader responsibilities.") + #new_server_in_ring() + else: + print("I am no longer the leader.") +########################### End - observation value changes leader ########################### + +########################### Only for Testing ########################## +####################################################################### +####################################################################### +def frage_benutzer(): # A test function that prompts the user to decide whether to execute the election process + antwort = input("Möchten Sie die Funktion ausführen? (Ja/Nein): ").strip().lower() + if antwort == 'ja': + start_election() + else: + print("Die Funktion wurde nicht ausgeführt.") +####################################################################### +####################################################################### +####################################################################### + +if __name__ == "__main__": + # Create threads for different server operations + thread1 = threading.Thread(target=server_enters) # Handles server entry to the ring + thread2 = threading.Thread(target=lausche_update_Ring) # Listens for ring updates + thread3 = threading.Thread(target=frage_benutzer) # Prompts the user for action --> Only for testing! + thread4 = threading.Thread(target=zuhören_election) # Listens for election messages + # Start all threads + thread1.start() + thread2.start() + thread3.start() + thread4.start() + + is_leader = VariableWatcher() # Create an instance of VariableWatcher to observe changes in leader status + is_leader.add_observer(callback) # Add the callback function as an observer for changes in `is_leader` +