Skip to content
Snippets Groups Projects
Commit 2d192dab authored by Alexander Schulz's avatar Alexander Schulz
Browse files

reassign groups when group server down

parent 4dc515fd
No related branches found
No related tags found
No related merge requests found
...@@ -21,9 +21,11 @@ class Client(multiprocessing.Process): ...@@ -21,9 +21,11 @@ class Client(multiprocessing.Process):
send_thread = threading.Thread(target=self.send_message) send_thread = threading.Thread(target=self.send_message)
receive_thread = threading.Thread(target=self.receive_messages) receive_thread = threading.Thread(target=self.receive_messages)
receive_new_server_thread = threading.Thread(target=self.receive_new_server)
send_thread.start() send_thread.start()
receive_thread.start() receive_thread.start()
receive_new_server_thread.start()
# waiting for thread to stop = prevent the programm from shutdown before thread is stopped # waiting for thread to stop = prevent the programm from shutdown before thread is stopped
send_thread.join() send_thread.join()
...@@ -85,3 +87,18 @@ class Client(multiprocessing.Process): ...@@ -85,3 +87,18 @@ class Client(multiprocessing.Process):
connection, addr = client_receive_message_socket.accept() connection, addr = client_receive_message_socket.accept()
message = connection.recv(1024) message = connection.recv(1024)
print(f"GC message: {message.decode('utf-8')}") print(f"GC message: {message.decode('utf-8')}")
def receive_new_server(self):
PORT = 52000
client_receive_message_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_receive_message_socket.bind((self.client_address, PORT))
client_receive_message_socket.listen()
print("Listening for server address update messages")
while True:
connection, addr = client_receive_message_socket.accept()
message = connection.recv(1024)
print(f"New server: {message.decode('utf-8')}")
self.registered_server = message.decode('utf-8')
\ No newline at end of file
...@@ -4,4 +4,4 @@ import time ...@@ -4,4 +4,4 @@ import time
if __name__ == '__main__': if __name__ == '__main__':
client = Client() client = Client()
time.sleep(20) time.sleep(100)
...@@ -14,11 +14,5 @@ if __name__ == '__main__': ...@@ -14,11 +14,5 @@ if __name__ == '__main__':
# Start the Server process # Start the Server process
server.start() server.start()
# Optionally, if you want the main program to wait for a certain duration # Wait indefinitely until the server process completes its execution
# and then terminate the server, you can use:
# time.sleep(20)
# server.terminate() # This will terminate the server process after 20 seconds
# If you want the main program to wait indefinitely until the server process
# completes its execution, use:
server.join() server.join()
...@@ -30,6 +30,7 @@ class Server(multiprocessing.Process): ...@@ -30,6 +30,7 @@ class Server(multiprocessing.Process):
client_cache_key_offset = 0 client_cache_key_offset = 0
local_servers_cache = dict() local_servers_cache = dict()
local_clients_cache = dict() local_clients_cache = dict()
local_group_cache = dict()
def __init__(self, client_address, server_id, server_port, server_cache, clients_cache, leader): def __init__(self, client_address, server_id, server_port, server_cache, clients_cache, leader):
super(Server, self).__init__() super(Server, self).__init__()
...@@ -55,6 +56,7 @@ class Server(multiprocessing.Process): ...@@ -55,6 +56,7 @@ class Server(multiprocessing.Process):
print("My UUID: ", self.server_uuid) print("My UUID: ", self.server_uuid)
self.participant = False self.participant = False
self.keep_running_nonLeader = True self.keep_running_nonLeader = True
self.is_admin_of_groupchat = False
@staticmethod @staticmethod
def get_local_ip_address(): def get_local_ip_address():
...@@ -128,7 +130,7 @@ class Server(multiprocessing.Process): ...@@ -128,7 +130,7 @@ class Server(multiprocessing.Process):
def run(self): def run(self):
print(self.server_id+": "+"Up and running") print(self.server_id+": "+"Up and running")
if self.server_id == "MAIN" or self.leader == True: if self.server_id == "MAIN":
client_listener_thread = threading.Thread(target=self.listen_for_clients) client_listener_thread = threading.Thread(target=self.listen_for_clients)
client_listener_thread.start() client_listener_thread.start()
...@@ -151,6 +153,8 @@ class Server(multiprocessing.Process): ...@@ -151,6 +153,8 @@ class Server(multiprocessing.Process):
self.heartbeat_timeout_thread.start() self.heartbeat_timeout_thread.start()
self.leader_election_thread.start() self.leader_election_thread.start()
self.is_admin_of_groupchat = True
def get_broadcast_address(self): def get_broadcast_address(self):
IP = self.server_address IP = self.server_address
MASK = self.subnet_mask MASK = self.subnet_mask
...@@ -167,14 +171,26 @@ class Server(multiprocessing.Process): ...@@ -167,14 +171,26 @@ class Server(multiprocessing.Process):
print("Local Server Cache:", self.local_servers_cache) print("Local Server Cache:", self.local_servers_cache)
while True: while True:
time.sleep(10) time.sleep(10)
failed_group_server = []
for server_id, server_address in self.local_servers_cache.items(): for server_id, server_address in self.local_servers_cache.items():
if server_address[0] != self.server_address: if server_address[0] != self.server_address:
count = 0
for i in range(0,3):
acknowledgment_received = self.send_heartbeat_to_server(server_address[0], server_heartbeat_tcp_listener_port) acknowledgment_received = self.send_heartbeat_to_server(server_address[0], server_heartbeat_tcp_listener_port)
#acknowledgment_received = "YES" #acknowledgment_received = "YES"
if acknowledgment_received: if acknowledgment_received:
print(f"Heartbeat acknowledgment received from {server_id}") print(f"Heartbeat acknowledgment received from {server_id}")
break
else: else:
print(f"No acknowledgment received from {server_id}. Server may be down.") count = count + 1
print(f"No acknowledgment received from {server_id}. Server may be down. Error Count : {count}")
if count == 3:
failed_group_server.append(server_id)
for server_id in failed_group_server:
del self.local_servers_cache[server_id]
self.reassign_chat_groups(server_id)
def send_heartbeat_to_server(self, server_address, server_port): def send_heartbeat_to_server(self, server_address, server_port):
acknowledgment_received = False acknowledgment_received = False
...@@ -221,6 +237,49 @@ class Server(multiprocessing.Process): ...@@ -221,6 +237,49 @@ class Server(multiprocessing.Process):
except socket.error as e: except socket.error as e:
print(f"Error: {e}") print(f"Error: {e}")
# find every group where the dead server was admin and reassign group to (new) MAIN server
def reassign_chat_groups(self, dead_server_id):
reassigned_groups = []
for group in self.local_group_cache:
if self.local_group_cache[group] == dead_server_id:
self.local_group_cache[group] = self.server_id
reassigned_groups.append(group)
update_cache_thread2 = threading.Thread(target=self.updateCacheList)
update_group_server_of_client_thread = threading.Thread(target=self.update_group_server_of_client(reassigned_groups))
update_cache_thread2.start()
update_group_server_of_client_thread.start()
# find out which clients need to be informed about their groupchat server change
def update_group_server_of_client(self, reassigned_groups):
for group in reassigned_groups:
clients_to_inform = []
for client in self.local_clients_cache:
if client[0] == group:
clients_to_inform.append(client)
new_group_server_addr = self.server_address
self.send_client_new_group_server_address(new_group_server_addr, clients_to_inform)
# inform clients about the address of their new groupchat server
def send_client_new_group_server_address(self, addr, clients_to_inform):
PORT = 52000
for client in clients_to_inform:
client_addr = self.local_clients_cache[client]
try:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.connect((client_addr[0], PORT))
server_socket.sendall(addr.encode('utf-8'))
server_socket.close()
except (ConnectionRefusedError, TimeoutError):
print(f'Unable to send to {client_addr}')
def check_heartbeat_timeout(self): def check_heartbeat_timeout(self):
while self.keep_running_nonLeader == True: while self.keep_running_nonLeader == True:
time.sleep(5) # Adjust the interval as needed time.sleep(5) # Adjust the interval as needed
...@@ -237,9 +296,8 @@ class Server(multiprocessing.Process): ...@@ -237,9 +296,8 @@ class Server(multiprocessing.Process):
# find highest server ID in cache # find highest server ID in cache
def get_last_server_id(self): def get_last_server_id(self):
if self.local_servers_cache: if self.local_group_cache:
return ord(max(self.local_servers_cache, key=lambda k: ord(k))) return ord(max(self.local_group_cache, key=lambda k: ord(k)))
#return max(self.local_servers_cache)
else: else:
# ascii value before A # ascii value before A
return 64 return 64
...@@ -270,6 +328,8 @@ class Server(multiprocessing.Process): ...@@ -270,6 +328,8 @@ class Server(multiprocessing.Process):
new_server_id = chr(last_server_id + 1) new_server_id = chr(last_server_id + 1)
#new_server_id = last_server_id + 1 #new_server_id = last_server_id + 1
self.local_servers_cache[new_server_id] = addr self.local_servers_cache[new_server_id] = addr
self.local_group_cache[new_server_id] = new_server_id
print("GroupCache: ", self.local_group_cache)
print(self.server_id+": "+"Received server register broadcast message:", message) print(self.server_id+": "+"Received server register broadcast message:", message)
...@@ -420,9 +480,10 @@ class Server(multiprocessing.Process): ...@@ -420,9 +480,10 @@ class Server(multiprocessing.Process):
BROADCAST_ADDRESS = self.get_broadcast_address() BROADCAST_ADDRESS = self.get_broadcast_address()
servers_cache_as_string = json.dumps(self.local_servers_cache, indent=2).encode('utf-8') servers_cache_as_string = json.dumps(self.local_servers_cache, indent=2).encode('utf-8')
clients_cache_as_string = json.dumps(self.local_clients_cache, indent=2).encode('utf-8') clients_cache_as_string = json.dumps(self.local_clients_cache, indent=2).encode('utf-8')
group_cache_as_string = json.dumps(self.local_group_cache, indent=2).encode('utf-8')
separator = "_" separator = "_"
MSG = servers_cache_as_string + separator.encode('utf-8') + clients_cache_as_string MSG = servers_cache_as_string + separator.encode('utf-8') + clients_cache_as_string + separator.encode('utf-8') + group_cache_as_string
broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
...@@ -468,8 +529,11 @@ class Server(multiprocessing.Process): ...@@ -468,8 +529,11 @@ class Server(multiprocessing.Process):
splitted = message.split("_") splitted = message.split("_")
server_cache_json = json.loads(splitted[0]) server_cache_json = json.loads(splitted[0])
client_cache_json = json.loads(splitted[1]) client_cache_json = json.loads(splitted[1])
group_cache_json = json.loads(splitted[2])
self.local_servers_cache = server_cache_json self.local_servers_cache = server_cache_json
self.local_clients_cache = client_cache_json self.local_clients_cache = client_cache_json
self.local_group_cache = group_cache_json
print("Group Cache: ", self.local_group_cache)
print("Server Cache: ", self.local_servers_cache) print("Server Cache: ", self.local_servers_cache)
print("Client Cache: ", self.local_clients_cache) print("Client Cache: ", self.local_clients_cache)
except socket.timeout: except socket.timeout:
...@@ -659,7 +723,7 @@ class Server(multiprocessing.Process): ...@@ -659,7 +723,7 @@ class Server(multiprocessing.Process):
def handle_leader_tasks(self): def handle_leader_tasks(self):
# Perform leader-specific tasks here # Perform leader-specific tasks here
print(self.server_address, " is now the leader.") print(self.server_address, " is now the leader.")
self.leader = True self.server_id = "MAIN"
self.stop_threads() self.stop_threads()
self.run() self.run()
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment