Skip to content
Snippets Groups Projects
Commit 00febf8b authored by Martin Schmollinger's avatar Martin Schmollinger
Browse files

Banktransfer Service Part 2: Bank Transaction Processing

parent 999c1eb0
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,8 @@ package service
import (
"context"
"math/rand"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus"
......@@ -12,46 +14,60 @@ import (
type BankTransferService struct {
banktransfer.BankTransferServer
counter int32
queue chan banktransfer.Transaction
}
func NewBankTransferService() *BankTransferService {
return &BankTransferService{counter: 1}
rand.Seed(time.Now().UnixNano())
return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)}
}
func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) {
log.Infof("-------> Received transaction <-------: %v", transaction)
entry := log.WithField("transaction", transaction)
entry.Info(("Received transaction"))
s.processTransaction(transaction)
return &emptypb.Empty{}, nil
}
func (s *BankTransferService) processTransaction(transaction *banktransfer.Transaction) {
entry := log.WithField("transaction", transaction)
go func(transaction banktransfer.Transaction) {
entry.Info("Start processing transaction")
transaction.Id = atomic.AddInt32(&s.counter, 1)
time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second)
s.queue <- transaction
entry.Info("Processing transaction finished")
}(*transaction)
}
func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
return func() error {
for {
select {
case <-stream.Context().Done():
log.Info("Watching transactions cancelled from the client side")
return nil
case <-ticker.C:
transaction := &banktransfer.Transaction{Id: s.counter, Amount: 20}
case transaction := <-s.queue:
id := transaction.Id
entry := log.WithField("transaction", transaction)
entry.Info("Sending transaction")
if err := stream.Send(transaction); err != nil {
if err := stream.Send(&transaction); err != nil {
// TODO: we need to recover from this if communication is not up
entry.WithError(err).Error("Error sending transaction")
return err
}
entry.Info("Transaction sent. Waiting for processing response")
response, err := stream.Recv()
if err != nil {
// TODO: we need to recover from this if communication is not up
entry.WithError(err).Error("Error receiving processing response")
return err
}
if response.Id != s.counter {
if response.Id != id {
// NOTE: this is just a guard and not happening as transaction is local per connection
entry.Error("Received processing response of a different transaction")
} else {
entry.Info("Processing response received")
s.counter++
}
}
}
......
......@@ -7,6 +7,7 @@ import (
log "github.com/sirupsen/logrus"
"gitlab.reutlingen-university.de/go-exercises/myaktion-go-ss2023/src/myaktion/client"
"gitlab.reutlingen-university.de/go-exercises/myaktion-go-ss2023/src/myaktion/client/banktransfer"
"gitlab.reutlingen-university.de/go-exercises/myaktion-go-ss2023/src/myaktion/service"
)
func monitortransactions() {
......@@ -42,7 +43,13 @@ func connectandmonitor() {
continue
}
entry := log.WithField("transaction", transaction)
entry.Info("Received transaction. Sending processing response")
entry.Info("Received transaction")
err = service.MarkDonation(uint(transaction.DonationId))
if err != nil {
entry.WithError(err).Error("error changing donation status")
continue
}
entry.Info("Sending processing response")
err = watcher.Send(&banktransfer.ProcessingResponse{Id: transaction.Id})
if err != nil {
entry.WithError(err).Error("error sending processing response")
......
......@@ -74,3 +74,23 @@ func deleteDonation(donation *model.Donation) error {
entry.Info("Successfully deleted campaign.")
return nil
}
func MarkDonation(id uint) error {
entry := log.WithField("donationId", id)
donation := new(model.Donation)
result := db.DB.First(donation, id)
if result.Error != nil {
entry.WithError(result.Error).Error("Can't retrieve donation")
return result.Error
}
entry = entry.WithField("donation", donation)
entry.Trace("Retrieved donation")
donation.Status = model.TRANSFERRED
result = db.DB.Save(donation)
if result.Error != nil {
entry.WithError(result.Error).Error("Can't update donation")
return result.Error
}
entry.Info("Successfully updated status of donation.")
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment