diff --git a/src/banktransfer/main.go b/src/banktransfer/main.go index 6ddf1f1308f7c08e49a73fad3d79dcd31ece5dfc..b7c9b9fcd178d0de83d626a2c26b1802932f9d08 100644 --- a/src/banktransfer/main.go +++ b/src/banktransfer/main.go @@ -33,6 +33,9 @@ func main() { log.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } grpcServer := grpc.NewServer() + transferService := service.NewBankTransferService() + transferService.Start() + defer transferService.Stop() banktransfer.RegisterBankTransferServer(grpcServer, service.NewBankTransferService()) if err := grpcServer.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) diff --git a/src/banktransfer/service/banktransfer.go b/src/banktransfer/service/banktransfer.go index f11930091fa9477d220d7f630fb2218344a56aea..bf10af900314a434df857ac4c715f3532b39fdab 100644 --- a/src/banktransfer/service/banktransfer.go +++ b/src/banktransfer/service/banktransfer.go @@ -11,15 +11,23 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) +const retryTime = 5 * time.Second + type BankTransferService struct { banktransfer.BankTransferServer - counter int32 - queue chan banktransfer.Transaction + counter int32 + queue chan banktransfer.Transaction + retryQueue chan banktransfer.Transaction + stop chan struct{} } func NewBankTransferService() *BankTransferService { rand.Seed(time.Now().UnixNano()) - return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)} + return &BankTransferService{counter: 0, + queue: make(chan banktransfer.Transaction), + retryQueue: make(chan banktransfer.Transaction), + stop: make(chan struct{}), + } } func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) { @@ -33,13 +41,17 @@ func (s *BankTransferService) processTransaction(transaction *banktransfer.Trans entry := log.WithField("transaction", transaction) go func(transaction banktransfer.Transaction) { entry.Info("Start processing transaction") - transaction.Id = atomic.AddInt32(&s.counter, 1) + transaction.Id = s.getUniqueId() time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second) s.queue <- transaction entry.Info("Processing transaction finished") }(*transaction) } +func (s *BankTransferService) getUniqueId() int32 { + return atomic.AddInt32(&s.counter, 1) +} + func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error { return func() error { for { @@ -52,14 +64,14 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf entry := log.WithField("transaction", transaction) entry.Info("Sending transaction") if err := stream.Send(&transaction); err != nil { - // TODO: we need to recover from this if communication is not up + s.requeueTransaction(&transaction) entry.WithError(err).Error("Error sending transaction") return err } entry.Info("Transaction sent. Waiting for processing response") response, err := stream.Recv() if err != nil { - // TODO: we need to recover from this if communication is not up + s.requeueTransaction(&transaction) entry.WithError(err).Error("Error receiving processing response") return err } @@ -73,3 +85,32 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf } }() } + +func (s *BankTransferService) requeueTransaction(transaction *banktransfer.Transaction) { + entry := log.WithField("transaction", transaction) + go func(transaction banktransfer.Transaction) { + entry.Infof("Requeuing transaction. Wait for %f seconds", retryTime.Seconds()) + time.Sleep(retryTime) + s.retryQueue <- transaction + entry.Info("Requeued transaction") + }(*transaction) +} + +func (s *BankTransferService) Start() { + log.Info("Starting banktransfer service") + go func() { + for { + select { + case <-s.stop: + break + case transaction := <-s.retryQueue: + s.queue <- transaction + } + } + }() +} + +func (s *BankTransferService) Stop() { + log.Info("Stopping banktransfer service") + close(s.stop) +}