Skip to content
Snippets Groups Projects
Commit 32fe957a authored by Robin Leber's avatar Robin Leber
Browse files

Merge branch 'Robin' into 'main'

leader election und heartbeat

See merge request !2
parents cd437941 8a42591b
No related branches found
No related tags found
1 merge request!2leader election und heartbeat
File added
...@@ -2,16 +2,16 @@ import socket ...@@ -2,16 +2,16 @@ import socket
import threading import threading
from datetime import datetime from datetime import datetime
MULTICAST_GROUP_IP = '224.1.1.1'
# Broadcast address and port # Broadcast address and port
BROADCAST_IP = "192.168.0.255"
BROADCAST_PORT = 5973 CLIENT_MULTICAST_PORT = 5973
# Local host information # Local host information
MY_HOST = socket.gethostname() MY_HOST = socket.gethostname()
MY_IP = socket.gethostbyname(MY_HOST) MY_IP = socket.gethostbyname(MY_HOST)
class Client(): class Client():
def __init__(self): def __init__(self):
self.currentLeader = '' self.currentLeader = ''
...@@ -23,23 +23,23 @@ class Client(): ...@@ -23,23 +23,23 @@ class Client():
print(f'[{current_date_time}] {msg}') print(f'[{current_date_time}] {msg}')
# dynamic discoverey: client sends request to server group and gets the IP of server as reply # dynamic discoverey: client sends request to server group and gets the IP of server as reply
def BroadcastSendAndReceive(self): def MulticastSendAndReceive(self):
message = 'New client wants to connect: ' + MY_IP
# Create a UDP socket # Create a UDP socket
broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) multicast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Send message on broadcast address
broadcast_socket.sendto(str.encode(message), (BROADCAST_IP, BROADCAST_PORT))
# Send broadcast message # Set the time-to-live for messages to 1 so they do not go past the local network segment
multicast_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
# Send message on multicast address
message = 'New client wants to connect: ' + MY_IP
multicast_socket.sendto(str.encode(message), (MULTICAST_GROUP_IP, CLIENT_MULTICAST_PORT))
self.printwt("Sent my IP to server group") self.printwt("Sent my IP to server group")
while True: while True:
try: try:
# receive reply data (server IP) from the other participants # receive reply data (server IP) from the other participants
reply, addr = broadcast_socket.recvfrom(1024) reply, addr = multicast_socket.recvfrom(1024)
if reply: if reply:
# decode received data # decode received data
...@@ -82,6 +82,6 @@ class Client(): ...@@ -82,6 +82,6 @@ class Client():
if __name__ == "__main__": if __name__ == "__main__":
client = Client() client = Client()
thread1 = threading.Thread(target = client.BroadcastSendAndReceive) thread1 = threading.Thread(target = client.MulticastSendAndReceive)
thread1.start() thread1.start()
thread1.join() thread1.join()
\ No newline at end of file
import socket import socket
import threading import threading
from datetime import datetime from datetime import datetime
import time
import struct
import uuid
import json
from voting import form_ring, get_neighbour
# Listening port
BROADCAST_PORT = 5973 MULTICAST_GROUP_IP = '224.1.1.1'
# Listening port Server Discovery
SERVER_MULTICAST_PORT = 5974
# Listening port Client Discovery
CLIENT_MULTICAST_PORT = 5973
# Listening port ring
RING_PORT = 5972
# Local host information # Local host information
MY_HOST = socket.gethostname() MY_HOST = socket.gethostname()
MY_IP = socket.gethostbyname(MY_HOST) MY_IP = socket.gethostbyname(MY_HOST)
class Server(): class Server():
def __init__(self): def __init__(self):
self.leader_IP = '' # fix the leader IP
self.clients = [] self.clients = []
self.informServer = False
self.serverList = [] # list if servers and their addresses
self.leader_uuid = '' # fix the leader IP
self.isLeader = False # New variable to track if the server is the leader
self.uuid = str(uuid.uuid4())
self.participant = False
def printwt(self, msg): def printwt(self, msg):
current_date_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') current_date_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'[{current_date_time}] {msg}') print(f'[{current_date_time}] {msg}')
def print_group_view(self):
print("Group view is:", self.serverList)
#This function enables the server to listen to the server multicast port and reply the ip address
def MulticastListenAndReply(self):
# if my IP is not in the server list add it
if MY_IP not in self.serverList:
self.serverList.append(MY_IP)
# create socket bind to server address
multicast_listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
multicast_listen_sock.bind(('', SERVER_MULTICAST_PORT))
# tell the os to add the socket to the multicast group
multicast_group = socket.inet_aton(MULTICAST_GROUP_IP)
mreg = struct.pack('4sL', multicast_group, socket.INADDR_ANY)
multicast_listen_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreg)
while True:
data, address = multicast_listen_sock.recvfrom(1024)
if data:
newServer_address = data.decode()
self.printwt(f'New participant wants to connect: {newServer_address}')
self.isLeader = False
# if the decoded address is not in the server list add it and print the list
if newServer_address not in self.serverList:
self.serverList.append(newServer_address)
reply_message = MY_IP
multicast_listen_sock.sendto(str.encode(reply_message), address)
self.printwt('Replied my IP to new participant')
time.sleep(1)
self.print_group_view()
#this function enables the server to send a multicast to the server group and receive the answers of existing members
def MulticastSendAndReceive(self):
# create socket
multicast_group = (MULTICAST_GROUP_IP, SERVER_MULTICAST_PORT)
multicast_send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Set a timeout so the socket does not block indefinitely when trying to receive data.
multicast_send_sock.settimeout(2)
# Set the time-to-live for messages to 1 so they do not go past the local network segment.
multicast_send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
message = MY_IP
multicast_send_sock.sendto(message.encode(), multicast_group)
self.printwt("Sent my IP to server group")
# if my IP is not in the server list add it
if MY_IP not in self.serverList:
self.serverList.append(MY_IP)
# listen for IPs from existing servers
maxLoop = 5
currentLoop = 0
# Listen to client broadcast (request) and reply with Server IP # Anzahl der eingehenden Antworten initialisieren
num_responses = 0
while currentLoop < maxLoop:
print("Waiting for responses...") # Debug-Ausgabe
while True:
currentLoop += 1
try:
# receive reply data from the other participants
reply, address = multicast_send_sock.recvfrom(1024)
if reply:
reply_address = reply.decode()
# Debug-Ausgabe
print(f"Received response from: {reply_address}")
# if reply address is not in the server list, add it
if reply_address not in self.serverList:
self.serverList.append(reply_address)
# Erhöhe die Anzahl der eingehenden Antworten
num_responses += 1
print(f"Current server list: {self.serverList}") # Debug-Ausgabe
except socket.timeout:
break
if num_responses == 1:
multicast_send_sock.close()
self.isLeader = True
self.leader_uuid = self.uuid # Hier wird die uuid-Adresse des Leaders zugewiesens
time.sleep(1)
self.print_group_view()
# Listen to client multicast (request) and reply with Server IP
def ListenForClientAndReply(self): def ListenForClientAndReply(self):
# Create a UDP socket # Create a UDP socket
listen_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) listen_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Set the socket to broadcast and enable reusing addresses # Enable reusing addresses
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Bind socket to address and port # Bind socket to address and port
listen_socket.bind((MY_IP, BROADCAST_PORT)) listen_socket.bind((MY_IP, CLIENT_MULTICAST_PORT))
# tell the os to add the socket to the multicast group
multicast_group = socket.inet_aton(MULTICAST_GROUP_IP)
mreg = struct.pack('4sL', multicast_group, socket.INADDR_ANY)
listen_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreg)
print("Listening to broadcast messages") # print("Listening to multicast messages")
# Receiving broadcast massage # Receiving multicast massage
while True: while True:
data, addr = listen_socket.recvfrom(1024) data, addr = listen_socket.recvfrom(1024)
...@@ -38,8 +173,7 @@ class Server(): ...@@ -38,8 +173,7 @@ class Server():
self.printwt(data.decode()) self.printwt(data.decode())
# if Iam the leader, answer the client including my IP # if Iam the leader, answer the client including my IP
# if MY_IP == self.leader_IP: if self.uuid == self.leader_uuid:
if data:
reply_message = MY_IP reply_message = MY_IP
listen_socket.sendto(str.encode(reply_message), addr) listen_socket.sendto(str.encode(reply_message), addr)
self.printwt('Replied my IP to new client') self.printwt('Replied my IP to new client')
...@@ -52,28 +186,141 @@ class Server(): ...@@ -52,28 +186,141 @@ class Server():
data = client_socket.recv(1024) data = client_socket.recv(1024)
if not data: if not data:
break break
self.broadcast(data, client_socket) self.multicast(data, client_socket)
except: except:
self.clients.remove(client_socket) self.clients.remove(client_socket)
break break
def broadcast(self, message, sender_socket): def multicast(self, message, sender_socket):
for client in self.clients: for client in self.clients:
try: try:
if client != sender_socket: if client != sender_socket:
client.send(message) client.send(message)
except: except:
self.clients.remove(client) self.clients.remove(client)
def basic_lcr(self):
time.sleep(3)
# bind to ring socket
ring_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
ring_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ring_socket.bind((MY_IP, RING_PORT))
print('++++++++++++++++++++++++++')
print('New leader election starts')
print('Servers')
print(self.serverList)
neighbour = get_neighbour(form_ring(self.serverList), MY_IP, 'left')
print('Neighbour')
print(neighbour)
neighbour_address = (neighbour, RING_PORT)
first_message = {
"mid": self.uuid,
"isLeader": False
}
self.participant = True
print('sending first leader election message to neighbour')
ring_socket.sendto(json.dumps(first_message).encode('utf-8'), neighbour_address)
while True:
neighbour = get_neighbour(form_ring(self.serverList), MY_IP, 'left')
neighbour_address = (neighbour, RING_PORT)
print('\nWaiting to receive election message...\n')
data, address = ring_socket.recvfrom(1024)
election_message = json.loads(data.decode())
print('There is a election message:')
print(election_message)
if election_message.get('isLeader') and self.participant:
print('leader info weitergeben')
self.leader_uuid = election_message['mid']
# forward received election message to left neighbour
self.participant = False
ring_socket.sendto(json.dumps(election_message).encode('utf-8'), neighbour_address)
print(f'Leader is {self.leader_uuid}')
elif election_message.get('mid') < self.uuid and not self.participant:
print('mich vorschlagen')
new_election_message = {
"mid": self.uuid,
"isLeader": False
}
self.participant = True
# send received election message to left neighbour
ring_socket.sendto(json.dumps(new_election_message).encode('utf-8'), neighbour_address)
elif election_message.get('mid') > self.uuid:
# send received election message to left neighbour
print('Jemand anderes vorschlagen')
self.participant = True
ring_socket.sendto(json.dumps(election_message).encode('utf-8'), neighbour_address)
elif election_message.get('mid') == self.uuid and self.participant:
print('Ich wurde als Leader definiert')
self.leader_uid = self.uuid
self.isLeader = True
new_election_message = {
"mid": self.uuid,
"isLeader": True
}
# send new election message to left neighbour
self.participant = False
ring_socket.sendto(json.dumps(new_election_message).encode('utf-8'), neighbour_address)
print(f'I am Leader {self.leader_uuid}')
elif election_message.get('isLeader') and not self.participant:
print('Leader ist gewählt, Nachricht wurde weiteregeben, ELECTION beenden')
except Exception as e:
print(f"An error occurred: {e}")
finally:
ring_socket.close()
def init_heartbeat(self):
self.leader_heartbeat_last_received = time.time()
self.is_leader = False
self.heartbeat_interval = 5 # seconds
self.missed_heartbeats_limit = 5
self.missed_heartbeats = 0
threading.Thread(target=self.heartbeat_monitor).start()
if self.is_leader:
threading.Thread(target=self.emit_heartbeat).start()
def emit_heartbeat(self):
while self.is_leader:
# Code to sends a heartbeat message
print("Heartbeat emitted by leader")
time.sleep(self.heartbeat_interval)
def heartbeat_monitor(self):
while True:
time_since_last_heartbeat = time.time() - self.leader_heartbeat_last_received
if time_since_last_heartbeat > self.heartbeat_interval:
self.missed_heartbeats += 1
print(f"Missed heartbeat detected. Count: {self.missed_heartbeats}")
if self.missed_heartbeats >= self.missed_heartbeats_limit:
print("Missed heartbeats limit reached. Initiating leader election.")
# Reset missed heartbeats count
self.missed_heartbeats = 0
# Code to initiate a new voting process to elect a new leader
# This could involve calling a function from voting.py or similar logic
time.sleep(self.heartbeat_interval)
# starting all simultaneously working procedures # starting all simultaneously working procedures
if __name__== '__main__': if __name__== '__main__':
server = Server() server = Server()
thread2 = threading.Thread(target = server.MulticastListenAndReply)
thread2.start()
thread3 = threading.Thread(target = server.MulticastSendAndReceive)
thread3.start()
thread1 = threading.Thread(target = server.ListenForClientAndReply) thread1 = threading.Thread(target = server.ListenForClientAndReply)
thread1.start() thread1.start()
thread_election = threading.Thread(target = server.basic_lcr)
thread_election.start()
# Socket erstellen und binden # Socket erstellen und binden
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((MY_IP, 5555)) server_socket.bind((MY_IP, 5555))
......
import socket
def form_ring(members):
sorted_binary_ring = sorted([socket.inet_aton(member) for member in members])
sorted_ip_ring = [socket.inet_ntoa(node) for node in sorted_binary_ring]
return sorted_ip_ring
def get_neighbour(ring, current_node_ip, direction='left'):
current_node_index = ring.index(current_node_ip) if current_node_ip in ring else -1
if current_node_index != -1:
if direction == 'left':
if current_node_index + 1 == len(ring):
return ring[0]
else:
return ring[current_node_index + 1]
else:
if current_node_index == 0:
return ring[len(ring) - 1]
else:
return ring[current_node_index - 1]
else:
return None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment