diff --git a/server.py b/server.py index 7a2eb5e95d18ac5df93693dc22e90a76bc689df6..9bf30029c7e4b10c57122b238166bf274d195721 100644 --- a/server.py +++ b/server.py @@ -19,6 +19,9 @@ CLIENT_MULTICAST_PORT = 5973 # Listening port ring RING_PORT = 5972 +# Listening port hearthbeat +HEARTBEAT_PORT = 5971 + # Local host information MY_HOST = socket.gethostname() MY_IP = socket.gethostbyname(MY_HOST) @@ -30,7 +33,7 @@ class Server(): self.informServer = False self.serverList = [] # list if servers and their addresses self.leader_uuid = '' # fix the leader IP - self.isLeader = False # New variable to track if the server is the leader + self.is_leader = False # New variable to track if the server is the leader self.uuid = str(uuid.uuid4()) self.participant = False @@ -66,7 +69,7 @@ class Server(): if data: newServer_address = data.decode() self.printwt(f'New participant wants to connect: {newServer_address}') - self.isLeader = False + self.is_leader = False # if the decoded address is not in the server list add it and print the list @@ -141,8 +144,8 @@ class Server(): if num_responses == 1: multicast_send_sock.close() - self.isLeader = True - self.leader_uuid = self.uuid # Hier wird die uuid-Adresse des Leaders zugewiesens + # self.is_leader = True + # self.leader_uuid = self.uuid # Hier wird die uuid-Adresse des Leaders zugewiesens time.sleep(1) @@ -219,7 +222,7 @@ class Server(): first_message = { "mid": self.uuid, - "isLeader": False + "is_leader": False } self.participant = True print('sending first leader election message to neighbour') @@ -235,7 +238,7 @@ class Server(): print('There is a election message:') print(election_message) - if election_message.get('isLeader') and self.participant: + if election_message.get('is_leader') and self.participant: print('leader info weitergeben') self.leader_uuid = election_message['mid'] # forward received election message to left neighbour @@ -246,7 +249,7 @@ class Server(): print('mich vorschlagen') new_election_message = { "mid": self.uuid, - "isLeader": False + "is_leader": False } self.participant = True # send received election message to left neighbour @@ -258,17 +261,17 @@ class Server(): ring_socket.sendto(json.dumps(election_message).encode('utf-8'), neighbour_address) elif election_message.get('mid') == self.uuid and self.participant: print('Ich wurde als Leader definiert') - self.leader_uid = self.uuid - self.isLeader = True + self.leader_uuid = self.uuid + self.is_leader = True new_election_message = { "mid": self.uuid, - "isLeader": True + "is_leader": True } # send new election message to left neighbour self.participant = False ring_socket.sendto(json.dumps(new_election_message).encode('utf-8'), neighbour_address) print(f'I am Leader {self.leader_uuid}') - elif election_message.get('isLeader') and not self.participant: + elif election_message.get('is_leader') and not self.participant: print('Leader ist gewählt, Nachricht wurde weiteregeben, ELECTION beenden') except Exception as e: print(f"An error occurred: {e}") @@ -277,33 +280,56 @@ class Server(): def init_heartbeat(self): self.leader_heartbeat_last_received = time.time() - self.is_leader = False self.heartbeat_interval = 5 # seconds self.missed_heartbeats_limit = 5 self.missed_heartbeats = 0 - threading.Thread(target=self.heartbeat_monitor).start() - if self.is_leader: - threading.Thread(target=self.emit_heartbeat).start() - def emit_heartbeat(self): - while self.is_leader: - # Code to sends a heartbeat message - print("Heartbeat emitted by leader") - time.sleep(self.heartbeat_interval) + # multicast_group = (MULTICAST_GROUP_IP, HEARTBEAT_PORT) + heartbeat_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + # Set the time-to-live for messages to 1 so they do not go past the local network segment. + # heartbeat_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) + heartbeat_socket.bind(('', HEARTBEAT_PORT)) + + multicast_group = socket.inet_aton(MULTICAST_GROUP_IP) + mreg = struct.pack('4sL', multicast_group, socket.INADDR_ANY) + heartbeat_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreg) + + threading.Thread(target=self.heartbeat_subscriber, args=(heartbeat_socket, multicast_group)).start() + threading.Thread(target=self.heartbeat_publisher, args=(heartbeat_socket, multicast_group)).start() - def heartbeat_monitor(self): + def heartbeat_publisher(self, heartbeat_socket, multicast_group): while True: - time_since_last_heartbeat = time.time() - self.leader_heartbeat_last_received - if time_since_last_heartbeat > self.heartbeat_interval: - self.missed_heartbeats += 1 - print(f"Missed heartbeat detected. Count: {self.missed_heartbeats}") - if self.missed_heartbeats >= self.missed_heartbeats_limit: - print("Missed heartbeats limit reached. Initiating leader election.") - # Reset missed heartbeats count - self.missed_heartbeats = 0 - # Code to initiate a new voting process to elect a new leader - # This could involve calling a function from voting.py or similar logic - time.sleep(self.heartbeat_interval) + if self.is_leader: + print('\n I am Leader, sending heartbeat\n') + # Code to sends a heartbeat message + msg = '#' + heartbeat_socket.sendto(msg.encode(), (MULTICAST_GROUP_IP, HEARTBEAT_PORT)) + time.sleep(self.heartbeat_interval) + + def heartbeat_subscriber(self, heartbeat_socket, multicast_group): + while True: + if not self.is_leader: + print('\nWaiting to receive heartbeat...\n') + data, address = heartbeat_socket.recvfrom(1024) + if data: + print(data.decode()) + heartbeat_socket.sendto('#'.encode(), address) + + + + + # time_since_last_heartbeat = time.time() - self.leader_heartbeat_last_received + # if time_since_last_heartbeat > self.heartbeat_interval: + # self.missed_heartbeats += 1 + # print(f"Missed heartbeat detected. Count: {self.missed_heartbeats}") + # if self.missed_heartbeats >= self.missed_heartbeats_limit: + # print("Missed heartbeats limit reached. Initiating leader election.") + # # Reset missed heartbeats count + # self.missed_heartbeats = 0 + # # Code to initiate a new voting process to elect a new leader + # # This could involve calling a function from voting.py or similar logic + # time.sleep(self.heartbeat_interval) # starting all simultaneously working procedures if __name__== '__main__': @@ -320,6 +346,9 @@ if __name__== '__main__': thread_election = threading.Thread(target = server.basic_lcr) thread_election.start() + + thread_heartbeat = threading.Thread(target = server.init_heartbeat) + thread_heartbeat.start() # Socket erstellen und binden server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)