From 77373cfb2ffcfe8bd5e9a409d4ecb281092406c2 Mon Sep 17 00:00:00 2001 From: Quoc Dao <quoc.dao@student.reutlingen-university.de> Date: Sun, 4 Feb 2024 23:01:09 +0100 Subject: [PATCH] back to functioning code --- client.py | 53 +---------------------------------------------- server.py | 61 ++++++++++++++++++++----------------------------------- 2 files changed, 23 insertions(+), 91 deletions(-) diff --git a/client.py b/client.py index 9ddf7a8..4f1cae8 100644 --- a/client.py +++ b/client.py @@ -1,8 +1,6 @@ import socket import threading from datetime import datetime -import time -import struct MULTICAST_GROUP_IP = '224.1.1.1' @@ -66,50 +64,6 @@ class Client(): except socket.timeout: pass - except ConnectionResetError: - # Verbindung zum Leader wurde zurückgesetzt - print("Connection to the leader server was reset. Attempting to reconnect...") - self.server_socket.close() # Schließe die vorhandene Verbindung - time.sleep(1) # Warte eine Sekunde, bevor eine neue Verbindung hergestellt wird - self.connect_to_leader() # Methode aufrufen, um eine Verbindung zum aktuellen Leader herzustellen - - - def connect_to_leader(self): - try: - # Verbindung zum aktuellen Leader herstellen - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server_socket.connect((self.currentLeader, 5555)) - print("Reconnected to the leader server.") - except Exception as e: - print(f"Failed to reconnect to the leader server: {str(e)}") - - - def listen_for_new_leader(self): - # Socket zum Empfangen von Multicast-Nachrichten erstellen - multicast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) - multicast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - multicast_socket.bind(('', CLIENT_MULTICAST_PORT)) - - # Multicast-Gruppe beitreten - group = socket.inet_aton(MULTICAST_GROUP_IP) - mreq = struct.pack('4sL', group, socket.INADDR_ANY) - multicast_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) - - while True: - try: - # Nachrichten von potenziellen neuen Leadern empfangen - data, address = multicast_socket.recvfrom(1024) - if data: - new_leader_address = data.decode() - if new_leader_address != self.currentLeader: - print(f"New leader detected: {new_leader_address}") - self.currentLeader = new_leader_address - # Verbindung zum neuen Leader herstellen - self.connect_to_leader() - - except socket.timeout: - pass - def receive_messages(self): while True: @@ -129,10 +83,5 @@ if __name__ == "__main__": client = Client() thread1 = threading.Thread(target = client.MulticastSendAndReceive) - thread2 = threading.Thread(target=client.listen_for_new_leader) - thread1.start() - thread2.start() - - thread1.join() - thread2.join() \ No newline at end of file + thread1.join() \ No newline at end of file diff --git a/server.py b/server.py index dc11804..6e1c1eb 100644 --- a/server.py +++ b/server.py @@ -278,19 +278,6 @@ class Server(): finally: ring_socket.close() - def handle_new_leader(self): - # Funktion, um den neuen Leader zu behandeln und seine IP-Adresse zu senden - multicast_group = (MULTICAST_GROUP_IP, SERVER_MULTICAST_PORT) - multicast_send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - - while True: - # Überprüfe, ob ein neuer Leader gewählt wurde - if self.is_leader: - # Sende die IP-Adresse des Leaders an die Multicast-Gruppe - multicast_send_sock.sendto(self.leader_ip.encode(), multicast_group) - time.sleep(1) # Warte kurz, bevor du die IP-Adresse erneut sendest - - def init_heartbeat(self): self.leader_heartbeat_last_received = time.time() self.heartbeat_interval = 1 # seconds @@ -324,33 +311,32 @@ class Server(): multicast_socket.settimeout(3) while True: - if not self.is_leader: - try: - data, address = multicast_socket.recvfrom(1024) - if data: - self.leader_heartbeat_last_received = time.time() - sender_ip = address[0] - self.leader_ip = sender_ip - if sender_ip != MY_IP: - print(f"Received heartbeat from {sender_ip}: {data.decode()}") + try: + data, address = multicast_socket.recvfrom(1024) + if data: + self.leader_heartbeat_last_received = time.time() + sender_ip = address[0] + self.leader_ip = sender_ip + if sender_ip != MY_IP: + print(f"Received heartbeat from {sender_ip}: {data.decode()}") # Wenn kein Heartbeat empfangen wurde - except socket.timeout: - time_since_last_heartbeat = time.time() - self.leader_heartbeat_last_received - if time_since_last_heartbeat > self.heartbeat_interval: - self.missed_heartbeats += 1 - print(f"Missed heartbeat detected. Count: {self.missed_heartbeats}") - if self.missed_heartbeats >= self.missed_heartbeats_limit: - print("Missed heartbeats limit reached. Initiating leader election.") + except socket.timeout: + time_since_last_heartbeat = time.time() - self.leader_heartbeat_last_received + if time_since_last_heartbeat > self.heartbeat_interval: + self.missed_heartbeats += 1 + print(f"Missed heartbeat detected. Count: {self.missed_heartbeats}") + if self.missed_heartbeats >= self.missed_heartbeats_limit: + print("Missed heartbeats limit reached. Initiating leader election.") - self.serverList.remove(self.leader_ip) - self.basic_lcr() + self.serverList.remove(self.leader_ip) + self.basic_lcr() - # Reset missed heartbeats count - self.missed_heartbeats = 0 - # Code to initiate a new voting process to elect a new leader - # This could involve calling a function from voting.py or similar logic - time.sleep(self.heartbeat_interval) + # Reset missed heartbeats count + self.missed_heartbeats = 0 + # Code to initiate a new voting process to elect a new leader + # This could involve calling a function from voting.py or similar logic + time.sleep(self.heartbeat_interval) # starting all simultaneously working procedures @@ -372,9 +358,6 @@ if __name__== '__main__': thread_heartbeat = threading.Thread(target = server.init_heartbeat) thread_heartbeat.start() - thread_new_leader = threading.Thread(target=server.handle_new_leader) - thread_new_leader.start() - # Socket erstellen und binden server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((MY_IP, 5555)) -- GitLab