From 2e7ce602aa5f316aa202d8ccd5f4f871ec986d96 Mon Sep 17 00:00:00 2001 From: Katharina <katharina.willig@outlook.com> Date: Mon, 9 Dec 2024 16:20:16 +0100 Subject: [PATCH] remove unused files --- extract.py | 4 - form_ring.py | 47 ----------- kserver.py | 191 -------------------------------------------- leader_heartbeat.py | 168 -------------------------------------- 4 files changed, 410 deletions(-) delete mode 100644 extract.py delete mode 100644 form_ring.py delete mode 100644 kserver.py delete mode 100644 leader_heartbeat.py diff --git a/extract.py b/extract.py deleted file mode 100644 index 926a83b..0000000 --- a/extract.py +++ /dev/null @@ -1,4 +0,0 @@ - - # checks wheter the message comes from the server itself - if decoded_message.startswith(f"[{server_id}]"): - continue # if...ignores it \ No newline at end of file diff --git a/form_ring.py b/form_ring.py deleted file mode 100644 index c89f340..0000000 --- a/form_ring.py +++ /dev/null @@ -1,47 +0,0 @@ -import socket -import kserver - -my_list = kserver.server_uuids -print(my_list) -result = ", ".join([str(x) for x in my_list]) # Listen-Komprehension -# Alternativ: result = ", ".join(map(str, my_list)) -print(result) # Ausgabe: 1, 2, 3 - - -def form_ring(result): - #uuid, address = kserver.server_socket.recvfrom(4096) - #print("here uuid or what:", uuid, "and address:", address) - #message = uuid.decode() - #Convert a 32-bit packed IPv4 address to its standard dotted-quad string representation - #sorted()=returns a sorted list of the specified iterable object - sorted_binary_ring= sorted([socket.inet_aton(member) for member in result]) - sorted_ip_ring= [socket.inet_ntoa(node) for node in sorted_binary_ring] - return sorted_ip_ring - -#dynamic! -#1.broadcast server uuid everyone -#listen to broadcast here and -#members = ['192.168.0.1', '130.234.204.2', '130.234.203.2', '130.234.204.1', '182.4.3.111'] -ring = form_ring(result) - -print(ring) - -#getting neighbour: -def get_neighbour(ring, current_node_ip, direction='left'): - current_node_index = ring.index(current_node_ip) if current_node_ip in ring else -1 - if current_node_index != -1: - if direction == 'left': - if current_node_index + 1 == len(ring): - return ring[0] - else: - return ring[current_node_index + 1] - else: - if current_node_index == 0: - return ring[len(ring) - 1] - else: - return ring[current_node_index- 1] - else: - return None - -neighbour = get_neighbour(ring, '130.234.203.2', 'left') -print(neighbour) \ No newline at end of file diff --git a/kserver.py b/kserver.py deleted file mode 100644 index 3d9f160..0000000 --- a/kserver.py +++ /dev/null @@ -1,191 +0,0 @@ -import socket -import multiprocessing -import uuid -import time -import threading -import os - -#current_method=multiprocessing.set_start_method('spawn', force=True) #result=none (why??) -#print(f"Multiprocessing start method: {current_method}") - -print(f"Script started with PID: {os.getpid()}") #is starting 4 times -server_id=str(uuid.uuid4()) - -broadcast_ip = '255.255.255.255' -broadcast_port = 55555 - -# Socket for broadcast -server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) -server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -server_socket.bind(('', broadcast_port)) - -##here -active_servers = {} - -# broadcast-function -def broadcast(message): - """Sendet Nachrichten an alle Server-IDs im Netzwerk""" - full_message = f"[{server_id}] {message}".encode() - server_socket.sendto(full_message, (broadcast_ip, broadcast_port)) - -# listener-function -def listen(queue): - """Receives messages from other processes and forwards them/puts them in queue""" - global active_servers - #server_id = shared_data["server_id"] - while True: - try: - message, client_address = server_socket.recvfrom(4096) - decoded_message = message.decode() - - # ignore messages from own server-ID/ also running for clients from this ip - if decoded_message.startswith(f"[{server_id}]"): - continue - - if "REQUEST_LEADER" in decoded_message: - if current_leader: - response = f"LEADER: {current_leader}".encode() - server_socket.sendto(response, client_address) - - print(f"Received from {client_address}: {decoded_message}") - - # handle heartbeat messages - if "HEARTBEAT" in decoded_message: - sender_uuid = decoded_message.split(": ")[1] - active_servers[sender_uuid] = time.time() - continue - - # put message in queue, that broadcast process can react - queue.put(decoded_message) - - except socket.error as e: - print(f"Socket error occurred: {e}") - break - -# leader election function -def start_election(queue): - """Starts leader election based on received messages.""" - global current_leader - #server_id = shared_data["server_id"] - print("Starting election...") - broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function - - timeout = time.time() + 20 # wait 20 secs for answers - highest_id = server_id - - while time.time() < timeout: - # wait for messages from queue - try: - message = queue.get(timeout=1) - - # processing messages - if "START_ELECTION" in message: - sender_uuid = message.split(": ")[1] - #print("extracted uuid?:", sender_uuid) - active_servers[sender_uuid] = time.time() - print(f"Received UUID for election: {sender_uuid}") - - # compare UUIDs for leader election - if sender_uuid > highest_id: - highest_id=sender_uuid - print(f"Received higher ID {sender_uuid}, forwarding...") - broadcast(f"START_ELECTION: {sender_uuid}") - - #####does this work????######## - elif sender_uuid < server_id: - highest_id=server_id - print(f"Received lower ID {sender_uuid}, sending own ID...") - broadcast(f"START_ELECTION: {server_id}") - else: - # you are the leader: - current_leader = server_id - broadcast(f"LEADER: {server_id}") - print(f"I am the leader: {server_id}") - - elif "LEADER" in message: - # leader was elected - leader_uuid = message.split(": ")[1] - current_leader = leader_uuid - print(f"Leader elected: {current_leader}") - broadcast(f"current leader is: {current_leader}") - return - - #continue when queue is empty - except multiprocessing.queues.Empty: - continue - - # after timeout: own server becomes leader, if no other has been chosen/higher ID - if highest_id == server_id: - current_leader = server_id - broadcast(f"LEADER {server_id}") - print(f"I am the leader: {server_id}") - else: - print(f"Leader election finished, leader is {highest_id}") - -############broadcastLEADER##################### -#fct - -################### Heartbeat-function not working########################### -def send_heartbeat(): - """Sends heartbeat messages regularly to keep the server ID active""" - while True: - broadcast(f"HEARTBEAT: {server_id}") - time.sleep(5) # sends every 3 seconds - -#monitor heartbeats -def monitor_heartbeats(): - """Checks active servers based on heartbeat messages""" - global active_servers - while True: - time.sleep(6) - now = time.time() - active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6} - print(f"Active servers: {list(active_servers.keys())}") -################################################################################ - -# Main -if __name__ == "__main__": - - # current_process = psutil.Process() - # print(f"DEBUG: Number of child processes: {len(current_process.children())}") - #manager = Manager() - #shared_data = manager.dict() # Gemeinsame Datenstruktur - #shared_data["server_id"] = str(uuid.uuid4()) # Nur im Hauptprozess erzeugen - - multiprocessing.freeze_support() # important for Windows, to not start the main process three times - # queue for communication between processes - -#################### - print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...") - - # global list for known server-IDs (which does not work atm) - - current_leader = None -################### - - message_queue = multiprocessing.Queue() - - # create processes - listener_process = multiprocessing.Process(target=listen, args=(message_queue, )) - election_process = multiprocessing.Process(target=start_election, args=(message_queue, )) - - # heartbeat in seperate thread########### - heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True) - heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True) - - # start processes - listener_process.start() - election_process.start() - heartbeat_thread.start() - heartbeat_monitor_thread.start() - #test_thread.start() - - try: - # main process is waiting for the sub processes to finish - listener_process.join() - election_process.join() - except KeyboardInterrupt: - print("\nShutting down server...") - listener_process.terminate() - election_process.terminate() diff --git a/leader_heartbeat.py b/leader_heartbeat.py deleted file mode 100644 index b686bf8..0000000 --- a/leader_heartbeat.py +++ /dev/null @@ -1,168 +0,0 @@ -import socket -import multiprocessing -import uuid -import time -import threading - -broadcast_ip = '255.255.255.255' -broadcast_port = 55555 - -# creates unique server-ID -server_id = str(uuid.uuid4()) - -# Socket for broadcast -server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) -server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -server_socket.bind(('', broadcast_port)) - -print(f"Server is running with ID {server_id} and broadcasting on port {broadcast_port}...") - -# global list for known server-IDs (which does not work atm) -active_servers = {} -current_leader = None - - -# broadcast-function -def broadcast(message): - """Sendet Nachrichten an alle Server-IDs im Netzwerk""" - full_message = f"[{server_id}] {message}".encode() - server_socket.sendto(full_message, (broadcast_ip, broadcast_port)) - -# listener-function -def listen(queue): - """Receives messages from other processes and forwards them/puts them in queue""" - global active_servers - while True: - try: - message, client_address = server_socket.recvfrom(4096) - decoded_message = message.decode() - - # ignore messages from own server-ID/ also running for clients from this ip - if decoded_message.startswith(f"[{server_id}]"): - continue - - print(f"Received from {client_address}: {decoded_message}") - - # handle heartbeat messages - if "HEARTBEAT" in decoded_message: - sender_uuid = decoded_message.split(": ")[1] - active_servers[sender_uuid] = time.time() - continue - - # put message in queue, that broadcast process can react - queue.put(decoded_message) - - except socket.error as e: - print(f"Socket error occurred: {e}") - break - -# leader election function -def start_election(queue): - """Starts leader election based on received messages.""" - global current_leader - print("Starting election...") - broadcast(f"START_ELECTION: {server_id}") #sends broadcast, ignores his own id only in listen function - - timeout = time.time() + 20 # wait 20 secs for answers - highest_id = server_id - - while time.time() < timeout: - # wait for messages from queue - try: - message = queue.get(timeout=1) - - # processing messages - if "START_ELECTION" in message: - sender_uuid = message.split(": ")[1] - #print("extracted uuid?:", sender_uuid) - active_servers[sender_uuid] = time.time() - print(f"Received UUID for election: {sender_uuid}") - - # compare UUIDs for leader election - if sender_uuid > highest_id: - highest_id=sender_uuid - print(f"Received higher ID {sender_uuid}, forwarding...") - broadcast(f"START_ELECTION: {sender_uuid}") - - #####does this work????######## - elif sender_uuid < server_id: - highest_id=server_id - print(f"Received lower ID {sender_uuid}, sending own ID...") - broadcast(f"START_ELECTION: {server_id}") - else: - # you are the leader: - current_leader = server_id - broadcast(f"LEADER: {server_id}") - print(f"I am the leader: {server_id}") - - elif "LEADER" in message: - # leader was elected - leader_uuid = message.split(": ")[1] - current_leader = leader_uuid - print(f"Leader elected: {current_leader}") - broadcast(f"current leader is: {current_leader}") - return - - #continue when queue is empty - except multiprocessing.queues.Empty: - continue - - # after timeout: own server becomes leader, if no other has been chosen/higher ID - if highest_id == server_id: - current_leader = server_id - broadcast(f"LEADER {server_id}") - print(f"I am the leader: {server_id}") - else: - print(f"Leader election finished, leader is {highest_id}") - -############broadcastLEADER##################### -#fct - -################### Heartbeat-function not working########################### -def send_heartbeat(): - """Sends heartbeat messages regularly to keep the server ID active""" - while True: - broadcast(f"HEARTBEAT: {server_id}") - time.sleep(5) # sends every 3 seconds - -#monitor heartbeats -def monitor_heartbeats(): - """Checks active servers based on heartbeat messages""" - global active_servers - while True: - time.sleep(6) - now = time.time() - active_servers = {uuid: last_seen for uuid, last_seen in active_servers.items() if now - last_seen < 6} - print(f"Active servers: {list(active_servers.keys())}") -################################################################################ - -# Main -if __name__ == "__main__": - # queue for communication between processes - message_queue = multiprocessing.Queue() - - # create processes - listener_process = multiprocessing.Process(target=listen, args=(message_queue,)) - #time.sleep(10) - election_process = multiprocessing.Process(target=start_election, args=(message_queue,)) - - # heartbeat in seperate thread########### - heartbeat_thread = threading.Thread(target=send_heartbeat, daemon=True) - heartbeat_monitor_thread = threading.Thread(target=monitor_heartbeats, daemon=True) - - # start processes - listener_process.start() - election_process.start() - heartbeat_thread.start() - heartbeat_monitor_thread.start() - #test_thread.start() - - try: - # main process is waiting for the sub processes to finish - listener_process.join() - election_process.join() - except KeyboardInterrupt: - print("\nShutting down server...") - listener_process.terminate() - election_process.terminate() -- GitLab