From be4acda2ffb7f146d135127a2fe7d46a84dcf063 Mon Sep 17 00:00:00 2001 From: Katharina <katharina.willig@outlook.com> Date: Sun, 2 Feb 2025 13:06:12 +0100 Subject: [PATCH] delete and add comments --- Client_25-01-25.py | 10 ++-- Server_25-01-25.py | 114 ++++++++++++++++++++++----------------------- 2 files changed, 59 insertions(+), 65 deletions(-) diff --git a/Client_25-01-25.py b/Client_25-01-25.py index 36a35e9..02b01dc 100644 --- a/Client_25-01-25.py +++ b/Client_25-01-25.py @@ -19,7 +19,7 @@ last_heartbeat = time.time() processed_message_ids = set() # A set to track the IDs of messages that have already been processed. This helps avoid duplicate processing. listener_ready = threading.Event() -########################### Start - Receiving MSG ########################### +########################### Start - Receiving messages ########################### def listen_server(): """ Listens for messages broadcasted by the server and processes them. @@ -38,10 +38,7 @@ def listen_server(): received_uuid, rest_of_text = text.split(":", 1) # Split the message into UUID (unique identifier) and the rest of the text if received_uuid not in processed_message_ids: # Process the message if it hasn't been processed yet processed_message_ids.add(received_uuid) # Mark the message as processed - #print(f"Received {data.decode()} from {address}") # Only for Debugging - print(rest_of_text) # Display the message content - #else: ################### Only for debugging - # print("Message ist Doppelt") ################### Only for debugging + print(rest_of_text) # Display the message content except socket.error as e: # Handle and log any errors while listening for messages print(f"An error occurred while listening: {e}") continue @@ -61,7 +58,7 @@ def sender(): client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) #Send the "entered the chat" message to the broadcast address - processed_message_ids.add(message_id) # Mark the message ID as processed #######+ Können wir als Rückmeldung auch ausgeben lassen + processed_message_ids.add(message_id) # Mark the message ID as processed try: while True: message = input("") # Allow the user to enter a message @@ -96,7 +93,6 @@ def listen_to_heartbeat(): while True: try: # Wait for incoming heartbeat messages data, addr = client_heartbeat_socket.recvfrom(1024) # Receive data from the socket - #print("Server heartbeat received.") # Log that the server's heartbeat was received - Only for De bugging last_heartbeat = time.time() # Update the timestamp for the last received heartbeat except Exception as e: # Handle and log any errors during data reception print(f"Error sending data: {e}") diff --git a/Server_25-01-25.py b/Server_25-01-25.py index b6bf9da..90666a1 100644 --- a/Server_25-01-25.py +++ b/Server_25-01-25.py @@ -15,25 +15,29 @@ from collections import deque global members_UUID global members_IP global last_heartbeat_time + # Initialize lists to keep track of members members_UUID = [] # List for UUIDs of members in the ring members_IP = [] # List for IP addresses of members in the ring + # Network and port configurations broadcast_ip = "255.255.255.255" #Broadcast-adress in the Network enter_port = 12348 #Port that is used for the discovery of new server participants -ringport = 12343 -election_port = 12345 +ringport = 12343 #Port that is used for the ring updates +election_port = 12345 #Port that is used for the leader election client_broadcast_port = 55555 #client sends, server listens client_broadcast_port2 = 33333 #server sends, client listens -acknowledgement_port = 22222 -heartbeat_port = 44444 -heartbeat_client_broadcast_port = 11111 +acknowledgement_port = 22222 #port to send acknowledgements +heartbeat_port = 44444 #Port to send the leader heartbeat +heartbeat_client_broadcast_port = 11111 #Port to send heartbeat to the client + # Unique identification for this server myuuid = uuid.uuid4() #Creating a unique ip Adress using uuid4 my_ID = str(myuuid) #Creating a unique ip Adress using uuid4 -hostname = socket.gethostname() +hostname = socket.gethostname() #get the hostname of the current machine ip_address = socket.gethostbyname(hostname) # Retrieves the hostname of the current machine -#ip_address = "127.0.0.1" ########verwenden zum Testen auf einem Gerät +#ip_address = "127.0.0.1" #for testing at one host + # Leader election-related variables participating = False # Indicates whether the server is currently participating in an election is_leader = False # Boolean flag to indicate if the server is the leader @@ -41,11 +45,12 @@ Leader = False # Alternate flag to indicate leader status (can be consolidated w ELECTION = 0 # Message type for initiating an election NEW_LEAD = 1 # Message type for announcing a new leader leader_ip = 'unknown' # Stores the IP address of the current leader; default is 'unknown' + # Heartbeat related variables -last_three_messages = deque(maxlen=3) # Initialization of message storage (last 3 messages) -send_heartbeat = deque() -received_heartbeat = deque() -last_heartbeat_time = time.time() +last_three_messages = deque(maxlen=3) # Initialization of message storage (last 3 messages), double-ended queue +send_heartbeat = deque() #double-ended queue +received_heartbeat = deque() #double-ended queue +last_heartbeat_time = time.time() # variables for Listen to Client hold_back_queue = deque() # A double-ended queue (deque) to store messages that need to be temporarily held back before they are processed. @@ -81,7 +86,7 @@ def send_acknowledgement(): right_neighbour = get_neighbour(members_IP, ip_address, 'left') # Determine the right neighbor based on the current ring structure msg=("Your Message was received.") # Message sent as a receipt confirmation send_ack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a UDP socket to send the acknowledgment - send_ack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + send_ack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) #reuse of the address send_ack_socket.sendto(msg.encode('utf-8'),(right_neighbour,acknowledgement_port)) # Send the acknowledgment message to the right neighbor using the acknowledgement_port send_ack_socket.close() # Close the socket after sending the message @@ -96,9 +101,9 @@ def receive_acknowledgement(msg, ip, so): if len(members_IP) > 1: right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure recack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to receive acknowledgment messages - recack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + recack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) #reuse address recack_socket.bind((ip_address,acknowledgement_port)) - recack_socket.settimeout(2) # Set a timeout for receiving acknowledgment --> Notwendig in welcher Höhe???? Testen?? + recack_socket.settimeout(2) # Set a timeout for receiving acknowledgment try: data, addr = recack_socket.recvfrom(1024) # Attempt to receive a response print(f"{addr} has received the message.") @@ -129,7 +134,7 @@ def receive_acknowledgement(msg, ip, so): # Update the ring and forward necessary messages send_update_to_ring() if right_neighbour == leader_ip: #check if failed server was the leader - start_election() + start_election() #is it was the leader, then start a new election new_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the new right neighbor with updated ring temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) temp_send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) @@ -158,7 +163,7 @@ def listen_update_Ring(): members_IP = members_IP2 # Update the local members list print(f"Ring update received: {members_IP}") send_acknowledgement() # Send an acknowledgment for the received update - send_update_to_ring() # Forward the updated member list to the next neighbor + send_update_to_ring() # Forward the updated members list to the next neighbor except json.JSONDecodeError: print("Error decoding the JSON data.") # Handle errors in decoding the JSON data @@ -168,7 +173,7 @@ def send_update_to_ring(): """ global members_IP right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure - if len(members_IP) > 1:# If no right neighbor exists, there is no one to send the update to + if len(members_IP) > 1: # If no right neighbor exists, there is no one to send the update to try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending updates sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) @@ -199,12 +204,12 @@ def new_server_in_ring(): new_server_ip, new_server_port = addr # Extract the IP and port of the new server from the received address new_IP = data.decode() new_IP = new_IP.split(": ")[1] # Extract the IP address from the message - if new_IP in members_IP: # Check if the IP already exists. This might happen if a server temporarily lost connection. #######Tritt das überhaupt ein? In Gruppe diskutieren - f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server# If the server already exists, send the Welcome Message + if new_IP in members_IP: # Check if the IP already exists. This might happen if a server temporarily lost connection. + f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server. If the server already exists, send the Welcome Message else: members_IP.append(new_IP) # If the server is new, add its IP to the list msg = f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server - #AttributeError: 'bytes' object has no attribute 'encode'. Did you mean: 'decode'? + sock.sendto(msg, (new_IP, new_server_port)) # Send the greeting message back to the new server print(f"The updated IP_Ring is: {members_IP}") send_update_to_ring() # Update the ring topology @@ -217,15 +222,15 @@ def server_enters(): """ global members_UUID global leader_ip - max_retries = 3 + max_retries = 3 #retry of the sending for a maximum of 3 times success = False - intervall = 2 #time to wait fpr a response + intervall = 2 #time to wait for a response msg = f"I am new: {ip_address}".encode() # Greeting message from the new server for attempt in range(max_retries): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode - sock.settimeout(intervall) # Set a timeout to wait for a response --> ggf. anpassen, je nach geschwindigkeit mit Handy Internet + sock.settimeout(intervall) # Set a timeout to wait for a response print(f"Sending discovery message (attempt{attempt+1}/{max_retries})") sock.sendto(msg, (broadcast_ip, enter_port)) # Send the greeting message to the broadcast address using the enter_port try: @@ -243,9 +248,9 @@ def server_enters(): break if not success: - print(f"Keine Antwort erhalten. Ich bin jetzt der Leader. Meine IP: {ip_address}") # If no answer is received the server sets itself as leader - members_UUID.append(my_ID) # Add itself as a participant in the ring - members_IP.append(ip_address) # Add itself as a participant in the ring + print(f"No response received. I am now the leader. My IP: {ip_address}") # If no answer is received the server sets itself as leader + members_UUID.append(my_ID) # Add itself as a participant in the ring, UUID + members_IP.append(ip_address) # Add itself as a participant in the ring, IP sock.close() leader_ip = ip_address # Set leder_ip to own IP print(leader_ip) @@ -266,11 +271,11 @@ def start_election(): msg = f"{ELECTION}: {myuuid}: {ip_address}".encode('utf-8') # Create the election message with the server's UUID and IP address send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to send the election message send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) - send_socket.sendto(msg,(right_neighbour,election_port)) # Send msg to the right neighbor usinf the election_port + send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to the right neighbor using the election_port send_socket.close() receive_acknowledgement(msg, right_neighbour, election_port) # Wait for acknowledgment -def accept(group,erhaltene_uuid,erhaltene_ip): +def accept(group,received_uuid,received_ip): """ Function to handle election messages and determine the next steps. If the message is part of an election (ELECTION), the server compares UUIDs to either forward, update, or declare itself as the leader. @@ -285,16 +290,16 @@ def accept(group,erhaltene_uuid,erhaltene_ip): right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure if group == ELECTION: # If the received message is part of an election # Compare the received UUID with the server's own UUID - if erhaltene_uuid > myuuid: # Received UUID is greater, so forward the message without changes + if received_uuid > myuuid: # Received UUID is greater, so forward the message without changes print("{ip_adress}:{} is forwarding without updates.".format(myuuid)) participating = True - msg = f"{ELECTION}: {erhaltene_uuid}: {erhaltene_ip}".encode() + msg = f"{ELECTION}: {received_uuid}: {received_ip}".encode() send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour send_socket.close() receive_acknowledgement(msg, right_neighbour , election_port) - elif erhaltene_uuid < myuuid and participating==False: # Received UUID is smaller, update with the server's own UUID and forward + elif received_uuid < myuuid and participating==False: # Received UUID is smaller, update with the server's own UUID and forward print("{ip_adress}: {} is updating and forwarding.".format(myuuid)) participating = True msg = f"{ELECTION}: {myuuid}: {ip_address}".encode() @@ -303,7 +308,7 @@ def accept(group,erhaltene_uuid,erhaltene_ip): send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour send_socket.close() receive_acknowledgement(msg, right_neighbour , election_port) - if erhaltene_uuid == myuuid: # If the server receives its own UUID, it becomes the leader + if received_uuid == myuuid: # If the server receives its own UUID, it becomes the leader print("{ip_adress}: {} starts acting as a leader!".format(myuuid)) participating = False leader_ip = ip_address # Set leader_ip to own IP @@ -318,15 +323,15 @@ def accept(group,erhaltene_uuid,erhaltene_ip): receive_acknowledgement(msg, right_neighbour , election_port) if group == NEW_LEAD: # If the received message announces a new leader - if erhaltene_uuid == myuuid: # If the UUID matches, the server has already acknowledged + if received_uuid == myuuid: # If the UUID matches, the server has already acknowledged return - if erhaltene_uuid != myuuid: # Update the leader information and forward the new leader announcement + if received_uuid != myuuid: # Update the leader information and forward the new leader announcement print("{} acknowledged new leader.".format(myuuid)) - if leader_ip == ip_address: # Check if this server was the Leader bevor the election and set is_leader to False + if leader_ip == ip_address: # Check if this server was the Leader before the election and set is_leader to False is_leader.set_value(False) - leader_ip = erhaltene_ip # Update leader_ip - leader = erhaltene_uuid # Update leader - msg = f"{NEW_LEAD}: {erhaltene_uuid}: {erhaltene_ip}".encode() + leader_ip = received_ip # Update leader_ip + leader = received_uuid # Update leader + msg = f"{NEW_LEAD}: {received_uuid}: {received_ip}".encode() send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour @@ -337,11 +342,8 @@ def accept(group,erhaltene_uuid,erhaltene_ip): def listen_election(): """ Listens for incoming election or leader messages on the configured election socket. - Decodes the message, sends an acknowledgment to the sender, and processes the message via the accept() function. + Decodes the message, sends an acknowledgement to the sender, and processes the message via the accept() function. """ - #sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages - #sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) - #sock.bind((ip_address,election_port)) # Bind to the election socket while True: sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) @@ -349,11 +351,11 @@ def listen_election(): data,addr=sock.recvfrom(4096) # Receive data from other servers übernahme = data.decode('utf-8') # Decode the received message grouprec = int(übernahme.split(": ")[0]) # Extract group ID, UUID, and IP address from the message - erhaltene_ip = (übernahme.split(": ")[2]) - erhaltene_uuid2 = uuid.UUID((übernahme.split(": ")[1])) + received_ip = (übernahme.split(": ")[2]) + received_uuid2 = uuid.UUID((übernahme.split(": ")[1])) sock.close() - send_acknowledgement() # Send acknowledgment back to the sender - accept(grouprec,erhaltene_uuid2,erhaltene_ip) # Process the election or new leader message + send_acknowledgement() # Send acknowledgement back to the sender + accept(grouprec,received_uuid2,received_ip) # Process the election or new leader message ########################### End - Leader Election ########################### ########################### Start - Process client messages ########################### @@ -393,7 +395,6 @@ def listen_client(): message, client_address = server_socket.recvfrom(4096) # Receive a message from a client decoded_message = message.decode() # Decode the message last_three_messages.append(decoded_message) # Store the message for Leader heartbeat - #print(f"Message stored: {last_three_messages}") # Only for debugging message_id = decoded_message.split(":")[0] # Extract the unique message ID (UUID) from the decoded message if message_id in processed_message_ids: # Check if the message has already been processed continue # Skip if the message was already processed @@ -402,7 +403,7 @@ def listen_client(): except socket.error as e: # Handle socket errors print(f"An error occurred while listening: {e}") break - except KeyboardInterrupt: # Handle server shutdown via keyboard interrupt ????? Funktioniert das?? + except KeyboardInterrupt: # Handle server shutdown via keyboard interrupt, but does not work. print("\nShutting down server...") break ########################### End - Process client messages ########################### @@ -430,9 +431,7 @@ def listen_heartbeat(): """ global received_heartbeat global last_heartbeat_time - #sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for for receiving heartbeats - #sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) - #sock.bind((ip_address, heartbeat_port)) # Bind the socket to the servers IP address and heartbeat port for listening + while True: # Receive data from the socket sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for for receiving heartbeats sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) @@ -442,8 +441,8 @@ def listen_heartbeat(): sock.close() send_acknowledgement() # Send acknowledgment for the received message received_heartbeat = pickle.loads(data) # Deserialize the received data - #print(received_heartbeat) # Debugging. Print the received messages last_heartbeat_time = time.time() # Update the last heartbeat timestamp + if leader_ip == ip_address: # If the server is the leader if send_heartbeat==received_heartbeat: print(f"My heartbeat has traveled through the ring.") @@ -451,7 +450,7 @@ def listen_heartbeat(): print(f"Received a foreign heartbeat. I am forwarding the messages.") forward_received_heartbeat_to_clients() else: # Forward the heartbeat to the right neighbor - print("Heartbeat is forwarded to the neigbbor.") + print("Heartbeat is forwarded to the neighbor.") right_neighbour = get_neighbour(members_IP, ip_address, 'right') msg = pickle.dumps(received_heartbeat) try: @@ -463,7 +462,6 @@ def listen_heartbeat(): except Exception as e: # Handle errors during data transmission print(f"Error sending data lsiten heartbeat: {e}") - def leader_send_heartbeat(): """ @@ -475,7 +473,7 @@ def leader_send_heartbeat(): send_heartbeat = last_three_messages # Set the send_heartbeat as the last three messages msg = pickle.dumps(send_heartbeat) # Serialize the messages right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure - if len(members_IP) == 1: # If only this server is part of the Ring it does not have to send an Heartbeat + if len(members_IP) == 1: # If only this server is part of the Ring, it does not have to send an Heartbeat print("I am the only server. There is no other Server to send a heartbeat to.") last_heartbeat_time = time.time() # Update the last heartbeat timestamp else: # Send the heartbeat to the right neighbor @@ -490,7 +488,7 @@ def leader_send_heartbeat(): sock_heart_send.close() # Close the socket to free up resources time.sleep(3) # Wait before sending the next heartbeat -def monitor_heartbeat(): ################ Achtung Testen, ob auch der Leader in ein timeout kommen kann... ggf. vor removal f leader IP einfügen####### +def monitor_heartbeat(): """ Monitors whether the last heartbeat was received within the timeout period. """ @@ -523,9 +521,9 @@ def leader_send_Client_heartbeat(): print("I am sending a heartbeat to the client.") # Debugging message indicating that the server is sending a heartbeat server_client_heartbeat = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket server_client_heartbeat.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode - server_client_heartbeat.sendto(msg.encode('utf-8'), (broadcast_ip, heartbeat_client_broadcast_port)) # # Send the broadcast message to the specified broadcast IP and port + server_client_heartbeat.sendto(msg.encode('utf-8'), (broadcast_ip, heartbeat_client_broadcast_port)) # Send the broadcast message to the specified broadcast IP and port server_client_heartbeat.close()# Close the socket to free up resources - time.sleep(2) # Wait for 15 seconds before sending the next heartbeat + time.sleep(2) # Wait for 2 seconds before sending the next heartbeat ########################### End - Client Heartbeat ########################### ########################### Start - observation value changes leader ########################### -- GitLab