Skip to content
Snippets Groups Projects
Commit 1c4b6802 authored by Katharina's avatar Katharina
Browse files

exercise done

parent 5083fa3d
No related branches found
No related tags found
No related merge requests found
##########unsicher, ob heartbeat funktioniert################ ##########save content of the received message############does only saves the message id atm#########
###########deutsch englisch korrigieren###################
################liste mit message ids auf 3-5 setzen########
############active servers dic shared data################
import socket import socket
import multiprocessing import multiprocessing
...@@ -10,6 +7,7 @@ import time ...@@ -10,6 +7,7 @@ import time
import threading import threading
import os import os
from multiprocessing import Manager from multiprocessing import Manager
from collections import deque
broadcast_ip = '255.255.255.255' broadcast_ip = '255.255.255.255'
broadcast_port = 55555 broadcast_port = 55555
...@@ -27,7 +25,6 @@ election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) ...@@ -27,7 +25,6 @@ election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
election_socket.bind(('', election_port)) election_socket.bind(('', election_port))
#does not work, use shared data??
#global leader_election_done #global leader_election_done
leader_election_done = threading.Event() leader_election_done = threading.Event()
active_servers={} active_servers={}
...@@ -51,7 +48,6 @@ def broadcast_election(message): ...@@ -51,7 +48,6 @@ def broadcast_election(message):
def start_election(queue, shared_data): def start_election(queue, shared_data):
"""Starts leader election based on received messages.""" """Starts leader election based on received messages."""
global active_servers global active_servers
#global leader_election_done
print("Starting election...") print("Starting election...")
broadcast_election(f"START_ELECTION: {shared_data['server_id']}") #every server which broadcasts his id is going to be part of the election broadcast_election(f"START_ELECTION: {shared_data['server_id']}") #every server which broadcasts his id is going to be part of the election
...@@ -116,20 +112,11 @@ def listen_broadcast_election(queue, shared_data): ...@@ -116,20 +112,11 @@ def listen_broadcast_election(queue, shared_data):
print(f"Received from {sender_address}: {decoded_sender_message}") print(f"Received from {sender_address}: {decoded_sender_message}")
leader_election_done.set() leader_election_done.set()
# Handle heartbeat messages
#if "HEARTBEAT" in decoded_sender_message:
# shared_data['sender_uuid'] = decoded_sender_message.split(": ")[1]
#print("sender_uuid", sender_uuid) #debug
# active_servers[shared_data['sender_uuid']] = time.time() #update active server dictionary
#send to neighbour######################
# continue
if "HEARTBEAT" in decoded_sender_message: if "HEARTBEAT" in decoded_sender_message:
sender_uuid = decoded_sender_message.split(": ")[1] sender_uuid = decoded_sender_message.split(": ")[1]
active_servers[sender_uuid] = time.time() active_servers[sender_uuid] = time.time()
continue continue
# Put message in the queue for election process # Put message in the queue for election process
queue.put(decoded_sender_message) queue.put(decoded_sender_message)
...@@ -143,18 +130,9 @@ def listen_client(shared_data): ...@@ -143,18 +130,9 @@ def listen_client(shared_data):
"""receives messages from clients and broadcast them.""" """receives messages from clients and broadcast them."""
#global received_messages #global received_messages
received_messages = set() received_message_uuid = set()
MAX_MESSAGES = 3
#save_last_messages = manager.dict() message_queue = deque(maxlen=MAX_MESSAGES)
#message_queue = multiprocessing.Queue(maxsize=3)
#global save_last_messages
#only listen if current leader
###################NameError: name 'leader_election_done' is not defined ############
#global leader_election_done
#leader_election_done.wait() # blocks until the leader election is done
print("current leader for listening is:", shared_data['current_leader']) #debug print("current leader for listening is:", shared_data['current_leader']) #debug
...@@ -166,11 +144,21 @@ def listen_client(shared_data): ...@@ -166,11 +144,21 @@ def listen_client(shared_data):
# Extract message UUID and check if already processed # Extract message UUID and check if already processed
message_id = decoded_message.split(":")[0] message_id = decoded_message.split(":")[0]
if message_id in received_messages: if message_id in received_message_uuid:
continue continue
received_messages.add(message_id)
#broadcast_election(save_last_messages) #send to neighbour # mark message as proccessed
message_queue.append(message_id)
received_message_uuid.add(message_id)
# Entferne die älteste Nachricht aus dem Set, wenn die Queue sie entfernt
if len(message_queue) == MAX_MESSAGES:
removed_id = message_queue.popleft()
received_message_uuid.remove(removed_id)
print("erhaltene msg uuid:", received_message_uuid)
#broadcast_election(sreceived_message_uuid) #send to neighbour
# checks wheter the message comes from the server itself # checks wheter the message comes from the server itself
if decoded_message.startswith(f"{shared_data['server_id']}"): if decoded_message.startswith(f"{shared_data['server_id']}"):
...@@ -223,7 +211,7 @@ def monitor_heartbeats(): ...@@ -223,7 +211,7 @@ def monitor_heartbeats():
print(f"Active servers: {list(active_servers.keys())}") print(f"Active servers: {list(active_servers.keys())}")
# Main function #**************************************Main function*************************************************
if __name__ == "__main__": if __name__ == "__main__":
multiprocessing.freeze_support() multiprocessing.freeze_support()
......
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
# message_uuid = decoded_message.split(": ")[0] # message_uuid = decoded_message.split(": ")[0]
#print("message id:", message_uuid) #debug #print("message id:", message_uuid) #debug
# Enqueue messages and maintain a fixed size # Enqueue messages and maintain a fixed size
"""if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)): """if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)):
if message_queue.full(): if message_queue.full():
...@@ -57,3 +57,12 @@ ...@@ -57,3 +57,12 @@
#save_last_messages.update(f"last message: {message}: {message_uuid}") #save_last_messages.update(f"last message: {message}: {message_uuid}")
print("here are your last messages:", dict(save_last_messages)) #debug print("here are your last messages:", dict(save_last_messages)) #debug
""" """
# Handle heartbeat messages
#if "HEARTBEAT" in decoded_sender_message:
# shared_data['sender_uuid'] = decoded_sender_message.split(": ")[1]
#print("sender_uuid", sender_uuid) #debug
# active_servers[shared_data['sender_uuid']] = time.time() #update active server dictionary
#send to neighbour######################
# continue
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment