Skip to content
Snippets Groups Projects
Select Git revision
  • e092cdeeb0a1da7a79b1c93bc7244a93b182e958
  • main default protected
  • feat/final
  • feature/Michelle
  • feat/010125
  • Luca
  • feat/knew
  • feat/try
  • feat/katha2
  • feature/katharina
10 results

sync.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    2024-12-16_Server.py 12.16 KiB
    #Ring testebn
    #Forming a Ring
    
    import time
    import threading
    from uuid import uuid4
    import socket
    import uuid
    import json
    import neighbour
    import multiprocessing
    import os
    from multiprocessing import Manager
    from collections import deque
    
    broadcast_ip = '255.255.255.255'
    broadcast_port = 55555
    election_port= 55559
    
    
    #dict member = {"uuid": {"IP",}}
    #Definition Atribute
    global members_UUID
    global member_IP
    members_UUID = [] #List for members in the ring
    member_IP = [] #List for ip Adresses of the members
    broadcast_ip = "255.255.255.255"  #Broadcast-adress in the Network
    port = 12348  #Port that is used for the discovery of new server participants
    server_ip = "0.0.0.0" #IP that enables the server to receive all messages that are passed to the port
    myuuid = uuid.uuid4()
    my_ID = str(myuuid) #Creating a unique ip Adress using uuid4
    ringport = 12343
    election_socket = 12345
    hostname = socket.gethostname()
    ip_address = socket.gethostbyname(hostname) ##### Für Tests anpassen "127.0.0.1"
    participating = False
    is_leader = False
    Leader = False
    ELECTION = 0
    NEW_LEAD = 1
    
    #Muss dauerhaft erfolgen
    def lausche_update_Ring():
        global member_IP
       
        # Erstellen eines UDP-Sockets
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # Setze den Socket in den Broadcast-Modus Wöfür Frage MF?????
        #sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        # Sende die Nachricht an die Broadcast-Adresse
        sock.bind((ip_address, ringport))
        while True:
            try:
                data, addr = sock.recvfrom(1024)  # Antwort empfangen
                member_IP2 = json.loads(data.decode())
                #Abgleich, ob Member_IP2 gleich Member_Ip ist
                if member_IP2 == member_IP:
                    print(f"Ring Update bereits erhalten: {member_IP}")
                else:
                    member_IP = member_IP2
                    print(f"Ring Update erhalten: {member_IP}")
                    send_update_to_ring()
                #nachricht_update_ring()
            except json.JSONDecodeError:
                print("Fehler beim Decodieren der JSON-Daten.")
    
                       
    #Function of Leader. Server is listeng to the Port to greet new servers and Update Ring Topology
    def new_server_in_ring():
        global members_UUID
        global member_IP
      
        # Erstellen eines UDP-Sockets
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # Setze den Socket in den Broadcast-Modus
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        # Binde den Socket an die Adresse und den Port
        sock.bind(("0.0.0.0", port))
        print("Server läuft und wartet auf Broadcast-Nachrichten von neuen Servern.") 
        while True:
            #Lauscht ob es ene Nachricht von anderen servern erhält
            data, addr = sock.recvfrom(1024)  # Puffergröße 1024 Bytes
            print(f"Nachricht empfangen von {addr}: {data.decode()}")
            msg = 'Es gibt bereits server'
            new_server_ip, new_server_port = addr 
            new_IP = data.decode()
            new_IP = new_IP.split(": ")[1]
            
            # ??? notwendig ???
            #Prüfen ob die ID bereis vorhanden ist und Server nur kurz die Verbindung verloren hat. Kann eigemtlich nich eintreten, da ID immer neu generiert wird
            if new_IP in member_IP:
                #msg = f"Welcome Back. Aktueller Ring: {json.dumps(members)}".encode()
                Nachricht = json.dumps(member_IP).encode()
            else:
                #members_UUID.append(new_ID) ####UUID wird in Nachricht nicht mehr übergeben
                member_IP.append(new_IP)
            sock.sendto(msg.encode(), (new_IP, new_server_port))    
            #print(f"Members des Rings sind: {members_UUID}")
            print(f"Der neue IP_Ring ist: {member_IP}")
           
            #Ring Update losschicken
            send_update_to_ring()
    
            
    
    #Server tritt bei und macht sich für andere Server bemerkbar
    def server_enters():
        global members_UUID
        msg = f"Ich bin neu: {ip_address}".encode() #Greeting message of the server entering the chat
        # Erstellen eines UDP-Sockets
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # Setze den Socket in den Broadcast-Modus
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        # Sende die Nachricht an die Broadcast-Adresse
        sock.sendto(msg, (broadcast_ip, port))
        # Warte etwas auf eine Antwort
        sock.settimeout(2)
        try:
            data, addr = sock.recvfrom(1024)  # Antwort empfangen
            print(f"Antwort von {addr}: {data.decode()}")
            #lausche_update_Ring() Nicht Notwendig, da Funktion dauerhaft im Hintergrund erfolgt....
            sock.close()
            #Liste der Members wird dem Neuzugang gesendet, Es wird geprüft, ob die json Datei ausgelesen werden kann. Json wird genutzt um Liste Vollständig zu übergeben
            #try:
                #members_UUID = json.loads(data.decode())
                #print(f"Der neue Ring ist: {members_UUID}")
                #print(f"Der neue IP_Ring ist: {member_IP}")
                
                #nachricht_update_ring()
            #except json.JSONDecodeError:
                #print("Fehler beim Decodieren der JSON-Daten.")
    
            #Übergabe der Ring Teilnehmer durch den Server an Alle anderen Teilnehnmer!!
        except socket.timeout:
            #If no answer is received the server sets itself as leader
            print(f"Keine Antwort erhalten. Ich bin jetzt der Leader. Meine IP: {ip_address}")
            #Leader fügt sich selbst als Teilnehmer in den Ring ein
            members_UUID.append(my_ID)
            member_IP.append(ip_address)
            sock.close()
            is_leader.set_value(True)
            new_server_in_ring()
            
    #Server tritt in den Ring ein/formt den Ring
    def send_update_to_ring():
        global member_IP
        right_neighbour = neighbour.get_neighbour(member_IP, ip_address, 'right')
        if not right_neighbour:
            print("No left neighbour to send updates.")
            return
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        data = json.dumps(member_IP).encode()
        try:
            sock.sendto(data, (right_neighbour , ringport))
        except Exception as e:
            print(f"Error sending data: {e}")
        sock.close()
    
    
    ##############################################################
    ##############################################################
    ##############################################################
    
    
    
    
    def start_election():
        right_neighbour = neighbour.get_neighbour(member_IP, ip_address, 'right')
        print("{} is starting an election.".format(myuuid))
        #Server nimmt an Election Teil
        participating = True
        #Nachricht wird an den Server neben an gesendet
        msg = f"{ELECTION}: {myuuid}".encode('utf-8')
        #Msg encoden? Variable setzen?
        send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        send_socket.sendto(msg,(right_neighbour,election_socket)) ########Problem an dieser Stelle??
        #data,addr=send_socket.recvfrom(4096)
        #Was passiert wenn diese Nachricht nicht ankommt?
        send_socket.close()
    
    
    def accept(group,erhaltene_uuid):
        right_neighbour = neighbour.get_neighbour(member_IP, ip_address, 'right')
        if group == ELECTION:
            #Abgleich der erhaltenen Uuid mit der eigenen uuid
            if erhaltene_uuid > myuuid:
                print("{} is forwarding without updates.".format(myuuid))
                participating = True
                msg = f"{ELECTION}: {erhaltene_uuid}".encode()
                #Weitergabe der höheren uuid
                send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
                send_socket.sendto(msg,(right_neighbour,election_socket))
                #Was passiert wenn diese Nachricht nicht ankommt?
                send_socket.close()
            if erhaltene_uuid < myuuid:
                print("{} is updating and forwarding.".format(myuuid))
                participating = True
                msg = f"{ELECTION}: {myuuid}".encode()
                #Weitergabe der höheren uuid
                send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
                send_socket.sendto(msg,(right_neighbour,election_socket))
                #Was passiert wenn diese Nachricht nicht ankommt?
                send_socket.close()
            if erhaltene_uuid == myuuid:
                print("{} starts acting as a leader!".format(myuuid))
                participating = False
                #is_leader = True
                is_leader.set_value(True)
                leader = myuuid
                msg = f"{NEW_LEAD}: {myuuid}".encode()
                #Weitergabe der höheren uuid
                send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
                send_socket.sendto(msg,(right_neighbour,election_socket))
                #Was passiert wenn diese Nachricht nicht ankommt?
                send_socket.close()
        if group == NEW_LEAD:
            if erhaltene_uuid == myuuid:
                return
            if erhaltene_uuid != myuuid:
                print("{} acknowledged new leader.".format(myuuid))
                is_leader.set_value(False)
                leader = myuuid
                msg = f"{NEW_LEAD}: {erhaltene_uuid}".encode()
                #Weitergabe der höheren uuid
                send_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
                send_socket.sendto(msg,(right_neighbour,election_socket))
                #Was passiert wenn diese Nachricht nicht ankommt?
                send_socket.close()
    
    
    #zuhören
    
    #start_election()
    def zuhören_election():
        sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.bind((ip_address,election_socket))
        while True:
            data,addr=sock.recvfrom(4096)
            übernahme = data.decode('utf-8')
            #message = ("Hello i am server").encode('utf-8')
            #Für Piggiback Acn nutzen?
            #sock.sendto(message,addr)
            grouprec = int(übernahme.split(": ")[0])
            #print(übernahme)
            #print((übernahme.split(": ")[1]))
            erhaltene_uuid2 = uuid.UUID((übernahme.split(": ")[1]))
            accept(grouprec,erhaltene_uuid2)
    
    
    def frage_benutzer():
        # Den Benutzer fragen, ob er die Funktion ausführen möchte
        antwort = input("Möchten Sie die Funktion ausführen? (Ja/Nein): ").strip().lower()
        if antwort == 'ja':
            start_election()
        else:
            print("Die Funktion wurde nicht ausgeführt.")
    
    
    
    #######################################################################
    #######################################################################
    #######################################################################
    
    
    
    #####Mit dieser Funktion wird beobachtet, ob sich ein Wert verändert
    class VariableWatcher:
        def __init__(self):
            self._value = None  # Initialisiert den Wert als None
            self.observers = []  # Erstellt eine Liste, um die Beobachter zu speichern
    
        def set_value(self, new_value):
            self._value = new_value  # Setzt den neuen Wert
            self.notify(new_value)  # Benachrichtigt alle Beobachter über den neuen Wert
    
        def add_observer(self, observer):
            self.observers.append(observer)  # Fügt einen Beobachter zur Liste hinzu
    
        def notify(self, new_value):
            for observer in self.observers:  # Iteriert durch alle Beobachter
                observer(new_value)  # Ruft die Beobachter mit dem neuen Wert auf
    
    
    
    #Wenn sich der Wert des is_leaders verändert wird diese veränderung hier geprüft
    def callback(new_value):
        print(f"Variable value changed to: {new_value}")  # Callback-Funktion, die aufgerufen wird, wenn sich der Wert ändert
        if new_value==True:
            print("Ich übernehme die Aufgaben des leaders")
            #new_server_in_ring()
        else:
            print("Leider bin ich nicht mehr leader")
    
    
    
    
    
    
    if __name__ == "__main__":
        # Thread 1 und 2 erstellen
        #thread1 = f"Thread1-{my_ID}"
        thread1 = threading.Thread(target=server_enters)  #Achtung eigentlich nur Leader!!
        thread2 = threading.Thread(target=lausche_update_Ring)
        thread3 = threading.Thread(target=frage_benutzer)
        thread4 = threading.Thread(target=zuhören_election)
    
        # Starten der Threads
        thread1.start()
        thread2.start()
        thread3.start()
        thread4.start()
     
        is_leader = VariableWatcher()
        is_leader.add_observer(callback) # Fügt die Callback-Funktion als Beobachter hinzu
    
    # Schließe den Socket an welchem Punkt??
    #sock.close()