Skip to content
Snippets Groups Projects
Select Git revision
  • 97eb848858c76c7362d35e1309f711a035585db5
  • main default protected
2 results

app.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    kserver.py 6.82 KiB
    import socket
    import multiprocessing
    import uuid
    import time
    import threading
    import os
    
    #current_method=multiprocessing.set_start_method('spawn', force=True) #result=none (why??)
    #print(f"Multiprocessing start method: {current_method}")
    
    print(f"Script started with PID: {os.getpid()}") #is starting 4 times
    server_id=str(uuid.uuid4())
    
    broadcast_ip = '255.255.255.255'
    broadcast_port = 55555
    
    # Socket for broadcast
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind(('', broadcast_port))
    
    ##here
    active_servers = {}
    
    # broadcast-function
    def broadcast(message):
        """Sendet Nachrichten an alle Server-IDs im Netzwerk"""
        full_message = f"[{server_id}] {message}".encode()
        server_socket.sendto(full_message, (broadcast_ip, broadcast_port))
    
    # listener-function
    def listen(queue):
        """Receives messages from other processes and forwards them/puts them in queue"""
        global active_servers
        #server_id = shared_data["server_id"]
        while True:
            try:
                message, client_address = server_socket.recvfrom(4096)
                decoded_message = message.decode()
    
                # ignore messages from own server-ID/ also running for clients from this ip
                if decoded_message.startswith(f"[{server_id}]"):
                    continue
    
                if "REQUEST_LEADER" in decoded_message:
                  if current_leader:
                    response = f"LEADER: {current_leader}".encode()
                    server_socket.sendto(response, client_address)
    
                print(f"Received from {client_address}: {decoded_message}")
    
                # handle heartbeat messages
                if "HEARTBEAT" in decoded_message:
                    sender_uuid = decoded_message.split(": ")[1]
                    active_servers[sender_uuid] = time.time()
                    continue
    
                # put message in queue, that broadcast process can react
                queue.put(decoded_message)
    
            except socket.error as e:
                print(f"Socket error occurred: {e}")
                break
    
    # leader election function
    def start_election(queue):
        """Starts leader election based on received messages."""
        global current_leader 
        #server_id = shared_data["server_id"]
        print("Starting election...")
        broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function
    
        timeout = time.time() + 20  # wait 20 secs for answers
        highest_id = server_id
    
        while time.time() < timeout:
            # wait for messages from queue
            try:
                message = queue.get(timeout=1)
    
                # processing messages
                if "START_ELECTION" in message:
                    sender_uuid = message.split(": ")[1]
                    #print("extracted uuid?:", sender_uuid)
                    active_servers[sender_uuid] = time.time()
                    print(f"Received UUID for election: {sender_uuid}")
    
                    # compare UUIDs for leader election
                    if sender_uuid > highest_id:
                        highest_id=sender_uuid
                        print(f"Received higher ID {sender_uuid}, forwarding...")
                        broadcast(f"START_ELECTION: {sender_uuid}")
                   
                   #####does this work????########
                    elif sender_uuid < server_id:
                        highest_id=server_id
                        print(f"Received lower ID {sender_uuid}, sending own ID...")
                        broadcast(f"START_ELECTION: {server_id}")
                    else:
                        # you are the leader:
                        current_leader = server_id
                        broadcast(f"LEADER: {server_id}")
                        print(f"I am the leader: {server_id}")
    
                elif "LEADER" in message:
                    # leader was elected
                    leader_uuid = message.split(": ")[1]
                    current_leader = leader_uuid
                    print(f"Leader elected: {current_leader}")
                    broadcast(f"current leader is: {current_leader}")
                    return
    
            #continue when queue is empty
            except multiprocessing.queues.Empty:
                continue
    
        # after timeout: own server becomes leader, if no other has been chosen/higher ID
        if highest_id == server_id:
            current_leader = server_id
            broadcast(f"LEADER {server_id}")
            print(f"I am the leader: {server_id}")
        else:
            print(f"Leader election finished, leader is {highest_id}")
    
    ############broadcastLEADER#####################
    #fct
    
    ################### Heartbeat-function not working###########################
    def send_heartbeat():
        """Sends heartbeat messages regularly to keep the server ID active"""
        while True:
            broadcast(f"HEARTBEAT: {server_id}")
            time.sleep(5)  # sends every 3 seconds
    
    #monitor heartbeats
    def monitor_heartbeats():
        """Checks active servers based on heartbeat messages"""
        global active_servers
        while True:
            time.sleep(6)
            now = time.time()
            active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6}
            print(f"Active servers: {list(active_servers.keys())}")
    ################################################################################
    
    # Main
    if __name__ == "__main__":
    
       # current_process = psutil.Process()
       # print(f"DEBUG: Number of child processes: {len(current_process.children())}")
        #manager = Manager()
        #shared_data = manager.dict()  # Gemeinsame Datenstruktur
        #shared_data["server_id"] = str(uuid.uuid4())  # Nur im Hauptprozess erzeugen
    
        multiprocessing.freeze_support()  # important for Windows, to not start the main process three times
        # queue for communication between processes
    
    ####################
        print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...")
    
        # global list for known server-IDs (which does not work atm)
    
        current_leader = None
    ###################
    
        message_queue = multiprocessing.Queue()
    
        # create processes
        listener_process = multiprocessing.Process(target=listen, args=(message_queue, ))
        election_process = multiprocessing.Process(target=start_election, args=(message_queue, ))
    
        # heartbeat in seperate thread###########
        heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True)
        heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True)
    
        # start processes
        listener_process.start()
        election_process.start()
        heartbeat_thread.start()
        heartbeat_monitor_thread.start()
        #test_thread.start()
    
        try:
            # main process is waiting for the sub processes to finish
            listener_process.join()
            election_process.join()
        except KeyboardInterrupt:
            print("\nShutting down server...")
            listener_process.terminate()
            election_process.terminate()