diff --git a/Client_25-01-25.py b/Client_25-01-25.py new file mode 100644 index 0000000000000000000000000000000000000000..36a35e9448d8a47093c3cdabc8316d563df2d73d --- /dev/null +++ b/Client_25-01-25.py @@ -0,0 +1,136 @@ +import socket +import threading +import time +import uuid +import sys +import os + +broadcast_ip = '255.255.255.255' #Broadcast-adress in the Network +broadcast_port = 55555 #client sends +broadcast_port2 = 33333 #client listens +heartbeat_client_broadcast_port = 11111 # Defined heartbeat port +#local host information +MY_HOST = socket.gethostname() +MY_IP = socket.gethostbyname(MY_HOST) +#Variables for server heartbeat +is_server_available = True +last_heartbeat = time.time() + +processed_message_ids = set() # A set to track the IDs of messages that have already been processed. This helps avoid duplicate processing. +listener_ready = threading.Event() + +########################### Start - Receiving MSG ########################### +def listen_server(): + """ + Listens for messages broadcasted by the server and processes them. + """ + client_socket2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket to listen for incoming messages + client_socket2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode for the socket + client_socket2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Allow the socket to reuse the same address + client_socket2.bind(('', broadcast_port2)) # Bind the socket to the broadcast port and listen on all available IP addresses + """receives messages from server.""" + global message_id # Tracks the ID of the current message + while True: + try: + data, address = client_socket2.recvfrom(4096) # Receive data from the socket + decoded_message = data.decode() # Decode the received message + text = decoded_message + received_uuid, rest_of_text = text.split(":", 1) # Split the message into UUID (unique identifier) and the rest of the text + if received_uuid not in processed_message_ids: # Process the message if it hasn't been processed yet + processed_message_ids.add(received_uuid) # Mark the message as processed + #print(f"Received {data.decode()} from {address}") # Only for Debugging + print(rest_of_text) # Display the message content + #else: ################### Only for debugging + # print("Message ist Doppelt") ################### Only for debugging + except socket.error as e: # Handle and log any errors while listening for messages + print(f"An error occurred while listening: {e}") + continue +########################### End - Receiving MSG ########################### + +########################### Start - Sending MSG ########################### +def sender(): + """ + Allows the user to compose and send messages to the chat. + """ + global message_id # Tracks the unique ID of each message + message_id = str(uuid.uuid4()) # Generate a unique ID for the user's entry message + nickname = input("Enter your nickname: ") # Prompt the user to enter their nickname + just_nickname= f"{message_id}:{nickname} entered the chat".encode() # Create the initial "entered the chat" message + # create client-socket for broadcast + client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) #Send the "entered the chat" message to the broadcast address + processed_message_ids.add(message_id) # Mark the message ID as processed #######+ Können wir als Rückmeldung auch ausgeben lassen + try: + while True: + message = input("") # Allow the user to enter a message + if message.strip(): # Ensure the message is not empty or whitespace + message_id = str(uuid.uuid4()) # Generate a new unique ID for this message + full_message = f"{message_id}:{nickname}: {message}".encode() # Format the message with the user's nickname and content + client_socket.sendto(full_message, (broadcast_ip, broadcast_port)) # Send the message to the broadcast address + processed_message_ids.add(message_id) # Mark the message ID as processed + except KeyboardInterrupt: #does not work + # Handle when the user presses Ctrl+C + print(f"\n{nickname} left the chat.") + # Notify others that this client left the chat + message_id = str(uuid.uuid4()) # New message ID + leave_message = f"{nickname} left the chat".encode() + client_socket.sendto(leave_message, (broadcast_ip, broadcast_port)) + time.sleep(2) # Wait briefly before closing the socket + client_socket.close() # Close the socket + sys.exit() # Exit the program +########################### End - Sending MSG ########################### + +########################### Start - heartbeat ########################### +def listen_to_heartbeat(): + """ + Listens for heartbeat messages from the server and updates the timestamp of the last received heartbeat. + """ + global last_heartbeat # Tracks the time of the last received heartbeat + global is_server_available # Indicates whether the server is available + client_heartbeat_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket to listen for incoming heartbeat messages + client_heartbeat_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode for the socket + client_heartbeat_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Allow the socket to reuse the same address + client_heartbeat_socket.bind(('', heartbeat_client_broadcast_port)) # Bind the socket to the broadcast port and listen on all available IP addresses + while True: + try: # Wait for incoming heartbeat messages + data, addr = client_heartbeat_socket.recvfrom(1024) # Receive data from the socket + #print("Server heartbeat received.") # Log that the server's heartbeat was received - Only for De bugging + last_heartbeat = time.time() # Update the timestamp for the last received heartbeat + except Exception as e: # Handle and log any errors during data reception + print(f"Error sending data: {e}") + +def monitor_heartbeat(): + """ + Monitors whether the last heartbeat was received within the timeout period and updates server availability status. + """ + global last_heartbeat # Tracks the time of the last received heartbeat + global is_server_available # Indicates whether the server is available + while True: + time_since_last_heartbeat = time.time() - last_heartbeat # Calculate the time since the last received heartbeat + if time_since_last_heartbeat > 7: # Check if timeout is reached + if is_server_available == True: # If the server was previously available, log that it is now unavailable + print(f"ERROR: Server is Unavailable! Waiting for connection...") + is_server_available = False # Update the server status to unavailable + else: # If the server becomes available again + if is_server_available == False: + is_server_available = True # Update the server status to available + print("Server is up and Running again.") + time.sleep(1) # Check the heartbeat status every second +########################### End - heartbeat ########################### + +###############main################################# +if __name__ == "__main__": + # Start listener thread + listen_thread = threading.Thread(target=listen_server) + listen_thread.daemon = True + listen_thread.start() + #listener_ready.wait() + heartbeat_thread = threading.Thread(target=listen_to_heartbeat) + heartbeat_thread.start() + listen_heartbeat_thread = threading.Thread(target=monitor_heartbeat) + listen_heartbeat_thread.start() + + sender_thread = threading.Thread(target=sender) + sender_thread.start() diff --git a/README.md b/README.md deleted file mode 100644 index 45f25d423890c559ccfbf39de036614487664d0c..0000000000000000000000000000000000000000 --- a/README.md +++ /dev/null @@ -1,93 +0,0 @@ -# Distributed-Systems - - - -## Getting started - -To make it easy for you to get started with GitLab, here's a list of recommended next steps. - -Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)! - -## Add your files - -- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files -- [ ] [Add files using the command line](https://docs.gitlab.com/ee/gitlab-basics/add-file.html#add-a-file-using-the-command-line) or push an existing Git repository with the following command: - -``` -cd existing_repo -git remote add origin https://gitlab.reutlingen-university.de/wikatg24/distributed-systems.git -git branch -M main -git push -uf origin main -``` - -## Integrate with your tools - -- [ ] [Set up project integrations](https://gitlab.reutlingen-university.de/wikatg24/distributed-systems/-/settings/integrations) - -## Collaborate with your team - -- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/) -- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html) -- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically) -- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/) -- [ ] [Set auto-merge](https://docs.gitlab.com/ee/user/project/merge_requests/merge_when_pipeline_succeeds.html) - -## Test and Deploy - -Use the built-in continuous integration in GitLab. - -- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/index.html) -- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing (SAST)](https://docs.gitlab.com/ee/user/application_security/sast/) -- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html) -- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/) -- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html) - -*** - -# Editing this README - -When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thanks to [makeareadme.com](https://www.makeareadme.com/) for this template. - -## Suggestions for a good README - -Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information. - -## Name -Choose a self-explaining name for your project. - -## Description -Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors. - -## Badges -On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge. - -## Visuals -Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method. - -## Installation -Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection. - -## Usage -Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README. - -## Support -Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc. - -## Roadmap -If you have ideas for releases in the future, it is a good idea to list them in the README. - -## Contributing -State if you are open to contributions and what your requirements are for accepting them. - -For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self. - -You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser. - -## Authors and acknowledgment -Show your appreciation to those who have contributed to the project. - -## License -For open source projects, say how it is licensed. - -## Project status -If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers. diff --git a/Server_25-01-25.py b/Server_25-01-25.py new file mode 100644 index 0000000000000000000000000000000000000000..78e9c4e21284ae3f13b80208947cc6c4f337943a --- /dev/null +++ b/Server_25-01-25.py @@ -0,0 +1,584 @@ + +from inspect import _empty +import time +import threading +from uuid import uuid4 +import socket +import uuid +import pickle +import json +import multiprocessing +import os +from multiprocessing import Manager +from collections import deque + +# Global variables to manage ring members and their information +global members_UUID +global members_IP +global last_heartbeat_time +# Initialize lists to keep track of members +members_UUID = [] # List for UUIDs of members in the ring +members_IP = [] # List for IP addresses of members in the ring +# Network and port configurations +broadcast_ip = "255.255.255.255" #Broadcast-adress in the Network +enter_port = 12348 #Port that is used for the discovery of new server participants +ringport = 12343 +election_port = 12345 +client_broadcast_port = 55555 #client sends, server listens +client_broadcast_port2 = 33333 #server sends, client listens +acknowledgement_port = 22222 +heartbeat_port = 44444 +heartbeat_client_broadcast_port = 11111 +# Unique identification for this server +myuuid = uuid.uuid4() #Creating a unique ip Adress using uuid4 +my_ID = str(myuuid) #Creating a unique ip Adress using uuid4 +hostname = socket.gethostname() +ip_address = socket.gethostbyname(hostname) # Retrieves the hostname of the current machine +#ip_address = "127.0.0.1" ########verwenden zum Testen auf einem Gerät +# Leader election-related variables +participating = False # Indicates whether the server is currently participating in an election +is_leader = False # Boolean flag to indicate if the server is the leader +Leader = False # Alternate flag to indicate leader status (can be consolidated with is_leader) +ELECTION = 0 # Message type for initiating an election +NEW_LEAD = 1 # Message type for announcing a new leader +leader_ip = 'unknown' # Stores the IP address of the current leader; default is 'unknown' +# Heartbeat related variables +last_three_messages = deque(maxlen=3) # Initialization of message storage (last 3 messages) +send_heartbeat = deque() +received_heartbeat = deque() +last_heartbeat_time = time.time() + +# variables for Listen to Client +hold_back_queue = deque() # A double-ended queue (deque) to store messages that need to be temporarily held back before they are processed. +processed_message_ids = set() # A set to track the IDs of messages that have already been processed. This helps avoid duplicate processing. + +########################### Start - Neighbour ########################### +def get_neighbour(members_IP, current_member_ip, direction='left'): + """ + Determines the neighbor of a server in a circular ring topology based on the direction. + """ + current_member_index = members_IP.index(current_member_ip) if current_member_ip in members_IP else -1 # Find the index of the current member in the list. If not found, set to -1. + if current_member_index != -1: # Determine the neighbor to the 'left' + if direction == 'left': # Determine the neighbor to the 'left' (next in the ring) + if current_member_index + 1 == len(members_IP): # If the current member is the last in the list, wrap around to the first + return members_IP[0] # Return the first member in the list + else: + return members_IP[current_member_index + 1] # Return the next member in the list + else: # Determine the neighbor to the 'right' + if current_member_index - 1 < 0: # If the current member is the first in the list, wrap around to the last + return members_IP[len(members_IP) - 1] # Return the last member in the list + else: + return members_IP[current_member_index - 1] # Return the previous member in the list + else: + return None # If the current member IP is not found in the list, return None +########################### End - Neighbour ########################### + +########################### Start - Acknowledgement ############################ +def send_acknowledgement(): + """ + Function for sending an acknowledgment for a received message. + """ + if len(members_IP) > 1: + right_neighbour = get_neighbour(members_IP, ip_address, 'left') # Determine the right neighbor based on the current ring structure + msg=("Your Message was received.") # Message sent as a receipt confirmation + send_ack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a UDP socket to send the acknowledgment + send_ack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + send_ack_socket.sendto(msg.encode('utf-8'),(right_neighbour,acknowledgement_port)) # Send the acknowledgment message to the right neighbor using the acknowledgement_port + send_ack_socket.close() # Close the socket after sending the message + +def receive_acknowledgement(msg, ip, so): + """ + Function for receiving acknowledgments. This section includes the system's response to messages not received between servers. + """ + global members_IP + global is_leader + global leader_ip + + if len(members_IP) > 1: + right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + recack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to receive acknowledgment messages + recack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + recack_socket.bind((ip_address,acknowledgement_port)) + recack_socket.settimeout(2) # Set a timeout for receiving acknowledgment --> Notwendig in welcher Höhe???? Testen?? + try: + data, addr = recack_socket.recvfrom(1024) # Attempt to receive a response + print(f"{addr} has received the message.") + recack_socket.close() + except socket.timeout: + print(f"No response received. Retrying message delivery.") # If no response is received, resend the message + temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Open a temporary socket for resending the original message + temp_send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + temp_send_socket.sendto(msg,(ip,so)) + temp_send_socket.close() + recack_socket.settimeout(2) # Set a new timeout for acknowledgment + try: + data, addr = recack_socket.recvfrom(1024) # Second attempt to receive a response + print(f"{addr} has received the message.") + recack_socket.close() + except socket.timeout: # If the second attempt fails, handle server failure + print(f"No response again. Server is unreachable. Triggering ring update.") + recack_socket.close() + members_IP.remove(right_neighbour) # Remove the failed server from the member list + # Check if the failed server was the leader + if len(members_IP) == 1: + # If only one server remains, it becomes the leader + if leader_ip != ip_address: # Check if the leader IP is its own; only change is_leader to True if the server was not the leader before + leader_ip = ip_address # Set leder_ip to own IP + is_leader.set_value(True) + print("I am now the last server in this ring and therefore the leader.") + else: + # Update the ring and forward necessary messages + send_update_to_ring() + if right_neighbour == leader_ip: #check if failed server was the leader + start_election() + new_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the new right neighbor with updated ring + temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + temp_send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + temp_send_socket.sendto(msg,(new_neighbour,so)) # Forward the original message to the new neighbor + temp_send_socket.close() + receive_acknowledgement(msg, new_neighbour, so) # Wait for acknowledgment from the new neighbor +########################### End - Acknowledgement ########################### + +########################### Start - Update ring ########################### +def lausche_update_Ring(): + """ + Listens for ring updates via UDP broadcasts and handles updates to the members_IP list. + """ + global members_IP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for communication + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + sock.bind((ip_address, ringport)) # Bind the socket to the servers IP address and ring port for listening + while True: + try: + data, addr = sock.recvfrom(1024) # Receive data from the socket + members_IP2 = json.loads(data.decode()) # Decode the received JSON data to update the members list + if members_IP2 == members_IP: # Check if the received members list matches the current members list + print(f"Ring update has traveled through the ring.") + send_acknowledgement() # Send an acknowledgment for the received update + else: + members_IP = members_IP2 # Update the local members list + print(f"Ring update received: {members_IP}") + send_acknowledgement() # Send an acknowledgment for the received update + send_update_to_ring() # Forward the updated member list to the next neighbor + except json.JSONDecodeError: + print("Error decoding the JSON data.") # Handle errors in decoding the JSON data + +def send_update_to_ring(): + """ + Sends the updated members list to the next server in the ring. + """ + global members_IP + right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + if len(members_IP) > 1:# If no right neighbor exists, there is no one to send the update to + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending updates + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + data = json.dumps(members_IP).encode() # Serialize the members list into JSON format + sock.sendto(data, (right_neighbour , ringport)) # Send the members list to the right neighbor + sock.close()# Close the socket to free up resources + receive_acknowledgement(data, right_neighbour , ringport) # Wait for acknowledgment from the right neighbor + except Exception as e: # Handle errors during data transmission + print(f"Error sending data to Ring: {e}") +########################### End - Update ring ########################### + +########################### Start - Server Enters ########################### +def new_server_in_ring(): + """ + This function is executed by the Leader. It listens for incoming messages from servers attempting to join the network. + The Leader maintains the list of all IP addresses in the ring and updates the topology whenever a new server joins. + Topology Updates: The Leader ensures all servers in the network are aware of the latest ring structure. + """ + global members_UUID + global members_IP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode + sock.bind(("0.0.0.0", enter_port)) # Bind the socket to the address "0.0.0.0" and the enter_port + print("Server is running and waiting for broadcast messages from new servers.") + while True: + data, addr = sock.recvfrom(1024) # Listen for messages from other servers + print(f"Message received from {addr}: {data.decode()}") + new_server_ip, new_server_port = addr # Extract the IP and port of the new server from the received address + new_IP = data.decode() + new_IP = new_IP.split(": ")[1] # Extract the IP address from the message + if new_IP in members_IP: # Check if the IP already exists. This might happen if a server temporarily lost connection. #######Tritt das überhaupt ein? In Gruppe diskutieren + f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server# If the server already exists, send the Welcome Message + else: + members_IP.append(new_IP) # If the server is new, add its IP to the list + msg = f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server + #AttributeError: 'bytes' object has no attribute 'encode'. Did you mean: 'decode'? + sock.sendto(msg, (new_IP, new_server_port)) # Send the greeting message back to the new server + print(f"The updated IP_Ring is: {members_IP}") + send_update_to_ring() # Update the ring topology + +def server_enters(): + """ + This function is used when a server wants to join the network. It sends a greeting message to the broadcast address and waits for a response from the Leader. + If no response is received, the server assumes the Leader role and starts managing the ring itself. + Broadcast Communication: This allows new servers to discover the Leader without knowing its specific IP address. + """ + global members_UUID + global leader_ip + max_retries = 3 + success = False + intervall = 2 #time to wait fpr a response + msg = f"I am new: {ip_address}".encode() # Greeting message from the new server + + for attempt in range(max_retries): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode + sock.settimeout(intervall) # Set a timeout to wait for a response --> ggf. anpassen, je nach geschwindigkeit mit Handy Internet + print(f"Sending discovery message (attempt{attempt+1}/{max_retries})") + sock.sendto(msg, (broadcast_ip, enter_port)) # Send the greeting message to the broadcast address using the enter_port + try: + while True: + data, addr = sock.recvfrom(1024) # receiving response + print(f"Answer from {addr}: {data.decode()}") + success = True + #sock.close() + my_leader = data.decode().split(": ")[1] # Extract the Leader's IP address from the response + leader_ip = my_leader # Set leder_ip to the received IP + except socket.timeout: + print(f"No response received for attempt {attempt+1}.") + if success: + print("Response received. Stopping retries.") + + break + if not success: + print(f"Keine Antwort erhalten. Ich bin jetzt der Leader. Meine IP: {ip_address}") # If no answer is received the server sets itself as leader + members_UUID.append(my_ID) # Add itself as a participant in the ring + members_IP.append(ip_address) # Add itself as a participant in the ring + sock.close() + leader_ip = ip_address # Set leder_ip to own IP + print(leader_ip) + is_leader.set_value(True) # Mark itself as the Leader + sock.close() +########################### End - Server Enters ########################### + +########################### Start - Leader Election ########################### +def start_election(): + """ + Initiates an election by sending the server's UUID and IP address to its right neighbor in the ring. + Marks the server as participating in the election process and waits for acknowledgment. + """ + global participating + right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + print("{} is starting an election.".format(myuuid)) + participating = True # Server marks itself as participating in the election + msg = f"{ELECTION}: {myuuid}: {ip_address}".encode('utf-8') # Create the election message with the server's UUID and IP address + send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to send the election message + send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + send_socket.sendto(msg,(right_neighbour,election_port)) # Send msg to the right neighbor usinf the election_port + send_socket.close() + receive_acknowledgement(msg, right_neighbour, election_port) # Wait for acknowledgment + +def accept(group,erhaltene_uuid,erhaltene_ip): + """ + Function to handle election messages and determine the next steps. + If the message is part of an election (ELECTION), the server compares UUIDs to either forward, update, or declare itself as the leader. + If the message is a new leader announcement (NEW_LEAD), it updates the local leader information and forwards the message. + Leader Election: Based on comparing UUIDs, the server with the highest UUID becomes the leader. + Acknowledgment Mechanism: Ensures that messages are received and processed reliably. + """ + global leader_ip + global is_leader + global last_heartbeat_time + global participating + right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + if group == ELECTION: # If the received message is part of an election + # Compare the received UUID with the server's own UUID + if erhaltene_uuid > myuuid: # Received UUID is greater, so forward the message without changes + print("{ip_adress}:{} is forwarding without updates.".format(myuuid)) + participating = True + msg = f"{ELECTION}: {erhaltene_uuid}: {erhaltene_ip}".encode() + send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour + send_socket.close() + receive_acknowledgement(msg, right_neighbour , election_port) + elif erhaltene_uuid < myuuid and participating==False: # Received UUID is smaller, update with the server's own UUID and forward + print("{ip_adress}: {} is updating and forwarding.".format(myuuid)) + participating = True + msg = f"{ELECTION}: {myuuid}: {ip_address}".encode() + send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour + send_socket.close() + receive_acknowledgement(msg, right_neighbour , election_port) + if erhaltene_uuid == myuuid: # If the server receives its own UUID, it becomes the leader + print("{ip_adress}: {} starts acting as a leader!".format(myuuid)) + participating = False + leader_ip = ip_address # Set leader_ip to own IP + leader = myuuid #Set leader to own uuid + msg = f"{NEW_LEAD}: {myuuid}: {ip_address}".encode() + #if leader_ip != ip_address: # Update leadership status if server was not already the leader bevor the election + is_leader.set_value(True) + send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour + send_socket.close() + receive_acknowledgement(msg, right_neighbour , election_port) + + if group == NEW_LEAD: # If the received message announces a new leader + if erhaltene_uuid == myuuid: # If the UUID matches, the server has already acknowledged + return + if erhaltene_uuid != myuuid: # Update the leader information and forward the new leader announcement + print("{} acknowledged new leader.".format(myuuid)) + if leader_ip == ip_address: # Check if this server was the Leader bevor the election and set is_leader to False + is_leader.set_value(False) + leader_ip = erhaltene_ip # Update leader_ip + leader = erhaltene_uuid # Update leader + msg = f"{NEW_LEAD}: {erhaltene_uuid}: {erhaltene_ip}".encode() + send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour + send_socket.close() + receive_acknowledgement(msg, right_neighbour , election_port) + last_heartbeat_time = time.time() + +def zuhören_election(): + """ + Listens for incoming election or leader messages on the configured election socket. + Decodes the message, sends an acknowledgment to the sender, and processes the message via the accept() function. + """ + #sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages + #sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + #sock.bind((ip_address,election_port)) # Bind to the election socket + while True: + sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + sock.bind((ip_address,election_port)) + data,addr=sock.recvfrom(4096) # Receive data from other servers + übernahme = data.decode('utf-8') # Decode the received message + grouprec = int(übernahme.split(": ")[0]) # Extract group ID, UUID, and IP address from the message + erhaltene_ip = (übernahme.split(": ")[2]) + erhaltene_uuid2 = uuid.UUID((übernahme.split(": ")[1])) + sock.close() + send_acknowledgement() # Send acknowledgment back to the sender + accept(grouprec,erhaltene_uuid2,erhaltene_ip) # Process the election or new leader message +########################### End - Leader Election ########################### + +########################### Start - Process client messages ########################### +def process_hold_back_queue(): + """ + Processes messages in the hold-back queue in the correct order. + """ + while hold_back_queue: + message = hold_back_queue.popleft() # Remove the oldest message from the queue + message_id, decoded_message = message + if message_id not in processed_message_ids: + # Process or forward the message + print(f"Processing message: {decoded_message}") + broadcast(decoded_message) # Forward the message to all participants + processed_message_ids.add(message_id) # Mark the message as processed + +def broadcast(message): + """ + Sends a message to all participants in the network via broadcast. + """ + server_socket2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + server_socket2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode + full_message = f"{message}".encode() # Encode the message + server_socket2.sendto(full_message, (broadcast_ip, client_broadcast_port2)) # Send the broadcast message + server_socket2.close() + +def listen_client(): + """ + Listens for messages from clients, processes them, and broadcasts them to other participants. + """ + global last_three_messages + server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode + server_socket.bind(('', client_broadcast_port)) # Bind to the broadcast port + while True: # Wait to receive a message from a client + try: + message, client_address = server_socket.recvfrom(4096) # Receive a message from a client + decoded_message = message.decode() # Decode the message + last_three_messages.append(decoded_message) # Store the message for Leader heartbeat + #print(f"Message stored: {last_three_messages}") # Only for debugging + message_id = decoded_message.split(":")[0] # Extract the unique message ID (UUID) from the decoded message + if message_id in processed_message_ids: # Check if the message has already been processed + continue # Skip if the message was already processed + hold_back_queue.append((message_id, decoded_message)) # Add the message to the hold-back queue + process_hold_back_queue() # Process messages in the hold-back queue + except socket.error as e: # Handle socket errors + print(f"An error occurred while listening: {e}") + break + except KeyboardInterrupt: # Handle server shutdown via keyboard interrupt ????? Funktioniert das?? + print("\nShutting down server...") + break +########################### End - Process client messages ########################### + +########################### Start - Leader Heartbeat ################################# +def forward_received_heartbeat_to_clients(): + """ + Forwards the last 3 received heartbeat messages separately to clients via UDP broadcast. + """ + server_socket3 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + server_socket3.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode + print("Starting to send the messages from the heartbeat.") + for message in received_heartbeat: + try: + print(f"Forwarding message: {message}") # Print the message being forwarded for debugging + full_message = f"{message}".encode() # Encode the message + server_socket3.sendto(full_message, (broadcast_ip, client_broadcast_port2)) # Send the broadcast message + except Exception as e: + print(f"Error forwarding message: {e}") + server_socket3.close() # Close the socket to free up resources + +def listen_heartbeat(): + """ + Listens for the leader's heartbeat and forwards it to the right neighbor in the ring. + """ + global received_heartbeat + global last_heartbeat_time + #sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for for receiving heartbeats + #sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + #sock.bind((ip_address, heartbeat_port)) # Bind the socket to the servers IP address and heartbeat port for listening + while True: # Receive data from the socket + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for for receiving heartbeats + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + sock.bind((ip_address, heartbeat_port)) # Bind the socket to the servers IP address and heartbeat port for listening + data, addr = sock.recvfrom(1024) # Receive data from the socket + print("Leader heartbeat received.") + sock.close() + send_acknowledgement() # Send acknowledgment for the received message + received_heartbeat = pickle.loads(data) # Deserialize the received data + #print(received_heartbeat) # Debugging. Print the received messages + last_heartbeat_time = time.time() # Update the last heartbeat timestamp + if leader_ip == ip_address: # If the server is the leader + if send_heartbeat==received_heartbeat: + print(f"My heartbeat has traveled through the ring.") + else: + print(f"Received a foreign heartbeat. I am forwarding the messages.") + forward_received_heartbeat_to_clients() + else: # Forward the heartbeat to the right neighbor + print("Heartbeat is forwarded to the neigbbor.") + right_neighbour = get_neighbour(members_IP, ip_address, 'right') + msg = pickle.dumps(received_heartbeat) + try: + sock_heart = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending Hearbeat + sock_heart.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + sock_heart.sendto(msg, (right_neighbour , heartbeat_port)) # Send heartbeat to the right neighbor + sock_heart.close() # Close the socket to free up resources + receive_acknowledgement(msg, right_neighbour , heartbeat_port) # Wait for acknowledgment from the right neighbor + + except Exception as e: # Handle errors during data transmission + print(f"Error sending data lsiten heartbeat: {e}") + + +def leader_send_heartbeat(): + """ + Sends the Heartbeat to the next server in the ring. + """ + global send_heartbeat + global last_heartbeat_time + while True: + send_heartbeat = last_three_messages # Set the send_heartbeat as the last three messages + msg = pickle.dumps(send_heartbeat) # Serialize the messages + right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure + if len(members_IP) == 1: # If only this server is part of the Ring it does not have to send an Heartbeat + print("I am the only server. There is no other Server to send a heartbeat to.") + last_heartbeat_time = time.time() # Update the last heartbeat timestamp + else: # Send the heartbeat to the right neighbor + try: + sock_heart_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending updates + sock_heart_send.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + sock_heart_send.sendto(msg, (right_neighbour , heartbeat_port)) # Send the members list to the right neighbor + sock_heart_send.close() + receive_acknowledgement(msg, right_neighbour , heartbeat_port) # Wait for acknowledgment from the right neighbor + except Exception as e: # Handle errors during data transmission + print(f"Error sending data Leader_send_heartbeat: {e}") + sock_heart_send.close() # Close the socket to free up resources + time.sleep(3) # Wait before sending the next heartbeat + +def monitor_heartbeat(): ################ Achtung Testen, ob auch der Leader in ein timeout kommen kann... ggf. vor removal f leader IP einfügen####### + """ + Monitors whether the last heartbeat was received within the timeout period. + """ + global members_IP + global leader_ip + global last_heartbeat_time + while True: + time_since_last_heartbeat = time.time() - last_heartbeat_time # Calculate the time since the last heartbeat + if time_since_last_heartbeat > 11: + print(f"ERROR: No heartbeat received for 11 seconds! Leader is down.") + members_IP.remove(leader_ip) # Remove the Leader server from the member list + if len(members_IP) == 1: # If only one server remains, it becomes the leader + print("I am now the last server in this ring and therefore the leader.") + leader_ip = ip_address # Update the leader IP to the current server + is_leader.set_value(True) + else: + send_update_to_ring() # Update the ring and forward necessary messages + start_election() # Start leader election to determine new Leader + last_heartbeat_time = time.time() # Update the last heartbeat timestamp + time.sleep(1) # Check for heartbeat status every second +########################### End - Leader Heartbeat ############################ + +########################### Start - Client Heartbeat ########################### +def leader_send_Client_heartbeat(): + """ + Broadcasts a heartbeat message to the clients, indicating the server is available. + """ + while True: + msg = ("Server is available.") # Define the heartbeat message + print("I am sending a heartbeat to the client.") # Debugging message indicating that the server is sending a heartbeat + server_client_heartbeat = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket + server_client_heartbeat.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode + server_client_heartbeat.sendto(msg.encode('utf-8'), (broadcast_ip, heartbeat_client_broadcast_port)) # # Send the broadcast message to the specified broadcast IP and port + server_client_heartbeat.close()# Close the socket to free up resources + time.sleep(2) # Wait for 15 seconds before sending the next heartbeat +########################### End - Client Heartbeat ########################### + +########################### Start - observation value changes leader ########################### +class VariableWatcher: # A utility class designed to monitor a variable's value and notify registered observers whenever the value changes. + def __init__(self): + self._value = None # Initializes the variable's value as None + self.observers = [] # Creates a list to store observers + def set_value(self, new_value): # Updates the variable and notifies observers. + self._value = new_value # Updates the variable's value + self.notify(new_value) # Notifies all observers about the new value + def add_observer(self, observer): + self.observers.append(observer) # Adds an observer to the list + def notify(self, new_value): + for observer in self.observers: # Iterates through all registered observers + observer(new_value) # Calls each observer with the new value + +def callback(new_value): + """ + This function is triggered when the `is_leader` value changes. + """ + if new_value: + print("I am now taking over leader responsibilities.") + thread7 = threading.Thread(target=listen_client) # Listens to client messages + thread8 = threading.Thread(target=new_server_in_ring) # Listens for election messages + thread9 = threading.Thread(target= leader_send_Client_heartbeat) # Start sending heartbeats to the client + thread10 = threading.Thread(target= leader_send_heartbeat) + thread7.start() + thread8.start() + thread9.start() + if len(received_heartbeat) > 0: # Check if there are values in received_heartbeat + print("I am sending the last 3 messages of the leader heartbeat to the clients.") + forward_received_heartbeat_to_clients() # Forward the heartbeat to the clients. + thread10.start() # Leader starts sending its heartbeat to its right neighbour. + else: + print("I am no longer the leader.") +########################### End - observation value changes leader ########################### + +####################################################################### + +if __name__ == "__main__": + # Create threads for different server operations + thread1 = threading.Thread(target=server_enters) # Handles server entry to the ring + thread2 = threading.Thread(target=lausche_update_Ring) # Listens for ring updates + thread4 = threading.Thread(target=zuhören_election) # Listens for election messages + thread5 = threading.Thread(target=listen_heartbeat) # Listens for leader heartbeat + thread6 = threading.Thread(target=monitor_heartbeat) # Monitord the leader heartbeat + # Start all threads + thread1.start() + thread2.start() + thread4.start() + thread5.start() + thread6.start() + + is_leader = VariableWatcher() # Create an instance of VariableWatcher to observe changes in leader status + is_leader.add_observer(callback) # Add the callback function as an observer for changes in `is_leader` \ No newline at end of file