From 6dbefadfe6afeca57c152ce791435b4dbf77d798 Mon Sep 17 00:00:00 2001
From: Michael Zanger
 <michaelzanger@74f559e5-e8f5-4c9f-b261-b23487556fc4.fritz.box>
Date: Wed, 17 Jan 2024 17:35:42 +0100
Subject: [PATCH] Leader Election

---
 chat_server.py |   2 +-
 main_server.py |   2 +-
 server.py      | 274 ++++++++++++++++++++++++++++++++-----------------
 3 files changed, 184 insertions(+), 94 deletions(-)

diff --git a/chat_server.py b/chat_server.py
index c3b9e23..026bbb9 100644
--- a/chat_server.py
+++ b/chat_server.py
@@ -5,7 +5,7 @@ import re
 
 if __name__ == '__main__':
     # Create an instance of the Server class
-    server_instance = Server(client_address="", server_id="", server_port=0, server_cache=dict(), clients_cache=dict())
+    server_instance = Server(client_address="", server_id="", server_port=0, server_cache=dict(), clients_cache=dict(), leader=False)
     
     # Get the broadcast address from the existing server_instance
     broadcast_address = server_instance.get_broadcast_address()
diff --git a/main_server.py b/main_server.py
index 3a460b3..634ab6c 100644
--- a/main_server.py
+++ b/main_server.py
@@ -9,7 +9,7 @@ if __name__ == '__main__':
     clients_cache = dict()
 
     # Create the Server process
-    server = Server(client_address, server_id, server_port, server_cache, clients_cache)
+    server = Server(client_address, server_id, server_port, server_cache, clients_cache, leader=True)
 
     # Start the Server process
     server.start()
diff --git a/server.py b/server.py
index d129148..65a3623 100644
--- a/server.py
+++ b/server.py
@@ -11,12 +11,12 @@ import uuid
 client_broadcast_listener_port = 49153
 server_broadcast_listener_port = 49154
 
-server_heartbeat_tcp_listener_port = 49152
+server_heartbeat_tcp_listener_port = 49160
 
 client_receive_chat_tcp_port = 50001
 client_forward_message_multicast_port = 51000
 
-leader_election_port = 49155
+leader_election_port = 49161
 
 multicast_group_ip = '224.0.1.1'
 
@@ -31,7 +31,7 @@ class Server(multiprocessing.Process):
     local_servers_cache = dict()
     local_clients_cache = dict()
 
-    def __init__(self, client_address, server_id, server_port, server_cache, clients_cache):
+    def __init__(self, client_address, server_id, server_port, server_cache, clients_cache, leader):
         super(Server, self).__init__()
         self.os = self.get_os_type()
         print(self.os)
@@ -39,16 +39,21 @@ class Server(multiprocessing.Process):
         self.server_address = self.get_local_ip_address()
         self.subnet_mask = self.get_subnet_mask(self.active_interface)
         print(self.active_interface)
+        self.leader = leader
         self.client_address = client_address
         self.server_id = server_id
         self.server_port = server_port
         self.local_servers_cache = server_cache
         self.local_clients_cache = clients_cache
         self.last_heartbeat_timestamp = last_heartbeat_timestamp
-        ring_socket.bind((self.server_address, leader_election_port))
+        self.ring_socket = ring_socket
+        if self.os == "macOS":
+            self.ring_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+        self.ring_socket.bind((self.server_address, leader_election_port))
         self.server_uuid = self.generate_server_uuid()
         print("My UUID: ", self.server_uuid)
-        self.participant = True
+        self.participant = False
+        self.keep_running_nonLeader = True
 
     @staticmethod
     def get_local_ip_address():
@@ -122,7 +127,7 @@ class Server(multiprocessing.Process):
 
     def run(self):
         print(self.server_id+": "+"Up and running")
-        if self.server_id == "MAIN":
+        if self.server_id == "MAIN" or self.leader == True:
             client_listener_thread = threading.Thread(target=self.listen_for_clients)
             client_listener_thread.start()
 
@@ -133,15 +138,17 @@ class Server(multiprocessing.Process):
             heartbeat_send_thread.start()
 
         else:
