Skip to content
Snippets Groups Projects
Commit 61126f94 authored by Katharina Willig's avatar Katharina Willig
Browse files

Merge branch 'feat/finalfinal' into 'main'

Feat/finalfinal

See merge request !1
parents 282f133e d88370dd
No related branches found
No related tags found
1 merge request!1Feat/finalfinal
import socket
import threading
import time
import uuid
import sys
import os
broadcast_ip = '255.255.255.255' #Broadcast-adress in the Network
broadcast_port = 55555 #client sends
broadcast_port2 = 33333 #client listens
heartbeat_client_broadcast_port = 11111 # Defined heartbeat port
#local host information
MY_HOST = socket.gethostname()
MY_IP = socket.gethostbyname(MY_HOST)
#Variables for server heartbeat
is_server_available = True
last_heartbeat = time.time()
processed_message_ids = set() # A set to track the IDs of messages that have already been processed. This helps avoid duplicate processing.
listener_ready = threading.Event()
########################### Start - Receiving MSG ###########################
def listen_server():
"""
Listens for messages broadcasted by the server and processes them.
"""
client_socket2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket to listen for incoming messages
client_socket2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode for the socket
client_socket2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Allow the socket to reuse the same address
client_socket2.bind(('', broadcast_port2)) # Bind the socket to the broadcast port and listen on all available IP addresses
"""receives messages from server."""
global message_id # Tracks the ID of the current message
while True:
try:
data, address = client_socket2.recvfrom(4096) # Receive data from the socket
decoded_message = data.decode() # Decode the received message
text = decoded_message
received_uuid, rest_of_text = text.split(":", 1) # Split the message into UUID (unique identifier) and the rest of the text
if received_uuid not in processed_message_ids: # Process the message if it hasn't been processed yet
processed_message_ids.add(received_uuid) # Mark the message as processed
#print(f"Received {data.decode()} from {address}") # Only for Debugging
print(rest_of_text) # Display the message content
#else: ################### Only for debugging
# print("Message ist Doppelt") ################### Only for debugging
except socket.error as e: # Handle and log any errors while listening for messages
print(f"An error occurred while listening: {e}")
continue
########################### End - Receiving MSG ###########################
########################### Start - Sending MSG ###########################
def sender():
"""
Allows the user to compose and send messages to the chat.
"""
global message_id # Tracks the unique ID of each message
message_id = str(uuid.uuid4()) # Generate a unique ID for the user's entry message
nickname = input("Enter your nickname: ") # Prompt the user to enter their nickname
just_nickname= f"{message_id}:{nickname} entered the chat".encode() # Create the initial "entered the chat" message
# create client-socket for broadcast
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) #Send the "entered the chat" message to the broadcast address
processed_message_ids.add(message_id) # Mark the message ID as processed #######+ Können wir als Rückmeldung auch ausgeben lassen
try:
while True:
message = input("") # Allow the user to enter a message
if message.strip(): # Ensure the message is not empty or whitespace
message_id = str(uuid.uuid4()) # Generate a new unique ID for this message
full_message = f"{message_id}:{nickname}: {message}".encode() # Format the message with the user's nickname and content
client_socket.sendto(full_message, (broadcast_ip, broadcast_port)) # Send the message to the broadcast address
processed_message_ids.add(message_id) # Mark the message ID as processed
except KeyboardInterrupt: #does not work
# Handle when the user presses Ctrl+C
print(f"\n{nickname} left the chat.")
# Notify others that this client left the chat
message_id = str(uuid.uuid4()) # New message ID
leave_message = f"{nickname} left the chat".encode()
client_socket.sendto(leave_message, (broadcast_ip, broadcast_port))
time.sleep(2) # Wait briefly before closing the socket
client_socket.close() # Close the socket
sys.exit() # Exit the program
########################### End - Sending MSG ###########################
########################### Start - heartbeat ###########################
def listen_to_heartbeat():
"""
Listens for heartbeat messages from the server and updates the timestamp of the last received heartbeat.
"""
global last_heartbeat # Tracks the time of the last received heartbeat
global is_server_available # Indicates whether the server is available
client_heartbeat_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket to listen for incoming heartbeat messages
client_heartbeat_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode for the socket
client_heartbeat_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Allow the socket to reuse the same address
client_heartbeat_socket.bind(('', heartbeat_client_broadcast_port)) # Bind the socket to the broadcast port and listen on all available IP addresses
while True:
try: # Wait for incoming heartbeat messages
data, addr = client_heartbeat_socket.recvfrom(1024) # Receive data from the socket
#print("Server heartbeat received.") # Log that the server's heartbeat was received - Only for De bugging
last_heartbeat = time.time() # Update the timestamp for the last received heartbeat
except Exception as e: # Handle and log any errors during data reception
print(f"Error sending data: {e}")
def monitor_heartbeat():
"""
Monitors whether the last heartbeat was received within the timeout period and updates server availability status.
"""
global last_heartbeat # Tracks the time of the last received heartbeat
global is_server_available # Indicates whether the server is available
while True:
time_since_last_heartbeat = time.time() - last_heartbeat # Calculate the time since the last received heartbeat
if time_since_last_heartbeat > 7: # Check if timeout is reached
if is_server_available == True: # If the server was previously available, log that it is now unavailable
print(f"ERROR: Server is Unavailable! Waiting for connection...")
is_server_available = False # Update the server status to unavailable
else: # If the server becomes available again
if is_server_available == False:
is_server_available = True # Update the server status to available
print("Server is up and Running again.")
time.sleep(1) # Check the heartbeat status every second
########################### End - heartbeat ###########################
###############main#################################
if __name__ == "__main__":
# Start listener thread
listen_thread = threading.Thread(target=listen_server)
listen_thread.daemon = True
listen_thread.start()
#listener_ready.wait()
heartbeat_thread = threading.Thread(target=listen_to_heartbeat)
heartbeat_thread.start()
listen_heartbeat_thread = threading.Thread(target=monitor_heartbeat)
listen_heartbeat_thread.start()
sender_thread = threading.Thread(target=sender)
sender_thread.start()
# Distributed-Systems
## Getting started
To make it easy for you to get started with GitLab, here's a list of recommended next steps.
Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)!
## Add your files
- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files
- [ ] [Add files using the command line](https://docs.gitlab.com/ee/gitlab-basics/add-file.html#add-a-file-using-the-command-line) or push an existing Git repository with the following command:
```
cd existing_repo
git remote add origin https://gitlab.reutlingen-university.de/wikatg24/distributed-systems.git
git branch -M main
git push -uf origin main
```
## Integrate with your tools
- [ ] [Set up project integrations](https://gitlab.reutlingen-university.de/wikatg24/distributed-systems/-/settings/integrations)
## Collaborate with your team
- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/)
- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html)
- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically)
- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/)
- [ ] [Set auto-merge](https://docs.gitlab.com/ee/user/project/merge_requests/merge_when_pipeline_succeeds.html)
## Test and Deploy
Use the built-in continuous integration in GitLab.
- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/index.html)
- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing (SAST)](https://docs.gitlab.com/ee/user/application_security/sast/)
- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html)
- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/)
- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html)
***
# Editing this README
When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thanks to [makeareadme.com](https://www.makeareadme.com/) for this template.
## Suggestions for a good README
Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information.
## Name
Choose a self-explaining name for your project.
## Description
Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors.
## Badges
On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge.
## Visuals
Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method.
## Installation
Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection.
## Usage
Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README.
## Support
Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc.
## Roadmap
If you have ideas for releases in the future, it is a good idea to list them in the README.
## Contributing
State if you are open to contributions and what your requirements are for accepting them.
For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self.
You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser.
## Authors and acknowledgment
Show your appreciation to those who have contributed to the project.
## License
For open source projects, say how it is licensed.
## Project status
If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers.
from inspect import _empty
import time
import threading
from uuid import uuid4
import socket
import uuid
import pickle
import json
import multiprocessing
import os
from multiprocessing import Manager
from collections import deque
# Global variables to manage ring members and their information
global members_UUID
global members_IP
global last_heartbeat_time
# Initialize lists to keep track of members
members_UUID = [] # List for UUIDs of members in the ring
members_IP = [] # List for IP addresses of members in the ring
# Network and port configurations
broadcast_ip = "255.255.255.255" #Broadcast-adress in the Network
enter_port = 12348 #Port that is used for the discovery of new server participants
ringport = 12343
election_port = 12345
client_broadcast_port = 55555 #client sends, server listens
client_broadcast_port2 = 33333 #server sends, client listens
acknowledgement_port = 22222
heartbeat_port = 44444
heartbeat_client_broadcast_port = 11111
# Unique identification for this server
myuuid = uuid.uuid4() #Creating a unique ip Adress using uuid4
my_ID = str(myuuid) #Creating a unique ip Adress using uuid4
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname) # Retrieves the hostname of the current machine
#ip_address = "127.0.0.1" ########verwenden zum Testen auf einem Gerät
# Leader election-related variables
participating = False # Indicates whether the server is currently participating in an election
is_leader = False # Boolean flag to indicate if the server is the leader
Leader = False # Alternate flag to indicate leader status (can be consolidated with is_leader)
ELECTION = 0 # Message type for initiating an election
NEW_LEAD = 1 # Message type for announcing a new leader
leader_ip = 'unknown' # Stores the IP address of the current leader; default is 'unknown'
# Heartbeat related variables
last_three_messages = deque(maxlen=3) # Initialization of message storage (last 3 messages)
send_heartbeat = deque()
received_heartbeat = deque()
last_heartbeat_time = time.time()
# variables for Listen to Client
hold_back_queue = deque() # A double-ended queue (deque) to store messages that need to be temporarily held back before they are processed.
processed_message_ids = set() # A set to track the IDs of messages that have already been processed. This helps avoid duplicate processing.
########################### Start - Neighbour ###########################
def get_neighbour(members_IP, current_member_ip, direction='left'):
"""
Determines the neighbor of a server in a circular ring topology based on the direction.
"""
current_member_index = members_IP.index(current_member_ip) if current_member_ip in members_IP else -1 # Find the index of the current member in the list. If not found, set to -1.
if current_member_index != -1: # Determine the neighbor to the 'left'
if direction == 'left': # Determine the neighbor to the 'left' (next in the ring)
if current_member_index + 1 == len(members_IP): # If the current member is the last in the list, wrap around to the first
return members_IP[0] # Return the first member in the list
else:
return members_IP[current_member_index + 1] # Return the next member in the list
else: # Determine the neighbor to the 'right'
if current_member_index - 1 < 0: # If the current member is the first in the list, wrap around to the last
return members_IP[len(members_IP) - 1] # Return the last member in the list
else:
return members_IP[current_member_index - 1] # Return the previous member in the list
else:
return None # If the current member IP is not found in the list, return None
########################### End - Neighbour ###########################
########################### Start - Acknowledgement ############################
def send_acknowledgement():
"""
Function for sending an acknowledgment for a received message.
"""
if len(members_IP) > 1:
right_neighbour = get_neighbour(members_IP, ip_address, 'left') # Determine the right neighbor based on the current ring structure
msg=("Your Message was received.") # Message sent as a receipt confirmation
send_ack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a UDP socket to send the acknowledgment
send_ack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
send_ack_socket.sendto(msg.encode('utf-8'),(right_neighbour,acknowledgement_port)) # Send the acknowledgment message to the right neighbor using the acknowledgement_port
send_ack_socket.close() # Close the socket after sending the message
def receive_acknowledgement(msg, ip, so):
"""
Function for receiving acknowledgments. This section includes the system's response to messages not received between servers.
"""
global members_IP
global is_leader
global leader_ip
if len(members_IP) > 1:
right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
recack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to receive acknowledgment messages
recack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
recack_socket.bind((ip_address,acknowledgement_port))
recack_socket.settimeout(2) # Set a timeout for receiving acknowledgment --> Notwendig in welcher Höhe???? Testen??
try:
data, addr = recack_socket.recvfrom(1024) # Attempt to receive a response
print(f"{addr} has received the message.")
recack_socket.close()
except socket.timeout:
print(f"No response received. Retrying message delivery.") # If no response is received, resend the message
temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Open a temporary socket for resending the original message
temp_send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
temp_send_socket.sendto(msg,(ip,so))
temp_send_socket.close()
recack_socket.settimeout(2) # Set a new timeout for acknowledgment
try:
data, addr = recack_socket.recvfrom(1024) # Second attempt to receive a response
print(f"{addr} has received the message.")
recack_socket.close()
except socket.timeout: # If the second attempt fails, handle server failure
print(f"No response again. Server is unreachable. Triggering ring update.")
recack_socket.close()
members_IP.remove(right_neighbour) # Remove the failed server from the member list
# Check if the failed server was the leader
if len(members_IP) == 1:
# If only one server remains, it becomes the leader
if leader_ip != ip_address: # Check if the leader IP is its own; only change is_leader to True if the server was not the leader before
leader_ip = ip_address # Set leder_ip to own IP
is_leader.set_value(True)
print("I am now the last server in this ring and therefore the leader.")
else:
# Update the ring and forward necessary messages
send_update_to_ring()
if right_neighbour == leader_ip: #check if failed server was the leader
start_election()
new_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the new right neighbor with updated ring
temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
temp_send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
temp_send_socket.sendto(msg,(new_neighbour,so)) # Forward the original message to the new neighbor
temp_send_socket.close()
receive_acknowledgement(msg, new_neighbour, so) # Wait for acknowledgment from the new neighbor
########################### End - Acknowledgement ###########################
########################### Start - Update ring ###########################
def lausche_update_Ring():
"""
Listens for ring updates via UDP broadcasts and handles updates to the members_IP list.
"""
global members_IP
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for communication
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
sock.bind((ip_address, ringport)) # Bind the socket to the servers IP address and ring port for listening
while True:
try:
data, addr = sock.recvfrom(1024) # Receive data from the socket
members_IP2 = json.loads(data.decode()) # Decode the received JSON data to update the members list
if members_IP2 == members_IP: # Check if the received members list matches the current members list
print(f"Ring update has traveled through the ring.")
send_acknowledgement() # Send an acknowledgment for the received update
else:
members_IP = members_IP2 # Update the local members list
print(f"Ring update received: {members_IP}")
send_acknowledgement() # Send an acknowledgment for the received update
send_update_to_ring() # Forward the updated member list to the next neighbor
except json.JSONDecodeError:
print("Error decoding the JSON data.") # Handle errors in decoding the JSON data
def send_update_to_ring():
"""
Sends the updated members list to the next server in the ring.
"""
global members_IP
right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
if len(members_IP) > 1:# If no right neighbor exists, there is no one to send the update to
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending updates
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
data = json.dumps(members_IP).encode() # Serialize the members list into JSON format
sock.sendto(data, (right_neighbour , ringport)) # Send the members list to the right neighbor
sock.close()# Close the socket to free up resources
receive_acknowledgement(data, right_neighbour , ringport) # Wait for acknowledgment from the right neighbor
except Exception as e: # Handle errors during data transmission
print(f"Error sending data to Ring: {e}")
########################### End - Update ring ###########################
########################### Start - Server Enters ###########################
def new_server_in_ring():
"""
This function is executed by the Leader. It listens for incoming messages from servers attempting to join the network.
The Leader maintains the list of all IP addresses in the ring and updates the topology whenever a new server joins.
Topology Updates: The Leader ensures all servers in the network are aware of the latest ring structure.
"""
global members_UUID
global members_IP
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode
sock.bind(("0.0.0.0", enter_port)) # Bind the socket to the address "0.0.0.0" and the enter_port
print("Server is running and waiting for broadcast messages from new servers.")
while True:
data, addr = sock.recvfrom(1024) # Listen for messages from other servers
print(f"Message received from {addr}: {data.decode()}")
new_server_ip, new_server_port = addr # Extract the IP and port of the new server from the received address
new_IP = data.decode()
new_IP = new_IP.split(": ")[1] # Extract the IP address from the message
if new_IP in members_IP: # Check if the IP already exists. This might happen if a server temporarily lost connection. #######Tritt das überhaupt ein? In Gruppe diskutieren
f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server# If the server already exists, send the Welcome Message
else:
members_IP.append(new_IP) # If the server is new, add its IP to the list
msg = f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server
#AttributeError: 'bytes' object has no attribute 'encode'. Did you mean: 'decode'?
sock.sendto(msg, (new_IP, new_server_port)) # Send the greeting message back to the new server
print(f"The updated IP_Ring is: {members_IP}")
send_update_to_ring() # Update the ring topology
def server_enters():
"""
This function is used when a server wants to join the network. It sends a greeting message to the broadcast address and waits for a response from the Leader.
If no response is received, the server assumes the Leader role and starts managing the ring itself.
Broadcast Communication: This allows new servers to discover the Leader without knowing its specific IP address.
"""
global members_UUID
global leader_ip
max_retries = 3
success = False
intervall = 2 #time to wait fpr a response
msg = f"I am new: {ip_address}".encode() # Greeting message from the new server
for attempt in range(max_retries):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode
sock.settimeout(intervall) # Set a timeout to wait for a response --> ggf. anpassen, je nach geschwindigkeit mit Handy Internet
print(f"Sending discovery message (attempt{attempt+1}/{max_retries})")
sock.sendto(msg, (broadcast_ip, enter_port)) # Send the greeting message to the broadcast address using the enter_port
try:
while True:
data, addr = sock.recvfrom(1024) # receiving response
print(f"Answer from {addr}: {data.decode()}")
success = True
#sock.close()
my_leader = data.decode().split(": ")[1] # Extract the Leader's IP address from the response
leader_ip = my_leader # Set leder_ip to the received IP
except socket.timeout:
print(f"No response received for attempt {attempt+1}.")
if success:
print("Response received. Stopping retries.")
break
if not success:
print(f"Keine Antwort erhalten. Ich bin jetzt der Leader. Meine IP: {ip_address}") # If no answer is received the server sets itself as leader
members_UUID.append(my_ID) # Add itself as a participant in the ring
members_IP.append(ip_address) # Add itself as a participant in the ring
sock.close()
leader_ip = ip_address # Set leder_ip to own IP
print(leader_ip)
is_leader.set_value(True) # Mark itself as the Leader
sock.close()
########################### End - Server Enters ###########################
########################### Start - Leader Election ###########################
def start_election():
"""
Initiates an election by sending the server's UUID and IP address to its right neighbor in the ring.
Marks the server as participating in the election process and waits for acknowledgment.
"""
global participating
right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
print("{} is starting an election.".format(myuuid))
participating = True # Server marks itself as participating in the election
msg = f"{ELECTION}: {myuuid}: {ip_address}".encode('utf-8') # Create the election message with the server's UUID and IP address
send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to send the election message
send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
send_socket.sendto(msg,(right_neighbour,election_port)) # Send msg to the right neighbor usinf the election_port
send_socket.close()
receive_acknowledgement(msg, right_neighbour, election_port) # Wait for acknowledgment
def accept(group,erhaltene_uuid,erhaltene_ip):
"""
Function to handle election messages and determine the next steps.
If the message is part of an election (ELECTION), the server compares UUIDs to either forward, update, or declare itself as the leader.
If the message is a new leader announcement (NEW_LEAD), it updates the local leader information and forwards the message.
Leader Election: Based on comparing UUIDs, the server with the highest UUID becomes the leader.
Acknowledgment Mechanism: Ensures that messages are received and processed reliably.
"""
global leader_ip
global is_leader
global last_heartbeat_time
global participating
right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
if group == ELECTION: # If the received message is part of an election
# Compare the received UUID with the server's own UUID
if erhaltene_uuid > myuuid: # Received UUID is greater, so forward the message without changes
print("{ip_adress}:{} is forwarding without updates.".format(myuuid))
participating = True
msg = f"{ELECTION}: {erhaltene_uuid}: {erhaltene_ip}".encode()
send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour
send_socket.close()
receive_acknowledgement(msg, right_neighbour , election_port)
elif erhaltene_uuid < myuuid and participating==False: # Received UUID is smaller, update with the server's own UUID and forward
print("{ip_adress}: {} is updating and forwarding.".format(myuuid))
participating = True
msg = f"{ELECTION}: {myuuid}: {ip_address}".encode()
send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour
send_socket.close()
receive_acknowledgement(msg, right_neighbour , election_port)
if erhaltene_uuid == myuuid: # If the server receives its own UUID, it becomes the leader
print("{ip_adress}: {} starts acting as a leader!".format(myuuid))
participating = False
leader_ip = ip_address # Set leader_ip to own IP
leader = myuuid #Set leader to own uuid
msg = f"{NEW_LEAD}: {myuuid}: {ip_address}".encode()
#if leader_ip != ip_address: # Update leadership status if server was not already the leader bevor the election
is_leader.set_value(True)
send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour
send_socket.close()
receive_acknowledgement(msg, right_neighbour , election_port)
if group == NEW_LEAD: # If the received message announces a new leader
if erhaltene_uuid == myuuid: # If the UUID matches, the server has already acknowledged
return
if erhaltene_uuid != myuuid: # Update the leader information and forward the new leader announcement
print("{} acknowledged new leader.".format(myuuid))
if leader_ip == ip_address: # Check if this server was the Leader bevor the election and set is_leader to False
is_leader.set_value(False)
leader_ip = erhaltene_ip # Update leader_ip
leader = erhaltene_uuid # Update leader
msg = f"{NEW_LEAD}: {erhaltene_uuid}: {erhaltene_ip}".encode()
send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour
send_socket.close()
receive_acknowledgement(msg, right_neighbour , election_port)
last_heartbeat_time = time.time()
def zuhören_election():
"""
Listens for incoming election or leader messages on the configured election socket.
Decodes the message, sends an acknowledgment to the sender, and processes the message via the accept() function.
"""
#sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages
#sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
#sock.bind((ip_address,election_port)) # Bind to the election socket
while True:
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
sock.bind((ip_address,election_port))
data,addr=sock.recvfrom(4096) # Receive data from other servers
übernahme = data.decode('utf-8') # Decode the received message
grouprec = int(übernahme.split(": ")[0]) # Extract group ID, UUID, and IP address from the message
erhaltene_ip = (übernahme.split(": ")[2])
erhaltene_uuid2 = uuid.UUID((übernahme.split(": ")[1]))
sock.close()
send_acknowledgement() # Send acknowledgment back to the sender
accept(grouprec,erhaltene_uuid2,erhaltene_ip) # Process the election or new leader message
########################### End - Leader Election ###########################
########################### Start - Process client messages ###########################
def process_hold_back_queue():
"""
Processes messages in the hold-back queue in the correct order.
"""
while hold_back_queue:
message = hold_back_queue.popleft() # Remove the oldest message from the queue
message_id, decoded_message = message
if message_id not in processed_message_ids:
# Process or forward the message
print(f"Processing message: {decoded_message}")
broadcast(decoded_message) # Forward the message to all participants
processed_message_ids.add(message_id) # Mark the message as processed
def broadcast(message):
"""
Sends a message to all participants in the network via broadcast.
"""
server_socket2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket
server_socket2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode
full_message = f"{message}".encode() # Encode the message
server_socket2.sendto(full_message, (broadcast_ip, client_broadcast_port2)) # Send the broadcast message
server_socket2.close()
def listen_client():
"""
Listens for messages from clients, processes them, and broadcasts them to other participants.
"""
global last_three_messages
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode
server_socket.bind(('', client_broadcast_port)) # Bind to the broadcast port
while True: # Wait to receive a message from a client
try:
message, client_address = server_socket.recvfrom(4096) # Receive a message from a client
decoded_message = message.decode() # Decode the message
last_three_messages.append(decoded_message) # Store the message for Leader heartbeat
#print(f"Message stored: {last_three_messages}") # Only for debugging
message_id = decoded_message.split(":")[0] # Extract the unique message ID (UUID) from the decoded message
if message_id in processed_message_ids: # Check if the message has already been processed
continue # Skip if the message was already processed
hold_back_queue.append((message_id, decoded_message)) # Add the message to the hold-back queue
process_hold_back_queue() # Process messages in the hold-back queue
except socket.error as e: # Handle socket errors
print(f"An error occurred while listening: {e}")
break
except KeyboardInterrupt: # Handle server shutdown via keyboard interrupt ????? Funktioniert das??
print("\nShutting down server...")
break
########################### End - Process client messages ###########################
########################### Start - Leader Heartbeat #################################
def forward_received_heartbeat_to_clients():
"""
Forwards the last 3 received heartbeat messages separately to clients via UDP broadcast.
"""
server_socket3 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket
server_socket3.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode
print("Starting to send the messages from the heartbeat.")
for message in received_heartbeat:
try:
print(f"Forwarding message: {message}") # Print the message being forwarded for debugging
full_message = f"{message}".encode() # Encode the message
server_socket3.sendto(full_message, (broadcast_ip, client_broadcast_port2)) # Send the broadcast message
except Exception as e:
print(f"Error forwarding message: {e}")
server_socket3.close() # Close the socket to free up resources
def listen_heartbeat():
"""
Listens for the leader's heartbeat and forwards it to the right neighbor in the ring.
"""
global received_heartbeat
global last_heartbeat_time
#sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for for receiving heartbeats
#sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
#sock.bind((ip_address, heartbeat_port)) # Bind the socket to the servers IP address and heartbeat port for listening
while True: # Receive data from the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for for receiving heartbeats
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
sock.bind((ip_address, heartbeat_port)) # Bind the socket to the servers IP address and heartbeat port for listening
data, addr = sock.recvfrom(1024) # Receive data from the socket
print("Leader heartbeat received.")
sock.close()
send_acknowledgement() # Send acknowledgment for the received message
received_heartbeat = pickle.loads(data) # Deserialize the received data
#print(received_heartbeat) # Debugging. Print the received messages
last_heartbeat_time = time.time() # Update the last heartbeat timestamp
if leader_ip == ip_address: # If the server is the leader
if send_heartbeat==received_heartbeat:
print(f"My heartbeat has traveled through the ring.")
else:
print(f"Received a foreign heartbeat. I am forwarding the messages.")
forward_received_heartbeat_to_clients()
else: # Forward the heartbeat to the right neighbor
print("Heartbeat is forwarded to the neigbbor.")
right_neighbour = get_neighbour(members_IP, ip_address, 'right')
msg = pickle.dumps(received_heartbeat)
try:
sock_heart = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending Hearbeat
sock_heart.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
sock_heart.sendto(msg, (right_neighbour , heartbeat_port)) # Send heartbeat to the right neighbor
sock_heart.close() # Close the socket to free up resources
receive_acknowledgement(msg, right_neighbour , heartbeat_port) # Wait for acknowledgment from the right neighbor
except Exception as e: # Handle errors during data transmission
print(f"Error sending data lsiten heartbeat: {e}")
def leader_send_heartbeat():
"""
Sends the Heartbeat to the next server in the ring.
"""
global send_heartbeat
global last_heartbeat_time
while True:
send_heartbeat = last_three_messages # Set the send_heartbeat as the last three messages
msg = pickle.dumps(send_heartbeat) # Serialize the messages
right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
if len(members_IP) == 1: # If only this server is part of the Ring it does not have to send an Heartbeat
print("I am the only server. There is no other Server to send a heartbeat to.")
last_heartbeat_time = time.time() # Update the last heartbeat timestamp
else: # Send the heartbeat to the right neighbor
try:
sock_heart_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending updates
sock_heart_send.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
sock_heart_send.sendto(msg, (right_neighbour , heartbeat_port)) # Send the members list to the right neighbor
sock_heart_send.close()
receive_acknowledgement(msg, right_neighbour , heartbeat_port) # Wait for acknowledgment from the right neighbor
except Exception as e: # Handle errors during data transmission
print(f"Error sending data Leader_send_heartbeat: {e}")
sock_heart_send.close() # Close the socket to free up resources
time.sleep(3) # Wait before sending the next heartbeat
def monitor_heartbeat(): ################ Achtung Testen, ob auch der Leader in ein timeout kommen kann... ggf. vor removal f leader IP einfügen#######
"""
Monitors whether the last heartbeat was received within the timeout period.
"""
global members_IP
global leader_ip
global last_heartbeat_time
while True:
time_since_last_heartbeat = time.time() - last_heartbeat_time # Calculate the time since the last heartbeat
if time_since_last_heartbeat > 11:
print(f"ERROR: No heartbeat received for 11 seconds! Leader is down.")
members_IP.remove(leader_ip) # Remove the Leader server from the member list
if len(members_IP) == 1: # If only one server remains, it becomes the leader
print("I am now the last server in this ring and therefore the leader.")
leader_ip = ip_address # Update the leader IP to the current server
is_leader.set_value(True)
else:
send_update_to_ring() # Update the ring and forward necessary messages
start_election() # Start leader election to determine new Leader
last_heartbeat_time = time.time() # Update the last heartbeat timestamp
time.sleep(1) # Check for heartbeat status every second
########################### End - Leader Heartbeat ############################
########################### Start - Client Heartbeat ###########################
def leader_send_Client_heartbeat():
"""
Broadcasts a heartbeat message to the clients, indicating the server is available.
"""
while True:
msg = ("Server is available.") # Define the heartbeat message
print("I am sending a heartbeat to the client.") # Debugging message indicating that the server is sending a heartbeat
server_client_heartbeat = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket
server_client_heartbeat.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode
server_client_heartbeat.sendto(msg.encode('utf-8'), (broadcast_ip, heartbeat_client_broadcast_port)) # # Send the broadcast message to the specified broadcast IP and port
server_client_heartbeat.close()# Close the socket to free up resources
time.sleep(2) # Wait for 15 seconds before sending the next heartbeat
########################### End - Client Heartbeat ###########################
########################### Start - observation value changes leader ###########################
class VariableWatcher: # A utility class designed to monitor a variable's value and notify registered observers whenever the value changes.
def __init__(self):
self._value = None # Initializes the variable's value as None
self.observers = [] # Creates a list to store observers
def set_value(self, new_value): # Updates the variable and notifies observers.
self._value = new_value # Updates the variable's value
self.notify(new_value) # Notifies all observers about the new value
def add_observer(self, observer):
self.observers.append(observer) # Adds an observer to the list
def notify(self, new_value):
for observer in self.observers: # Iterates through all registered observers
observer(new_value) # Calls each observer with the new value
def callback(new_value):
"""
This function is triggered when the `is_leader` value changes.
"""
if new_value:
print("I am now taking over leader responsibilities.")
thread7 = threading.Thread(target=listen_client) # Listens to client messages
thread8 = threading.Thread(target=new_server_in_ring) # Listens for election messages
thread9 = threading.Thread(target= leader_send_Client_heartbeat) # Start sending heartbeats to the client
thread10 = threading.Thread(target= leader_send_heartbeat)
thread7.start()
thread8.start()
thread9.start()
if len(received_heartbeat) > 0: # Check if there are values in received_heartbeat
print("I am sending the last 3 messages of the leader heartbeat to the clients.")
forward_received_heartbeat_to_clients() # Forward the heartbeat to the clients.
thread10.start() # Leader starts sending its heartbeat to its right neighbour.
else:
print("I am no longer the leader.")
########################### End - observation value changes leader ###########################
#######################################################################
if __name__ == "__main__":
# Create threads for different server operations
thread1 = threading.Thread(target=server_enters) # Handles server entry to the ring
thread2 = threading.Thread(target=lausche_update_Ring) # Listens for ring updates
thread4 = threading.Thread(target=zuhören_election) # Listens for election messages
thread5 = threading.Thread(target=listen_heartbeat) # Listens for leader heartbeat
thread6 = threading.Thread(target=monitor_heartbeat) # Monitord the leader heartbeat
# Start all threads
thread1.start()
thread2.start()
thread4.start()
thread5.start()
thread6.start()
is_leader = VariableWatcher() # Create an instance of VariableWatcher to observe changes in leader status
is_leader.add_observer(callback) # Add the callback function as an observer for changes in `is_leader`
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment