Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
D
Distributed-Systems
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Katharina Willig
Distributed-Systems
Commits
adf52f7c
Commit
adf52f7c
authored
11 months ago
by
Katharina
Browse files
Options
Downloads
Patches
Plain Diff
comments
parent
e7caf57c
Loading
Loading
No related merge requests found
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
kserver.py
+24
-24
24 additions, 24 deletions
kserver.py
with
24 additions
and
24 deletions
kserver.py
+
24
−
24
View file @
adf52f7c
...
...
@@ -7,10 +7,10 @@ import threading
broadcast_ip
=
'
255.255.255.255
'
broadcast_port
=
55555
#
Erstelle eine eindeutig
e
S
erver-ID
#
creates uniqu
e
s
erver-ID
server_id
=
str
(
uuid
.
uuid4
())
# Socket f
ü
r
B
roadcast
erstellen
# Socket f
o
r
b
roadcast
server_socket
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_DGRAM
)
server_socket
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_BROADCAST
,
1
)
server_socket
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
...
...
@@ -18,18 +18,18 @@ server_socket.bind(('', broadcast_port))
print
(
f
"
Server is running with ID
{
server_id
}
and broadcasting on port
{
broadcast_port
}
...
"
)
#
G
lobal
e L
ist
e
f
ü
r
bekannte Server-IDs
#
g
lobal
l
ist f
o
r
known server-IDs (which does not work atm)
active_servers
=
set
()
current_leader
=
None
#
B
roadcast-
F
un
k
tion
#
b
roadcast-
f
un
c
tion
def
broadcast
(
message
):
"""
Sendet Nachrichten an alle Server-IDs im Netzwerk
"""
full_message
=
f
"
[
{
server_id
}
]
{
message
}
"
.
encode
()
server_socket
.
sendto
(
full_message
,
(
broadcast_ip
,
broadcast_port
))
#
L
istener-
F
un
k
tion
#
l
istener-
f
un
c
tion
def
listen
(
queue
):
"""
Empfängt Nachrichten von anderen Prozessen und leitet sie weiter.
"""
while
True
:
...
...
@@ -37,42 +37,41 @@ def listen(queue):
message
,
client_address
=
server_socket
.
recvfrom
(
4096
)
decoded_message
=
message
.
decode
()
#
I
gnor
iere Nachrichten vom eigene
n
S
erver/ also running for clients from this ip
#
i
gnor
e messages from ow
n
s
erver
-ID
/ also running for clients from this ip
if
decoded_message
.
startswith
(
f
"
[
{
server_id
}
]
"
):
continue
print
(
f
"
Received from
{
client_address
}
:
{
decoded_message
}
"
)
#
Nachricht in die Queue legen, damit der B
roadcast
-P
ro
z
ess
darauf reagieren kann
#
put message in queue, that b
roadcast
p
ro
c
ess
can react
queue
.
put
(
decoded_message
)
except
socket
.
error
as
e
:
print
(
f
"
Socket error occurred:
{
e
}
"
)
break
#
L
eader
E
lection
F
un
k
tion
#
l
eader
e
lection
f
un
c
tion
def
start_election
(
queue
):
"""
Startet die Leader Election basierend auf empfangenen Nachrichten.
"""
global
current_leader
print
(
"
Starting election...
"
)
broadcast
(
f
"
START_ELECTION:
{
server_id
}
"
)
#sends broadcast, ignores his own id only in listen function
#current_leader = None
timeout
=
time
.
time
()
+
15
# 5 Sekunden auf Antworten warten
timeout
=
time
.
time
()
+
20
# wait 20 secs for answers
highest_id
=
server_id
while
time
.
time
()
<
timeout
:
#
Auf Nachrichten aus der Queue warten
#
wait for messages from queue
try
:
message
=
queue
.
get
(
timeout
=
1
)
#
Nachricht verarbeiten
#
processing messages
if
"
START_ELECTION
"
in
message
:
sender_uuid
=
message
.
split
(
"
:
"
)[
1
]
#print("extracted uuid?:", sender_uuid)
active_servers
.
add
(
sender_uuid
)
print
(
f
"
Received UUID for election:
{
sender_uuid
}
"
)
#
Vergleich der
UUIDs f
ü
r
L
eader
E
lection
#
compare
UUIDs f
o
r
l
eader
e
lection
if
sender_uuid
>
highest_id
:
highest_id
=
sender_uuid
print
(
f
"
Received higher ID
{
sender_uuid
}
, forwarding...
"
)
...
...
@@ -83,13 +82,13 @@ def start_election(queue):
print
(
f
"
Received lower ID
{
sender_uuid
}
, sending own ID...
"
)
broadcast
(
f
"
START_ELECTION:
{
server_id
}
"
)
else
:
#
Wir sind der L
eader
#
you are the l
eader
:
current_leader
=
server_id
broadcast
(
f
"
LEADER:
{
server_id
}
"
)
print
(
f
"
I am the leader:
{
server_id
}
"
)
elif
"
LEADER
"
in
message
:
#
L
eader w
urde gewählt
#
l
eader w
as elected
leader_uuid
=
message
.
split
(
"
:
"
)[
1
]
current_leader
=
leader_uuid
print
(
f
"
Leader elected:
{
current_leader
}
"
)
...
...
@@ -99,7 +98,7 @@ def start_election(queue):
except
multiprocessing
.
queues
.
Empty
:
continue
#
Nach T
imeout:
Eigener Server wird Leader, falls niemand anderes gewählt wurde
#
after t
imeout:
own server becomes leader, if no other has been chosen
if
highest_id
==
server_id
:
current_leader
=
server_id
broadcast
(
f
"
LEADER
{
server_id
}
"
)
...
...
@@ -107,41 +106,42 @@ def start_election(queue):
else
:
print
(
f
"
Leader election finished, leader is
{
highest_id
}
"
)
################### Heartbeat-
F
un
k
tion
################### Heartbeat-
f
un
c
tion
not working###########################
def
send_heartbeat
():
"""
Sendet regelmäßig Heartbeat-Nachrichten, um die Server-ID aktiv zu halten.
"""
while
True
:
broadcast
(
f
"
HEARTBEAT:
{
server_id
}
"
)
time
.
sleep
(
3
)
#
Alle 3 Sekunden
time
.
sleep
(
3
)
#
sends every 3 seconds
def
monitor_heartbeats
():
"""
Überprüft die aktiven Server anhand von Heartbeat-Nachrichten.
"""
global
active_servers
while
True
:
time
.
sleep
(
6
)
#
Alle 6 Sekunden überprüfen
time
.
sleep
(
6
)
#
checks every 6 seconds
now
=
time
.
time
()
active_servers
=
{
server
for
server
,
last_seen
in
active_servers
.
items
()
if
now
-
last_seen
<
6
}
print
(
f
"
Active servers:
{
active_servers
}
"
)
################################################################################
# Main
if
__name__
==
"
__main__
"
:
#
Q
ueue f
ü
r
die K
ommuni
k
ation
zwisch
en
P
ro
z
esse
n
#
q
ueue f
o
r
c
ommuni
c
ation
betwe
en
p
ro
c
esse
s
message_queue
=
multiprocessing
.
Queue
()
#
Prozesse erstellen
#
create processes
listener_process
=
multiprocessing
.
Process
(
target
=
listen
,
args
=
(
message_queue
,))
#time.sleep(10)
election_process
=
multiprocessing
.
Process
(
target
=
start_election
,
args
=
(
message_queue
,))
#
H
eartbeat in
einem
sep
a
rate
n T
hread
#
h
eartbeat in sep
e
rate
t
hread
###########
heartbeat_thread
=
threading
.
Thread
(
target
=
send_heartbeat
,
daemon
=
True
)
#
P
ro
z
esse
starten
#
start p
ro
c
esse
s
listener_process
.
start
()
election_process
.
start
()
try
:
#
Haupt
pro
z
ess
wartet auf Beendigung der S
ubpro
z
esse
#
main
pro
c
ess
is waiting for the s
ub
pro
c
esse
s to finish
listener_process
.
join
()
election_process
.
join
()
except
KeyboardInterrupt
:
...
...
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
sign in
to comment