Skip to content
Snippets Groups Projects
Commit af3b2f96 authored by Marcus Schiesser's avatar Marcus Schiesser
Browse files

added hello example for kafka

parent bd256747
No related branches found
No related tags found
No related merge requests found
# see https://www.baeldung.com/ops/kafka-docker-setup
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.1.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:6.1.1
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
\ No newline at end of file
module github.com/turngeek/examples-go/src/hello-kafka
go 1.16
require github.com/segmentio/kafka-go v0.4.16 // indirect
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
const (
KafkaHost = "localhost:29092"
Topic = "hello-kafka"
)
func main() {
ensureTopic(Topic, 1)
writeMessage(Topic, "Hello Kafka!")
readMessage(Topic)
}
func readMessage(topic string) {
log.Printf("Start reading message from topic '%s'", topic)
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{KafkaHost},
Topic: topic,
})
defer r.Close()
msg, err := r.ReadMessage(context.Background())
if err != nil {
log.Printf("Failed to read message: %v\n", err)
return
}
log.Printf("Received message from topic '%s' at partition %v, offset %v: %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
}
func writeMessage(topic, msg string) {
log.Printf("Send message '%s' to topic '%s'\n", msg, topic)
w := &kafka.Writer{
Addr: kafka.TCP(KafkaHost),
Topic: topic,
Balancer: &kafka.LeastBytes{},
}
defer w.Close()
if err := w.WriteMessages(context.Background(), kafka.Message{
Value: []byte(msg),
}); err != nil {
log.Printf("Failed to write message %s: %v\n", msg, err)
return
}
log.Printf("Successfully send message '%s' to topic '%s'\n", msg, topic)
}
package main
import (
"log"
"net"
"strconv"
"github.com/segmentio/kafka-go"
)
func ensureTopic(topic string, numPartitions int) {
conn, err := kafka.Dial("tcp", KafkaHost)
if err != nil {
panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: numPartitions,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
log.Printf(
"Topic '%s' with %d partitions successfully created\n",
topic,
numPartitions,
)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment