Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
D
Distributed-Systems
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Katharina Willig
Distributed-Systems
Commits
3b7ee165
Commit
3b7ee165
authored
7 months ago
by
Katharina
Browse files
Options
Downloads
Patches
Plain Diff
election for multiple servers in network
parent
51e073f4
No related branches found
Branches containing commit
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
kserver.py
+58
-13
58 additions, 13 deletions
kserver.py
with
58 additions
and
13 deletions
kserver.py
+
58
−
13
View file @
3b7ee165
import
socket
import
socket
import
multiprocessing
import
multiprocessing
import
uuid
import
uuid
import
time
import
threading
broadcast_ip
=
'
255.255.255.255
'
broadcast_ip
=
'
255.255.255.255
'
broadcast_port
=
55555
broadcast_port
=
55555
...
@@ -16,6 +18,11 @@ server_socket.bind(('', broadcast_port))
...
@@ -16,6 +18,11 @@ server_socket.bind(('', broadcast_port))
print
(
f
"
Server is running with ID
{
server_id
}
and broadcasting on port
{
broadcast_port
}
...
"
)
print
(
f
"
Server is running with ID
{
server_id
}
and broadcasting on port
{
broadcast_port
}
...
"
)
# Globale Liste für bekannte Server-IDs
active_servers
=
set
()
current_leader
=
None
# Broadcast-Funktion
# Broadcast-Funktion
def
broadcast
(
message
):
def
broadcast
(
message
):
"""
Sendet Nachrichten an alle Server-IDs im Netzwerk
"""
"""
Sendet Nachrichten an alle Server-IDs im Netzwerk
"""
...
@@ -30,7 +37,7 @@ def listen(queue):
...
@@ -30,7 +37,7 @@ def listen(queue):
message
,
client_address
=
server_socket
.
recvfrom
(
4096
)
message
,
client_address
=
server_socket
.
recvfrom
(
4096
)
decoded_message
=
message
.
decode
()
decoded_message
=
message
.
decode
()
# Ignoriere Nachrichten vom eigenen Server
# Ignoriere Nachrichten vom eigenen Server
/ also running for clients from this ip
if
decoded_message
.
startswith
(
f
"
[
{
server_id
}
]
"
):
if
decoded_message
.
startswith
(
f
"
[
{
server_id
}
]
"
):
continue
continue
...
@@ -46,40 +53,74 @@ def listen(queue):
...
@@ -46,40 +53,74 @@ def listen(queue):
# Leader Election Funktion
# Leader Election Funktion
def
start_election
(
queue
):
def
start_election
(
queue
):
"""
Startet die Leader Election basierend auf empfangenen Nachrichten.
"""
"""
Startet die Leader Election basierend auf empfangenen Nachrichten.
"""
global
current_leader
print
(
"
Starting election...
"
)
print
(
"
Starting election...
"
)
broadcast
(
f
"
START_ELECTION:
{
server_id
}
"
)
broadcast
(
f
"
START_ELECTION:
{
server_id
}
"
)
#sends broadcast, ignores his own id only in listen function
current_leader
=
None
#current_leader = None
while
True
:
timeout
=
time
.
time
()
+
15
# 5 Sekunden auf Antworten warten
highest_id
=
server_id
while
time
.
time
()
<
timeout
:
# Auf Nachrichten aus der Queue warten
# Auf Nachrichten aus der Queue warten
try
:
try
:
message
=
queue
.
get
()
message
=
queue
.
get
(
timeout
=
1
)
# Nachricht verarbeiten
# Nachricht verarbeiten
if
"
START_ELECTION
"
in
message
:
if
"
START_ELECTION
"
in
message
:
sender_uuid
=
message
.
split
(
"
:
"
)[
1
]
sender_uuid
=
message
.
split
(
"
:
"
)[
1
]
print
(
"
extracted uuid?:
"
,
sender_uuid
)
print
(
"
extracted uuid?:
"
,
sender_uuid
)
active_servers
.
add
(
sender_uuid
)
print
(
f
"
Received UUID for election:
{
sender_uuid
}
"
)
# Vergleich der UUIDs für Leader Election
# Vergleich der UUIDs für Leader Election
if
sender_uuid
>
server_id
:
if
sender_uuid
>
highest_id
:
highest_id
=
sender_uuid
print
(
f
"
Received higher ID
{
sender_uuid
}
, forwarding...
"
)
print
(
f
"
Received higher ID
{
sender_uuid
}
, forwarding...
"
)
broadcast
(
f
"
START_ELECTION
{
sender_uuid
}
"
)
broadcast
(
f
"
START_ELECTION:
{
sender_uuid
}
"
)
elif
sender_uuid
<
server_id
:
elif
sender_uuid
<
server_id
:
highest_id
=
server_id
print
(
f
"
Received lower ID
{
sender_uuid
}
, sending own ID...
"
)
print
(
f
"
Received lower ID
{
sender_uuid
}
, sending own ID...
"
)
broadcast
(
f
"
START_ELECTION
{
server_id
}
"
)
broadcast
(
f
"
START_ELECTION
:
{
server_id
}
"
)
else
:
else
:
# Wir sind der Leader
# Wir sind der Leader
current_leader
=
server_id
current_leader
=
server_id
broadcast
(
f
"
LEADER
{
server_id
}
"
)
broadcast
(
f
"
LEADER
:
{
server_id
}
"
)
print
(
f
"
I am the leader:
{
server_id
}
"
)
print
(
f
"
I am the leader:
{
server_id
}
"
)
elif
"
LEADER
"
in
message
:
elif
"
LEADER
"
in
message
:
# Leader wurde gewählt
# Leader wurde gewählt
current_leader
=
message
.
split
(
"
"
)[
1
]
leader_uuid
=
message
.
split
(
"
:
"
)[
1
]
current_leader
=
leader_uuid
print
(
f
"
Leader elected:
{
current_leader
}
"
)
print
(
f
"
Leader elected:
{
current_leader
}
"
)
return
except
multiprocessing
.
queues
.
Empty
:
continue
# Nach Timeout: Eigener Server wird Leader, falls niemand anderes gewählt wurde
if
highest_id
==
server_id
:
current_leader
=
server_id
broadcast
(
f
"
LEADER
{
server_id
}
"
)
print
(
f
"
I am the leader:
{
server_id
}
"
)
else
:
print
(
f
"
Leader election finished, leader is
{
highest_id
}
"
)
################### Heartbeat-Funktion
def
send_heartbeat
():
"""
Sendet regelmäßig Heartbeat-Nachrichten, um die Server-ID aktiv zu halten.
"""
while
True
:
broadcast
(
f
"
HEARTBEAT:
{
server_id
}
"
)
time
.
sleep
(
3
)
# Alle 3 Sekunden
except
Exception
as
e
:
def
monitor_heartbeats
():
print
(
f
"
Error during election:
{
e
}
"
)
"""
Überprüft die aktiven Server anhand von Heartbeat-Nachrichten.
"""
break
global
active_servers
while
True
:
time
.
sleep
(
6
)
# Alle 6 Sekunden überprüfen
now
=
time
.
time
()
active_servers
=
{
server
for
server
,
last_seen
in
active_servers
.
items
()
if
now
-
last_seen
<
6
}
print
(
f
"
Active servers:
{
active_servers
}
"
)
# Main
# Main
if
__name__
==
"
__main__
"
:
if
__name__
==
"
__main__
"
:
...
@@ -88,8 +129,12 @@ if __name__ == "__main__":
...
@@ -88,8 +129,12 @@ if __name__ == "__main__":
# Prozesse erstellen
# Prozesse erstellen
listener_process
=
multiprocessing
.
Process
(
target
=
listen
,
args
=
(
message_queue
,))
listener_process
=
multiprocessing
.
Process
(
target
=
listen
,
args
=
(
message_queue
,))
#time.sleep(10)
election_process
=
multiprocessing
.
Process
(
target
=
start_election
,
args
=
(
message_queue
,))
election_process
=
multiprocessing
.
Process
(
target
=
start_election
,
args
=
(
message_queue
,))
# Heartbeat in einem separaten Thread
heartbeat_thread
=
threading
.
Thread
(
target
=
send_heartbeat
,
daemon
=
True
)
# Prozesse starten
# Prozesse starten
listener_process
.
start
()
listener_process
.
start
()
election_process
.
start
()
election_process
.
start
()
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment