Skip to content
Snippets Groups Projects
Commit a36f0656 authored by Martin Schmollinger's avatar Martin Schmollinger
Browse files

Implemented retry queue

parent 470aa26f
No related branches found
No related tags found
No related merge requests found
......@@ -34,6 +34,9 @@ func main() {
log.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
grpcServer := grpc.NewServer()
transferService := service.NewBankTransferService()
transferService.Start()
defer transferService.Stop()
banktransfer.RegisterBankTransferServer(grpcServer, service.NewBankTransferService())
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
......
......@@ -11,14 +11,23 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)
const retryTime = 5 * time.Second
type BankTransferService struct {
banktransfer.BankTransferServer
counter int32
queue chan banktransfer.Transaction
retryQueue chan banktransfer.Transaction
stop chan struct{}
}
func NewBankTransferService() *BankTransferService {
return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)}
return &BankTransferService{
counter: 1,
queue: make(chan banktransfer.Transaction),
retryQueue: make(chan banktransfer.Transaction),
stop: make(chan struct{}),
}
}
func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) {
......@@ -29,8 +38,6 @@ func (s *BankTransferService) TransferMoney(_ context.Context, transaction *bank
}
func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
return func() error {
for {
select {
......@@ -42,14 +49,14 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf
entry := log.WithField("transaction", transaction)
entry.Info("Sending transaction")
if err := stream.Send(&transaction); err != nil {
// TODO: we need to recover from this if communication is not up
s.requeueTransaction(&transaction)
entry.WithError(err).Error("Error sending transaction")
return err
}
entry.Info("Transaction sent. Waiting for processing response")
response, err := stream.Recv()
if err != nil {
// TODO: we need to recover from this if communication is not up
s.requeueTransaction(&transaction)
entry.WithError(err).Error("Error receiving processing response")
return err
}
......@@ -68,9 +75,42 @@ func (s *BankTransferService) processTransaction(transaction *banktransfer.Trans
entry := log.WithField("transaction", transaction)
go func(transaction banktransfer.Transaction) {
entry.Info("Start processing transaction")
transaction.Id = atomic.AddInt32(&s.counter, 1)
transaction.Id = s.getUniqueId()
time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second) //Simulate time for transaction
s.queue <- transaction
entry.Info("Processing transaction finished")
}(*transaction)
}
func (s *BankTransferService) requeueTransaction(transaction *banktransfer.Transaction) {
entry := log.WithField("transaction", transaction)
go func(transaction banktransfer.Transaction) {
entry.Infof("Requeuing transaction. Wait for %f seconds", retryTime.Seconds())
time.Sleep(retryTime)
s.retryQueue <- transaction
entry.Info("Requeued transaction")
}(*transaction)
}
func (s *BankTransferService) getUniqueId() int32 {
return atomic.AddInt32(&s.counter, 1)
}
func (s *BankTransferService) Start() {
log.Info("Starting banktransfer service")
go func() {
for {
select {
case <-s.stop:
break
case transaction := <-s.retryQueue:
s.queue <- transaction
}
}
}()
}
func (s *BankTransferService) Stop() {
log.Info("Stopping banktransfer service")
close(s.stop)
}
......@@ -18,7 +18,7 @@ func monitortransactions() {
}
func connectandmonitor() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // normal 10 s
defer cancel()
conn, err := client.GetBankTransferConnection(ctx)
if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment