Select Git revision
stringCalculator.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
server.py 12.74 KiB
##########unsicher, ob heartbeat funktioniert################
###########deutsch englisch korrigieren###################
################liste mit message ids auf 3-5 setzen########
import socket
import multiprocessing
import uuid
import time
import threading
import os
from multiprocessing import Manager
# TCP Socket erstellen
#tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#tcp_server_socket.bind(('', 55556)) # Listen auf einem separaten Port
#tcp_server_socket.listen(5)
broadcast_ip = '255.255.255.255'
broadcast_port = 55555
election_port= 55559
# Socket for broadcast
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('', broadcast_port))
# Socket for broadcast
election_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
election_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
election_socket.bind(('', election_port))
# Global list for active servers
active_servers = {}
# Broadcast function
def broadcast(message):
"""Sends messages to all server IDs in the network."""
full_message = f"{message}".encode()
#full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message
server_socket.sendto(full_message, (broadcast_ip, broadcast_port))
# Broadcast function
def broadcast_election(message):
"""Sends messages to all server IDs in the network."""
full_message = f"{message}".encode()
#full_message = f"[{shared_data['server_id']}] {message}".encode() #sends serverid with every message
election_socket.sendto(full_message, (broadcast_ip, election_port))
# Listener function
def listen_broadcast_election(queue, shared_data):
"""Receives messages and updates shared state."""
print(f"Listener from {shared_data['server_id']} is listening...")
global active_servers
server_socket.setblocking(False) # Set socket to non-blocking mode
#global sender_uuid
while True:
try:
message, sender_address = election_socket.recvfrom(4096)
decoded_sender_message = message.decode()
print(f"Received from {sender_address}: {decoded_sender_message}")
# Handle heartbeat messages
if "HEARTBEAT" in decoded_sender_message:
shared_data['sender_uuid'] = decoded_sender_message.split(": ")[1]
#print("sender_uuid", sender_uuid) #debug
active_servers[shared_data['sender_uuid']] = time.time() #update active server dictionary
#send to neighbour
continue
# Put message in the queue for election process
queue.put(decoded_sender_message)
except socket.error as e:
print(f"Socket error occurred: {e}")
break
time.sleep(0.5) # Small delay to prevent busy waiting
#listener function
##################TypeError: listen_client() takes 0 positional arguments but 2 were given############
def listen_client(shared_data):
"""receives messages from clients and broadcast them."""
#manager=Manager()
#global received_messages
received_messages = set()
#save_last_messages = manager.dict()
#message_queue = multiprocessing.Queue(maxsize=3)
#global save_last_messages
#only listen if current leader
#if shared_data['current_leader'] == shared_data['server_id']:
while True:
try:
message, client_address = server_socket.recvfrom(4096)
decoded_message = message.decode()
#create a dictionary for message uuid
# message_uuid = decoded_message.split(": ")[0]
#print("message id:", message_uuid) #debug
# Extract message UUID and check if already processed
message_id = decoded_message.split(":")[0]
if message_id in received_messages:
continue
received_messages.add(message_id)
# Enqueue messages and maintain a fixed size
"""if not any(decoded_message.startswith(msg) for msg in list(message_queue.queue)):
if message_queue.full():
message_queue.get() # Remove oldest message
message_queue.put(decoded_message)
# Print all messages
print("here are your last messages:", list(message_queue.queue)) # Debug
"""
"""
if message_uuid not in save_last_messages:
if len(save_last_messages) <= 3:
oldest_key = next(iter(save_last_messages)) # get oldest key
del save_last_messages[oldest_key] # delete it
#save_last_messages["last message"] = message_uuid
save_last_messages[message_uuid] = decoded_message
#save_last_messages.update(f"last message: {message}: {message_uuid}")
print("here are your last messages:", dict(save_last_messages)) #debug
"""
#broadcast_election(save_last_messages) #send to neighbour
# checks wheter the message comes from the server itself
if decoded_message.startswith(f"{shared_data['server_id']}"):
continue # if...ignores it
#checks if the message containts the string "entered"
if decoded_message.__contains__("entered"):
print(f"{client_address} entered the chat.") #client_address is the nickname
print(f"Received from {client_address}: {decoded_message}")
broadcast(decoded_message)
#exceptions
except socket.error as e:
print(f"An error occurred: {e}")
break
#does not work
except KeyboardInterrupt:
print("\nServer wird beendet...")
break
# Leader election function
def start_election(queue, shared_data):
"""Starts leader election based on received messages."""
global active_servers
#sender_uuid = None
#global sender_uuid
#current_leader = None
print("Starting election...")
broadcast_election(f"START_ELECTION: {shared_data['server_id']}") #every server which broadcasts his id is going to be part of the election
timeout = time.time() + 20 # 20-second timeout, waiting for other servers
highest_id = shared_data['server_id'] #highest id is own id
leader= False
#while current time is smaller than current time plus 20s:
while time.time() < timeout:
try:
message = queue.get(timeout=1) #waits 1 sec for a message
if shared_data['server_id']==shared_data['sender_uuid']: #wtf??????????????
continue
if "START_ELECTION" in message:
shared_data['sender_uuid'] = message.split(": ")[1]
active_servers[shared_data['sender_uuid']] = time.time()
print(f"Received UUID for election: {shared_data['sender_uuid']}")
################dumblogic###onlyworkswith21erver################################
if leader == False:
if highest_id>shared_data['sender_uuid']:
leader == True
elif highest_id<shared_data['sender_uuid']:
highest_id=shared_data['sender_uuid']
leader == True
if leader == True:
highest_id=shared_data['current_leader']
print(f"(sender) Leader elected: {shared_data['current_leader']}")
broadcast_election(f"LEADER: {shared_data['current_leader']}")
else:
shared_data['current_leader'] = shared_data['server_id']
print(f"(official, its me) Leader elected: {shared_data['current_leader']}")
broadcast_election(f"LEADER: {shared_data['current_leader']}")
elif "LEADER" in message:
leader_uuid = message.split(": ")[1]
shared_data['current_leader'] = leader_uuid
#time.sleep(1) # Delay to ensure all processes are aware of the leader
print(f"(leader) Leader elected: {shared_data['current_leader']}")
leader_election_done.set() # Signal, dass die Leaderwahl abgeschlossen ist
return
except multiprocessing.queues.Empty:
continue
#if highest_id == shared_data['server_id']:
# shared_data['current_leader'] = shared_data['server_id']
# broadcast(f"LEADER: {shared_data['current_leader']}")
# print(f"I am the leader: {shared_data['current_leader']}")
#else:
# print(f"Leader election finished, leader is {highest_id}")
#################tcpclient#################################
#def handle_tcp_client(conn, addr):
"""Sendet die Leader-Information über eine TCP-Verbindung."""
# print(f"TCP connection established with {addr}")
# if shared_data['current_leader']:
# conn.send(f"LEADER: {shared_data['current_leader']}".encode())
# else:
# conn.send("NO_LEADER".encode())
# conn.close()
#def tcp_listener():
"""Wartet auf TCP-Verbindungen und bearbeitet sie."""
# while True:
# conn, addr = tcp_server_socket.accept()
# threading.Thread(target=handle_tcp_client, args=(conn, addr)).start()
######################endtcp#################################################
# Heartbeat function
def send_heartbeat(shared_data):
"""Sends heartbeat messages to keep the server active."""
leader_election_done.wait() # Blockiert, bis die Leaderwahl abgeschlossen ist
print(f"Heartbeat function started. Leader: {shared_data['current_leader']}, Server ID: {shared_data['server_id']}")
#only if you are the current leader
if shared_data['current_leader'] == shared_data['server_id']:
#time.sleep(10) #waiting for leader election #set to 20s
while True:
try:
broadcast_election(f"HEARTBEAT: {shared_data['server_id']}") #please change that to right neighbour after correct leader election
time.sleep(5)
except socket.error as e:
print(f"Failed to send heartbeat: {e}")
# Monitor heartbeats
def monitor_heartbeats():
"""Checks active servers based on heartbeat messages."""
#only if you are the current leader
if shared_data['current_leader'] == shared_data['server_id']:
global active_servers
while True:
time.sleep(6)
now = time.time()
active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6}
print(f"Active servers: {list(active_servers.keys())}")
# Main function
if __name__ == "__main__":
multiprocessing.freeze_support()
print(f"Script started with PID: {os.getpid()}")
leader_election_done = threading.Event()
# Manager for shared data
manager = Manager()
shared_data = manager.dict()
shared_data['server_id'] = str(uuid.uuid4()) # Generate server ID once
shared_data['current_leader'] = None
shared_data['sender_uuid'] = None
#sender_uuid = None
print(f"Server is running with ID {shared_data['server_id']} and broadcasting on port {broadcast_port}...")
message_queue = multiprocessing.Queue()
save_last_messages={}
# Create processes
#listener_election_process = multiprocessing.Process(target=listen_broadcast_election, args=(message_queue, shared_data))
listener_client_process = multiprocessing.Process(target=listen_client, args=(shared_data,))
election_process = multiprocessing.Process(target=start_election, args=(message_queue, shared_data))
# Start the listener for clients in a thread
listener_election_thread = threading.Thread(target=listen_broadcast_election, args=(message_queue, shared_data), daemon=True)
# Heartbeat threads
heartbeat_thread = threading.Thread(target=send_heartbeat, args=(shared_data,), daemon=True)
heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True)
# Start processes and threads
#listener_election_process.start()
listener_client_process.start()
listener_election_thread.start()
election_process.start()
heartbeat_thread.start()
heartbeat_monitor_thread.start()
#heartbeat_thread.join()
#heartbeat_monitor_thread.join()
try:
#listener_election_process.join()
listener_client_process.join()
election_process.join()
except KeyboardInterrupt:
print("\nShutting down server...")
#listener_election_process.terminate()
listener_client_process.terminate()
election_process.terminate()