From 73e012aedd31f7ff05e7ccf354d9a6d9359bb0f4 Mon Sep 17 00:00:00 2001 From: Dominik Fuhrmann <dominik.fuhrmann1@gmail.com> Date: Tue, 24 Dec 2024 14:00:46 +0100 Subject: [PATCH] new scripts --- communicationScripts/client.py | 191 ++++++++++++++++++++++++---- communicationScripts/server.py | 223 ++++++++++++++++++++++++++------- 2 files changed, 350 insertions(+), 64 deletions(-) diff --git a/communicationScripts/client.py b/communicationScripts/client.py index 222cfcc..8a72fc7 100644 --- a/communicationScripts/client.py +++ b/communicationScripts/client.py @@ -1,31 +1,178 @@ -import wolfssl +#!/usr/bin/env python +# +# -*- coding: utf-8 -*- +# +# client.py +# +# Copyright (C) 2006-2020 wolfSSL Inc. +# +# This file is part of wolfSSL. (formerly known as CyaSSL) +# +# wolfSSL is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# wolfSSL is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + +# pylint: disable=missing-docstring, invalid-name, import-error + +import sys import socket -import logging +import argparse + +try: + import wolfssl +except ImportError: + print("You must run 'python setup.py install' to use the examples") + sys.exit() + +def build_arg_parser(): + parser = argparse.ArgumentParser(add_help=False) + + parser.add_argument( + "-?", "--help", action="help", + help="show this help message and exit" + ) + + parser.add_argument( + "-h", metavar="host", default="127.0.0.1", + help="Host to connect to, default 127.0.0.1" + ) + + parser.add_argument( + "-p", metavar="port", type=int, default=11111, + help="Port to connect on, not 0, default 11111" + ) + + parser.add_argument( + "-v", metavar="version", type=int, choices=[0, 1, 2, 3, 4, 5], + default=5, + help="SSL version [0-5]" + "(SSLv3, TLSv1, TLSv1.1, TLSv1.2, TLSv1.3, SSLv23)" + ) + + parser.add_argument( + "-u", action="store_true", + help="Use UDP DTLS, add -v 0 for DTLSv1, -v 1 for DTLSv1.2 (default)" + ) + + parser.add_argument( + "-l", metavar="ciphers", type=str, default="", + help="Cipher suite list (: delimited)" + ) + + parser.add_argument( + "-c", metavar="certificate", default="./certs/client-cert.pem", + help="Certificate file, default ./certs/client-cert.pem" + ) + + parser.add_argument( + "-k", metavar="key", default="./certs/client-key.pem", + help="Key file, default ./certs/client-key.pem" + ) + + parser.add_argument( + "-A", metavar="ca_file", default="./certs/ca-cert.pem", + help="Certificate Authority file, default ./certs/ca-cert.pem" + ) + + parser.add_argument( + "-d", action="store_true", + help="Disable client cert check" + ) + + parser.add_argument( + "-g", action="store_true", + help="Send server HTTP GET" + ) + + parser.add_argument( + "-C", action="store_true", + help="Disable CRL" + ) + + parser.add_argument( + "-r", metavar="crl_file", default="./certs/crl.pem", + help="CRL file, default ./certs/crl.pem" + ) + + + return parser + + +def get_SSLmethod(index): + return ( + wolfssl.PROTOCOL_SSLv3, + wolfssl.PROTOCOL_TLSv1, + wolfssl.PROTOCOL_TLSv1_1, + wolfssl.PROTOCOL_TLSv1_2, + wolfssl.PROTOCOL_TLSv1_3, + wolfssl.PROTOCOL_SSLv23 + )[index] + +def get_DTLSmethod(index): + return ( + wolfssl.PROTOCOL_DTLSv1, + wolfssl.PROTOCOL_DTLSv1_2, + wolfssl.PROTOCOL_DTLSv1_3 + )[index] + +def main(): + args = build_arg_parser().parse_args() + + # DTLS connection over UDP + if args.u: + bind_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + context = wolfssl.SSLContext(get_DTLSmethod(args.v)) + # SSL/TLS connection over TCP + else: + bind_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + context = wolfssl.SSLContext(get_SSLmethod(args.v)) + + # enable debug, if native wolfSSL has been compiled with '--enable-debug' + wolfssl.WolfSSL.enable_debug() + + context.load_cert_chain(args.c, args.k) + + if args.d: + context.verify_mode = wolfssl.CERT_NONE + else: + context.verify_mode = wolfssl.CERT_REQUIRED + context.load_verify_locations(args.A) -# Logging konfigurieren -logging.basicConfig(level=logging.DEBUG) + if args.l: + context.set_ciphers(args.l) -# SSLContext für den Client -context = wolfssl.SSLContext(wolfssl.SSLv23_METHOD) + try: + secure_socket = context.wrap_socket(bind_socket) + + if not args.C: + secure_socket.enable_crl(1) + secure_socket.load_crl_file(args.r, 1); + + secure_socket.connect((args.h, args.p)) -# Client-Socket einrichten -client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if args.g: + secure_socket.write(b"GET / HTTP/1.1\n\n") + else: + secure_socket.write(b"hello wolfssl") -# SSL-Verbindung zum Server aufbauen (IPv4-Adresse verwenden) -ssl_connection = context.wrap_socket(client_socket, server_side=False) -ssl_connection.connect(('192.168.178.63', 10023)) # IPv4-Adresse des Servers + print("\n", secure_socket.read(), "\n") -logging.info("SSL-Verbindung zum Server aufgebaut.") + except KeyboardInterrupt: + print() -# Nachricht senden -message = "Hallo Server!" -ssl_connection.sendall(message.encode()) -logging.info(f"Nachricht gesendet: {message}") + finally: + secure_socket.close() -# Empfangene Nachricht lesen (Echo) -response = ssl_connection.recv(1024) -logging.info(f"Empfangene Antwort: {response.decode()}") -# Verbindung schließen -ssl_connection.close() -logging.info("Verbindung geschlossen.") +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/communicationScripts/server.py b/communicationScripts/server.py index 1b46f2b..d0addc3 100644 --- a/communicationScripts/server.py +++ b/communicationScripts/server.py @@ -1,43 +1,182 @@ -import wolfssl +#!/usr/bin/env python +# +# -*- coding: utf-8 -*- +# +# server.py +# +# Copyright (C) 2006-2020 wolfSSL Inc. +# +# This file is part of wolfSSL. (formerly known as CyaSSL) +# +# wolfSSL is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# wolfSSL is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + +# pylint: disable=missing-docstring, invalid-name, import-error + +import sys import socket -import logging - -# Logging konfigurieren -logging.basicConfig(level=logging.DEBUG) - -# SSLContext für den Server -context = wolfssl.SSLContext(wolfssl.SSLv23_METHOD) -context.use_certificate_file('server-cert.pem') # Server-Zertifikat -context.use_privatekey_file('server-key.pem') # Server-Schlüssel - -# Server-Socket einrichten -server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -server_socket.bind(('192.168.178.63', 10023)) # IPv4-Adresse verwenden -server_socket.listen(1) - -logging.info("Server läuft und wartet auf Verbindungen...") - -while True: - # Akzeptiere eingehende Verbindungen - client_socket, client_address = server_socket.accept() - logging.info(f"Verbindung von {client_address} akzeptiert.") - - # SSL-Verbindung aufbauen - ssl_connection = context.wrap_socket(client_socket, server_side=True) - logging.info("SSL-Verbindung aufgebaut.") - - try: - # Empfangene Nachricht lesen - data = ssl_connection.recv(1024) - logging.info(f"Empfangene Nachricht: {data.decode()}") - - # Nachricht zurückschicken (Echo) - ssl_connection.sendall(data) - logging.info(f"Nachricht zurückgeschickt: {data.decode()}") - - except Exception as e: - logging.error(f"Fehler: {e}") - - finally: - ssl_connection.close() - logging.info("Verbindung geschlossen.") +import argparse + +try: + import wolfssl +except ImportError: + print("You must run 'python setup.py install' to use the examples") + sys.exit() + +def build_arg_parser(): + parser = argparse.ArgumentParser(add_help=False) + + parser.add_argument( + "-?", "--help", action="help", + help="show this help message and exit" + ) + + parser.add_argument( + "-p", metavar="port", type=int, default=11111, + help="Port to listen on, not 0, default 11111" + ) + + parser.add_argument( + "-v", metavar="version", type=int, choices=[0, 1, 2, 3, 4, 5], + default=5, + help="SSL version [0-5]" + "(SSLv3, TLSv1, TLSv1.1, TLSv1.2, TLSv1_3, SSLv23)" + ) + + parser.add_argument( + "-u", action="store_true", + help="Use UDP DTLS, add -v 0 for DTLSv1, -v 1 for DTLSv1.2" + ) + + parser.add_argument( + "-l", metavar="ciphers", type=str, default="", + help="Cipher suite list (: delimited)" + ) + + parser.add_argument( + "-c", metavar="certificate", default="./certs/server-cert.pem", + help="Certificate file, default ./certs/server-cert.pem" + ) + + parser.add_argument( + "-k", metavar="key", default="./certs/server-key.pem", + help="Key file, default ./certs/server-key.pem" + ) + + parser.add_argument( + "-A", metavar="ca_file", default="./certs/client-cert.pem", + help="Certificate Authority file, default ./certs/client-cert.pem" + ) + + parser.add_argument( + "-d", action="store_true", + help="Disable client cert check" + ) + + parser.add_argument( + "-b", action="store_true", + help="Bind to any interface instead of localhost only" + ) + + parser.add_argument( + "-i", action="store_true", + help="Loop indefinitely (allow repeated connections)" + ) + + return parser + + +def get_SSLmethod(index): + return ( + wolfssl.PROTOCOL_SSLv3, + wolfssl.PROTOCOL_TLSv1, + wolfssl.PROTOCOL_TLSv1_1, + wolfssl.PROTOCOL_TLSv1_2, + wolfssl.PROTOCOL_TLSv1_3, + wolfssl.PROTOCOL_SSLv23 + )[index] + +def get_DTLSmethod(index): + return ( + wolfssl.PROTOCOL_DTLSv1, + wolfssl.PROTOCOL_DTLSv1_2, + wolfssl.PROTOCOL_DTLSv1_3 + )[index] + + +def main(): + args = build_arg_parser().parse_args() + # DTLS connection over UDP + if args.u: + # Set DTLSv1.2 as default if unspecified + if args.v == 5: + args.v = 1 + bind_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + bind_socket.bind(("" if args.b else "localhost", args.p)) + data, from_addr = bind_socket.recvfrom(1) + context = wolfssl.SSLContext(get_DTLSmethod(args.v), server_side=True) + # SSL/TLS connection over TCP + else: + bind_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + bind_socket.bind(("" if args.b else "localhost", args.p)) + bind_socket.listen(5) + context = wolfssl.SSLContext(get_SSLmethod(args.v), server_side=True) + + print("Server listening on port", bind_socket.getsockname()[1]) + + # enable debug, if native wolfSSL has been compiled with '--enable-debug' + wolfssl.WolfSSL.enable_debug() + + context.load_cert_chain(args.c, args.k) + + if args.d: + context.verify_mode = wolfssl.CERT_NONE + else: + context.verify_mode = wolfssl.CERT_REQUIRED + context.load_verify_locations(args.A) + + if args.l: + context.set_ciphers(args.l) + + while True: + try: + secure_socket = None + if args.u: + secure_socket = context.wrap_socket(bind_socket) + else: + new_socket, from_addr = bind_socket.accept() + secure_socket = context.wrap_socket(new_socket) + + print("Connection received from", from_addr) + + print("\n", secure_socket.read(), "\n") + secure_socket.write(b"I hear you fa shizzle!") + + except KeyboardInterrupt: + print() + break + + finally: + if secure_socket: + secure_socket.shutdown(socket.SHUT_RDWR) + secure_socket.close() + + if not args.i: + break + + bind_socket.close() + + +if __name__ == '__main__': + main() \ No newline at end of file -- GitLab