Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
D
Distributed-Systems
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Katharina Willig
Distributed-Systems
Commits
d0015059
Commit
d0015059
authored
8 months ago
by
Katharina
Browse files
Options
Downloads
Patches
Plain Diff
multiprocessing
parent
03bf6bae
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
kserver.py
+71
-57
71 additions, 57 deletions
kserver.py
with
71 additions
and
57 deletions
kserver.py
+
71
−
57
View file @
d0015059
import
socket
import
thread
ing
import
multiprocess
ing
import
uuid
broadcast_ip
=
'
255.255.255.255
'
broadcast_port
=
55555
#
Create a uniqu
e
s
erver
ID
#
Erstelle eine eindeutig
e
S
erver
-
ID
server_id
=
str
(
uuid
.
uuid4
())
#
Create server s
ocket f
o
r
b
roadcast
ing
#
S
ocket f
ü
r
B
roadcast
erstellen
server_socket
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_DGRAM
)
server_socket
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_BROADCAST
,
1
)
server_socket
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
...
...
@@ -16,74 +16,88 @@ server_socket.bind(('', broadcast_port))
print
(
f
"
Server is running with ID
{
server_id
}
and broadcasting on port
{
broadcast_port
}
...
"
)
# Store the current leader's ID
current_leader
=
None
# Broadcast function
# Broadcast-Funktion
def
broadcast
(
message
):
"""
Send
messages to all other nodes in the netwo
rk
.
"""
"""
Send
et Nachrichten an alle Server-IDs im Netzwe
rk
"""
full_message
=
f
"
[
{
server_id
}
]
{
message
}
"
.
encode
()
server_socket
.
sendto
(
full_message
,
(
broadcast_ip
,
broadcast_port
))
# Listener function
def
listen
():
"""
Receive messages and implement LCR logic.
"""
global
current_leader
# Listener-Funktion
def
listen
(
queue
):
"""
Empfängt Nachrichten von anderen Prozessen und leitet sie weiter.
"""
while
True
:
try
:
message
,
client_address
=
server_socket
.
recvfrom
(
4096
)
decoded_message
=
message
.
decode
()
# Extract sender UUID and payload
if
decoded_message
.
startswith
(
"
[
"
)
and
"
]
"
in
decoded_message
:
sender_uuid
=
decoded_message
.
split
(
"
]
"
)[
0
][
1
:]
payload
=
decoded_message
.
split
(
"
]
"
)[
1
].
strip
()
# Ignore messages from self
if
sender_uuid
==
server_id
:
continue
# LCR logic
if
payload
==
"
START_ELECTION
"
:
print
(
f
"
Received election start from
{
sender_uuid
}
"
)
if
sender_uuid
>
server_id
:
# Forward the sender's ID
broadcast
(
f
"
START_ELECTION
{
sender_uuid
}
"
)
elif
sender_uuid
<
server_id
:
# Send own ID into the ring
broadcast
(
f
"
START_ELECTION
{
server_id
}
"
)
else
:
# If sender_uuid == server_id, we are the leader
current_leader
=
server_id
broadcast
(
f
"
LEADER
{
server_id
}
"
)
print
(
f
"
I am the leader:
{
server_id
}
"
)
elif
payload
.
startswith
(
"
LEADER
"
):
leader_id
=
payload
.
split
(
"
"
)[
1
]
current_leader
=
leader_id
print
(
f
"
Leader has been elected:
{
leader_id
}
"
)
# Ignoriere Nachrichten vom eigenen Server
if
decoded_message
.
startswith
(
f
"
[
{
server_id
}
]
"
):
continue
except
socket
.
error
as
e
:
print
(
f
"
An error occurred:
{
e
}
"
)
break
print
(
f
"
Received from
{
client_address
}
:
{
decoded_message
}
"
)
# Nachricht in die Queue legen, damit der Broadcast-Prozess darauf reagieren kann
queue
.
put
(
decoded_message
)
except
KeyboardInterrupt
:
print
(
"
\n
Shutting down server...
"
)
except
socket
.
error
as
e
:
print
(
f
"
Socket error occurred:
{
e
}
"
)
break
#
Start the elec
tion
def
start_election
():
"""
Initiate the LCR election process
.
"""
#
Leader Election Funk
tion
def
start_election
(
queue
):
"""
Startet die Leader Election basierend auf empfangenen Nachrichten
.
"""
print
(
"
Starting election...
"
)
broadcast
(
f
"
START_ELECTION
{
server_id
}
"
)
current_leader
=
None
while
True
:
# Auf Nachrichten aus der Queue warten
try
:
message
=
queue
.
get
()
# Nachricht verarbeiten
if
"
START_ELECTION
"
in
message
:
sender_uuid
=
message
.
split
(
"
"
)[
1
]
# Vergleich der UUIDs für Leader Election
if
sender_uuid
>
server_id
:
print
(
f
"
Received higher ID
{
sender_uuid
}
, forwarding...
"
)
broadcast
(
f
"
START_ELECTION
{
sender_uuid
}
"
)
elif
sender_uuid
<
server_id
:
print
(
f
"
Received lower ID
{
sender_uuid
}
, sending own ID...
"
)
broadcast
(
f
"
START_ELECTION
{
server_id
}
"
)
else
:
# Wir sind der Leader
current_leader
=
server_id
broadcast
(
f
"
LEADER
{
server_id
}
"
)
print
(
f
"
I am the leader:
{
server_id
}
"
)
elif
"
LEADER
"
in
message
:
# Leader wurde gewählt
current_leader
=
message
.
split
(
"
"
)[
1
]
print
(
f
"
Leader elected:
{
current_leader
}
"
)
except
Exception
as
e
:
print
(
f
"
Error during election:
{
e
}
"
)
break
# Main
if
__name__
==
"
__main__
"
:
# Start the listener thread
listen_thread
=
threading
.
Thread
(
target
=
listen
)
listen_thread
.
start
()
# Start election after a short delay
import
time
time
.
sleep
(
2
)
start_election
()
# Queue für die Kommunikation zwischen Prozessen
message_queue
=
multiprocessing
.
Queue
()
# Prozesse erstellen
listener_process
=
multiprocessing
.
Process
(
target
=
listen
,
args
=
(
message_queue
,))
election_process
=
multiprocessing
.
Process
(
target
=
start_election
,
args
=
(
message_queue
,))
# Prozesse starten
listener_process
.
start
()
election_process
.
start
()
try
:
# Hauptprozess wartet auf Beendigung der Subprozesse
listener_process
.
join
()
election_process
.
join
()
except
KeyboardInterrupt
:
print
(
"
\n
Shutting down server...
"
)
listener_process
.
terminate
()
election_process
.
terminate
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment