Skip to content
Snippets Groups Projects
Commit 08ae87ef authored by Robin Leber's avatar Robin Leber
Browse files

Merge branch 'main' into Rafael

parents bea39962 77373cfb
No related branches found
No related tags found
No related merge requests found
File added
File added
......@@ -2,16 +2,16 @@ import socket
import threading
from datetime import datetime
MULTICAST_GROUP_IP = '224.1.1.1'
# Broadcast address and port
BROADCAST_IP = "192.168.0.255"
BROADCAST_PORT = 5973
CLIENT_MULTICAST_PORT = 5973
# Local host information
MY_HOST = socket.gethostname()
MY_IP = socket.gethostbyname(MY_HOST)
class Client():
def __init__(self):
self.currentLeader = ''
......@@ -23,23 +23,23 @@ class Client():
print(f'[{current_date_time}] {msg}')
# dynamic discoverey: client sends request to server group and gets the IP of server as reply
def BroadcastSendAndReceive(self):
message = 'New client wants to connect: ' + MY_IP
def MulticastSendAndReceive(self):
# Create a UDP socket
broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Send message on broadcast address
broadcast_socket.sendto(str.encode(message), (BROADCAST_IP, BROADCAST_PORT))
multicast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Send broadcast message
# Set the time-to-live for messages to 1 so they do not go past the local network segment
multicast_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
# Send message on multicast address
message = 'New client wants to connect: ' + MY_IP
multicast_socket.sendto(str.encode(message), (MULTICAST_GROUP_IP, CLIENT_MULTICAST_PORT))
self.printwt("Sent my IP to server group")
while True:
try:
# receive reply data (server IP) from the other participants
reply, addr = broadcast_socket.recvfrom(1024)
reply, addr = multicast_socket.recvfrom(1024)
if reply:
# decode received data
......@@ -82,6 +82,6 @@ class Client():
if __name__ == "__main__":
client = Client()
thread1 = threading.Thread(target = client.BroadcastSendAndReceive)
thread1 = threading.Thread(target = client.MulticastSendAndReceive)
thread1.start()
thread1.join()
\ No newline at end of file
import socket
import threading
from datetime import datetime
import time
import struct
import uuid
import json
from voting import form_ring, get_neighbour
# Listening port
BROADCAST_PORT = 5973
MULTICAST_GROUP_IP = '224.1.1.1'
# Listening port Server Discovery
SERVER_MULTICAST_PORT = 5974
# Listening port Client Discovery
CLIENT_MULTICAST_PORT = 5973
# Listening port ring
RING_PORT = 5972
# Listening port hearthbeat
HEARTBEAT_PORT = 5971
# Local host information
MY_HOST = socket.gethostname()
MY_IP = socket.gethostbyname(MY_HOST)
class Server():
def __init__(self):
self.leader_IP = '' # fix the leader IP
self.clients = []
self.informServer = False
self.serverList = [] # list if servers and their addresses
self.leader_uuid = '' # fix the leader IP
self.leader_ip = ''
self.is_leader = False # New variable to track if the server is the leader
self.uuid = str(uuid.uuid4())
self.participant = False
def printwt(self, msg):
current_date_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'[{current_date_time}] {msg}')
def print_group_view(self):
print("Group view is:", self.serverList)
#This function enables the server to listen to the server multicast port and reply the ip address
def MulticastListenAndReply(self):
# if my IP is not in the server list add it
if MY_IP not in self.serverList:
self.serverList.append(MY_IP)
# create socket bind to server address
multicast_listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
multicast_listen_sock.bind(('', SERVER_MULTICAST_PORT))
# tell the os to add the socket to the multicast group
multicast_group = socket.inet_aton(MULTICAST_GROUP_IP)
mreg = struct.pack('4sL', multicast_group, socket.INADDR_ANY)
multicast_listen_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreg)
while True:
data, address = multicast_listen_sock.recvfrom(1024)
if data:
newServer_address = data.decode()
self.printwt(f'New participant wants to connect: {newServer_address}')
self.is_leader = False
# if the decoded address is not in the server list add it and print the list
if newServer_address not in self.serverList:
self.serverList.append(newServer_address)
reply_message = MY_IP
multicast_listen_sock.sendto(str.encode(reply_message), address)
self.printwt('Replied my IP to new participant')
time.sleep(1)
self.print_group_view()
#this function enables the server to send a multicast to the server group and receive the answers of existing members
def MulticastSendAndReceive(self):
# create socket
multicast_group = (MULTICAST_GROUP_IP, SERVER_MULTICAST_PORT)
multicast_send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Set a timeout so the socket does not block indefinitely when trying to receive data.
multicast_send_sock.settimeout(2)
# Set the time-to-live for messages to 1 so they do not go past the local network segment.
multicast_send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
message = MY_IP
multicast_send_sock.sendto(message.encode(), multicast_group)
self.printwt("Sent my IP to server group")
# if my IP is not in the server list add it
if MY_IP not in self.serverList:
self.serverList.append(MY_IP)
# listen for IPs from existing servers
maxLoop = 5
currentLoop = 0
# Anzahl der eingehenden Antworten initialisieren
num_responses = 0
while currentLoop < maxLoop:
print("Waiting for responses...") # Debug-Ausgabe
while True:
currentLoop += 1
try:
# receive reply data from the other participants
reply, address = multicast_send_sock.recvfrom(1024)
if reply:
reply_address = reply.decode()
# Debug-Ausgabe
print(f"Received response from: {reply_address}")
# if reply address is not in the server list, add it
if reply_address not in self.serverList:
self.serverList.append(reply_address)
# Listen to client broadcast (request) and reply with Server IP
# Erhöhe die Anzahl der eingehenden Antworten
num_responses += 1
print(f"Current server list: {self.serverList}") # Debug-Ausgabe
except socket.timeout:
break
if num_responses == 1:
multicast_send_sock.close()
# self.is_leader = True
# self.leader_uuid = self.uuid # Hier wird die uuid-Adresse des Leaders zugewiesens
time.sleep(1)
self.print_group_view()
# Listen to client multicast (request) and reply with Server IP
def ListenForClientAndReply(self):
# Create a UDP socket
listen_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Set the socket to broadcast and enable reusing addresses
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# Enable reusing addresses
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Bind socket to address and port
listen_socket.bind((MY_IP, BROADCAST_PORT))
listen_socket.bind((MY_IP, CLIENT_MULTICAST_PORT))
print("Listening to broadcast messages")
# tell the os to add the socket to the multicast group
multicast_group = socket.inet_aton(MULTICAST_GROUP_IP)
mreg = struct.pack('4sL', multicast_group, socket.INADDR_ANY)
listen_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreg)
# Receiving broadcast massage
# print("Listening to multicast messages")
# Receiving multicast massage
while True:
data, addr = listen_socket.recvfrom(1024)
......@@ -38,8 +176,7 @@ class Server():
self.printwt(data.decode())
# if Iam the leader, answer the client including my IP
# if MY_IP == self.leader_IP:
if data:
if self.uuid == self.leader_uuid:
reply_message = MY_IP
listen_socket.sendto(str.encode(reply_message), addr)
self.printwt('Replied my IP to new client')
......@@ -52,28 +189,175 @@ class Server():
data = client_socket.recv(1024)
if not data:
break
self.broadcast(data, client_socket)
self.multicast(data, client_socket)
except:
self.clients.remove(client_socket)
break
def broadcast(self, message, sender_socket):
def multicast(self, message, sender_socket):
for client in self.clients:
try:
if client != sender_socket:
client.send(message)
except:
self.clients.remove(client)
def basic_lcr(self):
time.sleep(3)
# bind to ring socket
ring_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
ring_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ring_socket.bind((MY_IP, RING_PORT))
print('++++++++++++++++++++++++++')
print('New leader election starts')
print('Servers')
print(self.serverList)
neighbour = get_neighbour(form_ring(self.serverList), MY_IP, 'left')
print('Neighbour')
print(neighbour)
neighbour_address = (neighbour, RING_PORT)
first_message = {
"mid": self.uuid,
"is_leader": False
}
self.participant = True
print('sending first leader election message to neighbour')
ring_socket.sendto(json.dumps(first_message).encode('utf-8'), neighbour_address)
while True:
neighbour = get_neighbour(form_ring(self.serverList), MY_IP, 'left')
neighbour_address = (neighbour, RING_PORT)
print('\nWaiting to receive election message...\n')
data, address = ring_socket.recvfrom(1024)
election_message = json.loads(data.decode())
print('There is a election message:')
print(election_message)
if election_message.get('is_leader') and self.participant:
print('leader info weitergeben')
self.leader_uuid = election_message['mid']
# forward received election message to left neighbour
self.participant = False
ring_socket.sendto(json.dumps(election_message).encode('utf-8'), neighbour_address)
print(f'Leader is {self.leader_uuid}')
elif election_message.get('mid') < self.uuid and not self.participant:
print('mich vorschlagen')
new_election_message = {
"mid": self.uuid,
"is_leader": False
}
self.participant = True
# send received election message to left neighbour
ring_socket.sendto(json.dumps(new_election_message).encode('utf-8'), neighbour_address)
elif election_message.get('mid') > self.uuid:
# send received election message to left neighbour
print('Jemand anderes vorschlagen')
self.participant = True
ring_socket.sendto(json.dumps(election_message).encode('utf-8'), neighbour_address)
elif election_message.get('mid') == self.uuid and self.participant:
print('Ich wurde als Leader definiert')
self.leader_uuid = self.uuid
self.is_leader = True
new_election_message = {
"mid": self.uuid,
"is_leader": True
}
# send new election message to left neighbour
self.participant = False
ring_socket.sendto(json.dumps(new_election_message).encode('utf-8'), neighbour_address)
print(f'I am Leader {self.leader_uuid}')
elif election_message.get('is_leader') and not self.participant:
print('Leader ist gewählt, Nachricht wurde weiteregeben, ELECTION beenden')
except Exception as e:
print(f"An error occurred: {e}")
finally:
ring_socket.close()
def init_heartbeat(self):
self.leader_heartbeat_last_received = time.time()
self.heartbeat_interval = 1 # seconds
self.missed_heartbeats_limit = 5
self.missed_heartbeats = 0
# Erstelle und binde den Multicast-Socket für Heartbeats
multicast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
multicast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
multicast_socket.bind((MY_IP, HEARTBEAT_PORT))
multicast_group = socket.inet_aton(MULTICAST_GROUP_IP)
mreq = struct.pack('4sL', multicast_group, socket.INADDR_ANY)
multicast_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
# Starte Heartbeat-Sender- und Empfänger-Threads
threading.Thread(target=self.heartbeat_publisher, args=(multicast_socket,)).start()
threading.Thread(target=self.heartbeat_subscriber, args=(multicast_socket,)).start()
def heartbeat_publisher(self, multicast_socket):
while True:
if self.is_leader:
print('\n I am Leader, sending heartbeat\n')
heartbeat_message = "#"
multicast_socket.sendto(heartbeat_message.encode(), (MULTICAST_GROUP_IP, HEARTBEAT_PORT))
time.sleep(self.heartbeat_interval)
def heartbeat_subscriber(self, multicast_socket):
multicast_socket.settimeout(3)
while True:
try:
data, address = multicast_socket.recvfrom(1024)
if data:
self.leader_heartbeat_last_received = time.time()
sender_ip = address[0]
self.leader_ip = sender_ip
if sender_ip != MY_IP:
print(f"Received heartbeat from {sender_ip}: {data.decode()}")
# Wenn kein Heartbeat empfangen wurde
except socket.timeout:
time_since_last_heartbeat = time.time() - self.leader_heartbeat_last_received
if time_since_last_heartbeat > self.heartbeat_interval:
self.missed_heartbeats += 1
print(f"Missed heartbeat detected. Count: {self.missed_heartbeats}")
if self.missed_heartbeats >= self.missed_heartbeats_limit:
print("Missed heartbeats limit reached. Initiating leader election.")
self.serverList.remove(self.leader_ip)
self.basic_lcr()
# Reset missed heartbeats count
self.missed_heartbeats = 0
# Code to initiate a new voting process to elect a new leader
# This could involve calling a function from voting.py or similar logic
time.sleep(self.heartbeat_interval)
# starting all simultaneously working procedures
if __name__== '__main__':
server = Server()
thread2 = threading.Thread(target = server.MulticastListenAndReply)
thread2.start()
thread3 = threading.Thread(target = server.MulticastSendAndReceive)
thread3.start()
thread1 = threading.Thread(target = server.ListenForClientAndReply)
thread1.start()
thread_election = threading.Thread(target = server.basic_lcr)
thread_election.start()
thread_heartbeat = threading.Thread(target = server.init_heartbeat)
thread_heartbeat.start()
# Socket erstellen und binden
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((MY_IP, 5555))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment