Skip to content
Snippets Groups Projects
Commit 241d2f92 authored by Simon Flaisch's avatar Simon Flaisch
Browse files

I commented the entire code, so it is understandable for everyone!

parent f3ff164b
No related branches found
No related tags found
No related merge requests found
......@@ -16,60 +16,64 @@ from fastapi.responses import StreamingResponse
import imutils
# Ignorierung der Warnings
warnings.filterwarnings("ignore")
# Initialisiere Text-to-Speech
# Initialisiere Text-to-Speech Engine
engine = pyttsx3.init('sapi5')
# Initialisierung einer Instanz von FastAPI für die Bereitstellung des Videostreams
app = FastAPI()
# Definiere Kamera und starte Videoaufnahme
# Try and expect Block, um den Videostream des Video Capturing Services einzufangen. Hierbei wird versucht die
# Verbindung herzustellen und wenn keine hergestellt werden kann, wird das Programm abgebrochen!
try:
video_capture =VideoStream("http://localhost:80/video_feed").start()
except Exception as e:
print(f"We couldn't reach the camera")
exit()
# Definiere, welche Cascade OpenCV verwenden soll
# Definiere, welche Cascade OpenCV verwenden soll. Dabei können verschiedene ausgewählt werden,
# für unterschiedlichste Usecases
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
# Wartet, um sicherzugehen, dass die Verbindung mit dem Videostream established ist.
time.sleep(2.0)
#Main Klasse in dem, die meiste Logik unseres Skripts abläuft
class Main:
#Initialisierung der benötigten Variablen für unser Skript
def __init__(self):
self.outputFrame = None
self.lock = threading.Lock()
self.wait_in_thread = None
self.streamThread = None
self.analysis_results = None
self.analysis_thread = None
self.speaking_thread = None # Verfolgt den aktuellen Sprech-Thread
self.main_time = 0
self.last_chat_time = 0 # Zeitpunkt des letzten Sprechens API kosten beachten
self.currentclient = None
self.unacked_publish = set()
self.flag = False
self.selection = 3
self.outputFrame = None # Outputframe für den Stream.
self.lock = threading.Lock() #Instanz der Threading.lock Klasse, welches ein primitives Lock object darstellt. Nur ein Thread kann auf dieses immer zugreifen und sperrt es für die anderen
self.streamThread = None #Variable für den Streamthread
self.analysis_results = None # Analysergebnisse
self.analysis_thread = None #Variable für den Analysethread
self.speaking_thread = None # Variable für den aktuellen Speakingthread
self.main_time = 0 # Zeitpunkt der letzten Gesichtsanalyse
self.last_chat_time = 0 # Zeitpunkt des letzten Sprechens - API kosten beachten
self.currentclient = None # Variable für den Mqtt client
self.unacked_publish = set() # Nicht rausgesendete MQTT Nachrichten
self.flag = False # Eine normale Flag für den Stream
self.selection = 3 #Variablen für die Logik des animierten Maskottchen
self.it = 0
# Funktion für den Stream - öffnet eine Fastapi auf vorgegebenen IP und Port
def stream(self):
uvicorn.run(host="localhost", port=89, app=app)
uvicorn.run(host="192.168.1.1", port=89, app=app)
# Thread für die Streaming-Funktion
def streamingThread(self):
self.streamThread = threading.Thread(target=self.stream)
self.streamThread.daemon = True
self.streamThread.daemon = True # Als Daemon gesetzt, damit der Code sich nach Terminierung der Main beendet
self.streamThread.start()
# Funktion für die Generierung des Streams
def generate(self):
# grab global references to the output frame and lock variables
# loop over frames from the output stream
while True:
# wait until the lock is acquired
with self.lock:
with self.lock: #Dieser Block wartet das ein Lock aquired wird und released es mit erfolgreichen Ausführen.
# check if the output frame is available, otherwise skip
# the iteration of the loop
if self.outputFrame is None:
......@@ -82,50 +86,54 @@ class Main:
# yield the output frame in the byte format
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
bytearray(encodedImage) + b'\r\n')
#Funktion wird mit der erfolgreichen Verbindung zum MQTT ausgeführt - Subscribed den Client zu Topic 1
def on_connect(self,client, userdata, flags, reason_code, properties):
print(f"Connected with result code {reason_code}\n")
client.subscribe("Topic1")
#Funktion, die bei einem Publish in ein Topic ausgeführt wird.
def on_publish(self,client, userdata, mid, reason_code, properties):
try:
userdata.remove(mid)
userdata.remove(mid) #Removed die gesendete Nachricht vom Speicher, um eine doppelte Sendung zu verhindern
except KeyError:
"Something went wrong. Please check if you have removed the mid from the userdata"
# Funktion zur Verbindung mit MQTT
def connectwithmqtt(self, adress:str, targetport:int):
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, userdata=None)
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, userdata=None) #Intitalisierung des Clients
mqttc.on_connect = self.on_connect
mqttc.on_publish = self.on_publish
mqttc.user_data_set(self.unacked_publish)
mqttc.username_pw_set(username="standardUser", password="GreatHHZ4Ever!")
mqttc.user_data_set(self.unacked_publish) # Den lokalen Speicher als User data setzen
mqttc.username_pw_set(username="standardUser", password="GreatHHZ4Ever!") # Username und Passwort - vorgegeben vom MQTT Server
mqttc.connect(host=adress, port=targetport)
while not mqttc.is_connected():
mqttc.loop()
print("Waiting for connection...")
return mqttc
#Funktion zum senden einer Nachricht
def sendMessage(self, mqttc, topic, message):
mqttc.loop_start()
mqttc.loop_start() # Started einen gethreaded connection loop zum senden der Nachrichten.
msg = mqttc.publish(topic, message, qos=1)
self.unacked_publish.add(msg.mid)
self.unacked_publish.add(msg.mid) # Mid sind die Daten der aktuellen Nachricht
while len(self.unacked_publish):
time.sleep(0.1)
time.sleep(0.1) # Wartet bis alle gequeten Nachrichten gepublished werden
msg.wait_for_publish()
# Funktion für Text to speeech.
def speak(self, text):
engine.say(text)
engine.runAndWait()
#Funktion fürs Threading von Text to Speech
def speak_in_thread(self, text):
if (self.speaking_thread is None or not self.speaking_thread.is_alive()):
if (self.speaking_thread is None or not self.speaking_thread.is_alive()): #Abfrage, ob ein Thread am leben oder defifniert ist, wenn ja keine Ausführung!
self.speaking_thread = threading.Thread(target=self.speak, args=(text,))
self.speaking_thread.daemon = True
self.speaking_thread.start()
# Funktion fürs Threading von der Gesichtsanalyse
def facial_analysis_thread(self, faceframe):
if (self.analysis_thread is None or not self.analysis_thread.is_alive()):
if (self.analysis_thread is None or not self.analysis_thread.is_alive()): #Abfrage, ob ein Thread am leben oder definiert ist, wenn ja keine Ausführung!
self.analysis_thread = threading.Thread(target=self.facial_analysis, args=(faceframe,))
self.analysis_thread.daemon = True
self.analysis_thread.start()
# Funktion Gesichtsanalyse
def facial_analysis(self, faceframe):
self.analysis_results = DeepFace.analyze(faceframe, actions=["emotion", "age", "gender"], enforce_detection=False)
......@@ -134,19 +142,20 @@ class Main:
def welcome_message(self):
self.speak_in_thread('Welcome to the face recognition and prediction tool of the herman hollerith center')
#Threading des Senden des ChatGPT prompts
def send_emotion_to_chatgpt_and_speak(self, emotion):
current_time = time.time()
if current_time - self.last_chat_time < 10:
current_time = time.time() #Setzen der aktuellen Zeit
if current_time - self.last_chat_time < 10: #Timer für 10 Sekunden
return
self.last_chat_time = current_time
self.last_chat_time = current_time # Reset des Timers
request_thread = threading.Thread(target=self.perform_request, args=(emotion,))
request_thread.daemon = True
request_thread.start()
# Methode des senden eines ChatGPT prompts
def perform_request(self, emotion):
headers = {
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiMmY4YTA2ODgtODIzNC00N2RhLTgwNjAtMjJhYjdlOWExNzE0IiwidHlwZSI6InNhbmRib3hfYXBpX3Rva2VuIn0.RbkSarkirWu8MyOved0gmafiGa0XauxoSaf1flSg3s4"
}
} #Übermitteln des Autorisierungsschlüssels zur Identifikation des Clients
payload = {
"providers": "openai",
"text": f"Die Person fühlt sich {emotion} an. Antworte auf Deutsch ohne Komma und in einem kurzen Satz. Motiviere die Person, erwähne die {emotion} Emotion. Du bist in einem Face detection Programm versuche immer Variation in neuen Sätzen zu bringen.",
......@@ -155,31 +164,31 @@ class Main:
"temperature": 0.0,
"max_tokens": 150,
"fallback_providers": ""
}
} # Prompt, welcher an Eden AI gesendet wird
try:
response = requests.post("https://api.edenai.run/v2/text/chat", json=payload, headers=headers)
result = response.json()
generated_text = result['openai']['generated_text']
print(generated_text)
self.speak_in_thread(generated_text)
response = requests.post("https://api.edenai.run/v2/text/chat", json=payload, headers=headers) #Senden der JSON an EdenAI
result = response.json() #Annahme der AI response in einer variable
generated_text = result['openai']['generated_text'] #Entnahme der relevanten Teile aus der JSON
print(generated_text) # Debug Print
self.speak_in_thread(generated_text) #Aufruf Speaking Thread mit dem generierten Tet
except Exception as e:
print(f"Fehler beim Senden/Empfangen der Daten zu/von ChatGPT: {e}")
#Hauptfunktion - Gesichtserkennung
def face_recognition(self, mqttc):
#Unendliche Schleife für den Dauerzustand, welches Videofeed benötigt
while True:
frame = video_capture.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.3, minNeighbors=10)
current_time = time.time()
frame = video_capture.read() # Einlesen des Camerafeeds
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) #Konvertierung von dem echten Colorspace zu dem idealen für Computervision.Damit sollen die Ergebnisse genauer werden
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.3, minNeighbors=5) #Erkennung der Gesichter anhand mehrerer Parameter
current_time = time.time() #Aktueller Zeitpunkt
try:
if current_time - self.main_time > 2:
self.main_time = current_time
self.facial_analysis_thread(frame)
analyze = self.analysis_results[0] if self.analysis_results else None
analysis_results_dict = {"age": analyze.get("age", "N/A"), "gender": analyze.get("dominant_gender", "N/A"), "emotion": analyze.get("dominant_emotion", "N/A")}
self.sendMessage(mqttc=mqttc, topic="Topic1", message=json.dumps(analysis_results_dict))
if current_time - self.main_time > 5: #Timer
self.main_time = current_time #Reset Timer
self.facial_analysis_thread(frame) # Ausführung der Gesichtsanalyse
analyze = self.analysis_results[0] if self.analysis_results else None #Schaut, ob Analyseergebnisse vorliegen
analysis_results_dict = {"age": analyze.get("age", "N/A"), "gender": analyze.get("dominant_gender", "N/A"), "emotion": analyze.get("dominant_emotion", "N/A")} #Analyseergebnisse in ein Dict für Konvertierung in JSON notwendig
self.sendMessage(mqttc=mqttc, topic="Topic1", message=json.dumps(analysis_results_dict)) # Sendung der Ergebnisse durch MQTT
except Exception as e:
print("Error in DeepFace Analysis:", e)
......@@ -192,13 +201,13 @@ class Main:
if emotion in ["happy", "neutral", "sad", "fear", "suprise", "angry"]:
self.send_emotion_to_chatgpt_and_speak(emotion)
maskottchen_images = {
maskottchen_images = { #Bilder Maskotchen
"mas1": cv2.imread("emoji_folder/mas1.png", cv2.IMREAD_UNCHANGED),
"mas2": cv2.imread("emoji_folder/mas2.png", cv2.IMREAD_UNCHANGED),
"mas3": cv2.imread("emoji_folder/mas3.png", cv2.IMREAD_UNCHANGED),
}
emoji_images = {
emoji_images = { #Bilder Emojis
"happy": cv2.imread("emoji_folder/happy.png", cv2.IMREAD_UNCHANGED),
"neutral": cv2.imread("emoji_folder/neutral.png", cv2.IMREAD_UNCHANGED),
"sad": cv2.imread("emoji_folder/sad.png", cv2.IMREAD_UNCHANGED),
......@@ -207,76 +216,80 @@ class Main:
"angry": cv2.imread("emoji_folder/angry.png", cv2.IMREAD_UNCHANGED)
}
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 1)
if analyze:
for (x, y, w, h) in faces: #Schleife, welches gleichzeitig das Gesichtsframe in seine Bestandteile (Länge, Breite, Höhe, ...) herunterbricht
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 1) #Setzt über das Gesicht ein Rectangle
if analyze: # Wenn Analyse Ergebnisse vorliegen, werden diese als Text über das Gesicht gelegt
cv2.putText(frame, f'Approx. Age: {analyze.get("age", "N/A")}', (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
cv2.putText(frame, f'Approx. Gender: {analyze.get("dominant_gender", "N/A")}', (x, y - 40), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 255), 2)
cv2.putText(frame, f'Current emotion: {analyze.get("dominant_emotion", "N/A")}', (x, y - 70), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 0), 2)
emotion = analyze.get("dominant_emotion", "N/A")
if emotion in emoji_images:
if emotion in emoji_images: #Abfrage, ob Emotion durch Emojis realiserbar
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # Änderungen des Farbraum für Verwendung von Pillows
emoji_pil = Image.fromarray(cv2.cvtColor(emoji_images[emotion], cv2.COLOR_BGRA2RGBA))
emoji_resized = emoji_pil.resize((w+20, h+20))
emoji_resized = emoji_pil.resize((w+20, h+20)) #Emoji wird vergrößert
x_offset = x-10
x_offset = x-10 # Positionsvariablen für die Setzung des Emojis
y_offset = y-10
frame_pil.paste(emoji_resized, (x_offset, y_offset), mask=emoji_resized)
frame_pil.paste(emoji_resized, (x_offset, y_offset), mask=emoji_resized) # Emoji wird ins Frame gesetzt
frame = cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR)
frame = cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR) # Colorspace des Frames wird für Pillow angepasst
if self.speaking_thread and self.speaking_thread.is_alive():
if self.speaking_thread and self.speaking_thread.is_alive(): #Abfrage über Tezt to speech
self.it = self.it + 1
#Einfügen von Pillows
frame_pillow = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))# Auslagern
mas_pil = Image.fromarray(cv2.cvtColor(maskottchen_images["mas1"], cv2.COLOR_BGRA2RGBA))# Auslagern
mas_pil2 = Image.fromarray(cv2.cvtColor(maskottchen_images["mas2"], cv2.COLOR_BGRA2RGBA))# Auslagern
mas_pil3 = Image.fromarray(cv2.cvtColor(maskottchen_images["mas3"], cv2.COLOR_BGRA2RGBA))# Auslagern
#Resize Pillows
mas_resized = mas_pil.resize((200, 200))
mas_resized2 = mas_pil2.resize((200, 200))
mas_resized3 = mas_pil3.resize((200, 200))
#Position
x_offset = 480
y_offset = 300
#Animationslogik mit Modulo
if self.it % self.selection == 0:
frame_pillow.paste(mas_resized, (x_offset, y_offset), mask=mas_resized)
if self.it % self.selection == 1:
frame_pillow.paste(mas_resized2, (x_offset, y_offset), mask=mas_resized)
if self.it % self.selection == 2:
frame_pillow.paste(mas_resized3, (x_offset, y_offset), mask=mas_resized)
#Zurück ins normale Format
frame = cv2.cvtColor(np.array(frame_pillow), cv2.COLOR_RGB2BGR)
# Frame für Stream wird kopiert
with self.lock:
self.outputFrame = frame.copy()
cv2.imshow("video_capture", frame)
#Anzeigen des Videostreams für Debuggen
#cv2.imshow("video_capture", frame)
# Wenn q gedrückt wird, shut down program
if cv2.waitKey(1) & 0xFF == ord('q'):
print("Goodbye!")
mqttc.disconnect()
mqttc.loop_stop()
break
#Endet Videocapturing und zerstört die Videos
video_capture.stop()
cv2.destroyAllWindows()
#Main
if __name__ == '__main__':
main = Main()
main.welcome_message()
main.streamingThread()
#Setzung des Endpoints für Videostream
@app.get("/video_feed")
def video_feed():
# return the response generated along with the specific media
# type (mime type)
return StreamingResponse(main.generate(),
media_type="multipart/x-mixed-replace; boundary=frame")
media_type="multipart/x-mixed-replace; boundary=frame") # HTTP-Response mit Videostream
main.currentclient =main.connectwithmqtt(adress="localhost", targetport=1883)
main.face_recognition(mqttc=main.currentclient)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment