Skip to content
Snippets Groups Projects
Commit f260e147 authored by Michelle Fahrner's avatar Michelle Fahrner
Browse files

Upload New File

parent ba56a077
1 merge request!1Feat/finalfinal
import time
import threading
from uuid import uuid4
import socket
import uuid
import json
import neighbour
import multiprocessing
import os
from multiprocessing import Manager
from collections import deque
# Global variables to manage ring members and their information
global members_UUID
global members_IP
# Initialize lists to keep track of members
members_UUID = [] # List for UUIDs of members in the ring
members_IP = [] # List for IP addresses of members in the ring
# Network and port configurations
broadcast_ip = "255.255.255.255" #Broadcast-adress in the Network
enter_port = 12348 #Port that is used for the discovery of new server participants
ringport = 12343
election_port = 12345
acknowledgement_port = 22222
# Unique identification for this server
myuuid = uuid.uuid4() #Creating a unique ip Adress using uuid4
my_ID = str(myuuid) #Creating a unique ip Adress using uuid4
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname) # Retrieves the hostname of the current machine
#ip_address = "127.0.0.2" ########verwenden zum Testen auf einem Gerät
# Leader election-related variables
participating = False # Indicates whether the server is currently participating in an election
is_leader = False # Boolean flag to indicate if the server is the leader
Leader = False # Alternate flag to indicate leader status (can be consolidated with is_leader)
ELECTION = 0 # Message type for initiating an election
NEW_LEAD = 1 # Message type for announcing a new leader
leader_ip = 'unknown' # Stores the IP address of the current leader; default is 'unknown'
########################### Start - Acknowledgement ############################
def send_acknowledgement():
"""
Function for sending an acknowledgment for a received message.
"""
right_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
msg=("Your Message was received.") # Message sent as a receipt confirmation
send_ack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a UDP socket to send the acknowledgment
send_ack_socket.sendto(msg.encode('utf-8'),(right_neighbour,acknowledgement_port)) # Send the acknowledgment message to the right neighbor using the acknowledgement_port
send_ack_socket.close() # Close the socket after sending the message
def receive_acknowledgement(msg, ip, so):
"""
Function for receiving acknowledgments. This section includes the system's response to messages not received between servers.
"""
global members_IP
global is_leader
right_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
recack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to receive acknowledgment messages
recack_socket.bind((ip_address,acknowledgement_port))
recack_socket.settimeout(1) # Set a timeout for receiving acknowledgment --> Notwendig in welcher Höhe???? Testen??
try:
data, addr = recack_socket.recvfrom(1024) # Attempt to receive a response
print(f"{addr} has received the message.")
recack_socket.close()
except socket.timeout:
print(f"No response received. Retrying message delivery.") # If no response is received, resend the message
temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Open a temporary socket for resending the original message
temp_send_socket.sendto(msg,(ip,so))
temp_send_socket.close()
recack_socket.settimeout(1) # Set a new timeout for acknowledgment
try:
data, addr = recack_socket.recvfrom(1024) # Second attempt to receive a response
print(f"{addr} has received the message.")
recack_socket.close()
except socket.timeout: # If the second attempt fails, handle server failure
print(f"No response again. Server is unreachable. Triggering ring update.")
recack_socket.close()
members_IP.remove(right_neighbour) # Remove the failed server from the member list
# Check if the failed server was the leader
if len(members_IP) == 1:
# If only one server remains, it becomes the leader
if leader_ip != ip_address: # Check if the leader IP is its own; only change is_leader to True if the server was not the leader before
is_leader.set_value(True)
print("I am now the last server in this ring and therefore the leader.")
else:
# Update the ring and forward necessary messages
send_update_to_ring()
if right_neighbour == leader_ip: #check if failed server was the leader
# Start a new election since the leader is down
start_election()
#####################################################################Funktion Leader is Down einfügen#####################################
new_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the new right neighbor with updated ring
temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
temp_send_socket.sendto(msg,(new_neighbour,so)) # Forward the original message to the new neighbor
temp_send_socket.close()
receive_acknowledgement(msg, new_neighbour, so) # Wait for acknowledgment from the new neighbor
########################### End - Acknowledgement ###########################
########################### Start - Update ring ###########################
def lausche_update_Ring():
"""
Listens for ring updates via UDP broadcasts and handles updates to the members_IP list.
"""
global members_IP
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for communication
sock.bind((ip_address, ringport)) # Bind the socket to the servers IP address and ring port for listening
while True:
try:
data, addr = sock.recvfrom(1024) # Receive data from the socket
members_IP2 = json.loads(data.decode()) # Decode the received JSON data to update the members list
if members_IP2 == members_IP: # Check if the received members list matches the current members list
print(f"Ring update has traveled through the ring.")
send_acknowledgement() # Send an acknowledgment for the received update
else:
members_IP = members_IP2 # Update the local members list
print(f"Ring update received: {members_IP}")
send_acknowledgement() # Send an acknowledgment for the received update
send_update_to_ring() # Forward the updated member list to the next neighbor
except json.JSONDecodeError:
print("Error decoding the JSON data.") # Handle errors in decoding the JSON data
def send_update_to_ring():
"""
Sends the updated members list to the next server in the ring.
"""
global members_IP
right_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
if not right_neighbour: # If no right neighbor exists, there is no one to send the update to
print("No left neighbour to send updates.")
return
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for sending updates
data = json.dumps(members_IP).encode() # Serialize the members list into JSON format
try:
sock.sendto(data, (right_neighbour , ringport)) # Send the members list to the right neighbor
receive_acknowledgement(data, right_neighbour , ringport) # Wait for acknowledgment from the right neighbor
except Exception as e: # Handle errors during data transmission
print(f"Error sending data: {e}")
sock.close() # Close the socket to free up resources
########################### End - Update ring ###########################
########################### Start - Server Enters ###########################
def new_server_in_ring():
"""
This function is executed by the Leader. It listens for incoming messages from servers attempting to join the network.
The Leader maintains the list of all IP addresses in the ring and updates the topology whenever a new server joins.
Topology Updates: The Leader ensures all servers in the network are aware of the latest ring structure.
"""
global members_UUID
global members_IP
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode
sock.bind(("0.0.0.0", enter_port)) # Bind the socket to the address "0.0.0.0" and the enter_port
print("Server is running and waiting for broadcast messages from new servers.")
while True:
data, addr = sock.recvfrom(1024) # Listen for messages from other servers
print(f"Message received from {addr}: {data.decode()}")
new_server_ip, new_server_port = addr # Extract the IP and port of the new server from the received address
new_IP = data.decode()
new_IP = new_IP.split(": ")[1] # Extract the IP address from the message
if new_IP in members_IP: # Check if the IP already exists. This might happen if a server temporarily lost connection. #######Tritt das überhaupt ein? In Gruppe diskutieren
msg = json.dumps(members_IP).encode() # If the server already exists, send the updated member list
else:
members_IP.append(new_IP) # If the server is new, add its IP to the list
msg = f"There are already servers. I am the leader: {ip_address}" # Create a message for the new server
sock.sendto(msg.encode(), (new_IP, new_server_port)) # Send the greeting message back to the new server
print(f"The updated IP_Ring is: {members_IP}")
send_update_to_ring() # Update the ring topology
def server_enters():
"""
This function is used when a server wants to join the network. It sends a greeting message to the broadcast address and waits for a response from the Leader.
If no response is received, the server assumes the Leader role and starts managing the ring itself.
Broadcast Communication: This allows new servers to discover the Leader without knowing its specific IP address.
"""
global members_UUID
global leader_ip
msg = f"I am new: {ip_address}".encode() # Greeting message from the new server
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode
sock.sendto(msg, (broadcast_ip, enter_port)) # Send the greeting message to the broadcast address using the enter_port
sock.settimeout(1) # Set a timeout to wait for a response --> ggf. anpassen, je nach geschwindigkeit mit Handy Internet
try:
data, addr = sock.recvfrom(1024) # receiving response
print(f"Antwort von {addr}: {data.decode()}")
sock.close()
my_leader = data.decode().split(": ")[1] # Extract the Leader's IP address from the response
leader_ip = my_leader # Set leder_ip to the received IP
except socket.timeout:
print(f"Keine Antwort erhalten. Ich bin jetzt der Leader. Meine IP: {ip_address}") # If no answer is received the server sets itself as leader
members_UUID.append(my_ID) # Add itself as a participant in the ring
members_IP.append(ip_address) # Add itself as a participant in the ring
sock.close()
is_leader.set_value(True) # Mark itself as the Leader
leader_ip = ip_address # Set leder_ip to own IP
new_server_in_ring() # Start the function to manage new servers in the ring --> Achtung nur für tests! #####################################################################
########################### End - Server Enters ###########################
########################### Start - Leader Election ###########################
def start_election():
"""
Initiates an election by sending the server's UUID and IP address to its right neighbor in the ring.
Marks the server as participating in the election process and waits for acknowledgment.
"""
right_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
print("{} is starting an election.".format(myuuid))
participating = True # Server marks itself as participating in the election
msg = f"{ELECTION}: {myuuid}: {ip_address}".encode('utf-8') # Create the election message with the server's UUID and IP address
send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to send the election message
send_socket.sendto(msg,(right_neighbour,election_port)) # Send msg to the right neighbor usinf the election_port
send_socket.close()
receive_acknowledgement(msg, right_neighbour, election_port) # Wait for acknowledgment
def accept(group,erhaltene_uuid,erhaltene_ip):
"""
Function to handle election messages and determine the next steps.
If the message is part of an election (ELECTION), the server compares UUIDs to either forward, update, or declare itself as the leader.
If the message is a new leader announcement (NEW_LEAD), it updates the local leader information and forwards the message.
Leader Election: Based on comparing UUIDs, the server with the highest UUID becomes the leader.
Acknowledgment Mechanism: Ensures that messages are received and processed reliably.
"""
global leader_ip
global is_leader
right_neighbour = neighbour.get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
if group == ELECTION: # If the received message is part of an election
# Compare the received UUID with the server's own UUID
if erhaltene_uuid > myuuid: # Received UUID is greater, so forward the message without changes
print("{} is forwarding without updates.".format(myuuid))
participating = True
msg = f"{ELECTION}: {erhaltene_uuid}: {erhaltene_ip}".encode()
if erhaltene_uuid < myuuid: # Received UUID is smaller, update with the server's own UUID and forward
print("{} is updating and forwarding.".format(myuuid))
participating = True
msg = f"{ELECTION}: {myuuid}: {ip_address}".encode()
if erhaltene_uuid == myuuid: # If the server receives its own UUID, it becomes the leader
print("{} starts acting as a leader!".format(myuuid))
participating = False
if leader_ip != ip_address: # Update leadership status if server was not already the leader bevor the election
is_leader.set_value(True)
leader_ip = ip_address # Set leader_ip to own IP
leader = myuuid #Set leader to own uuid
msg = f"{NEW_LEAD}: {myuuid}: {ip_address}".encode()
if group == NEW_LEAD: # If the received message announces a new leader
if erhaltene_uuid == myuuid: # If the UUID matches, the server has already acknowledged
return
if erhaltene_uuid != myuuid: # Update the leader information and forward the new leader announcement
print("{} acknowledged new leader.".format(myuuid))
if leader_ip == ip_address: # Check if this server was the Leader bevor the election and set is_leader to False
is_leader.set_value(False)
leader_ip = erhaltene_ip # Update leader_ip
leader = erhaltene_uuid # Update leader
msg = f"{NEW_LEAD}: {erhaltene_uuid}: {erhaltene_ip}".encode()
send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour
send_socket.close()
receive_acknowledgement(msg, right_neighbour , election_port)
def zuhören_election():
"""
Listens for incoming election or leader messages on the configured election socket.
Decodes the message, sends an acknowledgment to the sender, and processes the message via the accept() function.
"""
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages
sock.bind((ip_address,election_port)) # Bind to the election socket
while True:
data,addr=sock.recvfrom(4096) # Receive data from other servers
übernahme = data.decode('utf-8') # Decode the received message
grouprec = int(übernahme.split(": ")[0]) # Extract group ID, UUID, and IP address from the message
erhaltene_ip = (übernahme.split(": ")[2])
erhaltene_uuid2 = uuid.UUID((übernahme.split(": ")[1]))
send_acknowledgement() # Send acknowledgment back to the sender
accept(grouprec,erhaltene_uuid2,erhaltene_ip) # Process the election or new leader message
########################### End - Leader Election ###########################
########################### Start - observation value changes leader ###########################
class VariableWatcher: # A utility class designed to monitor a variable's value and notify registered observers whenever the value changes.
def __init__(self):
self._value = None # Initializes the variable's value as None
self.observers = [] # Creates a list to store observers
def set_value(self, new_value): # Updates the variable and notifies observers.
self._value = new_value # Updates the variable's value
self.notify(new_value) # Notifies all observers about the new value
def add_observer(self, observer):
self.observers.append(observer) # Adds an observer to the list
def notify(self, new_value):
for observer in self.observers: # Iterates through all registered observers
observer(new_value) # Calls each observer with the new value
def callback(new_value):
"""
This function is triggered when the `is_leader` value changes.
"""
print(f"Variable value changed to: {new_value}") # Prints the updated value
if new_value==True:
print("I am now taking over leader responsibilities.")
#new_server_in_ring()
else:
print("I am no longer the leader.")
########################### End - observation value changes leader ###########################
########################### Only for Testing ##########################
#######################################################################
#######################################################################
def frage_benutzer(): # A test function that prompts the user to decide whether to execute the election process
antwort = input("Möchten Sie die Funktion ausführen? (Ja/Nein): ").strip().lower()
if antwort == 'ja':
start_election()
else:
print("Die Funktion wurde nicht ausgeführt.")
#######################################################################
#######################################################################
#######################################################################
if __name__ == "__main__":
# Create threads for different server operations
thread1 = threading.Thread(target=server_enters) # Handles server entry to the ring
thread2 = threading.Thread(target=lausche_update_Ring) # Listens for ring updates
thread3 = threading.Thread(target=frage_benutzer) # Prompts the user for action --> Only for testing!
thread4 = threading.Thread(target=zuhören_election) # Listens for election messages
# Start all threads
thread1.start()
thread2.start()
thread3.start()
thread4.start()
is_leader = VariableWatcher() # Create an instance of VariableWatcher to observe changes in leader status
is_leader.add_observer(callback) # Add the callback function as an observer for changes in `is_leader`
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment