Skip to content
Snippets Groups Projects
Commit 99de2825 authored by albrecht's avatar albrecht
Browse files

Pdf 12 done

parent dd11eef4
No related branches found
No related tags found
1 merge request!3Master
...@@ -2,6 +2,8 @@ package service ...@@ -2,6 +2,8 @@ package service
import ( import (
"context" "context"
"math/rand"
"sync/atomic"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
...@@ -12,45 +14,59 @@ import ( ...@@ -12,45 +14,59 @@ import (
type BankTransferService struct { type BankTransferService struct {
banktransfer.BankTransferServer banktransfer.BankTransferServer
counter int32 counter int32
queue chan banktransfer.Transaction
} }
func NewBankTransferService() *BankTransferService { func NewBankTransferService() *BankTransferService {
return &BankTransferService{counter: 1} rand.Seed(time.Now().UnixNano())
return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)}
} }
func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) { func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) {
log.Infof("Received transaction: %v", transaction) entry := log.WithField("transaction", transaction)
entry.Info("Received transaction")
s.processTransaction(transaction)
return &emptypb.Empty{}, nil return &emptypb.Empty{}, nil
} }
func (s *BankTransferService) processTransaction(transaction *banktransfer.Transaction) {
entry := log.WithField("transaction", transaction)
go func(transaction banktransfer.Transaction) {
entry.Info("Start processing transaction")
atomic.AddInt32(&s.counter, 1)
time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second)
s.queue <- transaction
entry.Info("Processing transaction finished")
}(*transaction)
}
func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error { func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
return func() error { return func() error {
for { for {
select { select {
case <-stream.Context().Done(): case <-stream.Context().Done():
log.Info("Watching transactions cancelled from the client side") log.Info("Watching transactions cancelled from the client side")
return nil return nil
case <-ticker.C: case transaction := <-s.queue:
transaction := &banktransfer.Transaction{Id: s.counter, Amount: 20} id := transaction.Id
entry := log.WithField("transaction", transaction) entry := log.WithField("transaction", transaction)
entry.Info("Sending transaction") entry.Info("Sending transaction")
if err := stream.Send(transaction); err != nil { if err := stream.Send(&transaction); err != nil {
// TODO: we need to recover from this if communication is not up
entry.WithError(err).Error("Error sending transaction") entry.WithError(err).Error("Error sending transaction")
return err return err
} }
entry.Info("Transaction sent. Waiting for processing response") entry.Info("Transaction sent. Waiting for processing response")
response, err := stream.Recv() response, err := stream.Recv()
if err != nil { if err != nil {
// TODO: we need to recover from this if communication is not up
entry.WithError(err).Error("Error receiving processing response") entry.WithError(err).Error("Error receiving processing response")
return err return err
} }
if response.Id != s.counter { if response.Id != id {
// NOTE: this is just a guard and not happening as transaction is local per connection // NOTE: this is just a guard and not happening as transaction is local per connection
entry.Error("Received processing response of a different transaction") entry.Error("Received processing response of a different transaction")
} else { } else {
entry.Info("Processing response received") entry.Info("Processing response received")
s.counter++
} }
} }
} }
......
#!/bin/sh #!/bin/sh
# Abort on any error (including if wait-for-it fails).
set -e set -e
# Wait for DB # Wait for DB
...@@ -10,8 +9,7 @@ fi ...@@ -10,8 +9,7 @@ fi
# Wait for banktransfer # Wait for banktransfer
if [ -n "$BANKTRANSFER_CONNECT" ]; then if [ -n "$BANKTRANSFER_CONNECT" ]; then
/go/src/app/wait-for-it.sh "$BANKTRANSFER_CONNECT" -t 20 /go/src/app/wait-for-it.sh "$BANKTRANSFER_CONNECT" -t 40
fi fi
# Run the main container command. # Run the
exec "$@"
\ No newline at end of file
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"gitlab.reutlingen-university.de/albrecht/myaktion-go/src/myaktion/client" "gitlab.reutlingen-university.de/albrecht/myaktion-go/src/myaktion/client"
"gitlab.reutlingen-university.de/albrecht/myaktion-go/src/myaktion/client/banktransfer" "gitlab.reutlingen-university.de/albrecht/myaktion-go/src/myaktion/client/banktransfer"
"gitlab.reutlingen-university.de/albrecht/myaktion-go/src/myaktion/service"
) )
func monitortransactions() { func monitortransactions() {
...@@ -41,6 +42,12 @@ func connectandmonitor() { ...@@ -41,6 +42,12 @@ func connectandmonitor() {
continue continue
} }
entry := log.WithField("transaction", transaction) entry := log.WithField("transaction", transaction)
entry.Info("Received transaction")
err = service.MarkDonation(uint(transaction.DonationId))
if err != nil {
entry.WithError(err).Error("error changing donation status")
continue
}
entry.Info("Received transaction. Sending processing response") entry.Info("Received transaction. Sending processing response")
err = watcher.Send(&banktransfer.ProcessingResponse{Id: transaction.Id}) err = watcher.Send(&banktransfer.ProcessingResponse{Id: transaction.Id})
if err != nil { if err != nil {
......
...@@ -71,3 +71,23 @@ func deleteDonation(donation *model.Donation) error { ...@@ -71,3 +71,23 @@ func deleteDonation(donation *model.Donation) error {
entry.Info("Successfully deleted campaign.") entry.Info("Successfully deleted campaign.")
return nil return nil
} }
func MarkDonation(id uint) error {
entry := log.WithField("donationId", id)
donation := new(model.Donation)
result := db.DB.First(donation, id)
if result.Error != nil {
entry.WithError(result.Error).Error("Can't retrieve donation")
return result.Error
}
entry = entry.WithField("donation", donation)
entry.Trace("Retrieved donation")
donation.Status = model.TRANSFERRED
result = db.DB.Save(donation)
if result.Error != nil {
entry.WithError(result.Error).Error("Can't update donation")
return result.Error
}
entry.Info("Successfully updated status of donation.")
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment