diff --git a/src/banktransfer/service/banktransfer.go b/src/banktransfer/service/banktransfer.go index 491b479d17529ee4a2b45c2c375d3646dd2b94ce..92f50e2e65bffde8b5640d75dda297e804c8d38c 100644 --- a/src/banktransfer/service/banktransfer.go +++ b/src/banktransfer/service/banktransfer.go @@ -2,6 +2,8 @@ package service import ( "context" + "math/rand" + "sync/atomic" "time" log "github.com/sirupsen/logrus" @@ -12,45 +14,59 @@ import ( type BankTransferService struct { banktransfer.BankTransferServer counter int32 + queue chan banktransfer.Transaction } func NewBankTransferService() *BankTransferService { - return &BankTransferService{counter: 1} + rand.Seed(time.Now().UnixNano()) + return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)} } func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) { - log.Infof("Received transaction: %v", transaction) + entry := log.WithField("transaction", transaction) + entry.Info("Received transaction") + s.processTransaction(transaction) return &emptypb.Empty{}, nil } +func (s *BankTransferService) processTransaction(transaction *banktransfer.Transaction) { + entry := log.WithField("transaction", transaction) + go func(transaction banktransfer.Transaction) { + entry.Info("Start processing transaction") + atomic.AddInt32(&s.counter, 1) + time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second) + s.queue <- transaction + entry.Info("Processing transaction finished") + }(*transaction) +} + func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error { - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() return func() error { for { select { case <-stream.Context().Done(): log.Info("Watching transactions cancelled from the client side") return nil - case <-ticker.C: - transaction := &banktransfer.Transaction{Id: s.counter, Amount: 20} + case transaction := <-s.queue: + id := transaction.Id entry := log.WithField("transaction", transaction) entry.Info("Sending transaction") - if err := stream.Send(transaction); err != nil { + if err := stream.Send(&transaction); err != nil { + // TODO: we need to recover from this if communication is not up entry.WithError(err).Error("Error sending transaction") return err } entry.Info("Transaction sent. Waiting for processing response") response, err := stream.Recv() if err != nil { + // TODO: we need to recover from this if communication is not up entry.WithError(err).Error("Error receiving processing response") return err } - if response.Id != s.counter { + if response.Id != id { // NOTE: this is just a guard and not happening as transaction is local per connection entry.Error("Received processing response of a different transaction") } else { entry.Info("Processing response received") - s.counter++ } } } diff --git a/src/myaktion/docker-entrypoint.sh b/src/myaktion/docker-entrypoint.sh index 928de3806ae06c5ebee1d6be24726a4ed45ef18d..1e411c6ff4aa62cefd39b4ba5e12aa1531878f56 100644 --- a/src/myaktion/docker-entrypoint.sh +++ b/src/myaktion/docker-entrypoint.sh @@ -1,6 +1,5 @@ #!/bin/sh -# Abort on any error (including if wait-for-it fails). set -e # Wait for DB @@ -10,8 +9,7 @@ fi # Wait for banktransfer if [ -n "$BANKTRANSFER_CONNECT" ]; then - /go/src/app/wait-for-it.sh "$BANKTRANSFER_CONNECT" -t 20 + /go/src/app/wait-for-it.sh "$BANKTRANSFER_CONNECT" -t 40 fi -# Run the main container command. -exec "$@" \ No newline at end of file +# Run the diff --git a/src/myaktion/monitor.go b/src/myaktion/monitor.go index ad81999e7e0c08b69d961a64096ae2f6188a9fb8..68aebdb1ae9b418d0b4935616f62edca0067dfe4 100644 --- a/src/myaktion/monitor.go +++ b/src/myaktion/monitor.go @@ -7,6 +7,7 @@ import ( log "github.com/sirupsen/logrus" "gitlab.reutlingen-university.de/albrecht/myaktion-go/src/myaktion/client" "gitlab.reutlingen-university.de/albrecht/myaktion-go/src/myaktion/client/banktransfer" + "gitlab.reutlingen-university.de/albrecht/myaktion-go/src/myaktion/service" ) func monitortransactions() { @@ -41,6 +42,12 @@ func connectandmonitor() { continue } entry := log.WithField("transaction", transaction) + entry.Info("Received transaction") + err = service.MarkDonation(uint(transaction.DonationId)) + if err != nil { + entry.WithError(err).Error("error changing donation status") + continue + } entry.Info("Received transaction. Sending processing response") err = watcher.Send(&banktransfer.ProcessingResponse{Id: transaction.Id}) if err != nil { diff --git a/src/myaktion/service/donation.go b/src/myaktion/service/donation.go index d0104f4cda8909e1dd5ae743429da72ce0dc77d6..dc82d9f6845abc007e5dc04a4e5d0d0f8b55e414 100644 --- a/src/myaktion/service/donation.go +++ b/src/myaktion/service/donation.go @@ -71,3 +71,23 @@ func deleteDonation(donation *model.Donation) error { entry.Info("Successfully deleted campaign.") return nil } + +func MarkDonation(id uint) error { + entry := log.WithField("donationId", id) + donation := new(model.Donation) + result := db.DB.First(donation, id) + if result.Error != nil { + entry.WithError(result.Error).Error("Can't retrieve donation") + return result.Error + } + entry = entry.WithField("donation", donation) + entry.Trace("Retrieved donation") + donation.Status = model.TRANSFERRED + result = db.DB.Save(donation) + if result.Error != nil { + entry.WithError(result.Error).Error("Can't update donation") + return result.Error + } + entry.Info("Successfully updated status of donation.") + return nil +}