Kafka
Overview
kafka
is a Kafka client library encapsulated based on sarama. The producer supports synchronous and asynchronous message production, and the consumer supports group and partition message consumption, fully compatible with sarama's usage.
Producer
Sync Producer
A synchronous producer blocks after sending a message until it receives a response from the Kafka broker, which ensures the reliability of message delivery. kafka.InitSyncProducer
is used to initialize a synchronous producer. The default configuration requires all ISRs (In-Sync Replicas) to acknowledge receipt of the message before returning, providing the highest level of durability guarantee.
Click to view example code
package main
import (
"fmt"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
testTopic := "my-topic"
addrs := []string{"localhost:9092"}
// default config are requiredAcks=WaitForAll, partitionerConstructor=NewHashPartitioner, returnSuccesses=true
p, err := kafka.InitSyncProducer(addrs, kafka.SyncProducerWithVersion(sarama.V3_6_0_0))
if err != nil {
fmt.Println(err)
return
}
defer p.Close()
// testData is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/producer_test.go#L19
// var testData = []interface{}{...}
// Case 1: send sarama.ProducerMessage type message
msg := testData[0].(*sarama.ProducerMessage)
partition, offset, err := p.SendMessage(msg)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("partition:", partition, "offset:", offset)
// Case 2: send multiple types message
for _, data := range testData {
partition, offset, err := p.SendData(testTopic, data)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("partition:", partition, "offset:", offset)
}
}
The code example shows two ways to send messages:
p.SendMessage
: Directly sends asarama.ProducerMessage
type message. This method provides finer control, such as customizing the message's Key, Headers, etc.p.SendData
: A convenient method that can send various types of data. It will automatically encapsulate the data into asarama.ProducerMessage
and send it to the specified topic.
Async Producer
An asynchronous producer sends messages in a "fire and forget" manner and does not wait for confirmation from the broker, thus having very high throughput. kafka.InitAsyncProducer
is used to initialize an asynchronous producer. Parameters such as acknowledgment level (acks), flush messages, and flush frequency can be configured through options. Messages that fail to be sent will be returned through an error channel.
Click to view example code
package main
import (
"fmt"
"time"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
testTopic := "my-topic"
addrs := []string{"localhost:9092"}
p, err := kafka.InitAsyncProducer(addrs,
kafka.AsyncProducerWithVersion(sarama.V3_6_0_0),
kafka.AsyncProducerWithRequiredAcks(sarama.WaitForLocal),
kafka.AsyncProducerWithFlushMessages(50),
kafka.AsyncProducerWithFlushFrequency(time.milliseconds*500),
)
if err != nil {
fmt.Println(err)
return
}
defer p.Close()
// testData is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/producer_test.go#L19
// var testData = []interface{}{...}
// Case 1: send sarama.ProducerMessage type message, supports multiple messages
msg := testData[0].(*sarama.ProducerMessage)
err = p.SendMessage(msg, msg)
if err != nil {
fmt.Println(err)
return
}
// Case 2: send multiple types message, supports multiple messages
err = p.SendData(testTopic, testData...)
if err != nil {
fmt.Println(err)
return
}
<-time.After(time.Second) // wait for all messages to be sent
}
The code example also shows two ways of sending:
p.SendMessage
: Asynchronously sends one or moresarama.ProducerMessage
messages.p.SendData
: Asynchronously sends one or more messages of different types to the specified topic.
Since it is an asynchronous operation, it is necessary to wait appropriately before the program exits to ensure that all messages have been sent successfully.
Consumer
Consumer Group
A consumer group is the core mechanism for Kafka to achieve message broadcasting and load-balanced consumption. kafka.InitConsumerGroup
is used to initialize a consumer group. Multiple consumer instances within the group will coordinate to consume the partitions of the subscribed topics, and Kafka will automatically perform partition rebalancing.
Click to view example code
package main
import (
"fmt"
"time"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
testTopic := "my-topic"
groupID := "my-group"
addrs := []string{"localhost:9092"}
// default config are offsetsInitial=OffsetOldest, autoCommitEnable=true, autoCommitInterval=time.Second
cg, err := kafka.InitConsumerGroup(addrs, groupID, kafka.ConsumerWithVersion(sarama.V3_6_0_0))
if err != nil {
fmt.Println(err)
return
}
defer cg.Close()
// Case 1: consume default handle message
go cg.Consume(context.Background(), []string{testTopic}, handleMsgFn)
// Case 2: consume custom handle message
go cg.ConsumeCustom(context.Background(), []string{testTopic}, &myConsumerGroupHandler{
autoCommitEnable: cg.autoCommitEnable,
})
<-time.After(time.Minute) // wait exit
}
var handleMsgFn = func(msg *sarama.ConsumerMessage) error {
fmt.Printf("received msg: topic=%s, partition=%d, offset=%d, key=%s, val=%s\n",
msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
return nil
}
type myConsumerGroupHandler struct {
autoCommitEnable bool
}
func (h *myConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *myConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *myConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("received msg: topic=%s, partition=%d, offset=%d, key=%s, val=%s\n",
msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
session.MarkMessage(msg, "")
if !h.autoCommitEnable {
session.Commit()
}
}
return nil
}
The code example shows two ways of consumption:
cg.Consume
: Uses a simple processing functionhandleMsgFn
to consume messages. This is a convenient usage, suitable for simple message processing logic.cg.ConsumeCustom
: Uses a custom implementation of thesarama.ConsumerGroupHandler
interface. This provides more complete control over the consumption process, allowing management of the consumer session lifecycle (Setup
,Cleanup
) and the message commit process (ConsumeClaim
), for example, to implement manual offset commits.
Partition Consumer
In addition to using consumer groups, you can also directly consume specified partitions. kafka.InitConsumer
is used to initialize a simple consumer. This method will not be rebalanced. It provides precise control over which partition to consume but requires manual handling of partition allocation and failover.
Click to view example code
package main
import (
"fmt"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
"time"
)
func main() {
testTopic := "my-topic"
addrs := []string{"localhost:9092"}
c, err := kafka.InitConsumer(addrs, kafka.ConsumerWithVersion(sarama.V3_6_0_0))
if err != nil {
fmt.Println(err)
return
}
defer c.Close()
// Case 1: consume one partition
go c.ConsumePartition(context.Background(), testTopic, 0, sarama.OffsetNewest, handleMsgFn)
// Case 2: consume all partition
c.ConsumeAllPartition(context.Background(), testTopic, sarama.OffsetNewest, handleMsgFn)
<-time.After(time.Minute) // wait exit
}
var handleMsgFn = func(msg *sarama.ConsumerMessage) error {
fmt.Printf("received msg: topic=%s, partition=%d, offset=%d, key=%s, val=%s\n",
msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
return nil
}
The code example shows two ways of consumption:
c.ConsumePartition
: Consumes a single partition of a specified topic, requiring the partition number and starting offset to be provided.c.ConsumeAllPartition
: Consumes all partitions of a specified topic. This method will discover all partitions of the topic and start a goroutine for each partition to consume.
Topic Backlog
Monitoring the consumption of a Topic by a consumer group, especially the message lag, is crucial for assessing system health. Message lag refers to the difference between the latest message offset in a partition (Log End Offset) and the offset committed by the consumer group.
Click to view example code
package main
import (
"fmt"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
m, err := kafka.InitClientManager(brokerList, groupID)
if err != nil {
panic(err)
}
defer m.Close()
total, backlogs, err := m.GetBacklog(topic)
if err != nil {
panic(err)
}
fmt.Println("total backlog:", total)
for _, backlog := range backlogs {
fmt.Printf("partation=%d, backlog=%d, next_consume_offset=%d\n", backlog.Partition, backlog.Backlog, backlog.NextConsumeOffset)
}
}
By initializing a client manager through kafka.InitClientManager
and then calling the m.GetBacklog
method, you can obtain the total backlog of a specified Topic relative to a specified consumer group, as well as detailed backlog information for each partition. This is very useful for operations and monitoring.