From 6aeff0731b50d8023680bb10ba76eb0b23c649c9 Mon Sep 17 00:00:00 2001
From: Martin Schmollinger <martin.schmollinger@reutlingen-university.de>
Date: Wed, 10 May 2023 23:24:52 +0200
Subject: [PATCH] Banktransfer Service Part 3: Simple Retry-Mechanism

---
 src/banktransfer/main.go                 |  3 ++
 src/banktransfer/service/banktransfer.go | 53 +++++++++++++++++++++---
 2 files changed, 50 insertions(+), 6 deletions(-)

diff --git a/src/banktransfer/main.go b/src/banktransfer/main.go
index 6ddf1f1..b7c9b9f 100644
--- a/src/banktransfer/main.go
+++ b/src/banktransfer/main.go
@@ -33,6 +33,9 @@ func main() {
 		log.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
 	}
 	grpcServer := grpc.NewServer()
+	transferService := service.NewBankTransferService()
+	transferService.Start()
+	defer transferService.Stop()
 	banktransfer.RegisterBankTransferServer(grpcServer, service.NewBankTransferService())
 	if err := grpcServer.Serve(lis); err != nil {
 		log.Fatalf("failed to serve: %v", err)
diff --git a/src/banktransfer/service/banktransfer.go b/src/banktransfer/service/banktransfer.go
index f119300..bf10af9 100644
--- a/src/banktransfer/service/banktransfer.go
+++ b/src/banktransfer/service/banktransfer.go
@@ -11,15 +11,23 @@ import (
 	"google.golang.org/protobuf/types/known/emptypb"
 )
 
+const retryTime = 5 * time.Second
+
 type BankTransferService struct {
 	banktransfer.BankTransferServer
-	counter int32
-	queue   chan banktransfer.Transaction
+	counter    int32
+	queue      chan banktransfer.Transaction
+	retryQueue chan banktransfer.Transaction
+	stop       chan struct{}
 }
 
 func NewBankTransferService() *BankTransferService {
 	rand.Seed(time.Now().UnixNano())
-	return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)}
+	return &BankTransferService{counter: 0,
+		queue:      make(chan banktransfer.Transaction),
+		retryQueue: make(chan banktransfer.Transaction),
+		stop:       make(chan struct{}),
+	}
 }
 
 func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) {
@@ -33,13 +41,17 @@ func (s *BankTransferService) processTransaction(transaction *banktransfer.Trans
 	entry := log.WithField("transaction", transaction)
 	go func(transaction banktransfer.Transaction) {
 		entry.Info("Start processing transaction")
-		transaction.Id = atomic.AddInt32(&s.counter, 1)
+		transaction.Id = s.getUniqueId()
 		time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second)
 		s.queue <- transaction
 		entry.Info("Processing transaction finished")
 	}(*transaction)
 }
 
+func (s *BankTransferService) getUniqueId() int32 {
+	return atomic.AddInt32(&s.counter, 1)
+}
+
 func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error {
 	return func() error {
 		for {
@@ -52,14 +64,14 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf
 				entry := log.WithField("transaction", transaction)
 				entry.Info("Sending transaction")
 				if err := stream.Send(&transaction); err != nil {
-					// TODO: we need to recover from this if communication is not up
+					s.requeueTransaction(&transaction)
 					entry.WithError(err).Error("Error sending transaction")
 					return err
 				}
 				entry.Info("Transaction sent. Waiting for processing response")
 				response, err := stream.Recv()
 				if err != nil {
-					// TODO: we need to recover from this if communication is not up
+					s.requeueTransaction(&transaction)
 					entry.WithError(err).Error("Error receiving processing response")
 					return err
 				}
@@ -73,3 +85,32 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf
 		}
 	}()
 }
+
+func (s *BankTransferService) requeueTransaction(transaction *banktransfer.Transaction) {
+	entry := log.WithField("transaction", transaction)
+	go func(transaction banktransfer.Transaction) {
+		entry.Infof("Requeuing transaction. Wait for %f seconds", retryTime.Seconds())
+		time.Sleep(retryTime)
+		s.retryQueue <- transaction
+		entry.Info("Requeued transaction")
+	}(*transaction)
+}
+
+func (s *BankTransferService) Start() {
+	log.Info("Starting banktransfer service")
+	go func() {
+		for {
+			select {
+			case <-s.stop:
+				break
+			case transaction := <-s.retryQueue:
+				s.queue <- transaction
+			}
+		}
+	}()
+}
+
+func (s *BankTransferService) Stop() {
+	log.Info("Stopping banktransfer service")
+	close(s.stop)
+}
-- 
GitLab