-            cache_update_listener_thread = threading.Thread(target=self.listen_for_cache_update)
-            client_message_listener_thread = threading.Thread(target=self.listen_for_client_messages)
-            heartbeat_receive_thread = threading.Thread(target=self.listen_for_heartbeats)
-            heartbeat_timeout_thread = threading.Thread(target=self.check_heartbeat_timeout)
-
-            cache_update_listener_thread.start()
-            client_message_listener_thread.start()
-            heartbeat_receive_thread.start()
-            heartbeat_timeout_thread.start()
+            self.cache_update_listener_thread = threading.Thread(target=self.listen_for_cache_update)
+            self.client_message_listener_thread = threading.Thread(target=self.listen_for_client_messages)
+            self.heartbeat_receive_thread = threading.Thread(target=self.listen_for_heartbeats)
+            self.heartbeat_timeout_thread = threading.Thread(target=self.check_heartbeat_timeout)
+            self.leader_election_thread = threading.Thread(target=self.leader_election)
+
+            self.cache_update_listener_thread.start()
+            self.client_message_listener_thread.start()
+            self.heartbeat_receive_thread.start()
+            self.heartbeat_timeout_thread.start()
+            self.leader_election_thread.start()
     
     def get_broadcast_address(self):
         IP = self.server_address
@@ -155,10 +162,13 @@ class Server(multiprocessing.Process):
         return broadcast_address
             
     def send_heartbeat(self):
+        print("Heartbeat Sending started")
+        self.local_servers_cache = Server.local_servers_cache
+        print("Local Server Cache:", self.local_servers_cache)
         while True:
             time.sleep(10)
             for server_id, server_address in self.local_servers_cache.items():
-                if server_id != self.server_id:
+                if server_address[0] != self.server_address:
                     acknowledgment_received = self.send_heartbeat_to_server(server_address[0], server_heartbeat_tcp_listener_port)
                     #acknowledgment_received = "YES"
                     if acknowledgment_received:
@@ -175,7 +185,7 @@ class Server(multiprocessing.Process):
                 s.settimeout(2)  # Timeout for the connection
                 # Combine server address and port into a tuple
                 server_address_with_port = (server_address, server_port)
-                print("server_address_with_port", server_address_with_port)
+                print("Send Heartbeat to: ", server_address_with_port)
                 s.connect(server_address_with_port)
                 s.sendall(b'HEARTBEAT')
                 acknowledgment = s.recv(1024)
@@ -186,30 +196,33 @@ class Server(multiprocessing.Process):
         return acknowledgment_received
     
     def listen_for_heartbeats(self):
-        while True:
+        while self.keep_running_nonLeader == True:
             try:
                 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
-                    if self.os == "macOS":
-                        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
-                    s.bind((self.server_address, server_heartbeat_tcp_listener_port))
-                    actual_port = s.getsockname()[1]
-                    print(f"Heartbeat Listener Started on port {actual_port}")
-                    s.listen()
-                    conn, addr = s.accept()
-                    with conn:
-                        data = conn.recv(1024)
-                        if data == b'HEARTBEAT':
-                            # Handle the received heartbeat
-                            print(f"Heartbeat received from {addr}")
-                            # Update the timestamp of the last received heartbeat
-                            self.last_heartbeat_timestamp = time.time()
-                            # Send an acknowledgment
-                            conn.sendall(b'ACK')
+                    try:
+                        if self.os == "macOS":
+                            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+                        s.bind((self.server_address, server_heartbeat_tcp_listener_port))
+                        actual_port = s.getsockname()[1]
+                        print(f"Heartbeat Listener Started on port {actual_port}")
+                        s.listen()
+                        conn, addr = s.accept()
+                        with conn:
+                            data = conn.recv(1024)
+                            if data == b'HEARTBEAT':
+                                # Handle the received heartbeat
+                                print(f"Heartbeat received from {addr}")
+                                # Update the timestamp of the last received heartbeat
+                                self.last_heartbeat_timestamp = time.time()
+                                # Send an acknowledgment
+                                conn.sendall(b'ACK')
+                    except socket.timeout:
+                        pass #socket timeout
             except socket.error as e:
                 print(f"Error: {e}")
 
     def check_heartbeat_timeout(self):
-        while True:
+        while self.keep_running_nonLeader == True:
             time.sleep(5)  # Adjust the interval as needed
             if self.last_heartbeat_timestamp is not None:
                 current_time = time.time()
@@ -218,8 +231,9 @@ class Server(multiprocessing.Process):
                 if current_time - self.last_heartbeat_timestamp >= timeout_duration:
                     print(f"No heartbeats received for {timeout_duration} seconds. Initiating LCR...")
                     # Call a function to initiate the LCR algorithm here
-                    print(self.local_servers_cache)
-                    self.start_leader_election(self.server_uuid, self.local_servers_cache)
+                    print("local server cache:", self.local_servers_cache)
+                    Server.local_servers_cache = self.local_servers_cache
+                    self.start_leader_election()
     
     # find highest server ID in cache
     def get_last_server_id(self):      
@@ -445,19 +459,21 @@ class Server(multiprocessing.Process):
 
         print(self.server_id+": "+"Listening to cache update broadcast messages")
 
-        while True:
-            data, addr = listen_socket.recvfrom(1024)
-            if data:
-                message = data.decode('utf-8')
-                print(self.server_id+": "+"Received cache update broadcast message:")
-                splitted = message.split("_")
-                server_cache_json = json.loads(splitted[0])
-                client_cache_json = json.loads(splitted[1])
-                self.local_servers_cache = server_cache_json
-                self.local_clients_cache = client_cache_json
-
-                print(self.local_servers_cache)
-                print(self.local_clients_cache)
+        while self.keep_running_nonLeader == True:
+            try:
+                data, addr = listen_socket.recvfrom(1024)
+                if data:
+                    message = data.decode('utf-8')
+                    print(self.server_id+": "+"Received cache update broadcast message:")
+                    splitted = message.split("_")
+                    server_cache_json = json.loads(splitted[0])
+                    client_cache_json = json.loads(splitted[1])
+                    self.local_servers_cache = server_cache_json
+                    self.local_clients_cache = client_cache_json
+                    print("Server Cache:", self.local_servers_cache)
+                    print("Client Cache:", self.local_clients_cache)
+            except socket.timeout:
+                pass #Timeout reached
 
     def listen_for_client_messages(self):
 
@@ -514,63 +530,137 @@ class Server(multiprocessing.Process):
 
             multicast_socket.close()
 
-    def start_leader_election(self, server, neighbor_dict):
-        self.send_election_message(server, neighbor_dict['server_address'])
-        self.set_participant(True)
-    
-    def send_election_message(server, neighbor_address):
-        election_message = {"mid": server, "isLeader": False}
-        ring_socket.sendto(json.dumps(election_message).encode(), neighbor_address)
+    def start_leader_election(self):
+        #Reset last heartbeat timestamp
+        self.last_heartbeat_timestamp = None
 
-    def get_neighbour(servers, current_server, direction):
-        if not servers:
+        #Check if last remaining server -> instant self leader declaration
+        if len(self.local_servers_cache) == 1:
+            self.handle_leader_tasks()
+        else:
+            # Get the address of the neighboring server to the 'right' in the ring.
+            neighbor_info = self.get_neighbour('right')
+            if neighbor_info:
+                self.send_election_message(neighbor_info['server_address'], uuid=None, isLeader=False)
+
+    def send_election_message(self, neighbor_address, uuid, isLeader):
+        if isLeader == False and uuid is None:
+            # Create an election message with the server's UUID.
+            election_message = {"id": self.server_uuid, "isLeader": False}
+            # Send the election message to the neighbor's address.
+            self.ring_socket.sendto(json.dumps(election_message).encode(), (neighbor_address[0], leader_election_port))
+            print(election_message, "...sent to: ", neighbor_address, "...on port: ", leader_election_port)
+        elif isLeader == True and uuid != self.server_uuid: #Agreeing to someone else as leader
+            election_message = {"id": uuid, "isLeader": True}
+            # Send the election message to the neighbor's address.
+            self.ring_socket.sendto(json.dumps(election_message).encode(), (neighbor_address[0], leader_election_port))
+        elif isLeader == False and uuid == self.server_uuid: #Suggesting itself to leader
+            election_message = {"id": self.server_uuid, "isLeader": True}
+            # Send the election message to the neighbor's address.
+            self.ring_socket.sendto(json.dumps(election_message).encode(), (neighbor_address[0], leader_election_port))
+            print(election_message, "...sent to: ", neighbor_address, "...on port: ", leader_election_port)
+        else: #Sending pid 
+            election_message = {"id": uuid, "isLeader": False}
+            # Send the election message to the neighbor's address.
+            self.ring_socket.sendto(json.dumps(election_message).encode(), (neighbor_address[0], leader_election_port))
+            print(election_message, "...sent to: ", neighbor_address, "...on port: ", leader_election_port)
+
+    def get_neighbour(self, direction):
+        # First, convert the dictionary keys (server IDs) to a list.
+        server_ids = list(self.local_servers_cache.keys())
+
+        # Find the server ID corresponding to self.server_address.
+        current_server_id = None
+        for server_id, address in self.local_servers_cache.items():
+            if address[0] == self.server_address:
+                current_server_id = server_id
+                break
+
+        if current_server_id is None:
+            print(f"Server with IP address {self.server_address} not found in the cache.")
             return None
 
-        server_ids = list(servers.keys())
-        current_index = server_ids.index(current_server)
+        # Find the index of the current server ID in the list of server IDs.
+        current_index = server_ids.index(current_server_id)
 
-        if direction == 'left':
-            neighbor_index = (current_index - 1) % len(server_ids)
-        elif direction == 'right':
+        # Determine the index of the next or previous server in the ring.
+        if direction == 'right':
+            # Get the next server in the ring.
             neighbor_index = (current_index + 1) % len(server_ids)
+        elif direction == 'left':
+            # Get the previous server in the ring.
+            neighbor_index = (current_index - 1) % len(server_ids)
         else:
             return None
 
+        # Get the neighbor's server ID.
         neighbor_server_id = server_ids[neighbor_index]
-        neighbor_server_info = servers[neighbor_server_id]
-
-        return {'server_id': neighbor_server_id, 'server_address': neighbor_server_info}
 
+        # Return the neighbor's address as a tuple (IP, port).
+        neighbor_address = self.local_servers_cache[neighbor_server_id]
+        return {'server_address': (neighbor_address[0], neighbor_address[1])}
+    
     def set_participant(p):
         global participant
         participant = p
     
     def leader_election(self):
-        while True:
-            data, address = ring_socket.recvfrom(4096)
+        self.local_servers_cache = Server.local_servers_cache
+        #ring_socket.bind((self.server_address, leader_election_port))  # Bind to the server's IP and leader election port
+        while self.keep_running_nonLeader == True:
+            # Receive election messages from neighbors.
+            print(self.ring_socket)
+            data, address = self.ring_socket.recvfrom(4096)
+            #Reset last heartbeat timestamp
+            self.last_heartbeat_timestamp = None
+            print("first yes")
             if data:
                 election_message = json.loads(data.decode())
-                received_mid = election_message.get('mid')
-                received_mid_id = received_mid['server_uuid']  # Correctly access the server UUID
-                
-                # Compare received_mid_id with self.server_uuid (UUID comparison)
-                if received_mid_id > self.server_uuid:
-                    self.send_election_message(self.server_uuid, self.get_neighbour(self.local_servers_cache, self.server_uuid, 'left')['server_address'])
-                elif received_mid_id == self.server_uuid and self.participant:
-                    self.send_election_message(self.server_uuid, self.get_neighbour(self.local_servers_cache, self.server_uuid, 'left')['server_address'])
-                elif received_mid_id < self.server_uuid and not self.participant:
-                    self.send_election_message(self.server_uuid, self.get_neighbour(self.local_servers_cache, self.server_uuid, 'left')['server_address'])
-                
-                # Determine participant status and leader
-                self.participant = True  # You may adjust this logic
-                if election_message['isLeader']:
-                    self.set_leader(election_message['mid'])
-                    if self.server_uuid != received_mid_id:
-                        self.set_is_leader(False)
-                        self.send_election_message(self.server_uuid, self.get_neighbour(self.local_servers_cache, self.server_uuid, 'left')['server_address'])
+                received_id = election_message.get('id')
+                received_isLeader = election_message.get('isLeader')
+                print("Received UUID:", received_id)
+                print("Own UUID:", self.server_uuid)
+                print("Received isLeader:", received_isLeader)
+                if received_isLeader == False:
+                    # Logic to handle the election process based on the received UUID.
+                    if not self.participant:
+                        # Forward the message to the next server in the ring.
+                        neighbor_info = self.get_neighbour('right')
+                        print("got neighbor", neighbor_info)
+                        if neighbor_info:
+                            self.send_election_message(neighbor_info['server_address'], received_id, received_isLeader)
+                    self.participant = True
+                elif received_isLeader == True:
+                    if received_id == self.server_uuid:
+                        self.handle_leader_tasks()
+                    elif received_id > self.server_uuid:
+                        # Agree to leader
+                        neighbor_info = self.get_neighbour('right')
+                        print("got neighbor", neighbor_info)
+                        if neighbor_info:
+                            self.send_election_message(neighbor_info['server_address'], received_id, received_isLeader)
                     else:
-                        self.set_is_leader(True)
-                    self.participant = False  # You may set this as not a participant
-
-
-
+                        print("Failed")
+                    self.participant = False
+                else:
+                    print("Leader Election failed")
+
+
+    def declare_victory(self):
+        # Declare this server as the leader and notify neighbors.
+        victory_message = {"mid": self.server_uuid, "isLeader": True}
+        neighbor_info = self.get_neighbour('right')
+        self.ring_socket.sendto(json.dumps(victory_message).encode(), neighbor_info['server_address'])
+        print(self.ring_socket)
+        #if neighbor_info:
+        #    self.send_election_message(neighbor_info['server_address'], self.server_uuid, True)
+
+    def handle_leader_tasks(self):
+        # Perform leader-specific tasks here
+        print(self.server_address, " is now the leader.")
+        self.leader = True
+        self.stop_threads()
+        self.run()
+
+    def stop_threads(self):
+        self.keep_running_nonLeader = False
-- 
GitLab