diff --git a/src/hello-kafka/docker-compose.yml b/src/hello-kafka/docker-compose.yml new file mode 100644 index 0000000000000000000000000000000000000000..453bb2f7528f0bdf85a2f01dc46f1c293b1867da --- /dev/null +++ b/src/hello-kafka/docker-compose.yml @@ -0,0 +1,24 @@ +# see https://www.baeldung.com/ops/kafka-docker-setup +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper:6.1.1 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 22181:2181 + + kafka: + image: confluentinc/cp-kafka:6.1.1 + depends_on: + - zookeeper + ports: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 \ No newline at end of file diff --git a/src/hello-kafka/go.mod b/src/hello-kafka/go.mod new file mode 100644 index 0000000000000000000000000000000000000000..cd9be8dc1a38c6a36596929132b44c8a84e1684d --- /dev/null +++ b/src/hello-kafka/go.mod @@ -0,0 +1,5 @@ +module github.com/turngeek/examples-go/src/hello-kafka + +go 1.16 + +require github.com/segmentio/kafka-go v0.4.16 // indirect diff --git a/src/hello-kafka/main.go b/src/hello-kafka/main.go new file mode 100644 index 0000000000000000000000000000000000000000..32716422182187d8f5d2431d0085545edecf1d1d --- /dev/null +++ b/src/hello-kafka/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "log" + + "github.com/segmentio/kafka-go" +) + +const ( + KafkaHost = "localhost:29092" + Topic = "hello-kafka" +) + +func main() { + ensureTopic(Topic, 1) + writeMessage(Topic, "Hello Kafka!") + readMessage(Topic) +} + +func readMessage(topic string) { + log.Printf("Start reading message from topic '%s'", topic) + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{KafkaHost}, + Topic: topic, + }) + defer r.Close() + msg, err := r.ReadMessage(context.Background()) + if err != nil { + log.Printf("Failed to read message: %v\n", err) + return + } + log.Printf("Received message from topic '%s' at partition %v, offset %v: %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value)) +} + +func writeMessage(topic, msg string) { + log.Printf("Send message '%s' to topic '%s'\n", msg, topic) + w := &kafka.Writer{ + Addr: kafka.TCP(KafkaHost), + Topic: topic, + Balancer: &kafka.LeastBytes{}, + } + defer w.Close() + if err := w.WriteMessages(context.Background(), kafka.Message{ + Value: []byte(msg), + }); err != nil { + log.Printf("Failed to write message %s: %v\n", msg, err) + return + } + log.Printf("Successfully send message '%s' to topic '%s'\n", msg, topic) +} diff --git a/src/hello-kafka/topic.go b/src/hello-kafka/topic.go new file mode 100644 index 0000000000000000000000000000000000000000..bfcc0c0e97f4143a61172d198c7bc5666196f975 --- /dev/null +++ b/src/hello-kafka/topic.go @@ -0,0 +1,47 @@ +package main + +import ( + "log" + "net" + "strconv" + + "github.com/segmentio/kafka-go" +) + +func ensureTopic(topic string, numPartitions int) { + conn, err := kafka.Dial("tcp", KafkaHost) + if err != nil { + panic(err.Error()) + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + panic(err.Error()) + } + var controllerConn *kafka.Conn + controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + panic(err.Error()) + } + defer controllerConn.Close() + + topicConfigs := []kafka.TopicConfig{ + { + Topic: topic, + NumPartitions: numPartitions, + ReplicationFactor: 1, + }, + } + + err = controllerConn.CreateTopics(topicConfigs...) + if err != nil { + panic(err.Error()) + } + + log.Printf( + "Topic '%s' with %d partitions successfully created\n", + topic, + numPartitions, + ) +}