Skip to content
Snippets Groups Projects
Commit 8f44f43f authored by albrecht's avatar albrecht
Browse files

pdf 14 done

parent 99de2825
No related branches found
No related tags found
1 merge request!3Master
...@@ -32,6 +32,9 @@ func main() { ...@@ -32,6 +32,9 @@ func main() {
log.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) log.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
} }
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
transferService := service.NewBankTransferService()
transferService.Start()
defer transferService.Stop()
banktransfer.RegisterBankTransferServer(grpcServer, service.NewBankTransferService()) banktransfer.RegisterBankTransferServer(grpcServer, service.NewBankTransferService())
if err := grpcServer.Serve(lis); err != nil { if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err) log.Fatalf("failed to serve: %v", err)
......
...@@ -11,15 +11,23 @@ import ( ...@@ -11,15 +11,23 @@ import (
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
) )
const retryTime = 5 * time.Second
type BankTransferService struct { type BankTransferService struct {
banktransfer.BankTransferServer banktransfer.BankTransferServer
counter int32 counter int32
queue chan banktransfer.Transaction queue chan banktransfer.Transaction
retryQueue chan banktransfer.Transaction
stop chan struct{}
} }
func NewBankTransferService() *BankTransferService { func NewBankTransferService() *BankTransferService {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
return &BankTransferService{counter: 1, queue: make(chan banktransfer.Transaction)} return &BankTransferService{counter: 0,
queue: make(chan banktransfer.Transaction),
retryQueue: make(chan banktransfer.Transaction),
stop: make(chan struct{}),
}
} }
func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) { func (s *BankTransferService) TransferMoney(_ context.Context, transaction *banktransfer.Transaction) (*emptypb.Empty, error) {
entry := log.WithField("transaction", transaction) entry := log.WithField("transaction", transaction)
...@@ -32,7 +40,7 @@ func (s *BankTransferService) processTransaction(transaction *banktransfer.Trans ...@@ -32,7 +40,7 @@ func (s *BankTransferService) processTransaction(transaction *banktransfer.Trans
entry := log.WithField("transaction", transaction) entry := log.WithField("transaction", transaction)
go func(transaction banktransfer.Transaction) { go func(transaction banktransfer.Transaction) {
entry.Info("Start processing transaction") entry.Info("Start processing transaction")
atomic.AddInt32(&s.counter, 1) transaction.Id = s.getUniqueId()
time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second) time.Sleep(time.Duration(rand.Intn(9)+1) * time.Second)
s.queue <- transaction s.queue <- transaction
entry.Info("Processing transaction finished") entry.Info("Processing transaction finished")
...@@ -51,14 +59,14 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf ...@@ -51,14 +59,14 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf
entry := log.WithField("transaction", transaction) entry := log.WithField("transaction", transaction)
entry.Info("Sending transaction") entry.Info("Sending transaction")
if err := stream.Send(&transaction); err != nil { if err := stream.Send(&transaction); err != nil {
// TODO: we need to recover from this if communication is not up s.requeueTransaction(&transaction)
entry.WithError(err).Error("Error sending transaction") entry.WithError(err).Error("Error sending transaction")
return err return err
} }
entry.Info("Transaction sent. Waiting for processing response") entry.Info("Transaction sent. Waiting for processing response")
response, err := stream.Recv() response, err := stream.Recv()
if err != nil { if err != nil {
// TODO: we need to recover from this if communication is not up s.requeueTransaction(&transaction)
entry.WithError(err).Error("Error receiving processing response") entry.WithError(err).Error("Error receiving processing response")
return err return err
} }
...@@ -72,3 +80,35 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf ...@@ -72,3 +80,35 @@ func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransf
} }
}() }()
} }
func (s *BankTransferService) requeueTransaction(transaction *banktransfer.Transaction) {
entry := log.WithField("transaction", transaction)
go func(transaction banktransfer.Transaction) {
entry.Infof("Requeuing transaction. Wait for %f seconds", retryTime.Seconds())
time.Sleep(retryTime)
s.retryQueue <- transaction
entry.Info("Requeued transaction")
}(*transaction)
}
func (s *BankTransferService) getUniqueId() int32 {
return atomic.AddInt32(&s.counter, 1)
}
func (s *BankTransferService) Start() {
log.Info("Starting banktransfer service")
go func() {
for {
select {
case <-s.stop:
break
case transaction := <-s.retryQueue:
s.queue <- transaction
}
}
}()
}
func (s *BankTransferService) Stop() {
log.Info("Stopping banktransfer service")
close(s.stop)
}
...@@ -18,7 +18,7 @@ func monitortransactions() { ...@@ -18,7 +18,7 @@ func monitortransactions() {
} }
func connectandmonitor() { func connectandmonitor() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel() defer cancel()
conn, err := client.GetBankTransferConnection(ctx) conn, err := client.GetBankTransferConnection(ctx)
if err != nil { if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment