Skip to content
Snippets Groups Projects
Commit e667f98b authored by Katharina's avatar Katharina
Browse files

improve heartbeats & try sendonlytoleader

parent adf52f7c
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,7 @@ server_socket.bind(('', broadcast_port))
print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...")
# global list for known server-IDs (which does not work atm)
active_servers = set()
active_servers = {}
current_leader = None
......@@ -31,7 +31,8 @@ def broadcast(message):
# listener-function
def listen(queue):
"""Empfängt Nachrichten von anderen Prozessen und leitet sie weiter."""
"""Receives messages from other processes and forwards them/puts them in queue"""
global active_servers
while True:
try:
message, client_address = server_socket.recvfrom(4096)
......@@ -43,6 +44,12 @@ def listen(queue):
print(f"Received from {client_address}: {decoded_message}")
# handle heartbeat messages
if "HEARTBEAT" in decoded_message:
sender_uuid = decoded_message.split(": ")[1]
active_servers[sender_uuid] = time.time()
continue
# put message in queue, that broadcast process can react
queue.put(decoded_message)
......@@ -52,13 +59,14 @@ def listen(queue):
# leader election function
def start_election(queue):
"""Startet die Leader Election basierend auf empfangenen Nachrichten."""
"""Starts leader election based on received messages."""
global current_leader
print("Starting election...")
broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function
timeout = time.time() + 20 # wait 20 secs for answers
highest_id = server_id
while time.time() < timeout:
# wait for messages from queue
try:
......@@ -68,7 +76,7 @@ def start_election(queue):
if "START_ELECTION" in message:
sender_uuid = message.split(": ")[1]
#print("extracted uuid?:", sender_uuid)
active_servers.add(sender_uuid)
active_servers[sender_uuid] = time.time()
print(f"Received UUID for election: {sender_uuid}")
# compare UUIDs for leader election
......@@ -77,6 +85,7 @@ def start_election(queue):
print(f"Received higher ID {sender_uuid}, forwarding...")
broadcast(f"START_ELECTION: {sender_uuid}")
#####does this work????########
elif sender_uuid < server_id:
highest_id=server_id
print(f"Received lower ID {sender_uuid}, sending own ID...")
......@@ -95,10 +104,11 @@ def start_election(queue):
broadcast(f"current leader is: {current_leader}")
return
#continue when queue is empty
except multiprocessing.queues.Empty:
continue
# after timeout: own server becomes leader, if no other has been chosen
# after timeout: own server becomes leader, if no other has been chosen/higher ID
if highest_id == server_id:
current_leader = server_id
broadcast(f"LEADER {server_id}")
......@@ -106,23 +116,39 @@ def start_election(queue):
else:
print(f"Leader election finished, leader is {highest_id}")
# send message to the current leader
def send_message_to_leader(message):
"""Sends a message directly to the current leader."""
global current_leader
if current_leader is None:
print("No leader available to send the message.")
return
broadcast(f"TO_LEAD: {message}")
################### Heartbeat-function not working###########################
def send_heartbeat():
"""Sendet regelmäßig Heartbeat-Nachrichten, um die Server-ID aktiv zu halten."""
"""Sends heartbeat messages regularly to keep the server ID active"""
while True:
broadcast(f"HEARTBEAT: {server_id}")
time.sleep(3) # sends every 3 seconds
time.sleep(5) # sends every 3 seconds
#monitor heartbeats
def monitor_heartbeats():
"""Überprüft die aktiven Server anhand von Heartbeat-Nachrichten."""
"""Checks active servers based on heartbeat messages"""
global active_servers
while True:
time.sleep(6) # checks every 6 seconds
time.sleep(6)
now = time.time()
active_servers = {server for server, last_seen in active_servers.items() if now - last_seen < 6}
print(f"Active servers: {active_servers}")
active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6}
print(f"Active servers: {list(active_servers.keys())}")
################################################################################
def test_send_to_leader():
"""Tests sending a message to the current leader."""
time.sleep(15) # Wait for leader election to complete
send_message_to_leader("This is a message for the leader.")
# Main
if __name__ == "__main__":
# queue for communication between processes
......@@ -135,10 +161,17 @@ if __name__ == "__main__":
# heartbeat in seperate thread###########
heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True)
heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True)
# test sending to leader in a separate thread
test_thread = threading.Thread(target=test_send_to_leader, daemon=True)
# start processes
listener_process.start()
election_process.start()
heartbeat_thread.start()
heartbeat_monitor_thread.start()
test_thread.start()
try:
# main process is waiting for the sub processes to finish
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment