kafka
[TOC]
安装
# 使用docker: https://github.com/wurstmeister/kafka-docker.git
# 修改 vim docker-compose-single-broker.yml
# 启动 docker-compose -f docker-compose-single-broker.yml up -d
# 需要在本地启动
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "log:3:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
go client
sarama
producer
package main
import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
)
type MyMsg struct {
M int32
S int32
G string
}
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "log"
msg.Value = sarama.StringEncoder(genMsg())
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
func genMsg() string {
var m = MyMsg{M: 1, S: 2, G: "xxx"}
var s, _ = json.Marshal(m)
return string(s)
}
consumer
package main
import (
"fmt"
"sync"
"github.com/Shopify/sarama"
)
// kafka consumer
var wg sync.WaitGroup
func main() {
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("log") // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍历所有的分区
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
// 异步从每个分区消费信息
wg.Add(1)
go func(sarama.PartitionConsumer) {
defer func() {
pc.AsyncClose()
wg.Done()
}()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%s Value:%s \n", msg.Partition, msg.Offset, string(msg.Key),
string(msg.Value))
}
}(pc)
}
wg.Wait()
}
segmentio
producer
package main
import (
"context"
"log"
"time"
kafka "github.com/segmentio/kafka-go"
)
func main() {
// to produce messages
topic := "log"
partition := 0
log.Println("开始建立链接")
conn, err := kafka.DialLeader(context.Background(), "tcp", "127.0.0.1:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
log.Println("建立链接成功")
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
consumer
// TODO
reference
最后更新于
这有帮助吗?