diff --git a/kclient.py b/kclient.py index 90f2428953667f311289da9492a1da61350f484c..36507079db3e3c9afd814c8d2c2555a77126e9ff 100644 --- a/kclient.py +++ b/kclient.py @@ -4,7 +4,7 @@ import time import uuid broadcast_ip = '255.255.255.255'#change ip??? #hard coded? -broadcast_port = 55559 +broadcast_port = 55555 #local host information MY_HOST = socket.gethostname() @@ -21,7 +21,7 @@ client_socket.bind(('', broadcast_port)) # listen on broadcast #socket is bind listener_ready = threading.Event() #listen for server? -def listen(): +def listen_server(): """receives messages from server.""" #listener_ready.set() #makes sure that listener is ready while True: @@ -30,11 +30,12 @@ def listen(): decoded_message = data.decode() #print(f"Received {data.decode()} from {address}") #debug #ignores broadcast messages with own ip #################please enable it after testing!!!!!!!!!!!!!!!!!!!S + #does not work - #if address[0]==MY_IP: - # continue + if address[0]==MY_IP: + continue - # print("this decoded msg", decoded_message) #debug + #print("this decoded msg", decoded_message) #debug except socket.error as e: print(f"An error occurred: {e}") @@ -52,16 +53,17 @@ def sender(): message_id = str(uuid.uuid4()) #allows the client to send any message message = input("") - + #print("send message to", broadcast_port) #checks for whitemarks if message.strip(): full_message = f"{message_id}: {nickname}: {message}".encode() client_socket.sendto(full_message, (broadcast_ip, broadcast_port)) + #print("message sended to", broadcast_port) ###############main################################# if __name__ == "__main__": # Start listener thread - listen_thread = threading.Thread(target=listen) + listen_thread = threading.Thread(target=listen_server) listen_thread.daemon = True listen_thread.start() #listener_ready.wait() diff --git a/server.py b/server.py index c3e3253c536f76768046b41de0660e2277d81b4b..8e23e62d8cd9238f48812e0500e5d9a63525ac3e 100644 --- a/server.py +++ b/server.py @@ -49,55 +49,71 @@ def listen_broadcast_election(queue, shared_data): """Receives messages and updates shared state.""" print(f"Listener from {shared_data['server_id']} is listening...") global active_servers + server_socket.setblocking(False) # Set socket to non-blocking mode #global sender_uuid while True: try: - message, client_address = election_socket.recvfrom(4096) - decoded_message = message.decode() + message, sender_address = election_socket.recvfrom(4096) + decoded_sender_message = message.decode() - print(f"Received from {client_address}: {decoded_message}") + print(f"Received from {sender_address}: {decoded_sender_message}") # Handle heartbeat messages - if "HEARTBEAT" in decoded_message: - shared_data['server_id'] = decoded_message.split(": ")[1] + if "HEARTBEAT" in decoded_sender_message: + shared_data['sender_uuid'] = decoded_sender_message.split(": ")[1] #print("sender_uuid", sender_uuid) #debug - active_servers[shared_data['server_id']] = time.time() #update active server dictionary + active_servers[shared_data['sender_uuid']] = time.time() #update active server dictionary #send to neighbour continue # Put message in the queue for election process - queue.put(decoded_message) + queue.put(decoded_sender_message) except socket.error as e: print(f"Socket error occurred: {e}") break + time.sleep(0.5) # Small delay to prevent busy waiting #listener function ##################TypeError: listen_client() takes 0 positional arguments but 2 were given############ -def listen_client(): +def listen_client(shared_data): """receives messages from clients and broadcast them.""" - manager=Manager() - save_last_messages = manager.dict() - message_queue = multiprocessing.Queue(maxsize=3) + #manager=Manager() + + #global received_messages + received_messages = set() + + #save_last_messages = manager.dict() + #message_queue = multiprocessing.Queue(maxsize=3) + #global save_last_messages + #only listen if current leader + #if shared_data['current_leader'] == shared_data['server_id']: while True: try: message, client_address = server_socket.recvfrom(4096) decoded_message = message.decode() #create a dictionary for message uuid - message_uuid = decoded_message.split(": ")[0] - print("message id:", message_uuid) #debug + # message_uuid = decoded_message.split(": ")[0] + #print("message id:", message_uuid) #debug + + # Extract message UUID and check if already processed + message_id = decoded_message.split(":")[0] + if message_id in received_messages: + continue + received_messages.add(message_id) # Enqueue messages and maintain a fixed size - if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)): + """if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)): if message_queue.full(): message_queue.get() # Remove oldest message message_queue.put(decoded_message) # Print all messages print("here are your last messages:", list(message_queue.queue)) # Debug - + """ + """ if message_uuid not in save_last_messages: if len(save_last_messages) <= 3: @@ -108,7 +124,7 @@ def listen_client(): #save_last_messages.update(f"last message: {message}: {message_uuid}") print("here are your last messages:", dict(save_last_messages)) #debug """ - + #broadcast_election(save_last_messages) #send to neighbour # checks wheter the message comes from the server itself @@ -142,33 +158,36 @@ def start_election(queue, shared_data): #global sender_uuid #current_leader = None print("Starting election...") - broadcast_election(f"START_ELECTION: {shared_data['server_id']}") + broadcast_election(f"START_ELECTION: {shared_data['server_id']}") #every server which broadcasts his id is going to be part of the election timeout = time.time() + 20 # 20-second timeout, waiting for other servers - highest_id = shared_data['server_id'] + highest_id = shared_data['server_id'] #highest id is own id + leader= False #while current time is smaller than current time plus 20s: while time.time() < timeout: try: message = queue.get(timeout=1) #waits 1 sec for a message - if shared_data['server_id']==shared_data['server_id']: + if shared_data['server_id']==shared_data['sender_uuid']: #wtf?????????????? continue if "START_ELECTION" in message: - shared_data['server_id'] = message.split(": ")[1] - active_servers[shared_data['server_id']] = time.time() - print(f"Received UUID for election: {shared_data['server_id']}") + shared_data['sender_uuid'] = message.split(": ")[1] + active_servers[shared_data['sender_uuid']] = time.time() + print(f"Received UUID for election: {shared_data['sender_uuid']}") ################dumblogic###onlyworkswith21erver################################ - if shared_data['server_id'] > highest_id: - highest_id = shared_data['server_id'] - print(f"Received higher ID {shared_data['server_id']}, forwarding...") - broadcast_election(f"START_ELECTION: {shared_data['server_id']}") - - elif shared_data['server_id'] == highest_id: - shared_data['current_leader'] = shared_data['server_id'] - print(f"(sender) Leader elected: {shared_data['current_leader']}") - broadcast_election(f"LEADER: {shared_data['current_leader']}") - + if leader == False: + if highest_id>shared_data['sender_uuid']: + leader == True + elif highest_id<shared_data['sender_uuid']: + highest_id=shared_data['sender_uuid'] + leader == True + + if leader == True: + highest_id=shared_data['current_leader'] + print(f"(sender) Leader elected: {shared_data['current_leader']}") + broadcast_election(f"LEADER: {shared_data['current_leader']}") + else: shared_data['current_leader'] = shared_data['server_id'] print(f"(official, its me) Leader elected: {shared_data['current_leader']}") @@ -211,20 +230,24 @@ def start_election(queue, shared_data): # Heartbeat function def send_heartbeat(shared_data): """Sends heartbeat messages to keep the server active.""" - time.sleep(30) #waiting for leader election #set to 20s - while True: - broadcast_election(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election - time.sleep(5) + #only if you are the current leader + if shared_data['current_leader'] == shared_data['server_id']: + time.sleep(30) #waiting for leader election #set to 20s + while True: + broadcast_election(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election + time.sleep(5) # Monitor heartbeats def monitor_heartbeats(): """Checks active servers based on heartbeat messages.""" - global active_servers - while True: - time.sleep(6) - now = time.time() - active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6} - print(f"Active servers: {list(active_servers.keys())}") + #only if you are the current leader + if shared_data['current_leader'] == shared_data['server_id']: + global active_servers + while True: + time.sleep(6) + now = time.time() + active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6} + print(f"Active servers: {list(active_servers.keys())}") # Main function @@ -248,18 +271,22 @@ if __name__ == "__main__": save_last_messages={} # Create processes - listener_election_process = multiprocessing.Process(target=listen_broadcast_election, args=(message_queue, shared_data)) - listener_client_process = multiprocessing.Process(target=listen_client, ) + #listener_election_process = multiprocessing.Process(target=listen_broadcast_election, args=(message_queue, shared_data)) + listener_client_process = multiprocessing.Process(target=listen_client, args=(shared_data,)) election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data)) # Heartbeat threads heartbeat_thread = threading.Thread(target=send_heartbeat, args=(shared_data,), daemon=True) heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True) + # Start the listener for clients in a thread + listener_election_thread = threading.Thread(target=listen_broadcast_election, args=(message_queue, shared_data), daemon=True) + # Start processes and threads - listener_election_process.start() + #listener_election_process.start() listener_client_process.start() - + + listener_election_thread.start() election_process.start() heartbeat_thread.start() heartbeat_monitor_thread.start() @@ -268,12 +295,12 @@ if __name__ == "__main__": #heartbeat_monitor_thread.join() try: - listener_election_process.join() + #listener_election_process.join() listener_client_process.join() election_process.join() except KeyboardInterrupt: print("\nShutting down server...") - listener_election_process.terminate() + #listener_election_process.terminate() listener_client_process.terminate() election_process.terminate()