Skip to content
Snippets Groups Projects
Commit 6aeff073 authored by Martin Schmollinger's avatar Martin Schmollinger
Browse files

Banktransfer Service Part 3: Simple Retry-Mechanism

parent 00febf8b
No related branches found
No related tags found
No related merge requests found
...@@ -33,6 +33,9 @@ func main() { ...@@ -33,6 +33,9 @@ func main() {
log.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) log.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
} }
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
transferService := service.NewBankTransferService()
transferService.Start()
defer transferService.Stop()
banktransfer.RegisterBankTransferServer(grpcServer, service.NewBankTransferService()) banktransfer.RegisterBankTransferServer(grpcServer, service.NewBankTransferService())
if err := grpcServer.Serve(lis); err != nil { if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err) log.Fatalf("failed to serve: %v", err)
......
...@@ -11,15 +11,23 @@ import ( ...@@ -11,15 +11,23 @@ import (
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
) )
const retryTime = 5 * time.Second
type BankTransferService struct { type BankTransferService struct {
banktransfer.BankTransferServer banktransfer.BankTransferServer
counter int32 counter int32
queue chan banktransfer.Transaction queue chan banktransfer.Transaction
retryQueue chan banktransfer.Transaction
stop chan struct{}
} }
func NewBankTransferService() *BankTransferService { func NewBankTransferService() *BankTransferService {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)} return &BankTransferService{counter: 0,
queue: make(chan banktransfer.Transaction),
retryQueue: make(chan banktransfer.Transaction),
stop: make(chan struct{}),
}
} }
func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) { func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) {
...@@ -33,13 +41,17 @@ func (s *BankTransferService) processTransaction(transaction *banktransfer.Trans ...@@ -33,13 +41,17 @@ func (s *BankTransferService) processTransaction(transaction *banktransfer.Trans
entry := log.WithField("transaction", transaction) entry := log.WithField("transaction", transaction)
go func(transaction banktransfer.Transaction) { go func(transaction banktransfer.Transaction) {
entry.Info("Start processing transaction") entry.Info("Start processing transaction")
transaction.Id = atomic.AddInt32(&s.counter, 1) transaction.Id = s.getUniqueId()
time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second) time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second)
s.queue <- transaction s.queue <- transaction
entry.Info("Processing transaction finished") entry.Info("Processing transaction finished")
}(*transaction) }(*transaction)
} }
func (s *BankTransferService) getUniqueId() int32 {
return atomic.AddInt32(&s.counter, 1)
}
func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error { func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error {
return func() error { return func() error {
for { for {
...@@ -52,14 +64,14 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf ...@@ -52,14 +64,14 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf
entry := log.WithField("transaction", transaction) entry := log.WithField("transaction", transaction)
entry.Info("Sending transaction") entry.Info("Sending transaction")
if err := stream.Send(&transaction); err != nil { if err := stream.Send(&transaction); err != nil {
// TODO: we need to recover from this if communication is not up s.requeueTransaction(&transaction)
entry.WithError(err).Error("Error sending transaction") entry.WithError(err).Error("Error sending transaction")
return err return err
} }
entry.Info("Transaction sent. Waiting for processing response") entry.Info("Transaction sent. Waiting for processing response")
response, err := stream.Recv() response, err := stream.Recv()
if err != nil { if err != nil {
// TODO: we need to recover from this if communication is not up s.requeueTransaction(&transaction)
entry.WithError(err).Error("Error receiving processing response") entry.WithError(err).Error("Error receiving processing response")
return err return err
} }
...@@ -73,3 +85,32 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf ...@@ -73,3 +85,32 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf
} }
}() }()
} }
func (s *BankTransferService) requeueTransaction(transaction *banktransfer.Transaction) {
entry := log.WithField("transaction", transaction)
go func(transaction banktransfer.Transaction) {
entry.Infof("Requeuing transaction. Wait for %f seconds", retryTime.Seconds())
time.Sleep(retryTime)
s.retryQueue <- transaction
entry.Info("Requeued transaction")
}(*transaction)
}
func (s *BankTransferService) Start() {
log.Info("Starting banktransfer service")
go func() {
for {
select {
case <-s.stop:
break
case transaction := <-s.retryQueue:
s.queue <- transaction
}
}
}()
}
func (s *BankTransferService) Stop() {
log.Info("Stopping banktransfer service")
close(s.stop)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment