Skip to content
Snippets Groups Projects
Commit be4acda2 authored by Katharina's avatar Katharina
Browse files

delete and add comments

parent 38448f8d
No related merge requests found
......@@ -19,7 +19,7 @@ last_heartbeat = time.time()
processed_message_ids = set() # A set to track the IDs of messages that have already been processed. This helps avoid duplicate processing.
listener_ready = threading.Event()
########################### Start - Receiving MSG ###########################
########################### Start - Receiving messages ###########################
def listen_server():
"""
Listens for messages broadcasted by the server and processes them.
......@@ -38,10 +38,7 @@ def listen_server():
received_uuid, rest_of_text = text.split(":", 1) # Split the message into UUID (unique identifier) and the rest of the text
if received_uuid not in processed_message_ids: # Process the message if it hasn't been processed yet
processed_message_ids.add(received_uuid) # Mark the message as processed
#print(f"Received {data.decode()} from {address}") # Only for Debugging
print(rest_of_text) # Display the message content
#else: ################### Only for debugging
# print("Message ist Doppelt") ################### Only for debugging
except socket.error as e: # Handle and log any errors while listening for messages
print(f"An error occurred while listening: {e}")
continue
......@@ -61,7 +58,7 @@ def sender():
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port)) #Send the "entered the chat" message to the broadcast address
processed_message_ids.add(message_id) # Mark the message ID as processed #######+ Können wir als Rückmeldung auch ausgeben lassen
processed_message_ids.add(message_id) # Mark the message ID as processed
try:
while True:
message = input("") # Allow the user to enter a message
......@@ -96,7 +93,6 @@ def listen_to_heartbeat():
while True:
try: # Wait for incoming heartbeat messages
data, addr = client_heartbeat_socket.recvfrom(1024) # Receive data from the socket
#print("Server heartbeat received.") # Log that the server's heartbeat was received - Only for De bugging
last_heartbeat = time.time() # Update the timestamp for the last received heartbeat
except Exception as e: # Handle and log any errors during data reception
print(f"Error sending data: {e}")
......
......@@ -15,25 +15,29 @@ from collections import deque
global members_UUID
global members_IP
global last_heartbeat_time
# Initialize lists to keep track of members
members_UUID = [] # List for UUIDs of members in the ring
members_IP = [] # List for IP addresses of members in the ring
# Network and port configurations
broadcast_ip = "255.255.255.255" #Broadcast-adress in the Network
enter_port = 12348 #Port that is used for the discovery of new server participants
ringport = 12343
election_port = 12345
ringport = 12343 #Port that is used for the ring updates
election_port = 12345 #Port that is used for the leader election
client_broadcast_port = 55555 #client sends, server listens
client_broadcast_port2 = 33333 #server sends, client listens
acknowledgement_port = 22222
heartbeat_port = 44444
heartbeat_client_broadcast_port = 11111
acknowledgement_port = 22222 #port to send acknowledgements
heartbeat_port = 44444 #Port to send the leader heartbeat
heartbeat_client_broadcast_port = 11111 #Port to send heartbeat to the client
# Unique identification for this server
myuuid = uuid.uuid4() #Creating a unique ip Adress using uuid4
my_ID = str(myuuid) #Creating a unique ip Adress using uuid4
hostname = socket.gethostname()
hostname = socket.gethostname() #get the hostname of the current machine
ip_address = socket.gethostbyname(hostname) # Retrieves the hostname of the current machine
#ip_address = "127.0.0.1" ########verwenden zum Testen auf einem Gerät
#ip_address = "127.0.0.1" #for testing at one host
# Leader election-related variables
participating = False # Indicates whether the server is currently participating in an election
is_leader = False # Boolean flag to indicate if the server is the leader
......@@ -41,10 +45,11 @@ Leader = False # Alternate flag to indicate leader status (can be consolidated w
ELECTION = 0 # Message type for initiating an election
NEW_LEAD = 1 # Message type for announcing a new leader
leader_ip = 'unknown' # Stores the IP address of the current leader; default is 'unknown'
# Heartbeat related variables
last_three_messages = deque(maxlen=3) # Initialization of message storage (last 3 messages)
send_heartbeat = deque()
received_heartbeat = deque()
last_three_messages = deque(maxlen=3) # Initialization of message storage (last 3 messages), double-ended queue
send_heartbeat = deque() #double-ended queue
received_heartbeat = deque() #double-ended queue
last_heartbeat_time = time.time()
# variables for Listen to Client
......@@ -81,7 +86,7 @@ def send_acknowledgement():
right_neighbour = get_neighbour(members_IP, ip_address, 'left') # Determine the right neighbor based on the current ring structure
msg=("Your Message was received.") # Message sent as a receipt confirmation
send_ack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a UDP socket to send the acknowledgment
send_ack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
send_ack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) #reuse of the address
send_ack_socket.sendto(msg.encode('utf-8'),(right_neighbour,acknowledgement_port)) # Send the acknowledgment message to the right neighbor using the acknowledgement_port
send_ack_socket.close() # Close the socket after sending the message
......@@ -96,9 +101,9 @@ def receive_acknowledgement(msg, ip, so):
if len(members_IP) > 1:
right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
recack_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to receive acknowledgment messages
recack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
recack_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) #reuse address
recack_socket.bind((ip_address,acknowledgement_port))
recack_socket.settimeout(2) # Set a timeout for receiving acknowledgment --> Notwendig in welcher Höhe???? Testen??
recack_socket.settimeout(2) # Set a timeout for receiving acknowledgment
try:
data, addr = recack_socket.recvfrom(1024) # Attempt to receive a response
print(f"{addr} has received the message.")
......@@ -129,7 +134,7 @@ def receive_acknowledgement(msg, ip, so):
# Update the ring and forward necessary messages
send_update_to_ring()
if right_neighbour == leader_ip: #check if failed server was the leader
start_election()
start_election() #is it was the leader, then start a new election
new_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the new right neighbor with updated ring
temp_send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
temp_send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
......@@ -158,7 +163,7 @@ def listen_update_Ring():
members_IP = members_IP2 # Update the local members list
print(f"Ring update received: {members_IP}")
send_acknowledgement() # Send an acknowledgment for the received update
send_update_to_ring() # Forward the updated member list to the next neighbor
send_update_to_ring() # Forward the updated members list to the next neighbor
except json.JSONDecodeError:
print("Error decoding the JSON data.") # Handle errors in decoding the JSON data
......@@ -199,12 +204,12 @@ def new_server_in_ring():
new_server_ip, new_server_port = addr # Extract the IP and port of the new server from the received address
new_IP = data.decode()
new_IP = new_IP.split(": ")[1] # Extract the IP address from the message
if new_IP in members_IP: # Check if the IP already exists. This might happen if a server temporarily lost connection. #######Tritt das überhaupt ein? In Gruppe diskutieren
f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server# If the server already exists, send the Welcome Message
if new_IP in members_IP: # Check if the IP already exists. This might happen if a server temporarily lost connection.
f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server. If the server already exists, send the Welcome Message
else:
members_IP.append(new_IP) # If the server is new, add its IP to the list
msg = f"There are already servers. I am your leader: {ip_address}".encode() # Create a message for the new server
#AttributeError: 'bytes' object has no attribute 'encode'. Did you mean: 'decode'?
sock.sendto(msg, (new_IP, new_server_port)) # Send the greeting message back to the new server
print(f"The updated IP_Ring is: {members_IP}")
send_update_to_ring() # Update the ring topology
......@@ -217,15 +222,15 @@ def server_enters():
"""
global members_UUID
global leader_ip
max_retries = 3
max_retries = 3 #retry of the sending for a maximum of 3 times
success = False
intervall = 2 #time to wait fpr a response
intervall = 2 #time to wait for a response
msg = f"I am new: {ip_address}".encode() # Greeting message from the new server
for attempt in range(max_retries):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Set the socket to broadcast mode
sock.settimeout(intervall) # Set a timeout to wait for a response --> ggf. anpassen, je nach geschwindigkeit mit Handy Internet
sock.settimeout(intervall) # Set a timeout to wait for a response
print(f"Sending discovery message (attempt{attempt+1}/{max_retries})")
sock.sendto(msg, (broadcast_ip, enter_port)) # Send the greeting message to the broadcast address using the enter_port
try:
......@@ -243,9 +248,9 @@ def server_enters():
break
if not success:
print(f"Keine Antwort erhalten. Ich bin jetzt der Leader. Meine IP: {ip_address}") # If no answer is received the server sets itself as leader
members_UUID.append(my_ID) # Add itself as a participant in the ring
members_IP.append(ip_address) # Add itself as a participant in the ring
print(f"No response received. I am now the leader. My IP: {ip_address}") # If no answer is received the server sets itself as leader
members_UUID.append(my_ID) # Add itself as a participant in the ring, UUID
members_IP.append(ip_address) # Add itself as a participant in the ring, IP
sock.close()
leader_ip = ip_address # Set leder_ip to own IP
print(leader_ip)
......@@ -266,11 +271,11 @@ def start_election():
msg = f"{ELECTION}: {myuuid}: {ip_address}".encode('utf-8') # Create the election message with the server's UUID and IP address
send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to send the election message
send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
send_socket.sendto(msg,(right_neighbour,election_port)) # Send msg to the right neighbor usinf the election_port
send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to the right neighbor using the election_port
send_socket.close()
receive_acknowledgement(msg, right_neighbour, election_port) # Wait for acknowledgment
def accept(group,erhaltene_uuid,erhaltene_ip):
def accept(group,received_uuid,received_ip):
"""
Function to handle election messages and determine the next steps.
If the message is part of an election (ELECTION), the server compares UUIDs to either forward, update, or declare itself as the leader.
......@@ -285,16 +290,16 @@ def accept(group,erhaltene_uuid,erhaltene_ip):
right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
if group == ELECTION: # If the received message is part of an election
# Compare the received UUID with the server's own UUID
if erhaltene_uuid > myuuid: # Received UUID is greater, so forward the message without changes
if received_uuid > myuuid: # Received UUID is greater, so forward the message without changes
print("{ip_adress}:{} is forwarding without updates.".format(myuuid))
participating = True
msg = f"{ELECTION}: {erhaltene_uuid}: {erhaltene_ip}".encode()
msg = f"{ELECTION}: {received_uuid}: {received_ip}".encode()
send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour
send_socket.close()
receive_acknowledgement(msg, right_neighbour , election_port)
elif erhaltene_uuid < myuuid and participating==False: # Received UUID is smaller, update with the server's own UUID and forward
elif received_uuid < myuuid and participating==False: # Received UUID is smaller, update with the server's own UUID and forward
print("{ip_adress}: {} is updating and forwarding.".format(myuuid))
participating = True
msg = f"{ELECTION}: {myuuid}: {ip_address}".encode()
......@@ -303,7 +308,7 @@ def accept(group,erhaltene_uuid,erhaltene_ip):
send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour
send_socket.close()
receive_acknowledgement(msg, right_neighbour , election_port)
if erhaltene_uuid == myuuid: # If the server receives its own UUID, it becomes the leader
if received_uuid == myuuid: # If the server receives its own UUID, it becomes the leader
print("{ip_adress}: {} starts acting as a leader!".format(myuuid))
participating = False
leader_ip = ip_address # Set leader_ip to own IP
......@@ -318,15 +323,15 @@ def accept(group,erhaltene_uuid,erhaltene_ip):
receive_acknowledgement(msg, right_neighbour , election_port)
if group == NEW_LEAD: # If the received message announces a new leader
if erhaltene_uuid == myuuid: # If the UUID matches, the server has already acknowledged
if received_uuid == myuuid: # If the UUID matches, the server has already acknowledged
return
if erhaltene_uuid != myuuid: # Update the leader information and forward the new leader announcement
if received_uuid != myuuid: # Update the leader information and forward the new leader announcement
print("{} acknowledged new leader.".format(myuuid))
if leader_ip == ip_address: # Check if this server was the Leader bevor the election and set is_leader to False
if leader_ip == ip_address: # Check if this server was the Leader before the election and set is_leader to False
is_leader.set_value(False)
leader_ip = erhaltene_ip # Update leader_ip
leader = erhaltene_uuid # Update leader
msg = f"{NEW_LEAD}: {erhaltene_uuid}: {erhaltene_ip}".encode()
leader_ip = received_ip # Update leader_ip
leader = received_uuid # Update leader
msg = f"{NEW_LEAD}: {received_uuid}: {received_ip}".encode()
send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
send_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
send_socket.sendto(msg,(right_neighbour,election_port)) # Send message to neighbour
......@@ -337,11 +342,8 @@ def accept(group,erhaltene_uuid,erhaltene_ip):
def listen_election():
"""
Listens for incoming election or leader messages on the configured election socket.
Decodes the message, sends an acknowledgment to the sender, and processes the message via the accept() function.
Decodes the message, sends an acknowledgement to the sender, and processes the message via the accept() function.
"""
#sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages
#sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
#sock.bind((ip_address,election_port)) # Bind to the election socket
while True:
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # Create a socket to listen for election messages
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
......@@ -349,11 +351,11 @@ def listen_election():
data,addr=sock.recvfrom(4096) # Receive data from other servers
übernahme = data.decode('utf-8') # Decode the received message
grouprec = int(übernahme.split(": ")[0]) # Extract group ID, UUID, and IP address from the message
erhaltene_ip = (übernahme.split(": ")[2])
erhaltene_uuid2 = uuid.UUID((übernahme.split(": ")[1]))
received_ip = (übernahme.split(": ")[2])
received_uuid2 = uuid.UUID((übernahme.split(": ")[1]))
sock.close()
send_acknowledgement() # Send acknowledgment back to the sender
accept(grouprec,erhaltene_uuid2,erhaltene_ip) # Process the election or new leader message
send_acknowledgement() # Send acknowledgement back to the sender
accept(grouprec,received_uuid2,received_ip) # Process the election or new leader message
########################### End - Leader Election ###########################
########################### Start - Process client messages ###########################
......@@ -393,7 +395,6 @@ def listen_client():
message, client_address = server_socket.recvfrom(4096) # Receive a message from a client
decoded_message = message.decode() # Decode the message
last_three_messages.append(decoded_message) # Store the message for Leader heartbeat
#print(f"Message stored: {last_three_messages}") # Only for debugging
message_id = decoded_message.split(":")[0] # Extract the unique message ID (UUID) from the decoded message
if message_id in processed_message_ids: # Check if the message has already been processed
continue # Skip if the message was already processed
......@@ -402,7 +403,7 @@ def listen_client():
except socket.error as e: # Handle socket errors
print(f"An error occurred while listening: {e}")
break
except KeyboardInterrupt: # Handle server shutdown via keyboard interrupt ????? Funktioniert das??
except KeyboardInterrupt: # Handle server shutdown via keyboard interrupt, but does not work.
print("\nShutting down server...")
break
########################### End - Process client messages ###########################
......@@ -430,9 +431,7 @@ def listen_heartbeat():
"""
global received_heartbeat
global last_heartbeat_time
#sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for for receiving heartbeats
#sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
#sock.bind((ip_address, heartbeat_port)) # Bind the socket to the servers IP address and heartbeat port for listening
while True: # Receive data from the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket for for receiving heartbeats
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
......@@ -442,8 +441,8 @@ def listen_heartbeat():
sock.close()
send_acknowledgement() # Send acknowledgment for the received message
received_heartbeat = pickle.loads(data) # Deserialize the received data
#print(received_heartbeat) # Debugging. Print the received messages
last_heartbeat_time = time.time() # Update the last heartbeat timestamp
if leader_ip == ip_address: # If the server is the leader
if send_heartbeat==received_heartbeat:
print(f"My heartbeat has traveled through the ring.")
......@@ -451,7 +450,7 @@ def listen_heartbeat():
print(f"Received a foreign heartbeat. I am forwarding the messages.")
forward_received_heartbeat_to_clients()
else: # Forward the heartbeat to the right neighbor
print("Heartbeat is forwarded to the neigbbor.")
print("Heartbeat is forwarded to the neighbor.")
right_neighbour = get_neighbour(members_IP, ip_address, 'right')
msg = pickle.dumps(received_heartbeat)
try:
......@@ -464,7 +463,6 @@ def listen_heartbeat():
except Exception as e: # Handle errors during data transmission
print(f"Error sending data lsiten heartbeat: {e}")
def leader_send_heartbeat():
"""
Sends the Heartbeat to the next server in the ring.
......@@ -475,7 +473,7 @@ def leader_send_heartbeat():
send_heartbeat = last_three_messages # Set the send_heartbeat as the last three messages
msg = pickle.dumps(send_heartbeat) # Serialize the messages
right_neighbour = get_neighbour(members_IP, ip_address, 'right') # Determine the right neighbor based on the current ring structure
if len(members_IP) == 1: # If only this server is part of the Ring it does not have to send an Heartbeat
if len(members_IP) == 1: # If only this server is part of the Ring, it does not have to send an Heartbeat
print("I am the only server. There is no other Server to send a heartbeat to.")
last_heartbeat_time = time.time() # Update the last heartbeat timestamp
else: # Send the heartbeat to the right neighbor
......@@ -490,7 +488,7 @@ def leader_send_heartbeat():
sock_heart_send.close() # Close the socket to free up resources
time.sleep(3) # Wait before sending the next heartbeat
def monitor_heartbeat(): ################ Achtung Testen, ob auch der Leader in ein timeout kommen kann... ggf. vor removal f leader IP einfügen#######
def monitor_heartbeat():
"""
Monitors whether the last heartbeat was received within the timeout period.
"""
......@@ -523,9 +521,9 @@ def leader_send_Client_heartbeat():
print("I am sending a heartbeat to the client.") # Debugging message indicating that the server is sending a heartbeat
server_client_heartbeat = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create a UDP socket
server_client_heartbeat.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Enable broadcast mode
server_client_heartbeat.sendto(msg.encode('utf-8'), (broadcast_ip, heartbeat_client_broadcast_port)) # # Send the broadcast message to the specified broadcast IP and port
server_client_heartbeat.sendto(msg.encode('utf-8'), (broadcast_ip, heartbeat_client_broadcast_port)) # Send the broadcast message to the specified broadcast IP and port
server_client_heartbeat.close()# Close the socket to free up resources
time.sleep(2) # Wait for 15 seconds before sending the next heartbeat
time.sleep(2) # Wait for 2 seconds before sending the next heartbeat
########################### End - Client Heartbeat ###########################
########################### Start - observation value changes leader ###########################
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment