RabbitMQ
概述
rabbitmq
是一个基于 amqp091-go 的 rabbitmq 客户端库,支持自动重连和自定义设置参数,包括 direct
、topic
、fanout
、headers
、delayed message
、publisher subscriber
共六种消息类型,并支持死信。
使用示例
下面是direct
、topic
、fanout
、headers
、delayed message
、publisher subscriber
六种核心消息模型使用示例。每种模型都包含了生产者(Producer)发送消息和消费者(Consumer)接收消息的完整流程。代码通过 main
函数中的注释来切换执行不同的示例,例如 directExample
、topicExample
等。在每个示例中,程序首先初始化一个生产者,发送100条消息,然后启动一个或多个消费者来处理这些消息,并最后统计收发消息的数量以验证通信的正确性。这有助于开发者快速理解和上手不同消息类型的使用方法。
点击查看示例代码
package main
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
"github.com/go-dev-frame/sponge/pkg/logger"
"github.com/go-dev-frame/sponge/pkg/rabbitmq"
)
var (
producerCount int32
consumerCount int32
)
func main() {
url := "amqp://guest:guest@127.0.0.1:5672/"
directExample(url)
//topicExample(url)
//fanoutExample(url)
//headersExample(url)
//delayedMessageExample(url)
//publisherSubscriberExample(url)
}
func directExample(url string) {
exchangeName := "direct-exchange-demo"
queueName := "direct-queue-1"
routeKey := "direct-key-1"
exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)
var queueArgs map[string]interface{}
fmt.Printf("\n\n-------------------- direct --------------------\n")
// producer-side direct message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
for i := 1; i <= 100; i++ {
err = p.PublishDirect(context.Background(), []byte("[direct] message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side direct message
{
c := runConsume(url, exchange, queueName, queueArgs)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c.Count()))
}
printStat()
}
func topicExample(url string) {
exchangeName := "topic-exchange-demo"
queueName := "topic-queue-1"
routingKey := "key1.key2.*"
exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
var queueArgs map[string]interface{}
fmt.Printf("\n\n-------------------- topic --------------------\n")
// producer-side topic message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
for i := 1; i <= 100; i++ {
key := "key1.key2.key" + strconv.Itoa(i)
err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side topic message
{
c := runConsume(url, exchange, queueName, queueArgs)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c.Count()))
}
printStat()
}
func fanoutExample(url string) {
exchangeName := "fanout-exchange-demo"
queueName := "fanout-queue-1"
exchange := rabbitmq.NewFanoutExchange(exchangeName)
var queueArgs map[string]interface{}
fmt.Printf("\n\n-------------------- fanout --------------------\n")
// producer-side fanout message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
for i := 1; i <= 100; i++ {
err = p.PublishFanout(context.Background(), []byte("[fanout] message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side fanout message
{
queueName = "fanout-queue-1"
c1 := runConsume(url, exchange, queueName, queueArgs)
queueName = "fanout-queue-2"
c2 := runConsume(url, exchange, queueName, queueArgs)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
fmt.Println("\n\nconsumer 2 count:", c2.Count())
}
printStat()
}
func headersExample(url string) {
exchangeName := "headers-exchange-demo"
queueName := "headers-queue-1"
headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
var queueArgs map[string]interface{}
fmt.Printf("\n\n-------------------- headers --------------------\n")
// producer-side headers message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
ctx := context.Background()
for i := 1; i <= 100; i++ {
headersKeys1 := headersKeys
err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] key1 message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
// because of x-match: all, headersKeys2 will not match the same queue, so drop it
headersKeys2 := map[string]interface{}{"foo": "bar"}
err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] key2 message "+strconv.Itoa(i)))
checkErr(err)
}
}
// consumer-side headers message
{
c := runConsume(url, exchange, queueName, queueArgs)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c.Count()))
}
printStat()
}
func delayedMessageExample(url string) {
exchangeName := "delayed-message-exchange-demo"
queueName := "delayed-message-queue"
routingKey := "delayed-key"
exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
var queueArgs map[string]interface{}
fmt.Printf("\n\n-------------------- delayed message --------------------\n")
// producer-side delayed message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
ctx := context.Background()
datetimeLayout := "2006-01-02 15:04:05.000"
for i := 1; i <= 100; i++ {
err = p.PublishDelayedMessage(ctx, time.Second*3, []byte("[delayed] message "+strconv.Itoa(i)+" at "+time.Now().Format(datetimeLayout)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side delayed message
{
c := runConsume(url, exchange, queueName, queueArgs)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c.Count()))
}
printStat()
}
func publisherSubscriberExample(url string) {
channelName := "pub-sub"
fmt.Printf("\n\n-------------------- publisher subscriber --------------------\n")
// publisher-side message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewPublisher(channelName, connection)
checkErr(err)
defer p.Close()
for i := 1; i <= 100; i++ {
err = p.Publish(context.Background(), []byte("[pub-sub] message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// subscriber-side message
{
identifier := "pub-sub-queue-1"
s1 := runSubscriber(url, channelName, identifier)
identifier = "pub-sub-queue-2"
s2 := runSubscriber(url, channelName, identifier)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(s1.Count()))
fmt.Println("\n\nsubscriber 2 count:", s2.Count())
}
printStat()
}
func runConsume(url string, exchange *rabbitmq.Exchange, queueName string, queueArgs map[string]interface{}) *rabbitmq.Consumer {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
c, err := rabbitmq.NewConsumer(exchange, queueName, connection,
rabbitmq.WithConsumerAutoAck(false),
rabbitmq.WithConsumerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
)
checkErr(err)
c.Consume(context.Background(), handler)
return c
}
func runSubscriber(url string, channelName string, identifier string) *rabbitmq.Subscriber {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
s, err := rabbitmq.NewSubscriber(channelName, identifier, connection, rabbitmq.WithConsumerAutoAck(false))
checkErr(err)
s.Subscribe(context.Background(), handler)
return s
}
var handler = func(ctx context.Context, data []byte, tagID string) error {
logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
return nil
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
func printStat() {
fmt.Println("\n\n-------------------- stat --------------------")
fmt.Println("producer count:", atomic.LoadInt32(&producerCount))
fmt.Println("consumer count:", atomic.LoadInt32(&consumerCount))
fmt.Println("----------------------------------------------\n")
atomic.StoreInt32(&producerCount, 0)
atomic.StoreInt32(&consumerCount, 0)
}
死信队列
下面是为 direct
, topic
, fanout
, headers
, delayed message
这五种消息类型配置死信队列(Dead-Letter Queue)示例。通过在创建生产者时使用 rabbitmq.WithDeadLetterOptions
功能选项,可以方便地为常规队列绑定一个死信交换机和死信队列。示例中,通过设置队列参数(如 x-max-length
限制队列长度和 x-message-ttl
设置消息存活时间),当消息在原队列中过期或因队列已满而被丢弃时,这些消息(即死信)会被自动路由到指定的死信队列中。代码随后会启动两个消费者,一个用于消费正常消息,另一个用于消费死信队列中的消息,从而验证死信机制是否生效。
点击查看示例代码
package main
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
"github.com/go-dev-frame/sponge/pkg/logger"
"github.com/go-dev-frame/sponge/pkg/rabbitmq"
)
var (
producerCount int32
consumerCount int32
deadLetterConsumerCount int32
)
func main() {
url := "amqp://guest:guest@127.0.0.1:5672/"
directExample(url)
//topicExample(url)
//fanoutExample(url)
//headersExample(url)
//delayedMessageExample(url)
}
func directExample(url string) {
exchangeName := "direct-exchange-demo-2"
queueName := "direct-queue-2"
routingKey := "direct-key-2"
exchange := rabbitmq.NewDirectExchange(exchangeName, routingKey)
queueArgs := map[string]interface{}{
"x-max-length": 60,
"x-message-ttl": 3000, // milliseconds
}
deadLetterQueueName := "dl-" + queueName
deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)
fmt.Printf("\n\n-------------------- direct --------------------\n")
// producer-side direct message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection,
// set queue args
rabbitmq.WithProducerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
// add dead letter
rabbitmq.WithDeadLetterOptions(
rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
),
)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs() // get producer queue args
for i := 1; i <= 100; i++ {
err = p.PublishDirect(context.Background(), []byte("[direct] say hello"+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side direct message
{
c1 := runConsume(url, exchange, queueName, queueArgs)
c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
}
printStat()
}
func topicExample(url string) {
exchangeName := "topic-exchange-demo-2"
queueName := "topic-queue-2"
routingKey := "dl-key1.key2.*"
exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
queueArgs := map[string]interface{}{
"x-max-length": 60,
"x-message-ttl": 3000, // milliseconds
}
deadLetterQueueName := "dl-" + queueName
deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)
fmt.Printf("\n\n-------------------- topic --------------------\n")
// producer-side topic message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection,
// set queue args
rabbitmq.WithProducerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
// add dead letter
rabbitmq.WithDeadLetterOptions(
rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
),
)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
for i := 1; i <= 100; i++ {
key := "dl-key1.key2.key" + strconv.Itoa(i)
err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side topic message
{
c1 := runConsume(url, exchange, queueName, queueArgs)
c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
}
printStat()
}
func fanoutExample(url string) {
exchangeName := "fanout-exchange-demo-2"
queueName := "fanout-queue-3"
exchange := rabbitmq.NewFanoutExchange(exchangeName)
queueArgs := map[string]interface{}{
"x-max-length": 60,
"x-message-ttl": 3000, // milliseconds
}
deadLetterQueueName := "dl-" + queueName
deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-direct-key")
fmt.Printf("\n\n-------------------- fanout --------------------\n")
// producer-side fanout message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection,
// set queue args
rabbitmq.WithProducerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
// add dead letter
rabbitmq.WithDeadLetterOptions(
rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
),
)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
for i := 1; i <= 100; i++ {
err = p.PublishFanout(context.Background(), []byte("[fanout] message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side fanout message
{
queueName = "fanout-queue-3"
c1 := runConsume(url, exchange, queueName, queueArgs)
queueName = "fanout-queue-4"
c2 := runConsume(url, exchange, queueName, queueArgs)
c3 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
atomic.AddInt32(&consumerCount, int32(c2.Count()))
atomic.AddInt32(&deadLetterConsumerCount, int32(c3.Count()))
}
printStat()
}
func headersExample(url string) {
exchangeName := "headers-exchange-demo-2"
queueName := "headers-queue-2"
headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
queueArgs := map[string]interface{}{
"x-max-length": 60,
"x-message-ttl": 3000, // milliseconds
}
deadLetterQueueName := "dl-" + queueName
deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-headers-key")
fmt.Printf("\n\n-------------------- headers --------------------\n")
// producer-side headers message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection,
// set queue args
rabbitmq.WithProducerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
// add dead letter
rabbitmq.WithDeadLetterOptions(
rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
),
)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
ctx := context.Background()
for i := 1; i <= 100; i++ {
headersKeys1 := headersKeys
err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] message "+strconv.Itoa(i)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
// because of x-match: all, headersKeys2 will not match the same queue, so drop it
headersKeys2 := map[string]interface{}{"foo": "bar"}
err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] key2 message"))
checkErr(err)
}
}
// consumer-side headers message
{
c1 := runConsume(url, exchange, queueName, queueArgs)
c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)
<-time.After(time.Second * 5)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
}
printStat()
}
func delayedMessageExample(url string) {
exchangeName := "delayed-message-exchange-demo-2"
queueName := "delayed-message-queue-2"
routingKey := "delayed-key-2"
exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
queueArgs := map[string]interface{}{
"x-max-length": 60,
"x-message-ttl": 3000, // milliseconds
}
deadLetterQueueName := "dl-" + queueName
deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)
fmt.Printf("\n\n-------------------- delayed message --------------------\n")
// producer-side delayed message
{
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection,
// set queue args
rabbitmq.WithProducerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
// add dead letter
rabbitmq.WithDeadLetterOptions(
rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
),
)
checkErr(err)
defer p.Close()
queueArgs = p.QueueArgs()
ctx := context.Background()
datetimeLayout := "2006-01-02 15:04:05.000"
for i := 1; i <= 200; i++ {
delayTime := time.Second
if i > 100 {
delayTime = time.Second * 2
}
err = p.PublishDelayedMessage(ctx, delayTime, []byte("[delayed] message "+strconv.Itoa(i)+" at "+time.Now().Format(datetimeLayout)))
checkErr(err)
atomic.AddInt32(&producerCount, 1)
}
}
// consumer-side delayed message
{
time.Sleep(time.Second * 3) // wait for all messages to be sent
c1 := runConsume(url, exchange, queueName, queueArgs)
c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)
<-time.After(time.Second * 10)
atomic.AddInt32(&consumerCount, int32(c1.Count()))
atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
}
printStat()
}
func runConsume(url string, exchange *rabbitmq.Exchange, queueName string, queueArgs map[string]interface{}) *rabbitmq.Consumer {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
c, err := rabbitmq.NewConsumer(exchange, queueName, connection,
rabbitmq.WithConsumerAutoAck(false),
rabbitmq.WithConsumerQueueDeclareOptions(
rabbitmq.WithQueueDeclareArgs(queueArgs),
),
)
checkErr(err)
c.Consume(context.Background(), handler)
return c
}
func runConsumeForDeadLetter(url string, exchange *rabbitmq.Exchange, queueName string) *rabbitmq.Consumer {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
checkErr(err)
c.Consume(context.Background(), handler)
return c
}
var handler = func(ctx context.Context, data []byte, tagID string) error {
logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
return nil
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
func printStat() {
fmt.Println("\n\n-------------------- stat --------------------")
fmt.Println("producer count:", producerCount)
fmt.Println("consumer count:", consumerCount)
fmt.Println("dead letter consumer count:", deadLetterConsumerCount)
fmt.Println("----------------------------------------------\n")
atomic.StoreInt32(&producerCount, 0)
atomic.StoreInt32(&consumerCount, 0)
atomic.StoreInt32(&deadLetterConsumerCount, 0)
}
自动重连 RabbitMQ 服务
下面示例代码展示了 rabbitmq
库的客户端在面对网络中断或 RabbitMQ 服务重启时的自动重连能力。runProduce
函数则演示了生产者的手动重连逻辑。在发送消息的循环中,如果 PublishDirect
方法返回 rabbitmq.ErrClosed
错误,表示连接已断开。此时,程序会进入一个内部循环,通过 connection.CheckConnected()
方法检测连接状态。一旦连接恢复,它会重新创建一个新的生产者实例,并继续发送消息,从而保证了消息生产的健壮性。
点击查看示例代码
package main
import (
"context"
"errors"
"strconv"
"time"
"github.com/go-dev-frame/sponge/pkg/logger"
"github.com/go-dev-frame/sponge/pkg/rabbitmq"
)
var url = "amqp://guest:guest@127.0.0.1:5672/"
func main() {
ctx, _ := context.WithTimeout(context.Background(), time.Hour)
exchangeName := "direct-exchange-demo"
queueName := "direct-queue"
routeKey := "info"
exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)
err := runConsume(ctx, exchange, queueName)
if err != nil {
logger.Error("runConsume failed", logger.Err(err))
return
}
err = runProduce(ctx, exchange, queueName)
if err != nil {
logger.Error("runProduce failed", logger.Err(err))
return
}
}
func runProduce(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
if err != nil {
return err
}
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
if err != nil {
return err
}
defer p.Close()
count := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
count++
data := []byte("direct say hello" + strconv.Itoa(count))
err = p.PublishDirect(ctx, data)
if err != nil {
if errors.Is(err, rabbitmq.ErrClosed) {
for {
if !connection.CheckConnected() { // check connection
time.Sleep(time.Second * 2)
continue
}
p, err = rabbitmq.NewProducer(exchange, queueName, connection)
if err != nil {
logger.Warn("reconnect failed", logger.Err(err))
time.Sleep(time.Second * 2)
continue
}
break
}
} else {
logger.Warn("publish failed", logger.Err(err))
}
}
logger.Info("publish message", logger.String("data", string(data)))
time.Sleep(time.Second * 5) // mock send message frequency
}
}
}
func runConsume(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
if err != nil {
return err
}
c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
if err != nil {
return err
}
c.Consume(ctx, handler)
return nil
}
var handler = func(ctx context.Context, data []byte, tagID string) error {
logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
return nil
}