diff --git a/src/gopraha/README.md b/src/gopraha/README.md new file mode 100644 index 0000000000000000000000000000000000000000..107118c36274363eba67444ab62b894f61c2d9c7 --- /dev/null +++ b/src/gopraha/README.md @@ -0,0 +1,22 @@ +# gopraha + +Producer / Consumer example using Kafka. + +## Run the example + +To run the example, first start the Kafka using docker: + + # docker compose up + +Test starts up Kafka and Zookeeper. Check if a client can connect to the Kafka server: + + # nc -z localhost 29092 + +If so, you can run the go example: + + # go mod download + # go run . + +To shut down the Kafka server call: + + # docker compose down diff --git a/src/gopraha/docker-compose.yml b/src/gopraha/docker-compose.yml new file mode 100644 index 0000000000000000000000000000000000000000..453bb2f7528f0bdf85a2f01dc46f1c293b1867da --- /dev/null +++ b/src/gopraha/docker-compose.yml @@ -0,0 +1,24 @@ +# see https://www.baeldung.com/ops/kafka-docker-setup +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper:6.1.1 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 22181:2181 + + kafka: + image: confluentinc/cp-kafka:6.1.1 + depends_on: + - zookeeper + ports: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 \ No newline at end of file diff --git a/src/gopraha/main.go b/src/gopraha/main.go new file mode 100644 index 0000000000000000000000000000000000000000..c7018fdf1b36ac2556288f4eabb2cdf736084955 --- /dev/null +++ b/src/gopraha/main.go @@ -0,0 +1,115 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "sync" + + "github.com/segmentio/kafka-go" +) + +const ( + MESSAGES = 10 + PRODUCERS = 3 + CONSUMERS = 5 + + KAFKA_HOST = "localhost:29092" + TOPIC = "gopraha" +) + +var ( + wg sync.WaitGroup +) + +func main() { + ensureTopic(TOPIC, CONSUMERS) + ctx := createCancelContext() + for i := 1; i <= PRODUCERS; i++ { + if i != PRODUCERS { + createProducer(ctx, TOPIC, i, MESSAGES/PRODUCERS) + } else { + createProducer(ctx, TOPIC, i, MESSAGES/PRODUCERS+MESSAGES%PRODUCERS) + } + } + for i := 1; i <= CONSUMERS; i++ { + createConsumer(ctx, TOPIC, i) + } + + // wait for producer and consumers + wg.Wait() + log.Println("Consumers and producers are finished.") +} + +func createCancelContext() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + go func() { + select { + case <-c: + log.Println("Signal catched. Calling cancel.") + cancel() + case <-ctx.Done(): + } + signal.Stop(c) + log.Println("Signal watcher successfully removed.") + }() + return ctx +} + +func createConsumer(ctx context.Context, topic string, i int) { + log.Printf("Starting consumer %d\n", i) + wg.Add(1) + go func() { + defer wg.Done() + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{KAFKA_HOST}, + GroupID: "consumer-group-id", + Topic: topic, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }) + for { + msg, err := r.ReadMessage(ctx) + if err != nil { + log.Printf("Consumer %d failed to read message: %v\n", i, err) + break + } + log.Printf("Consumer %d received message at topic/partition/offset %v/%v/%v: %s = %s\n", i, msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) + } + if err := r.Close(); err != nil { + log.Printf("Consumer %d could not be closed: %v\n", i, err) + } + log.Printf("Consumer %d successfully closed\n", i) + }() +} + +func createProducer(ctx context.Context, topic string, i, nr int) { + log.Printf("Starting producer %d\n", i) + wg.Add(1) + go func() { + defer wg.Done() + w := &kafka.Writer{ + Addr: kafka.TCP(KAFKA_HOST), + Topic: topic, + Balancer: &kafka.LeastBytes{}, + } + var messages []kafka.Message + for m := 1; m <= nr; m++ { + messages = append(messages, kafka.Message{ + Value: []byte(fmt.Sprintf("Msg %d from producer %d", m, i)), + }) + } + if err := w.WriteMessages(ctx, messages...); err != nil { + log.Printf("Producer %d failed to write nr: %v\n", i, err) + } + log.Printf("Producer %d successfully sent messages\n", i) + if err := w.Close(); err != nil { + log.Printf("Producer %d could not be closed: %v\n", i, err) + } + log.Printf("Producer %d successfully closed\n", i) + }() +} diff --git a/src/gopraha/topic.go b/src/gopraha/topic.go new file mode 100644 index 0000000000000000000000000000000000000000..3590cf9c662c4083f9d21e62932f9a604b8de8dd --- /dev/null +++ b/src/gopraha/topic.go @@ -0,0 +1,47 @@ +package main + +import ( + "log" + "net" + "strconv" + + "github.com/segmentio/kafka-go" +) + +func ensureTopic(topic string, numPartitions int) { + conn, err := kafka.Dial("tcp", KAFKA_HOST) + if err != nil { + panic(err.Error()) + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + panic(err.Error()) + } + var controllerConn *kafka.Conn + controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + panic(err.Error()) + } + defer controllerConn.Close() + + topicConfigs := []kafka.TopicConfig{ + { + Topic: topic, + NumPartitions: numPartitions, + ReplicationFactor: 1, + }, + } + + err = controllerConn.CreateTopics(topicConfigs...) + if err != nil { + panic(err.Error()) + } + + log.Printf( + "Topic '%s' with %d partitions successfully created\n", + topic, + numPartitions, + ) +}