Kafka
概述
Kafka
是一个基于 sarama封装的 Kafka 客户端库,生产者支持同步和异步生产消息,消费者支持组和分区消费消息,完全兼容 sarama 的用法。
生产者
同步生产者
同步生产者在发送消息后会阻塞,直到收到 Kafka broker 的响应,这确保了消息发送的可靠性。kafka.InitSyncProducer
用于初始化一个同步生产者。默认配置要求所有 ISR(In-Sync Replicas)都确认收到消息后才返回,以提供最高等级的持久性保证。
点击查看代码示例
package main
import (
"fmt"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
testTopic := "my-topic"
addrs := []string{"localhost:9092"}
// default config are requiredAcks=WaitForAll, partitionerConstructor=NewHashPartitioner, returnSuccesses=true
p, err := kafka.InitSyncProducer(addrs, kafka.SyncProducerWithVersion(sarama.V3_6_0_0))
if err != nil {
fmt.Println(err)
return
}
defer p.Close()
// testData is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/producer_test.go#L19
// var testData = []interface{}{...}
// Case 1: send sarama.ProducerMessage type message
msg := testData[0].(*sarama.ProducerMessage)
partition, offset, err := p.SendMessage(msg)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("partition:", partition, "offset:", offset)
// Case 2: send multiple types message
for _, data := range testData {
partition, offset, err := p.SendData(testTopic, data)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("partition:", partition, "offset:", offset)
}
}
代码示例展示了两种发送消息的方式:
p.SendMessage
: 直接发送一个sarama.ProducerMessage
类型的消息。这种方式提供了更精细的控制,例如可以自定义消息的 Key、Headers 等。p.SendData
: 一个便捷方法,可以发送多种类型的数据。它会自动将数据封装成sarama.ProducerMessage
并发送到指定的主题(topic)。
异步生产者
异步生产者以“即发即忘”的方式发送消息,不会等待 broker 的确认,因此具有非常高的吞吐量。kafka.InitAsyncProducer
用于初始化异步生产者,可以通过选项配置确认级别(acks)、刷新消息数量(flush messages)和刷新频率(flush frequency)等参数。发送失败的消息会通过错误通道返回。
点击查看代码示例
package main
import (
"fmt"
"time"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
testTopic := "my-topic"
addrs := []string{"localhost:9092"}
p, err := kafka.InitAsyncProducer(addrs,
kafka.AsyncProducerWithVersion(sarama.V3_6_0_0),
kafka.AsyncProducerWithRequiredAcks(sarama.WaitForLocal),
kafka.AsyncProducerWithFlushMessages(50),
kafka.AsyncProducerWithFlushFrequency(time.milliseconds*500),
)
if err != nil {
fmt.Println(err)
return
}
defer p.Close()
// testData is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/producer_test.go#L19
// var testData = []interface{}{...}
// Case 1: send sarama.ProducerMessage type message, supports multiple messages
msg := testData[0].(*sarama.ProducerMessage)
err = p.SendMessage(msg, msg)
if err != nil {
fmt.Println(err)
return
}
// Case 2: send multiple types message, supports multiple messages
err = p.SendData(testTopic, testData...)
if err != nil {
fmt.Println(err)
return
}
<-time.After(time.Second) // wait for all messages to be sent
}
代码示例同样展示了两种发送方式:
p.SendMessage
: 异步发送一个或多个sarama.ProducerMessage
消息。p.SendData
: 异步发送一个或多个不同类型的消息到指定主题。
由于是异步操作,在程序退出前需要适当等待,以确保所有消息都已成功发送。
消费者
消费者组
消费者组是 Kafka 实现消息广播和负载均衡消费的核心机制。kafka.InitConsumerGroup
用于初始化一个消费者组。组内的多个消费者实例会协调消费订阅主题的分区,Kafka 会自动进行分区再均衡(rebalance)。
点击查看代码示例
package main
import (
"fmt"
"time"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
testTopic := "my-topic"
groupID := "my-group"
addrs := []string{"localhost:9092"}
// default config are offsetsInitial=OffsetOldest, autoCommitEnable=true, autoCommitInterval=time.Second
cg, err := kafka.InitConsumerGroup(addrs, groupID, kafka.ConsumerWithVersion(sarama.V3_6_0_0))
if err != nil {
fmt.Println(err)
return
}
defer cg.Close()
// Case 1: consume default handle message
go cg.Consume(context.Background(), []string{testTopic}, handleMsgFn)
// Case 2: consume custom handle message
go cg.ConsumeCustom(context.Background(), []string{testTopic}, &myConsumerGroupHandler{
autoCommitEnable: cg.autoCommitEnable,
})
<-time.After(time.Minute) // wait exit
}
var handleMsgFn = func(msg *sarama.ConsumerMessage) error {
fmt.Printf("received msg: topic=%s, partition=%d, offset=%d, key=%s, val=%s\n",
msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
return nil
}
type myConsumerGroupHandler struct {
autoCommitEnable bool
}
func (h *myConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *myConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *myConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("received msg: topic=%s, partition=%d, offset=%d, key=%s, val=%s\n",
msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
session.MarkMessage(msg, "")
if !h.autoCommitEnable {
session.Commit()
}
}
return nil
}
代码示例展示了两种消费方式:
cg.Consume
: 使用一个简单的处理函数handleMsgFn
来消费消息。这是一种便捷的用法,适用于简单的消息处理逻辑。cg.ConsumeCustom
: 使用一个自定义的sarama.ConsumerGroupHandler
接口实现。这提供了对消费过程更完整的控制,可以管理消费会话的生命周期(Setup
,Cleanup
)和消息的提交流程(ConsumeClaim
),例如实现手动提交位移。
分区消费
除了使用消费者组,还可以直接消费指定分区。kafka.InitConsumer
用于初始化一个简单的消费者。这种方式不会被再均衡,它提供了对消费哪个分区的精确控制,但需要手动处理分区的分配和故障转移。
点击查看代码示例
package main
import (
"fmt"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
"time"
)
func main() {
testTopic := "my-topic"
addrs := []string{"localhost:9092"}
c, err := kafka.InitConsumer(addrs, kafka.ConsumerWithVersion(sarama.V3_6_0_0))
if err != nil {
fmt.Println(err)
return
}
defer c.Close()
// Case 1: consume one partition
go c.ConsumePartition(context.Background(), testTopic, 0, sarama.OffsetNewest, handleMsgFn)
// Case 2: consume all partition
c.ConsumeAllPartition(context.Background(), testTopic, sarama.OffsetNewest, handleMsgFn)
<-time.After(time.Minute) // wait exit
}
var handleMsgFn = func(msg *sarama.ConsumerMessage) error {
fmt.Printf("received msg: topic=%s, partition=%d, offset=%d, key=%s, val=%s\n",
msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
return nil
}
代码示例展示了两种消费方式:
c.ConsumePartition
: 消费指定主题的单个分区,需要提供分区号和起始位移。c.ConsumeAllPartition
: 消费指定主题的所有分区。该方法会发现主题的所有分区并为每个分区启动一个 goroutine 进行消费。
Topic 积压
监控消费者组对某个 Topic 的消费情况,特别是消息积压(lag),对于评估系统健康状况至关重要。消息积压指的是分区中最新的消息位移(Log End Offset)与消费者组已提交的位移之间的差距。
点击查看代码示例
package main
import (
"fmt"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
m, err := kafka.InitClientManager(brokerList, groupID)
if err != nil {
panic(err)
}
defer m.Close()
total, backlogs, err := m.GetBacklog(topic)
if err != nil {
panic(err)
}
fmt.Println("total backlog:", total)
for _, backlog := range backlogs {
fmt.Printf("partation=%d, backlog=%d, next_consume_offset=%d\n", backlog.Partition, backlog.Backlog, backlog.NextConsumeOffset)
}
}
通过 kafka.InitClientManager
初始化一个客户端管理器,然后调用 m.GetBacklog
方法,即可获取指定 Topic 相对于指定消费者组的总积压量以及每个分区的详细积压信息。这对于运维和监控非常有用。