From dedabc4999b0c522614894ca0dfb6dd42c8ecc22 Mon Sep 17 00:00:00 2001
From: Katharina <katharina.willig@outlook.com>
Date: Tue, 10 Dec 2024 22:14:20 +0100
Subject: [PATCH] add uuid to messages
---
kclient.py | 16 ++++---
server.py | 123 ++++++++++++++++++++++++++++++++---------------------
2 files changed, 84 insertions(+), 55 deletions(-)
diff --git a/kclient.py b/kclient.py
index 90f2428..3650707 100644
--- a/kclient.py
+++ b/kclient.py
@@ -4,7 +4,7 @@ import time
import uuid
broadcast_ip = '255.255.255.255'#change ip??? #hard coded?
-broadcast_port = 55559
+broadcast_port = 55555
#local host information
MY_HOST = socket.gethostname()
@@ -21,7 +21,7 @@ client_socket.bind(('', broadcast_port)) # listen on broadcast #socket is bind
listener_ready = threading.Event()
#listen for server?
-def listen():
+def listen_server():
"""receives messages from server."""
#listener_ready.set() #makes sure that listener is ready
while True:
@@ -30,11 +30,12 @@ def listen():
decoded_message = data.decode()
#print(f"Received {data.decode()} from {address}") #debug
#ignores broadcast messages with own ip #################please enable it after testing!!!!!!!!!!!!!!!!!!!S
+
#does not work
- #if address[0]==MY_IP:
- # continue
+ if address[0]==MY_IP:
+ continue
- # print("this decoded msg", decoded_message) #debug
+ #print("this decoded msg", decoded_message) #debug
except socket.error as e:
print(f"An error occurred: {e}")
@@ -52,16 +53,17 @@ def sender():
message_id = str(uuid.uuid4())
#allows the client to send any message
message = input("")
-
+ #print("send message to", broadcast_port)
#checks for whitemarks
if message.strip():
full_message = f"{message_id}: {nickname}: {message}".encode()
client_socket.sendto(full_message, (broadcast_ip, broadcast_port))
+ #print("message sended to", broadcast_port)
###############main#################################
if __name__ == "__main__":
# Start listener thread
- listen_thread = threading.Thread(target=listen)
+ listen_thread = threading.Thread(target=listen_server)
listen_thread.daemon = True
listen_thread.start()
#listener_ready.wait()
diff --git a/server.py b/server.py
index c3e3253..8e23e62 100644
--- a/server.py
+++ b/server.py
@@ -49,55 +49,71 @@ def listen_broadcast_election(queue, shared_data):
"""Receives messages and updates shared state."""
print(f"Listener from {shared_data['server_id']} is listening...")
global active_servers
+ server_socket.setblocking(False) # Set socket to non-blocking mode
#global sender_uuid
while True:
try:
- message, client_address = election_socket.recvfrom(4096)
- decoded_message = message.decode()
+ message, sender_address = election_socket.recvfrom(4096)
+ decoded_sender_message = message.decode()
- print(f"Received from {client_address}: {decoded_message}")
+ print(f"Received from {sender_address}: {decoded_sender_message}")
# Handle heartbeat messages
- if "HEARTBEAT" in decoded_message:
- shared_data['server_id'] = decoded_message.split(": ")[1]
+ if "HEARTBEAT" in decoded_sender_message:
+ shared_data['sender_uuid'] = decoded_sender_message.split(": ")[1]
#print("sender_uuid", sender_uuid) #debug
- active_servers[shared_data['server_id']] = time.time() #update active server dictionary
+ active_servers[shared_data['sender_uuid']] = time.time() #update active server dictionary
#send to neighbour
continue
# Put message in the queue for election process
- queue.put(decoded_message)
+ queue.put(decoded_sender_message)
except socket.error as e:
print(f"Socket error occurred: {e}")
break
+ time.sleep(0.5) # Small delay to prevent busy waiting
#listener function
##################TypeError: listen_client() takes 0 positional arguments but 2 were given############
-def listen_client():
+def listen_client(shared_data):
"""receives messages from clients and broadcast them."""
- manager=Manager()
- save_last_messages = manager.dict()
- message_queue = multiprocessing.Queue(maxsize=3)
+ #manager=Manager()
+
+ #global received_messages
+ received_messages = set()
+
+ #save_last_messages = manager.dict()
+ #message_queue = multiprocessing.Queue(maxsize=3)
+
#global save_last_messages
+ #only listen if current leader
+ #if shared_data['current_leader'] == shared_data['server_id']:
while True:
try:
message, client_address = server_socket.recvfrom(4096)
decoded_message = message.decode()
#create a dictionary for message uuid
- message_uuid = decoded_message.split(": ")[0]
- print("message id:", message_uuid) #debug
+ # message_uuid = decoded_message.split(": ")[0]
+ #print("message id:", message_uuid) #debug
+
+ # Extract message UUID and check if already processed
+ message_id = decoded_message.split(":")[0]
+ if message_id in received_messages:
+ continue
+ received_messages.add(message_id)
# Enqueue messages and maintain a fixed size
- if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)):
+ """if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)):
if message_queue.full():
message_queue.get() # Remove oldest message
message_queue.put(decoded_message)
# Print all messages
print("here are your last messages:", list(message_queue.queue)) # Debug
-
+ """
+
"""
if message_uuid not in save_last_messages:
if len(save_last_messages) <= 3:
@@ -108,7 +124,7 @@ def listen_client():
#save_last_messages.update(f"last message: {message}: {message_uuid}")
print("here are your last messages:", dict(save_last_messages)) #debug
"""
-
+
#broadcast_election(save_last_messages) #send to neighbour
# checks wheter the message comes from the server itself
@@ -142,33 +158,36 @@ def start_election(queue, shared_data):
#global sender_uuid
#current_leader = None
print("Starting election...")
- broadcast_election(f"START_ELECTION: {shared_data['server_id']}")
+ broadcast_election(f"START_ELECTION: {shared_data['server_id']}") #every server which broadcasts his id is going to be part of the election
timeout = time.time() + 20 # 20-second timeout, waiting for other servers
- highest_id = shared_data['server_id']
+ highest_id = shared_data['server_id'] #highest id is own id
+ leader= False
#while current time is smaller than current time plus 20s:
while time.time() < timeout:
try:
message = queue.get(timeout=1) #waits 1 sec for a message
- if shared_data['server_id']==shared_data['server_id']:
+ if shared_data['server_id']==shared_data['sender_uuid']: #wtf??????????????
continue
if "START_ELECTION" in message:
- shared_data['server_id'] = message.split(": ")[1]
- active_servers[shared_data['server_id']] = time.time()
- print(f"Received UUID for election: {shared_data['server_id']}")
+ shared_data['sender_uuid'] = message.split(": ")[1]
+ active_servers[shared_data['sender_uuid']] = time.time()
+ print(f"Received UUID for election: {shared_data['sender_uuid']}")
################dumblogic###onlyworkswith21erver################################
- if shared_data['server_id'] > highest_id:
- highest_id = shared_data['server_id']
- print(f"Received higher ID {shared_data['server_id']}, forwarding...")
- broadcast_election(f"START_ELECTION: {shared_data['server_id']}")
-
- elif shared_data['server_id'] == highest_id:
- shared_data['current_leader'] = shared_data['server_id']
- print(f"(sender) Leader elected: {shared_data['current_leader']}")
- broadcast_election(f"LEADER: {shared_data['current_leader']}")
-
+ if leader == False:
+ if highest_id>shared_data['sender_uuid']:
+ leader == True
+ elif highest_id<shared_data['sender_uuid']:
+ highest_id=shared_data['sender_uuid']
+ leader == True
+
+ if leader == True:
+ highest_id=shared_data['current_leader']
+ print(f"(sender) Leader elected: {shared_data['current_leader']}")
+ broadcast_election(f"LEADER: {shared_data['current_leader']}")
+
else:
shared_data['current_leader'] = shared_data['server_id']
print(f"(official, its me) Leader elected: {shared_data['current_leader']}")
@@ -211,20 +230,24 @@ def start_election(queue, shared_data):
# Heartbeat function
def send_heartbeat(shared_data):
"""Sends heartbeat messages to keep the server active."""
- time.sleep(30) #waiting for leader election #set to 20s
- while True:
- broadcast_election(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election
- time.sleep(5)
+ #only if you are the current leader
+ if shared_data['current_leader'] == shared_data['server_id']:
+ time.sleep(30) #waiting for leader election #set to 20s
+ while True:
+ broadcast_election(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election
+ time.sleep(5)
# Monitor heartbeats
def monitor_heartbeats():
"""Checks active servers based on heartbeat messages."""
- global active_servers
- while True:
- time.sleep(6)
- now = time.time()
- active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6}
- print(f"Active servers: {list(active_servers.keys())}")
+ #only if you are the current leader
+ if shared_data['current_leader'] == shared_data['server_id']:
+ global active_servers
+ while True:
+ time.sleep(6)
+ now = time.time()
+ active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6}
+ print(f"Active servers: {list(active_servers.keys())}")
# Main function
@@ -248,18 +271,22 @@ if __name__ == "__main__":
save_last_messages={}
# Create processes
- listener_election_process = multiprocessing.Process(target=listen_broadcast_election, args=(message_queue, shared_data))
- listener_client_process = multiprocessing.Process(target=listen_client, )
+ #listener_election_process = multiprocessing.Process(target=listen_broadcast_election, args=(message_queue, shared_data))
+ listener_client_process = multiprocessing.Process(target=listen_client, args=(shared_data,))
election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data))
# Heartbeat threads
heartbeat_thread = threading.Thread(target=send_heartbeat, args=(shared_data,), daemon=True)
heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True)
+ # Start the listener for clients in a thread
+ listener_election_thread = threading.Thread(target=listen_broadcast_election, args=(message_queue, shared_data), daemon=True)
+
# Start processes and threads
- listener_election_process.start()
+ #listener_election_process.start()
listener_client_process.start()
-
+
+ listener_election_thread.start()
election_process.start()
heartbeat_thread.start()
heartbeat_monitor_thread.start()
@@ -268,12 +295,12 @@ if __name__ == "__main__":
#heartbeat_monitor_thread.join()
try:
- listener_election_process.join()
+ #listener_election_process.join()
listener_client_process.join()
election_process.join()
except KeyboardInterrupt:
print("\nShutting down server...")
- listener_election_process.terminate()
+ #listener_election_process.terminate()
listener_client_process.terminate()
election_process.terminate()
--
GitLab