Skip to content
Snippets Groups Projects
Commit d6f49be7 authored by Katharina's avatar Katharina
Browse files

add request leader

parent e667f98b
No related branches found
No related tags found
No related merge requests found
import socket
import threading
import time
broadcast_ip = '255.255.255.255'#change ip??? #hard coded?
broadcast_port = 55555
......@@ -9,40 +10,80 @@ MY_HOST = socket.gethostname()
#socket.gethostbyname(socket.gethostname()) #getip
MY_IP = socket.gethostbyname(MY_HOST)
#print(f"host:{MY_HOST} and ip: {MY_IP}")
print(f"Listening for leader response on {broadcast_ip}:{broadcast_port}...")
print(MY_HOST,MY_IP)
# create client-socket for broadcast
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
client_socket.bind(('', broadcast_port)) # listen on broadcast-port
#nickname = input("Enter your nickname: ")
# global var for leader address
leader_address = None
def listen():
"""receives messages from server and clients."""
global leader_address
while True:
try:
data, address = client_socket.recvfrom(4096)
decoded_message = data.decode()
# Ignoriere Nachrichten mit Server-Kennung
if decoded_message.startswith("["):
continue
#ignores broadcast messages with own ip
if address[0]==MY_IP:
continue
#if decoded_message.startswith("["):
# continue
#ignores broadcast messages with own ip #################please enable it after testing!!!!!!!!!!!!!!!!!!!S
#if address[0]==MY_IP:
# continue
#####identifyLEADER############
# Erkennen von LEADER-Nachrichten
if "LEADER:" in decoded_message:
leader_uuid = decoded_message.split(": ")[1]
print(f"Leader discovered: {leader_uuid} at {address[0]}:{address[1]}")
leader_address = (address[0], broadcast_port) # IP-Adresse des Leaders speichern
continue
#################################
print(decoded_message)
except socket.error as e:
print(f"An error occurred: {e}")
break
############requestleader############
def request_leader():
"""Sendet eine Anfrage, um den aktuellen Leader zu ermitteln."""
global leader_address
print("Requesting current leader...")
client_socket.sendto("REQUEST_LEAD".encode(), (broadcast_ip, broadcast_port))
timeout = time.time() + 5 # 10 Sekunden auf Antwort warten
while time.time() < timeout:
if leader_address:
return
time.sleep(0.5) # Warten, bis eine Antwort vom Leader eintrifft
#print("No leader found. Unable to send messages.")
####################################
def sender():
"""Ermöglicht dem Benutzer, Nachrichten zu schreiben und zu senden."""
global leader_address
nickname = input("Enter your nickname: ")
print("Requesting leader before starting...")
request_leader() # Versuche, den Leader zu finden
#########print(leader_address)###########debug: answer: None
if not leader_address:
print("No leader found. Exiting...")
return
just_nickname= f"{nickname} entered the chat".encode()
client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port))
#client_socket.sendto(just_nickname, (broadcast_ip, broadcast_port))
client_socket.sendto(just_nickname, leader_address)
while True:
#allows the client to send any message
......@@ -51,8 +92,11 @@ def sender():
#checks for whitemarks
if message.strip():
full_message = f"{nickname}: {message}".encode()
client_socket.sendto(full_message, (broadcast_ip, broadcast_port))
#(broadcast_ip, broadcast_port) #for sendto
if leader_address:
client_socket.sendto(full_message, leader_address)
else:
print("No leader available. Unable to send message.")
# starts listen-thread
listen_thread = threading.Thread(target=listen)
......
......@@ -3,25 +3,25 @@ import multiprocessing
import uuid
import time
import threading
import os
#current_method=multiprocessing.set_start_method('spawn', force=True) #result=none (why??)
#print(f"Multiprocessing start method: {current_method}")
print(f"Script started with PID: {os.getpid()}") #is starting 4 times
server_id=str(uuid.uuid4())
broadcast_ip = '255.255.255.255'
broadcast_port = 55555
# creates unique server-ID
server_id = str(uuid.uuid4())
# Socket for broadcast
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('', broadcast_port))
print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...")
# global list for known server-IDs (which does not work atm)
##here
active_servers = {}
current_leader = None
# broadcast-function
def broadcast(message):
......@@ -33,6 +33,7 @@ def broadcast(message):
def listen(queue):
"""Receives messages from other processes and forwards them/puts them in queue"""
global active_servers
#server_id = shared_data["server_id"]
while True:
try:
message, client_address = server_socket.recvfrom(4096)
......@@ -42,6 +43,11 @@ def listen(queue):
if decoded_message.startswith(f"[{server_id}]"):
continue
if "REQUEST_LEADER" in decoded_message:
if current_leader:
response = f"LEADER: {current_leader}".encode()
server_socket.sendto(response, client_address)
print(f"Received from {client_address}: {decoded_message}")
# handle heartbeat messages
......@@ -61,6 +67,7 @@ def listen(queue):
def start_election(queue):
"""Starts leader election based on received messages."""
global current_leader
#server_id = shared_data["server_id"]
print("Starting election...")
broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function
......@@ -116,15 +123,8 @@ def start_election(queue):
else:
print(f"Leader election finished, leader is {highest_id}")
# send message to the current leader
def send_message_to_leader(message):
"""Sends a message directly to the current leader."""
global current_leader
if current_leader is None:
print("No leader available to send the message.")
return
broadcast(f"TO_LEAD: {message}")
############broadcastLEADER#####################
#fct
################### Heartbeat-function not working###########################
def send_heartbeat():
......@@ -144,34 +144,42 @@ def monitor_heartbeats():
print(f"Active servers: {list(active_servers.keys())}")
################################################################################
def test_send_to_leader():
"""Tests sending a message to the current leader."""
time.sleep(15) # Wait for leader election to complete
send_message_to_leader("This is a message for the leader.")
# Main
if __name__ == "__main__":
# current_process = psutil.Process()
# print(f"DEBUG: Number of child processes: {len(current_process.children())}")
#manager = Manager()
#shared_data = manager.dict() # Gemeinsame Datenstruktur
#shared_data["server_id"] = str(uuid.uuid4()) # Nur im Hauptprozess erzeugen
multiprocessing.freeze_support() # important for Windows, to not start the main process three times
# queue for communication between processes
####################
print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...")
# global list for known server-IDs (which does not work atm)
current_leader = None
###################
message_queue = multiprocessing.Queue()
# create processes
listener_process = multiprocessing.Process(target=listen, args=(message_queue,))
#time.sleep(10)
election_process = multiprocessing.Process(target=start_election, args=(message_queue,))
listener_process = multiprocessing.Process(target=listen, args=(message_queue, ))
election_process = multiprocessing.Process(target=start_election, args=(message_queue, ))
# heartbeat in seperate thread###########
heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True)
heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True)
# test sending to leader in a separate thread
test_thread = threading.Thread(target=test_send_to_leader, daemon=True)
# start processes
listener_process.start()
election_process.start()
heartbeat_thread.start()
heartbeat_monitor_thread.start()
test_thread.start()
#test_thread.start()
try:
# main process is waiting for the sub processes to finish
......
import socket
import multiprocessing
import uuid
import time
import threading
broadcast_ip = '255.255.255.255'
broadcast_port = 55555
# creates unique server-ID
server_id = str(uuid.uuid4())
# Socket for broadcast
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('', broadcast_port))
print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...")
# global list for known server-IDs (which does not work atm)
active_servers = {}
current_leader = None
# broadcast-function
def broadcast(message):
"""Sendet Nachrichten an alle Server-IDs im Netzwerk"""
full_message = f"[{server_id}] {message}".encode()
server_socket.sendto(full_message, (broadcast_ip, broadcast_port))
# listener-function
def listen(queue):
"""Receives messages from other processes and forwards them/puts them in queue"""
global active_servers
while True:
try:
message, client_address = server_socket.recvfrom(4096)
decoded_message = message.decode()
# ignore messages from own server-ID/ also running for clients from this ip
if decoded_message.startswith(f"[{server_id}]"):
continue
print(f"Received from {client_address}: {decoded_message}")
# handle heartbeat messages
if "HEARTBEAT" in decoded_message:
sender_uuid = decoded_message.split(": ")[1]
active_servers[sender_uuid] = time.time()
continue
# put message in queue, that broadcast process can react
queue.put(decoded_message)
except socket.error as e:
print(f"Socket error occurred: {e}")
break
# leader election function
def start_election(queue):
"""Starts leader election based on received messages."""
global current_leader
print("Starting election...")
broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function
timeout = time.time() + 20 # wait 20 secs for answers
highest_id = server_id
while time.time() < timeout:
# wait for messages from queue
try:
message = queue.get(timeout=1)
# processing messages
if "START_ELECTION" in message:
sender_uuid = message.split(": ")[1]
#print("extracted uuid?:", sender_uuid)
active_servers[sender_uuid] = time.time()
print(f"Received UUID for election: {sender_uuid}")
# compare UUIDs for leader election
if sender_uuid > highest_id:
highest_id=sender_uuid
print(f"Received higher ID {sender_uuid}, forwarding...")
broadcast(f"START_ELECTION: {sender_uuid}")
#####does this work????########
elif sender_uuid < server_id:
highest_id=server_id
print(f"Received lower ID {sender_uuid}, sending own ID...")
broadcast(f"START_ELECTION: {server_id}")
else:
# you are the leader:
current_leader = server_id
broadcast(f"LEADER: {server_id}")
print(f"I am the leader: {server_id}")
elif "LEADER" in message:
# leader was elected
leader_uuid = message.split(": ")[1]
current_leader = leader_uuid
print(f"Leader elected: {current_leader}")
broadcast(f"current leader is: {current_leader}")
return
#continue when queue is empty
except multiprocessing.queues.Empty:
continue
# after timeout: own server becomes leader, if no other has been chosen/higher ID
if highest_id == server_id:
current_leader = server_id
broadcast(f"LEADER {server_id}")
print(f"I am the leader: {server_id}")
else:
print(f"Leader election finished, leader is {highest_id}")
############broadcastLEADER#####################
#fct
################### Heartbeat-function not working###########################
def send_heartbeat():
"""Sends heartbeat messages regularly to keep the server ID active"""
while True:
broadcast(f"HEARTBEAT: {server_id}")
time.sleep(5) # sends every 3 seconds
#monitor heartbeats
def monitor_heartbeats():
"""Checks active servers based on heartbeat messages"""
global active_servers
while True:
time.sleep(6)
now = time.time()
active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6}
print(f"Active servers: {list(active_servers.keys())}")
################################################################################
# Main
if __name__ == "__main__":
# queue for communication between processes
message_queue = multiprocessing.Queue()
# create processes
listener_process = multiprocessing.Process(target=listen, args=(message_queue,))
#time.sleep(10)
election_process = multiprocessing.Process(target=start_election, args=(message_queue,))
# heartbeat in seperate thread###########
heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True)
heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True)
# start processes
listener_process.start()
election_process.start()
heartbeat_thread.start()
heartbeat_monitor_thread.start()
#test_thread.start()
try:
# main process is waiting for the sub processes to finish
listener_process.join()
election_process.join()
except KeyboardInterrupt:
print("\nShutting down server...")
listener_process.terminate()
election_process.terminate()
server.py 0 → 100644
import socket
import multiprocessing
import uuid
import time
import threading
import os
from multiprocessing import Manager
broadcast_ip = '255.255.255.255'
broadcast_port = 55555
# Socket for broadcast
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('', broadcast_port))
# Global list for active servers
active_servers = {}
# Broadcast function
def broadcast(message):
"""Sends messages to all server IDs in the network."""
full_message = f"{message}".encode()
server_socket.sendto(full_message, (broadcast_ip, broadcast_port))
# Listener function
def listen(queue, shared_data):
"""Receives messages and updates shared state."""
global active_servers
while True:
try:
message, client_address = server_socket.recvfrom(4096)
decoded_message = message.decode()
# Ignore messages from this server
if decoded_message.startswith(f"[{shared_data['server_id']}]"):
continue
print(f"Received from {client_address}: {decoded_message}")
# Handle heartbeat messages
if "HEARTBEAT" in decoded_message:
sender_uuid = decoded_message.split(": ")[1]
active_servers[sender_uuid] = time.time()
continue
if "REQUEST_LEAD" in decoded_message:
if shared_data['current_leader']:
while True:
response = f"LEADER: {shared_data['current_leader']}".encode()
time.sleep(2) #wait before sending response that client can prepare for answer
server_socket.sendto(response, (broadcast_ip, broadcast_port))
#broadcast(response, client_address)
print(f"Sent leader information to {client_address}: {response.decode()}") #for debug
else:
print("No leader set, unable to respond to REQUEST_LEAD")
#continue
# Put message in the queue for election process
queue.put(decoded_message)
except socket.error as e:
print(f"Socket error occurred: {e}")
break
# Leader election function
def start_election(queue, shared_data):
"""Starts leader election based on received messages."""
#global active_servers
#current_leader = None
print("Starting election...")
broadcast(f"START_ELECTION: {shared_data['server_id']}")
timeout = time.time() + 20 # 20-second timeout
highest_id = shared_data['server_id']
while time.time() < timeout:
try:
message = queue.get(timeout=1)
if "START_ELECTION" in message:
sender_uuid = message.split(": ")[1]
active_servers[sender_uuid] = time.time()
print(f"Received UUID for election: {sender_uuid}")
if sender_uuid > highest_id:
highest_id = sender_uuid
print(f"Received higher ID {sender_uuid}, forwarding...")
broadcast(f"START_ELECTION: {sender_uuid}")
elif "LEADER" in message:
leader_uuid = message.split(": ")[1]
shared_data['current_leader'] = leader_uuid
print(f"Leader elected: {shared_data['current_leader']}")
return
except multiprocessing.queues.Empty:
continue
if highest_id == shared_data['server_id']:
shared_data['current_leader'] = shared_data['server_id']
broadcast(f"LEADER: {shared_data['current_leader']}")
print(f"I am the leader: {shared_data['current_leader']}")
else:
print(f"Leader election finished, leader is {highest_id}")
# Heartbeat function
def send_heartbeat(shared_data):
"""Sends heartbeat messages to keep the server active."""
time.sleep(30) #waiting for leader election #set to 20s
while True:
broadcast(f"HEARTBEAT: {shared_data['server_id']}")
time.sleep(5)
# Monitor heartbeats
def monitor_heartbeats():
"""Checks active servers based on heartbeat messages."""
global active_servers
while True:
time.sleep(6)
now = time.time()
active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6}
print(f"Active servers: {list(active_servers.keys())}")
# Main function
if __name__ == "__main__":
multiprocessing.freeze_support()
print(f"Script started with PID: {os.getpid()}")
# Manager for shared data
manager = Manager()
shared_data = manager.dict()
shared_data['server_id'] = str(uuid.uuid4()) # Generate server ID once
shared_data['current_leader'] = None
print(f"Server is running with ID {shared_data['server_id']} and broadcasting on port {broadcast_port}...")
message_queue = multiprocessing.Queue()
# Create processes
listener_process = multiprocessing.Process(target=listen, args=(message_queue, shared_data))
election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data))
# Heartbeat threads
heartbeat_thread = threading.Thread(target=send_heartbeat, args=(shared_data,), daemon=True)
heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True)
# Start processes and threads
listener_process.start()
election_process.start()
heartbeat_thread.start()
heartbeat_monitor_thread.start()
try:
listener_process.join()
election_process.join()
except KeyboardInterrupt:
print("\nShutting down server...")
listener_process.terminate()
election_process.terminate()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment