From 8f44f43f9153fbc3ac007b0083d3e8062c655b1a Mon Sep 17 00:00:00 2001
From: albrecht <flo@DESKTOP-ERC0T8S.localdomain>
Date: Mon, 19 Jun 2023 15:16:03 +0200
Subject: [PATCH] pdf 14 done

---
 src/banktransfer/main.go                 |  3 ++
 src/banktransfer/service/banktransfer.go | 52 +++++++++++++++++++++---
 src/myaktion/monitor.go                  |  2 +-
 3 files changed, 50 insertions(+), 7 deletions(-)

diff --git a/src/banktransfer/main.go b/src/banktransfer/main.go
index 0d1783a..a76a5a7 100644
--- a/src/banktransfer/main.go
+++ b/src/banktransfer/main.go
@@ -32,6 +32,9 @@ func main() {
 		log.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
 	}
 	grpcServer := grpc.NewServer()
+	transferService := service.NewBankTransferService()
+	transferService.Start()
+	defer transferService.Stop()
 	banktransfer.RegisterBankTransferServer(grpcServer, service.NewBankTransferService())
 	if err := grpcServer.Serve(lis); err != nil {
 		log.Fatalf("failed to serve: %v", err)
diff --git a/src/banktransfer/service/banktransfer.go b/src/banktransfer/service/banktransfer.go
index 92f50e2..5aef1c2 100644
--- a/src/banktransfer/service/banktransfer.go
+++ b/src/banktransfer/service/banktransfer.go
@@ -11,15 +11,23 @@ import (
 	"google.golang.org/protobuf/types/known/emptypb"
 )
 
+const retryTime = 5 * time.Second
+
 type BankTransferService struct {
 	banktransfer.BankTransferServer
-	counter int32
-	queue   chan banktransfer.Transaction
+	counter    int32
+	queue      chan banktransfer.Transaction
+	retryQueue chan banktransfer.Transaction
+	stop       chan struct{}
 }
 
 func NewBankTransferService() *BankTransferService {
 	rand.Seed(time.Now().UnixNano())
-	return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)}
+	return &BankTransferService{counter: 0,
+		queue:      make(chan banktransfer.Transaction),
+		retryQueue: make(chan banktransfer.Transaction),
+		stop:       make(chan struct{}),
+	}
 }
 func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) {
 	entry := log.WithField("transaction", transaction)
@@ -32,7 +40,7 @@ func (s *BankTransferService) processTransaction(transaction *banktransfer.Trans
 	entry := log.WithField("transaction", transaction)
 	go func(transaction banktransfer.Transaction) {
 		entry.Info("Start processing transaction")
-		atomic.AddInt32(&s.counter, 1)
+		transaction.Id = s.getUniqueId()
 		time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second)
 		s.queue <- transaction
 		entry.Info("Processing transaction finished")
@@ -51,14 +59,14 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf
 				entry := log.WithField("transaction", transaction)
 				entry.Info("Sending transaction")
 				if err := stream.Send(&transaction); err != nil {
-					// TODO: we need to recover from this if communication is not up
+					s.requeueTransaction(&transaction)
 					entry.WithError(err).Error("Error sending transaction")
 					return err
 				}
 				entry.Info("Transaction sent. Waiting for processing response")
 				response, err := stream.Recv()
 				if err != nil {
-					// TODO: we need to recover from this if communication is not up
+					s.requeueTransaction(&transaction)
 					entry.WithError(err).Error("Error receiving processing response")
 					return err
 				}
@@ -72,3 +80,35 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf
 		}
 	}()
 }
+
+func (s *BankTransferService) requeueTransaction(transaction *banktransfer.Transaction) {
+	entry := log.WithField("transaction", transaction)
+	go func(transaction banktransfer.Transaction) {
+		entry.Infof("Requeuing transaction. Wait for %f seconds", retryTime.Seconds())
+		time.Sleep(retryTime)
+		s.retryQueue <- transaction
+		entry.Info("Requeued transaction")
+	}(*transaction)
+}
+
+func (s *BankTransferService) getUniqueId() int32 {
+	return atomic.AddInt32(&s.counter, 1)
+}
+func (s *BankTransferService) Start() {
+	log.Info("Starting banktransfer service")
+	go func() {
+		for {
+			select {
+			case <-s.stop:
+				break
+			case transaction := <-s.retryQueue:
+				s.queue <- transaction
+			}
+		}
+	}()
+}
+
+func (s *BankTransferService) Stop() {
+	log.Info("Stopping banktransfer service")
+	close(s.stop)
+}
diff --git a/src/myaktion/monitor.go b/src/myaktion/monitor.go
index 68aebdb..db1de6f 100644
--- a/src/myaktion/monitor.go
+++ b/src/myaktion/monitor.go
@@ -18,7 +18,7 @@ func monitortransactions() {
 }
 
 func connectandmonitor() {
-	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
 	defer cancel()
 	conn, err := client.GetBankTransferConnection(ctx)
 	if err != nil {
-- 
GitLab