kafka

[TOC]

安装

# 使用docker:  https://github.com/wurstmeister/kafka-docker.git
# 修改 vim docker-compose-single-broker.yml
# 启动 docker-compose -f docker-compose-single-broker.yml up -d
# 需要在本地启动
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_CREATE_TOPICS: "log:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

go client

sarama

producer

package main

import (
    "encoding/json"
    "fmt"

    "github.com/Shopify/sarama"
)

type MyMsg struct {
    M int32
    S int32
    G string
}

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

    // 构造一个消息
    msg := &sarama.ProducerMessage{}
    msg.Topic = "log"
    msg.Value = sarama.StringEncoder(genMsg())
    // 连接kafka
    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        fmt.Println("producer closed, err:", err)
        return
    }
    defer client.Close()
    // 发送消息
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed, err:", err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

func genMsg() string {
    var m = MyMsg{M: 1, S: 2, G: "xxx"}
    var s, _ = json.Marshal(m)
    return string(s)
}

consumer

package main

import (
    "fmt"
    "sync"

    "github.com/Shopify/sarama"
)

// kafka consumer
var wg sync.WaitGroup

func main() {
    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
        return
    }
    partitionList, err := consumer.Partitions("log") // 根据topic取到所有的分区
    if err != nil {
        fmt.Printf("fail to get list of partition:err%v\n", err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList { // 遍历所有的分区
        // 针对每个分区创建一个对应的分区消费者
        pc, err := consumer.ConsumePartition("log", int32(partition), sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
            return
        }
        // 异步从每个分区消费信息
        wg.Add(1)
        go func(sarama.PartitionConsumer) {
            defer func() {
                pc.AsyncClose()
                wg.Done()
            }()
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d Offset:%d Key:%s Value:%s \n", msg.Partition, msg.Offset, string(msg.Key),
                    string(msg.Value))
            }
        }(pc)
    }

    wg.Wait()
}

segmentio

producer

package main

import (
    "context"
    "log"
    "time"

    kafka "github.com/segmentio/kafka-go"
)

func main() {
    // to produce messages
    topic := "log"
    partition := 0

    log.Println("开始建立链接")
    conn, err := kafka.DialLeader(context.Background(), "tcp", "127.0.0.1:9092", topic, partition)
    if err != nil {
        log.Fatal("failed to dial leader:", err)
    }

    log.Println("建立链接成功")
    conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
    _, err = conn.WriteMessages(
        kafka.Message{Value: []byte("one!")},
        kafka.Message{Value: []byte("two!")},
        kafka.Message{Value: []byte("three!")},
    )
    if err != nil {
        log.Fatal("failed to write messages:", err)
    }

    if err := conn.Close(); err != nil {
        log.Fatal("failed to close writer:", err)
    }

}

consumer

// TODO

reference

Kafka 概述:深入理解架构

使用docker-compose部署kafka(wurstmeister/kafka)以及常见问题解决

Kafka原理剖析

Kafka 从基础到高级(附图讲解)

最后更新于

这有帮助吗?