diff --git a/__pycache__/voting.cpython-311.pyc b/__pycache__/voting.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1cb1d5643b195e4afeb653d927acc31a9e23d574 Binary files /dev/null and b/__pycache__/voting.cpython-311.pyc differ diff --git a/__pycache__/voting.cpython-312.pyc b/__pycache__/voting.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..08e29fc07a41cba4e1a12db4a9d952b7d7fc9226 Binary files /dev/null and b/__pycache__/voting.cpython-312.pyc differ diff --git a/client.py b/client.py index d2952bfaf54e5bd15d6871cd474add50c7c1d6f4..4f1cae88ebba0e4adcf251656ac4cc28a4aef39a 100644 --- a/client.py +++ b/client.py @@ -2,16 +2,16 @@ import socket import threading from datetime import datetime +MULTICAST_GROUP_IP = '224.1.1.1' + # Broadcast address and port -BROADCAST_IP = "192.168.0.255" -BROADCAST_PORT = 5973 + +CLIENT_MULTICAST_PORT = 5973 # Local host information MY_HOST = socket.gethostname() MY_IP = socket.gethostbyname(MY_HOST) - - class Client(): def __init__(self): self.currentLeader = '' @@ -23,23 +23,23 @@ class Client(): print(f'[{current_date_time}] {msg}') # dynamic discoverey: client sends request to server group and gets the IP of server as reply - def BroadcastSendAndReceive(self): - - message = 'New client wants to connect: ' + MY_IP - + def MulticastSendAndReceive(self): # Create a UDP socket - broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - # Send message on broadcast address - broadcast_socket.sendto(str.encode(message), (BROADCAST_IP, BROADCAST_PORT)) + multicast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - # Send broadcast message + # Set the time-to-live for messages to 1 so they do not go past the local network segment + multicast_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) + + # Send message on multicast address + message = 'New client wants to connect: ' + MY_IP + multicast_socket.sendto(str.encode(message), (MULTICAST_GROUP_IP, CLIENT_MULTICAST_PORT)) self.printwt("Sent my IP to server group") while True: try: # receive reply data (server IP) from the other participants - reply, addr = broadcast_socket.recvfrom(1024) + reply, addr = multicast_socket.recvfrom(1024) if reply: # decode received data @@ -82,6 +82,6 @@ class Client(): if __name__ == "__main__": client = Client() - thread1 = threading.Thread(target = client.BroadcastSendAndReceive) + thread1 = threading.Thread(target = client.MulticastSendAndReceive) thread1.start() thread1.join() \ No newline at end of file diff --git a/server.py b/server.py index b09e59b3a4e6b095bf77479f87c869606a747365..6e1c1ebc38e300cccdbb63ae3c2d48f51517af80 100644 --- a/server.py +++ b/server.py @@ -1,36 +1,174 @@ import socket import threading from datetime import datetime +import time +import struct +import uuid +import json +from voting import form_ring, get_neighbour -# Listening port -BROADCAST_PORT = 5973 + +MULTICAST_GROUP_IP = '224.1.1.1' + +# Listening port Server Discovery +SERVER_MULTICAST_PORT = 5974 + +# Listening port Client Discovery +CLIENT_MULTICAST_PORT = 5973 + +# Listening port ring +RING_PORT = 5972 + +# Listening port hearthbeat +HEARTBEAT_PORT = 5971 # Local host information MY_HOST = socket.gethostname() MY_IP = socket.gethostbyname(MY_HOST) + class Server(): def __init__(self): - self.leader_IP = '' # fix the leader IP self.clients = [] - + self.informServer = False + self.serverList = [] # list if servers and their addresses + self.leader_uuid = '' # fix the leader IP + self.leader_ip = '' + self.is_leader = False # New variable to track if the server is the leader + self.uuid = str(uuid.uuid4()) + self.participant = False + def printwt(self, msg): current_date_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f'[{current_date_time}] {msg}') + + def print_group_view(self): + print("Group view is:", self.serverList) + + #This function enables the server to listen to the server multicast port and reply the ip address + def MulticastListenAndReply(self): + + # if my IP is not in the server list add it + if MY_IP not in self.serverList: + self.serverList.append(MY_IP) + + + # create socket bind to server address + multicast_listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + multicast_listen_sock.bind(('', SERVER_MULTICAST_PORT)) + + # tell the os to add the socket to the multicast group + multicast_group = socket.inet_aton(MULTICAST_GROUP_IP) + mreg = struct.pack('4sL', multicast_group, socket.INADDR_ANY) + multicast_listen_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreg) + + + while True: + + data, address = multicast_listen_sock.recvfrom(1024) + + if data: + newServer_address = data.decode() + self.printwt(f'New participant wants to connect: {newServer_address}') + self.is_leader = False + + # if the decoded address is not in the server list add it and print the list + + if newServer_address not in self.serverList: + self.serverList.append(newServer_address) + + reply_message = MY_IP + multicast_listen_sock.sendto(str.encode(reply_message), address) + + self.printwt('Replied my IP to new participant') + + + + time.sleep(1) + self.print_group_view() + + + #this function enables the server to send a multicast to the server group and receive the answers of existing members + def MulticastSendAndReceive(self): + + # create socket + multicast_group = (MULTICAST_GROUP_IP, SERVER_MULTICAST_PORT) + multicast_send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # Set a timeout so the socket does not block indefinitely when trying to receive data. + multicast_send_sock.settimeout(2) + + # Set the time-to-live for messages to 1 so they do not go past the local network segment. + multicast_send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) + + message = MY_IP + multicast_send_sock.sendto(message.encode(), multicast_group) + self.printwt("Sent my IP to server group") + + # if my IP is not in the server list add it + if MY_IP not in self.serverList: + self.serverList.append(MY_IP) + + + # listen for IPs from existing servers + maxLoop = 5 + currentLoop = 0 + + # Anzahl der eingehenden Antworten initialisieren + num_responses = 0 + + while currentLoop < maxLoop: + print("Waiting for responses...") # Debug-Ausgabe + while True: + currentLoop += 1 + + try: + # receive reply data from the other participants + reply, address = multicast_send_sock.recvfrom(1024) + + if reply: + reply_address = reply.decode() + + # Debug-Ausgabe + print(f"Received response from: {reply_address}") + + # if reply address is not in the server list, add it + if reply_address not in self.serverList: + self.serverList.append(reply_address) - # Listen to client broadcast (request) and reply with Server IP + # Erhöhe die Anzahl der eingehenden Antworten + num_responses += 1 + print(f"Current server list: {self.serverList}") # Debug-Ausgabe + + except socket.timeout: + break + + if num_responses == 1: + multicast_send_sock.close() + # self.is_leader = True + # self.leader_uuid = self.uuid # Hier wird die uuid-Adresse des Leaders zugewiesens + time.sleep(1) + + + self.print_group_view() + + + # Listen to client multicast (request) and reply with Server IP def ListenForClientAndReply(self): # Create a UDP socket listen_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - # Set the socket to broadcast and enable reusing addresses - listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + # Enable reusing addresses listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Bind socket to address and port - listen_socket.bind((MY_IP, BROADCAST_PORT)) + listen_socket.bind((MY_IP, CLIENT_MULTICAST_PORT)) - print("Listening to broadcast messages") + # tell the os to add the socket to the multicast group + multicast_group = socket.inet_aton(MULTICAST_GROUP_IP) + mreg = struct.pack('4sL', multicast_group, socket.INADDR_ANY) + listen_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreg) - # Receiving broadcast massage + # print("Listening to multicast messages") + + # Receiving multicast massage while True: data, addr = listen_socket.recvfrom(1024) @@ -38,8 +176,7 @@ class Server(): self.printwt(data.decode()) # if Iam the leader, answer the client including my IP - # if MY_IP == self.leader_IP: - if data: + if self.uuid == self.leader_uuid: reply_message = MY_IP listen_socket.sendto(str.encode(reply_message), addr) self.printwt('Replied my IP to new client') @@ -52,28 +189,175 @@ class Server(): data = client_socket.recv(1024) if not data: break - self.broadcast(data, client_socket) + self.multicast(data, client_socket) except: self.clients.remove(client_socket) break - def broadcast(self, message, sender_socket): + def multicast(self, message, sender_socket): for client in self.clients: try: if client != sender_socket: client.send(message) except: self.clients.remove(client) + + def basic_lcr(self): + time.sleep(3) + # bind to ring socket + ring_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + ring_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + ring_socket.bind((MY_IP, RING_PORT)) + print('++++++++++++++++++++++++++') + print('New leader election starts') + print('Servers') + print(self.serverList) + + neighbour = get_neighbour(form_ring(self.serverList), MY_IP, 'left') + print('Neighbour') + print(neighbour) + neighbour_address = (neighbour, RING_PORT) + + first_message = { + "mid": self.uuid, + "is_leader": False + } + self.participant = True + print('sending first leader election message to neighbour') + ring_socket.sendto(json.dumps(first_message).encode('utf-8'), neighbour_address) + + while True: + neighbour = get_neighbour(form_ring(self.serverList), MY_IP, 'left') + neighbour_address = (neighbour, RING_PORT) + + print('\nWaiting to receive election message...\n') + data, address = ring_socket.recvfrom(1024) + election_message = json.loads(data.decode()) + print('There is a election message:') + print(election_message) + + if election_message.get('is_leader') and self.participant: + print('leader info weitergeben') + self.leader_uuid = election_message['mid'] + # forward received election message to left neighbour + self.participant = False + ring_socket.sendto(json.dumps(election_message).encode('utf-8'), neighbour_address) + print(f'Leader is {self.leader_uuid}') + elif election_message.get('mid') < self.uuid and not self.participant: + print('mich vorschlagen') + new_election_message = { + "mid": self.uuid, + "is_leader": False + } + self.participant = True + # send received election message to left neighbour + ring_socket.sendto(json.dumps(new_election_message).encode('utf-8'), neighbour_address) + elif election_message.get('mid') > self.uuid: + # send received election message to left neighbour + print('Jemand anderes vorschlagen') + self.participant = True + ring_socket.sendto(json.dumps(election_message).encode('utf-8'), neighbour_address) + elif election_message.get('mid') == self.uuid and self.participant: + print('Ich wurde als Leader definiert') + self.leader_uuid = self.uuid + self.is_leader = True + new_election_message = { + "mid": self.uuid, + "is_leader": True + } + # send new election message to left neighbour + self.participant = False + ring_socket.sendto(json.dumps(new_election_message).encode('utf-8'), neighbour_address) + print(f'I am Leader {self.leader_uuid}') + elif election_message.get('is_leader') and not self.participant: + print('Leader ist gewählt, Nachricht wurde weiteregeben, ELECTION beenden') + except Exception as e: + print(f"An error occurred: {e}") + finally: + ring_socket.close() + + def init_heartbeat(self): + self.leader_heartbeat_last_received = time.time() + self.heartbeat_interval = 1 # seconds + self.missed_heartbeats_limit = 5 + self.missed_heartbeats = 0 + + + # Erstelle und binde den Multicast-Socket für Heartbeats + multicast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + multicast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + multicast_socket.bind((MY_IP, HEARTBEAT_PORT)) + multicast_group = socket.inet_aton(MULTICAST_GROUP_IP) + mreq = struct.pack('4sL', multicast_group, socket.INADDR_ANY) + multicast_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + + # Starte Heartbeat-Sender- und Empfänger-Threads + threading.Thread(target=self.heartbeat_publisher, args=(multicast_socket,)).start() + threading.Thread(target=self.heartbeat_subscriber, args=(multicast_socket,)).start() + + + def heartbeat_publisher(self, multicast_socket): + while True: + if self.is_leader: + print('\n I am Leader, sending heartbeat\n') + heartbeat_message = "#" + multicast_socket.sendto(heartbeat_message.encode(), (MULTICAST_GROUP_IP, HEARTBEAT_PORT)) + time.sleep(self.heartbeat_interval) + + def heartbeat_subscriber(self, multicast_socket): + + multicast_socket.settimeout(3) + + while True: + try: + data, address = multicast_socket.recvfrom(1024) + if data: + self.leader_heartbeat_last_received = time.time() + sender_ip = address[0] + self.leader_ip = sender_ip + if sender_ip != MY_IP: + print(f"Received heartbeat from {sender_ip}: {data.decode()}") + + # Wenn kein Heartbeat empfangen wurde + except socket.timeout: + time_since_last_heartbeat = time.time() - self.leader_heartbeat_last_received + if time_since_last_heartbeat > self.heartbeat_interval: + self.missed_heartbeats += 1 + print(f"Missed heartbeat detected. Count: {self.missed_heartbeats}") + if self.missed_heartbeats >= self.missed_heartbeats_limit: + print("Missed heartbeats limit reached. Initiating leader election.") + + self.serverList.remove(self.leader_ip) + self.basic_lcr() + + # Reset missed heartbeats count + self.missed_heartbeats = 0 + # Code to initiate a new voting process to elect a new leader + # This could involve calling a function from voting.py or similar logic + time.sleep(self.heartbeat_interval) # starting all simultaneously working procedures if __name__== '__main__': server = Server() + thread2 = threading.Thread(target = server.MulticastListenAndReply) + thread2.start() + + thread3 = threading.Thread(target = server.MulticastSendAndReceive) + thread3.start() + thread1 = threading.Thread(target = server.ListenForClientAndReply) thread1.start() + thread_election = threading.Thread(target = server.basic_lcr) + thread_election.start() + + thread_heartbeat = threading.Thread(target = server.init_heartbeat) + thread_heartbeat.start() + # Socket erstellen und binden server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((MY_IP, 5555))