Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
kserver.py 6.82 KiB
import socket
import multiprocessing
import uuid
import time
import threading
import os
#current_method=multiprocessing.set_start_method('spawn', force=True) #result=none (why??)
#print(f"Multiprocessing start method: {current_method}")
print(f"Script started with PID: {os.getpid()}") #is starting 4 times
server_id=str(uuid.uuid4())
broadcast_ip = '255.255.255.255'
broadcast_port = 55555
# Socket for broadcast
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('', broadcast_port))
##here
active_servers = {}
# broadcast-function
def broadcast(message):
"""Sendet Nachrichten an alle Server-IDs im Netzwerk"""
full_message = f"[{server_id}] {message}".encode()
server_socket.sendto(full_message, (broadcast_ip, broadcast_port))
# listener-function
def listen(queue):
"""Receives messages from other processes and forwards them/puts them in queue"""
global active_servers
#server_id = shared_data["server_id"]
while True:
try:
message, client_address = server_socket.recvfrom(4096)
decoded_message = message.decode()
# ignore messages from own server-ID/ also running for clients from this ip
if decoded_message.startswith(f"[{server_id}]"):
continue
if "REQUEST_LEADER" in decoded_message:
if current_leader:
response = f"LEADER: {current_leader}".encode()
server_socket.sendto(response, client_address)
print(f"Received from {client_address}: {decoded_message}")
# handle heartbeat messages
if "HEARTBEAT" in decoded_message:
sender_uuid = decoded_message.split(": ")[1]
active_servers[sender_uuid] = time.time()
continue
# put message in queue, that broadcast process can react
queue.put(decoded_message)
except socket.error as e:
print(f"Socket error occurred: {e}")
break
# leader election function
def start_election(queue):
"""Starts leader election based on received messages."""
global current_leader
#server_id = shared_data["server_id"]
print("Starting election...")
broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function
timeout = time.time() + 20 # wait 20 secs for answers
highest_id = server_id
while time.time() < timeout:
# wait for messages from queue
try:
message = queue.get(timeout=1)
# processing messages
if "START_ELECTION" in message:
sender_uuid = message.split(": ")[1]
#print("extracted uuid?:", sender_uuid)
active_servers[sender_uuid] = time.time()
print(f"Received UUID for election: {sender_uuid}")
# compare UUIDs for leader election
if sender_uuid > highest_id:
highest_id=sender_uuid
print(f"Received higher ID {sender_uuid}, forwarding...")
broadcast(f"START_ELECTION: {sender_uuid}")
#####does this work????########
elif sender_uuid < server_id:
highest_id=server_id
print(f"Received lower ID {sender_uuid}, sending own ID...")
broadcast(f"START_ELECTION: {server_id}")
else:
# you are the leader:
current_leader = server_id
broadcast(f"LEADER: {server_id}")
print(f"I am the leader: {server_id}")
elif "LEADER" in message:
# leader was elected
leader_uuid = message.split(": ")[1]
current_leader = leader_uuid
print(f"Leader elected: {current_leader}")
broadcast(f"current leader is: {current_leader}")
return
#continue when queue is empty
except multiprocessing.queues.Empty:
continue
# after timeout: own server becomes leader, if no other has been chosen/higher ID
if highest_id == server_id:
current_leader = server_id
broadcast(f"LEADER {server_id}")
print(f"I am the leader: {server_id}")
else:
print(f"Leader election finished, leader is {highest_id}")
############broadcastLEADER#####################
#fct
################### Heartbeat-function not working###########################
def send_heartbeat():
"""Sends heartbeat messages regularly to keep the server ID active"""
while True:
broadcast(f"HEARTBEAT: {server_id}")
time.sleep(5) # sends every 3 seconds
#monitor heartbeats
def monitor_heartbeats():
"""Checks active servers based on heartbeat messages"""
global active_servers
while True:
time.sleep(6)
now = time.time()
active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6}
print(f"Active servers: {list(active_servers.keys())}")
################################################################################
# Main
if __name__ == "__main__":
# current_process = psutil.Process()
# print(f"DEBUG: Number of child processes: {len(current_process.children())}")
#manager = Manager()
#shared_data = manager.dict() # Gemeinsame Datenstruktur
#shared_data["server_id"] = str(uuid.uuid4()) # Nur im Hauptprozess erzeugen
multiprocessing.freeze_support() # important for Windows, to not start the main process three times
# queue for communication between processes
####################
print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...")
# global list for known server-IDs (which does not work atm)
current_leader = None
###################
message_queue = multiprocessing.Queue()
# create processes
listener_process = multiprocessing.Process(target=listen, args=(message_queue, ))
election_process = multiprocessing.Process(target=start_election, args=(message_queue, ))
# heartbeat in seperate thread###########
heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True)
heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True)
# start processes
listener_process.start()
election_process.start()
heartbeat_thread.start()
heartbeat_monitor_thread.start()
#test_thread.start()
try:
# main process is waiting for the sub processes to finish
listener_process.join()
election_process.join()
except KeyboardInterrupt:
print("\nShutting down server...")
listener_process.terminate()
election_process.terminate()