diff --git a/src/banktransfer/main.go b/src/banktransfer/main.go index d89673c08c66d886c9b3298396a3abf1728c6e7f..0d1783afda80616fd97d3389505d585873f1493c 100644 --- a/src/banktransfer/main.go +++ b/src/banktransfer/main.go @@ -1,5 +1,39 @@ package main +import ( + "fmt" + "net" + "os" + + log "github.com/sirupsen/logrus" + "gitlab.reutlingen-university.de/albrecht/myaktion-go/src/banktransfer/grpc/banktransfer" + "gitlab.reutlingen-university.de/albrecht/myaktion-go/src/banktransfer/service" + "google.golang.org/grpc" +) + +func init() { + log.SetFormatter(&log.TextFormatter{}) + log.SetReportCaller(true) + level, err := log.ParseLevel(os.Getenv("LOG_LEVEL")) + if err != nil { + log.Info("Log level not specified, set default to: INFO") + log.SetLevel(log.InfoLevel) + return + } + log.SetLevel(level) +} + +var grpcPort = 9111 + func main() { - //test + log.Info("Starting Banktransfer server") + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort)) + if err != nil { + log.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) + } + grpcServer := grpc.NewServer() + banktransfer.RegisterBankTransferServer(grpcServer, service.NewBankTransferService()) + if err := grpcServer.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } } diff --git a/src/banktransfer/service/banktransfer.go b/src/banktransfer/service/banktransfer.go index 50472e99a6927b5f2175189be7ba3ca05836983a..491b479d17529ee4a2b45c2c375d3646dd2b94ce 100644 --- a/src/banktransfer/service/banktransfer.go +++ b/src/banktransfer/service/banktransfer.go @@ -2,6 +2,7 @@ package service import ( "context" + "time" log "github.com/sirupsen/logrus" "gitlab.reutlingen-university.de/albrecht/myaktion-go/src/banktransfer/grpc/banktransfer" @@ -20,3 +21,38 @@ func (s *BankTransferService) TransferMoney(_ context.Context, transaction *bank log.Infof("Received transaction: %v", transaction) return &emptypb.Empty{}, nil } + +func (s *BankTransferService) ProcessTransactions(stream banktransfer.BankTransfer_ProcessTransactionsServer) error { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + return func() error { + for { + select { + case <-stream.Context().Done(): + log.Info("Watching transactions cancelled from the client side") + return nil + case <-ticker.C: + transaction := &banktransfer.Transaction{Id: s.counter, Amount: 20} + entry := log.WithField("transaction", transaction) + entry.Info("Sending transaction") + if err := stream.Send(transaction); err != nil { + entry.WithError(err).Error("Error sending transaction") + return err + } + entry.Info("Transaction sent. Waiting for processing response") + response, err := stream.Recv() + if err != nil { + entry.WithError(err).Error("Error receiving processing response") + return err + } + if response.Id != s.counter { + // NOTE: this is just a guard and not happening as transaction is local per connection + entry.Error("Received processing response of a different transaction") + } else { + entry.Info("Processing response received") + s.counter++ + } + } + } + }() +}