Skip to content
Snippets Groups Projects
Select Git revision
  • ec78444ea71bb78809fbbd8ccc70dc938cd912e8
  • main default protected
  • develop
  • feature2YildirimHatice
  • feature2RafehDaniel
  • feature2GotsisWasilios
  • feature2BerishaAlma
  • feature2AliciMuhamed
  • featureYildirimHatice
  • featureAliciMuhamed
  • featureRafehDaniel
  • featureBer
  • feature7
  • feature6
  • feature5
  • feature4
  • feature3
  • feature2
  • feature1
19 results

stringCalculator.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    server.py 12.74 KiB
    ##########unsicher, ob heartbeat funktioniert################
    ###########deutsch englisch korrigieren###################
    ################liste mit message ids auf 3-5 setzen########
    
    import socket
    import multiprocessing
    import uuid
    import time
    import threading
    import os
    from multiprocessing import Manager
    
    # TCP Socket erstellen
    #tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    #tcp_server_socket.bind(('', 55556))  # Listen auf einem separaten Port
    #tcp_server_socket.listen(5)
    
    broadcast_ip = '255.255.255.255'
    broadcast_port = 55555
    election_port= 55559
    
    # Socket for broadcast
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind(('', broadcast_port))
    
    # Socket for broadcast
    election_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    election_socket.bind(('', election_port))
    
    # Global list for active servers
    active_servers = {}
    
    # Broadcast function
    def broadcast(message):
        """Sends messages to all server IDs in the network."""
        full_message = f"{message}".encode()
        #full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message
        server_socket.sendto(full_message, (broadcast_ip, broadcast_port))
    
    # Broadcast function
    def broadcast_election(message):
        """Sends messages to all server IDs in the network."""
        full_message = f"{message}".encode()
        #full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message
        election_socket.sendto(full_message, (broadcast_ip, election_port))
    
    # Listener function
    def listen_broadcast_election(queue, shared_data):
        """Receives messages and updates shared state."""
        print(f"Listener from {shared_data['server_id']} is listening...")
        global active_servers
        server_socket.setblocking(False)  # Set socket to non-blocking mode
        #global sender_uuid
        while True:
            try:
                message, sender_address = election_socket.recvfrom(4096)
                decoded_sender_message = message.decode()
    
                print(f"Received from {sender_address}: {decoded_sender_message}")
    
                # Handle heartbeat messages
                if "HEARTBEAT" in decoded_sender_message:
                    shared_data['sender_uuid'] = decoded_sender_message.split(": ")[1]
                    #print("sender_uuid", sender_uuid) #debug
                    active_servers[shared_data['sender_uuid']] = time.time() #update active server dictionary
                    #send to neighbour
                    continue
    
                # Put message in the queue for election process
                queue.put(decoded_sender_message)
    
            except socket.error as e:
                print(f"Socket error occurred: {e}")
                break
            time.sleep(0.5)  # Small delay to prevent busy waiting
    
    #listener function
    ##################TypeError: listen_client() takes 0 positional arguments but 2 were given############
    def listen_client(shared_data):
        """receives messages from clients and broadcast them."""
        #manager=Manager()
    
        #global received_messages
        received_messages = set()
    
        #save_last_messages = manager.dict()
        #message_queue = multiprocessing.Queue(maxsize=3)
        
        #global save_last_messages
        #only listen if current leader
        #if shared_data['current_leader'] == shared_data['server_id']:
        while True:
            try:
                message, client_address = server_socket.recvfrom(4096)
                decoded_message = message.decode()
    
                #create a dictionary for message uuid
               # message_uuid = decoded_message.split(": ")[0]
                #print("message id:", message_uuid) #debug
    
                # Extract message UUID and check if already processed
                message_id = decoded_message.split(":")[0]
                if message_id in received_messages:
                    continue
                received_messages.add(message_id)
    
                # Enqueue messages and maintain a fixed size
                """if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)):
                    if message_queue.full():
                        message_queue.get()  # Remove oldest message
                    message_queue.put(decoded_message)
    
                # Print all messages
                print("here are your last messages:", list(message_queue.queue))  # Debug
                """
                
                """
                if message_uuid not in save_last_messages:
                    if len(save_last_messages) <= 3:
                        oldest_key = next(iter(save_last_messages))  # get oldest key
                        del save_last_messages[oldest_key]          # delete it
                    #save_last_messages["last message"] = message_uuid
                    save_last_messages[message_uuid] = decoded_message
                        #save_last_messages.update(f"last message: {message}: {message_uuid}")
                print("here are your last messages:", dict(save_last_messages)) #debug
                """
    
                #broadcast_election(save_last_messages) #send to neighbour
    
                # checks wheter the message comes from the server itself
                if decoded_message.startswith(f"{shared_data['server_id']}"):
                    continue  # if...ignores it
                
                #checks if the message containts the string "entered"
                if decoded_message.__contains__("entered"):
                    print(f"{client_address} entered the chat.") #client_address is the nickname
    
                print(f"Received from {client_address}: {decoded_message}")
    
                broadcast(decoded_message)
                
            #exceptions
            except socket.error as e:
                print(f"An error occurred: {e}")
                break
    
            #does not work
            except KeyboardInterrupt:
                print("\nServer wird beendet...")
                break
    
    
    # Leader election function
    def start_election(queue, shared_data):
        """Starts leader election based on received messages."""
        global active_servers
        #sender_uuid = None
        #global sender_uuid
        #current_leader = None
        print("Starting election...")
        broadcast_election(f"START_ELECTION: {shared_data['server_id']}") #every server which broadcasts his id is going to be part of the election
    
        timeout = time.time() + 20  # 20-second timeout, waiting for other servers
        highest_id = shared_data['server_id'] #highest id is own id
        leader= False
    
        #while current time is smaller than current time plus 20s:
        while time.time() < timeout:
            try:
                message = queue.get(timeout=1) #waits 1 sec for a message
                if shared_data['server_id']==shared_data['sender_uuid']: #wtf??????????????
                 continue
                if "START_ELECTION" in message:
                    shared_data['sender_uuid'] = message.split(": ")[1]
                    active_servers[shared_data['sender_uuid']] = time.time()
                    print(f"Received UUID for election: {shared_data['sender_uuid']}")
    
    ################dumblogic###onlyworkswith21erver################################
                    if leader == False:
                        if highest_id>shared_data['sender_uuid']:
                            leader == True
                        elif highest_id<shared_data['sender_uuid']:
                            highest_id=shared_data['sender_uuid']
                            leader == True
                    
                    if leader == True:
                        highest_id=shared_data['current_leader']
                        print(f"(sender) Leader elected: {shared_data['current_leader']}")
                        broadcast_election(f"LEADER: {shared_data['current_leader']}")
                        
                    else:
                            shared_data['current_leader'] = shared_data['server_id']
                            print(f"(official, its me) Leader elected: {shared_data['current_leader']}")
                            broadcast_election(f"LEADER: {shared_data['current_leader']}")
    
                elif "LEADER" in message:
                    leader_uuid = message.split(": ")[1]
                    shared_data['current_leader'] = leader_uuid
                    #time.sleep(1)  # Delay to ensure all processes are aware of the leader
                    print(f"(leader) Leader elected: {shared_data['current_leader']}")
                    leader_election_done.set()  # Signal, dass die Leaderwahl abgeschlossen ist
                    return
    
            except multiprocessing.queues.Empty:
                continue
    
            #if highest_id == shared_data['server_id']:
            #    shared_data['current_leader'] = shared_data['server_id']
            #    broadcast(f"LEADER: {shared_data['current_leader']}")
            #    print(f"I am the leader: {shared_data['current_leader']}")
            #else:
            #    print(f"Leader election finished, leader is {highest_id}")
    
    #################tcpclient#################################
    #def handle_tcp_client(conn, addr):
        """Sendet die Leader-Information über eine TCP-Verbindung."""
    #    print(f"TCP connection established with {addr}")
    #    if shared_data['current_leader']:
    #        conn.send(f"LEADER: {shared_data['current_leader']}".encode())
    #    else:
    #        conn.send("NO_LEADER".encode())
    #    conn.close()
    
    #def tcp_listener():
        """Wartet auf TCP-Verbindungen und bearbeitet sie."""
    #    while True:
    #        conn, addr = tcp_server_socket.accept()
    #        threading.Thread(target=handle_tcp_client, args=(conn, addr)).start()
    
    ######################endtcp#################################################
    
    # Heartbeat function
    def send_heartbeat(shared_data):
        """Sends heartbeat messages to keep the server active."""
        leader_election_done.wait()  # Blockiert, bis die Leaderwahl abgeschlossen ist
        print(f"Heartbeat function started. Leader: {shared_data['current_leader']}, Server ID: {shared_data['server_id']}")
        #only if you are the current leader
        if shared_data['current_leader'] == shared_data['server_id']:
            #time.sleep(10) #waiting for leader election #set to 20s
            while True:
                try:
                    broadcast_election(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election
                    time.sleep(5)
                except socket.error as e:
                    print(f"Failed to send heartbeat: {e}")
    
    # Monitor heartbeats
    def monitor_heartbeats():
        """Checks active servers based on heartbeat messages."""
        #only if you are the current leader
        if shared_data['current_leader'] == shared_data['server_id']:
            global active_servers
            while True:
                time.sleep(6)
                now = time.time()
                active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6}
                print(f"Active servers: {list(active_servers.keys())}")
    
    
    # Main function
    if __name__ == "__main__":
        multiprocessing.freeze_support()
    
        print(f"Script started with PID: {os.getpid()}")
        leader_election_done = threading.Event()
    
        # Manager for shared data
        manager = Manager()
        shared_data = manager.dict()
        shared_data['server_id'] = str(uuid.uuid4())  # Generate server ID once
        shared_data['current_leader'] = None
        shared_data['sender_uuid'] = None
        #sender_uuid = None
    
        print(f"Server is running with ID {shared_data['server_id']} and broadcasting on port {broadcast_port}...")
    
        message_queue = multiprocessing.Queue()
    
        save_last_messages={}
    
        # Create processes
        #listener_election_process = multiprocessing.Process(target=listen_broadcast_election, args=(message_queue, shared_data))
        listener_client_process = multiprocessing.Process(target=listen_client, args=(shared_data,))
        election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data))
        # Start the listener for clients in a thread
        listener_election_thread = threading.Thread(target=listen_broadcast_election, args=(message_queue, shared_data), daemon=True)
    
        # Heartbeat threads
        heartbeat_thread = threading.Thread(target=send_heartbeat, args=(shared_data,), daemon=True)
        heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True)
    
        # Start processes and threads
        #listener_election_process.start()
        listener_client_process.start()
       
        listener_election_thread.start()
        election_process.start()
        heartbeat_thread.start()
        heartbeat_monitor_thread.start()
    
        #heartbeat_thread.join()
        #heartbeat_monitor_thread.join()
        
        try:
            #listener_election_process.join()
            listener_client_process.join()
            election_process.join()
        except KeyboardInterrupt:
            print("\nShutting down server...")
            #listener_election_process.terminate()
            listener_client_process.terminate()
            election_process.terminate()