Determines the neighbor of a server in a circular ring topology based on the direction.
"""
current_member_index=members_IP.index(current_member_ip)ifcurrent_member_ipinmembers_IPelse-1# Find the index of the current member in the list. If not found, set to -1.
ifcurrent_member_index!=-1:# Determine the neighbor to the 'left'
ifdirection=='left':# Determine the neighbor to the 'left' (next in the ring)
ifcurrent_member_index+1==len(members_IP):# If the current member is the last in the list, wrap around to the first
returnmembers_IP[0]# Return the first member in the list
else:
returnmembers_IP[current_member_index+1]# Return the next member in the list
else:# Determine the neighbor to the 'right'
ifcurrent_member_index-1<0:# If the current member is the first in the list, wrap around to the last
returnmembers_IP[len(members_IP)-1]# Return the last member in the list
else:
returnmembers_IP[current_member_index-1]# Return the previous member in the list
else:
returnNone# If the current member IP is not found in the list, return None
########################### End - Neighbour ###########################
Function for sending an acknowledgment for a received message.
"""
iflen(members_IP)>1:
right_neighbour=get_neighbour(members_IP,ip_address,'left')# Determine the right neighbor based on the current ring structure
msg=("Your Message was received.")# Message sent as a receipt confirmation
send_ack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)# Create a UDP socket to send the acknowledgment
send_ack_socket.sendto(msg.encode('utf-8'),(right_neighbour,acknowledgement_port))# Send the acknowledgment message to the right neighbor using the acknowledgement_port
send_ack_socket.close()# Close the socket after sending the message
defreceive_acknowledgement(msg,ip,so):
"""
Function for receiving acknowledgments. This section includes the system's response to messages not received between servers.
"""
globalmembers_IP
globalis_leader
globalleader_ip
iflen(members_IP)>1:
right_neighbour=get_neighbour(members_IP,ip_address,'right')# Determine the right neighbor based on the current ring structure
recack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)# Create a socket to receive acknowledgment messages
data=json.dumps(members_IP).encode()# Serialize the members list into JSON format
sock.sendto(data,(right_neighbour,ringport))# Send the members list to the right neighbor
sock.close()# Close the socket to free up resources
receive_acknowledgement(data,right_neighbour,ringport)# Wait for acknowledgment from the right neighbor
exceptExceptionase:# Handle errors during data transmission
print(f"Error sending data to Ring: {e}")
########################### End - Update ring ###########################
########################### Start - Server Enters ###########################
defnew_server_in_ring():
"""
This function is executed by the Leader. It listens for incoming messages from servers attempting to join the network.
The Leader maintains the list of all IP addresses in the ring and updates the topology whenever a new server joins.
Topology Updates: The Leader ensures all servers in the network are aware of the latest ring structure.
"""
globalmembers_UUID
globalmembers_IP
sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)# Create a UDP socket
sock.setsockopt(socket.SOL_SOCKET,socket.SO_BROADCAST,1)# Set the socket to broadcast mode
sock.bind(("0.0.0.0",enter_port))# Bind the socket to the address "0.0.0.0" and the enter_port
print("Server is running and waiting for broadcast messages from new servers.")
whileTrue:
data,addr=sock.recvfrom(1024)# Listen for messages from other servers
print(f"Message received from {addr}: {data.decode()}")
new_server_ip,new_server_port=addr# Extract the IP and port of the new server from the received address
new_IP=data.decode()
new_IP=new_IP.split(": ")[1]# Extract the IP address from the message
ifnew_IPinmembers_IP:# Check if the IP already exists. This might happen if a server temporarily lost connection. #######Tritt das überhaupt ein? In Gruppe diskutieren
msg=json.dumps(members_IP).encode()# If the server already exists, send the updated member list
else:
members_IP.append(new_IP)# If the server is new, add its IP to the list
msg=f"There are already servers. I am your leader: {ip_address}".encode()# Create a message for the new server
#AttributeError: 'bytes' object has no attribute 'encode'. Did you mean: 'decode'?
sock.sendto(msg,(new_IP,new_server_port))# Send the greeting message back to the new server
print(f"The updated IP_Ring is: {members_IP}")
send_update_to_ring()# Update the ring topology
defserver_enters():
"""
This function is used when a server wants to join the network. It sends a greeting message to the broadcast address and waits for a response from the Leader.
If no response is received, the server assumes the Leader role and starts managing the ring itself.
Broadcast Communication: This allows new servers to discover the Leader without knowing its specific IP address.
"""
globalmembers_UUID
globalleader_ip
msg=f"I am new: {ip_address}".encode()# Greeting message from the new server
sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)# Create a UDP socket
sock.setsockopt(socket.SOL_SOCKET,socket.SO_BROADCAST,1)# Set the socket to broadcast mode
sock.sendto(msg,(broadcast_ip,enter_port))# Send the greeting message to the broadcast address using the enter_port
sock.settimeout(5)# Set a timeout to wait for a response --> ggf. anpassen, je nach geschwindigkeit mit Handy Internet
try:
data,addr=sock.recvfrom(1024)# receiving response
print(f"Antwort von {addr}: {data.decode()}")
sock.close()
my_leader=data.decode().split(": ")[1]# Extract the Leader's IP address from the response
leader_ip=my_leader# Set leder_ip to the received IP
exceptsocket.timeout:
print(f"Keine Antwort erhalten. Ich bin jetzt der Leader. Meine IP: {ip_address}")# If no answer is received the server sets itself as leader
members_UUID.append(my_ID)# Add itself as a participant in the ring
members_IP.append(ip_address)# Add itself as a participant in the ring
sock.close()
leader_ip=ip_address# Set leder_ip to own IP
print(leader_ip)
is_leader.set_value(True)# Mark itself as the Leader
########################### End - Server Enters ###########################
server_client_heartbeat.sendto(msg.encode('utf-8'),(broadcast_ip,heartbeat_client_broadcast_port))# # Send the broadcast message to the specified broadcast IP and port
server_client_heartbeat.close()# Close the socket to free up resources
time.sleep(5)# Wait for 15 seconds before sending the next heartbeat
########################### End - Client Heartbeat ###########################
########################### Start - observation value changes leader ###########################
classVariableWatcher:# A utility class designed to monitor a variable's value and notify registered observers whenever the value changes.
def__init__(self):
self._value=None# Initializes the variable's value as None
self.observers=[]# Creates a list to store observers
defset_value(self,new_value):# Updates the variable and notifies observers.
self._value=new_value# Updates the variable's value
self.notify(new_value)# Notifies all observers about the new value
defadd_observer(self,observer):
self.observers.append(observer)# Adds an observer to the list
defnotify(self,new_value):
forobserverinself.observers:# Iterates through all registered observers
observer(new_value)# Calls each observer with the new value
defcallback(new_value):
"""
This function is triggered when the `is_leader` value changes.
"""
ifnew_value:
print("I am now taking over leader responsibilities.")
thread7=threading.Thread(target=listen_client)# Listens to client messages
thread8=threading.Thread(target=new_server_in_ring)# Listens for election messages
thread9=threading.Thread(target=leader_send_Client_heartbeat)# Start sending heartbeats to the client