From 00febf8b49b12e3aea72e6bbd6ee37dbc885bb8a Mon Sep 17 00:00:00 2001 From: Martin Schmollinger <martin.schmollinger@reutlingen-university.de> Date: Wed, 10 May 2023 22:48:55 +0200 Subject: [PATCH] Banktransfer Service Part 2: Bank Transaction Processing --- src/banktransfer/service/banktransfer.go | 34 +++++++++++++++++------- src/myaktion/monitor.go | 9 ++++++- src/myaktion/service/donation.go | 20 ++++++++++++++ 3 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/banktransfer/service/banktransfer.go b/src/banktransfer/service/banktransfer.go index a76ac70..f119300 100644 --- a/src/banktransfer/service/banktransfer.go +++ b/src/banktransfer/service/banktransfer.go @@ -2,6 +2,8 @@ package service import ( "context" + "math/rand" + "sync/atomic" "time" log "github.com/sirupsen/logrus" @@ -12,46 +14,60 @@ import ( type BankTransferService struct { banktransfer.BankTransferServer counter int32 + queue chan banktransfer.Transaction } func NewBankTransferService() *BankTransferService { - return &BankTransferService{counter: 1} + rand.Seed(time.Now().UnixNano()) + return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)} } func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) { - log.Infof("-------> Received transaction <-------: %v", transaction) + entry := log.WithField("transaction", transaction) + entry.Info(("Received transaction")) + s.processTransaction(transaction) return &emptypb.Empty{}, nil } +func (s *BankTransferService) processTransaction(transaction *banktransfer.Transaction) { + entry := log.WithField("transaction", transaction) + go func(transaction banktransfer.Transaction) { + entry.Info("Start processing transaction") + transaction.Id = atomic.AddInt32(&s.counter, 1) + time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second) + s.queue <- transaction + entry.Info("Processing transaction finished") + }(*transaction) +} + func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error { - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() return func() error { for { select { case <-stream.Context().Done(): log.Info("Watching transactions cancelled from the client side") return nil - case <-ticker.C: - transaction := &banktransfer.Transaction{Id: s.counter, Amount: 20} + case transaction := <-s.queue: + id := transaction.Id entry := log.WithField("transaction", transaction) entry.Info("Sending transaction") - if err := stream.Send(transaction); err != nil { + if err := stream.Send(&transaction); err != nil { + // TODO: we need to recover from this if communication is not up entry.WithError(err).Error("Error sending transaction") return err } entry.Info("Transaction sent. Waiting for processing response") response, err := stream.Recv() if err != nil { + // TODO: we need to recover from this if communication is not up entry.WithError(err).Error("Error receiving processing response") return err } - if response.Id != s.counter { + if response.Id != id { // NOTE: this is just a guard and not happening as transaction is local per connection entry.Error("Received processing response of a different transaction") } else { entry.Info("Processing response received") - s.counter++ } } } diff --git a/src/myaktion/monitor.go b/src/myaktion/monitor.go index 16ae2f4..64ad852 100644 --- a/src/myaktion/monitor.go +++ b/src/myaktion/monitor.go @@ -7,6 +7,7 @@ import ( log "github.com/sirupsen/logrus" "gitlab.reutlingen-university.de/go-exercises/myaktion-go-ss2023/src/myaktion/client" "gitlab.reutlingen-university.de/go-exercises/myaktion-go-ss2023/src/myaktion/client/banktransfer" + "gitlab.reutlingen-university.de/go-exercises/myaktion-go-ss2023/src/myaktion/service" ) func monitortransactions() { @@ -42,7 +43,13 @@ func connectandmonitor() { continue } entry := log.WithField("transaction", transaction) - entry.Info("Received transaction. Sending processing response") + entry.Info("Received transaction") + err = service.MarkDonation(uint(transaction.DonationId)) + if err != nil { + entry.WithError(err).Error("error changing donation status") + continue + } + entry.Info("Sending processing response") err = watcher.Send(&banktransfer.ProcessingResponse{Id: transaction.Id}) if err != nil { entry.WithError(err).Error("error sending processing response") diff --git a/src/myaktion/service/donation.go b/src/myaktion/service/donation.go index aefb75e..fe939ec 100644 --- a/src/myaktion/service/donation.go +++ b/src/myaktion/service/donation.go @@ -74,3 +74,23 @@ func deleteDonation(donation *model.Donation) error { entry.Info("Successfully deleted campaign.") return nil } + +func MarkDonation(id uint) error { + entry := log.WithField("donationId", id) + donation := new(model.Donation) + result := db.DB.First(donation, id) + if result.Error != nil { + entry.WithError(result.Error).Error("Can't retrieve donation") + return result.Error + } + entry = entry.WithField("donation", donation) + entry.Trace("Retrieved donation") + donation.Status = model.TRANSFERRED + result = db.DB.Save(donation) + if result.Error != nil { + entry.WithError(result.Error).Error("Can't update donation") + return result.Error + } + entry.Info("Successfully updated status of donation.") + return nil +} -- GitLab