diff --git a/src/banktransfer/main.go b/src/banktransfer/main.go index 0d1783afda80616fd97d3389505d585873f1493c..a76a5a7ff81aca9f67e4441cd71b9882ad294eb5 100644 --- a/src/banktransfer/main.go +++ b/src/banktransfer/main.go @@ -32,6 +32,9 @@ func main() { log.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } grpcServer := grpc.NewServer() + transferService := service.NewBankTransferService() + transferService.Start() + defer transferService.Stop() banktransfer.RegisterBankTransferServer(grpcServer, service.NewBankTransferService()) if err := grpcServer.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) diff --git a/src/banktransfer/service/banktransfer.go b/src/banktransfer/service/banktransfer.go index 92f50e2e65bffde8b5640d75dda297e804c8d38c..5aef1c2b41f4e52ce23d4ecf00534a2cc4b8fd41 100644 --- a/src/banktransfer/service/banktransfer.go +++ b/src/banktransfer/service/banktransfer.go @@ -11,15 +11,23 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) +const retryTime = 5 * time.Second + type BankTransferService struct { banktransfer.BankTransferServer - counter int32 - queue chan banktransfer.Transaction + counter int32 + queue chan banktransfer.Transaction + retryQueue chan banktransfer.Transaction + stop chan struct{} } func NewBankTransferService() *BankTransferService { rand.Seed(time.Now().UnixNano()) - return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)} + return &BankTransferService{counter: 0, + queue: make(chan banktransfer.Transaction), + retryQueue: make(chan banktransfer.Transaction), + stop: make(chan struct{}), + } } func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) { entry := log.WithField("transaction", transaction) @@ -32,7 +40,7 @@ func (s *BankTransferService) processTransaction(transaction *banktransfer.Trans entry := log.WithField("transaction", transaction) go func(transaction banktransfer.Transaction) { entry.Info("Start processing transaction") - atomic.AddInt32(&s.counter, 1) + transaction.Id = s.getUniqueId() time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second) s.queue <- transaction entry.Info("Processing transaction finished") @@ -51,14 +59,14 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf entry := log.WithField("transaction", transaction) entry.Info("Sending transaction") if err := stream.Send(&transaction); err != nil { - // TODO: we need to recover from this if communication is not up + s.requeueTransaction(&transaction) entry.WithError(err).Error("Error sending transaction") return err } entry.Info("Transaction sent. Waiting for processing response") response, err := stream.Recv() if err != nil { - // TODO: we need to recover from this if communication is not up + s.requeueTransaction(&transaction) entry.WithError(err).Error("Error receiving processing response") return err } @@ -72,3 +80,35 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf } }() } + +func (s *BankTransferService) requeueTransaction(transaction *banktransfer.Transaction) { + entry := log.WithField("transaction", transaction) + go func(transaction banktransfer.Transaction) { + entry.Infof("Requeuing transaction. Wait for %f seconds", retryTime.Seconds()) + time.Sleep(retryTime) + s.retryQueue <- transaction + entry.Info("Requeued transaction") + }(*transaction) +} + +func (s *BankTransferService) getUniqueId() int32 { + return atomic.AddInt32(&s.counter, 1) +} +func (s *BankTransferService) Start() { + log.Info("Starting banktransfer service") + go func() { + for { + select { + case <-s.stop: + break + case transaction := <-s.retryQueue: + s.queue <- transaction + } + } + }() +} + +func (s *BankTransferService) Stop() { + log.Info("Stopping banktransfer service") + close(s.stop) +} diff --git a/src/myaktion/monitor.go b/src/myaktion/monitor.go index 68aebdb1ae9b418d0b4935616f62edca0067dfe4..db1de6f2caad8e62d1192d8d96bc525fb6853e82 100644 --- a/src/myaktion/monitor.go +++ b/src/myaktion/monitor.go @@ -18,7 +18,7 @@ func monitortransactions() { } func connectandmonitor() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() conn, err := client.GetBankTransferConnection(ctx) if err != nil {