From 0f360c068d7a5ed424c89c129668e6f672426c5e Mon Sep 17 00:00:00 2001 From: Alexander Schulz <alexander.schulz@student.reutlingen-university.de> Date: Sun, 21 Jan 2024 21:57:12 +0100 Subject: [PATCH] changed server/client status design --- client.py | 21 ++++++++++---------- server.py | 58 +++++++++++++++++++++++++++++++------------------------ 2 files changed, 44 insertions(+), 35 deletions(-) diff --git a/client.py b/client.py index 5421f94..47858ce 100644 --- a/client.py +++ b/client.py @@ -13,10 +13,10 @@ class Client(multiprocessing.Process): self.run() def run(self): - print("Client up and running") - action = input("Enter type of action ('r' to register): ") + print("Client: Up and running") + action = input("Client: Enter type of action ('r' to register): ") if (action == "r"): - group = input("Which group dou you want to join? ") + group = input("Client: Which group dou you want to join? ") self.register("register", group) send_thread = threading.Thread(target=self.send_message) @@ -32,7 +32,7 @@ class Client(multiprocessing.Process): receive_thread.join() else: - message = input("Enter message:") + message = input("Client: Enter message:") print(message) def register(self, message_type, message_group): @@ -47,7 +47,7 @@ class Client(multiprocessing.Process): data, server = broadcast_socket.recvfrom(1024) - print('Received message from server: ', data.decode('utf-8')) + print('Client: Received message from server: ', data.decode('utf-8')) # search for server ip_address in message from server ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b' @@ -57,7 +57,7 @@ class Client(multiprocessing.Process): self.registered_server_address = matches[1] #self.registered_server = server - print("My server: ", self.registered_server_address) + print("Client: My server: ", self.registered_server_address) broadcast_socket.close() @@ -81,12 +81,13 @@ class Client(multiprocessing.Process): client_receive_message_socket.bind((self.client_address, PORT)) client_receive_message_socket.listen() - print("Listening for groupchat messages") + print("Client: Listening for groupchat messages") while True: connection, addr = client_receive_message_socket.accept() message = connection.recv(1024) - print(f"GC message: {message.decode('utf-8')}") + #print(f"GC message: {message.decode('utf-8')}") + print(message.decode('utf-8')) def receive_new_server(self): PORT = 52000 @@ -95,10 +96,10 @@ class Client(multiprocessing.Process): client_receive_message_socket.bind((self.client_address, PORT)) client_receive_message_socket.listen() - print("Listening for server address update messages") + print("Client: Listening for server address update messages") while True: connection, addr = client_receive_message_socket.accept() message = connection.recv(1024) - print(f"New server: {message.decode('utf-8')}") + print(f"Client: New server: {message.decode('utf-8')}") self.registered_server_address = message.decode('utf-8') diff --git a/server.py b/server.py index b8e7724..f454c59 100644 --- a/server.py +++ b/server.py @@ -36,7 +36,7 @@ class Server(multiprocessing.Process): def __init__(self): super(Server, self).__init__() self.os = self.get_os_type() - print(self.os) + print("Server running on OS: ", self.os) self.active_interface = self.get_active_interface() self.server_address = self.get_local_ip_address() self.subnet_mask = self.get_subnet_mask(self.active_interface) @@ -165,7 +165,7 @@ class Server(multiprocessing.Process): if server_response: match = re.search(r'\b([A-Za-z])\b$', message.decode('utf-8')) self.server_id = match.group(1) - print('Received message from server: ', message.decode('utf-8')) + print('Received message from MAIN server: ', message.decode('utf-8')) received_response = True self.run_funcs() break @@ -178,7 +178,7 @@ class Server(multiprocessing.Process): self.run_funcs() def run_funcs(self): - print(self.server_id+": "+"Up and running") + #print(self.server_id+": "+"Up and running") if self.server_id == "MAIN": client_listener_thread = threading.Thread(target=self.listen_for_clients) client_listener_thread.start() @@ -222,8 +222,8 @@ class Server(multiprocessing.Process): return broadcast_address def send_heartbeat(self): - print("Heartbeat Sending started") - print("Local Server Cache:", self.local_servers_cache) + print(self.server_id+": "+"Heartbeat Sending started") + #print("Local Server Cache:", self.local_servers_cache) while True: time.sleep(10) failed_group_server = [] @@ -234,7 +234,7 @@ class Server(multiprocessing.Process): acknowledgment_received = self.send_heartbeat_to_server(server_address[0], server_heartbeat_tcp_listener_port) #acknowledgment_received = "YES" if acknowledgment_received: - print(f"Heartbeat acknowledgment received from {server_id}") + print(self.server_id+": "+"Heartbeat acknowledgment received from "+server_id) break else: count = count + 1 @@ -256,7 +256,8 @@ class Server(multiprocessing.Process): s.settimeout(2) # Timeout for the connection # Combine server address and port into a tuple server_address_with_port = (server_address, server_port) - print("Send Heartbeat to: ", server_address_with_port) + #print(self.server_id+": "+"Send Heartbeat to: "+str(server_address_with_port[0])+":"+str(server_address_with_port[1])) + print(self.server_id+": "+"Send Heartbeat to: "+str(server_address_with_port)) s.connect(server_address_with_port) s.sendall(b'HEARTBEAT') acknowledgment = s.recv(1024) @@ -267,6 +268,7 @@ class Server(multiprocessing.Process): return acknowledgment_received def listen_for_heartbeats(self): + print(self.server_id+": "+"Heartbeat listener started") while self.keep_running_nonLeader == True: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -275,14 +277,14 @@ class Server(multiprocessing.Process): s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) s.bind((self.server_address, server_heartbeat_tcp_listener_port)) actual_port = s.getsockname()[1] - print(f"Heartbeat Listener Started on port {actual_port}") + #print(self.server_id+": "+"Heartbeat Listener Started on port "+str(actual_port)) s.listen() conn, addr = s.accept() with conn: data = conn.recv(1024) if data == b'HEARTBEAT': # Handle the received heartbeat - print(f"Heartbeat received from {addr}") + print(self.server_id+": "+"Heartbeat received from "+str(addr)) # Update the timestamp of the last received heartbeat self.last_heartbeat_timestamp = time.time() # Send an acknowledgment @@ -348,9 +350,9 @@ class Server(multiprocessing.Process): # Define the timeout period (15 seconds) timeout_duration = 15 if current_time - self.last_heartbeat_timestamp >= timeout_duration: - print(f"No heartbeats received for {timeout_duration} seconds. Initiating LCR...") + print(self.server_id+": "+"No heartbeats received for "+str(timeout_duration)+" seconds. Initiating LCR...") # Call a function to initiate the LCR algorithm here - print("local server cache:", self.local_servers_cache) + print("Server cache:", self.local_servers_cache) Server.local_servers_cache = self.local_servers_cache self.start_leader_election() @@ -393,7 +395,7 @@ class Server(multiprocessing.Process): #new_server_id = last_server_id + 1 self.local_servers_cache[new_server_id] = addr self.local_group_cache[new_server_id] = new_server_id - print("GroupCache: ", self.local_group_cache) + #print("GroupCache: ", self.local_group_cache) print(self.server_id+": "+"Received server register broadcast message:", message) @@ -412,7 +414,6 @@ class Server(multiprocessing.Process): server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.connect((addr[0], addr[1])) server_socket.sendto(str.encode(message), addr) - print(server_socket) server_socket.close() def listen_for_clients(self): @@ -541,7 +542,6 @@ class Server(multiprocessing.Process): def listen_for_cache_update(self): BROADCAST_ADDRESS = self.get_broadcast_address() BROADCAST_PORT = 5980 - print("listen for cache update") # Local host information # MY_HOST = socket.gethostname() @@ -574,7 +574,7 @@ class Server(multiprocessing.Process): self.local_servers_cache = server_cache_json self.local_clients_cache = client_cache_json self.local_group_cache = group_cache_json - print("Group Cache: ", self.local_group_cache) + print("Group Cache: ", self.local_group_cache) print("Server Cache: ", self.local_servers_cache) print("Client Cache: ", self.local_clients_cache) except socket.timeout: @@ -593,11 +593,10 @@ class Server(multiprocessing.Process): while True: connection, addr = server_socket.accept() - print(addr) - print(f"Connection established with {addr}") message = connection.recv(1024) - print(f"Received message from client: {message.decode('utf-8')}") + + print(self.server_id+": "+"Received message from client: "+message.decode('utf-8')) #response = "Hello, client! I received your message." #connection.sendall(bytes(response, 'utf-8')) @@ -616,21 +615,27 @@ class Server(multiprocessing.Process): if group in key: if addr[0] != self.local_clients_cache[key][0]: receiver_list.append(self.local_clients_cache[key][0]) - print("Receiver list", receiver_list) + print(self.server_id+": "+"Group receiver list "+str(receiver_list)) + elif addr[0] == self.local_clients_cache[key][0]: + sender = key - distribute_chat_thread = threading.Thread(target=self.send_chat_message_to_clients(message, receiver_list)) + distribute_chat_thread = threading.Thread(target=self.send_chat_message_to_clients(message, receiver_list, sender)) distribute_chat_thread.start() # distribute the received client chat message to all members of the group - def send_chat_message_to_clients(self, message, receiver_list): + def send_chat_message_to_clients(self, message, receiver_list, sender): PORT = 51000 + decoded_message = message.decode('utf-8') + new_message = sender + ": " + decoded_message + encoded_message = new_message.encode('utf-8') + for client in receiver_list: try: server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.connect((client, PORT)) - server_socket.sendall(message) + server_socket.sendall(encoded_message) server_socket.close() except (ConnectionRefusedError, TimeoutError): print(f'Unable to send to {client}') @@ -719,7 +724,7 @@ class Server(multiprocessing.Process): #ring_socket.bind((self.server_address, leader_election_port)) # Bind to the server's IP and leader election port while self.keep_running_nonLeader == True: # Receive election messages from neighbors. - print(self.ring_socket) + #print(self.ring_socket) data, address = self.ring_socket.recvfrom(4096) #Reset last heartbeat timestamp self.last_heartbeat_timestamp = None @@ -760,18 +765,21 @@ class Server(multiprocessing.Process): victory_message = {"mid": self.server_uuid, "isLeader": True} neighbor_info = self.get_neighbour('right') self.ring_socket.sendto(json.dumps(victory_message).encode(), neighbor_info['server_address']) - print(self.ring_socket) + #print(self.ring_socket) #if neighbor_info: # self.send_election_message(neighbor_info['server_address'], self.server_uuid, True) def handle_leader_tasks(self): # Perform leader-specific tasks here - print(self.server_address, " is now the leader.") + print(self.server_id+": "+"++++++++++++++++++++++++++++++++++++") + print(self.server_id+": "+str(self.server_address)+" is now the leader") + print(self.server_id+": "+"++++++++++++++++++++++++++++++++++++") print("Server Cache:", self.local_servers_cache) del self.local_servers_cache[self.server_id] print("Server Cache:", self.local_servers_cache) old_server_id = self.server_id self.server_id = "MAIN" + print(self.server_id+": "+"Server ID was changed from: "+str(old_server_id)+" to "+str(self.server_id)) # check if new MAIN server was leader of any groupchats and reassign these to serverID MAIN self.reassign_chat_groups(old_server_id) -- GitLab