Asynq
大约 5 分钟componentasynq
sasynq
是基于开源库 asynq的简单封装实现。它在保持与原生 asynq 使用模式完全兼容的同时,提供了更简洁易用的 SDK,其主要特性包括:
- 支持 Redis Cluster 和 Sentinel 模式,实现高可用与水平扩展能力。
- 分布式任务队列支持优先级队列、延迟队列、唯一任务(防止重复执行)和定时任务调度。
- 内置任务重试机制(可自定义重试次数)、超时控制和截止时间保障。
- 支持立即执行、延迟执行和指定时间点执行的灵活调度方式。
- 统一使用 zap 打印日志。
sasynq
简化了 Go 语言中的异步分布式任务处理,帮助开发者快速编写清晰可维护的后台任务代码。
任务队列
任务载荷与处理器定义
以下展示定义任务载荷和处理器,提供了三种方式。
定义任务载荷示例代码
// example/common/task.go
package common
import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
"github.com/go-dev-frame/sponge/pkg/sasynq"
)
// ----------------------------- 方式一 (推荐!) ----------------------------------
const TypeEmailSend = "email:send"
type EmailPayload struct {
UserID int `json:"user_id"`
Message string `json:"message"`
}
func HandleEmailTask(ctx context.Context, p *EmailPayload) error { // 方法名和参数p类型可自定义
fmt.Printf("[Email] Task for UserID %d completed successfully\n", p.UserID)
return nil
}
// ----------------------------- 方式二 ----------------------------------
const TypeSMSSend = "sms:send"
type SMSPayload struct {
UserID int `json:"user_id"`
Message string `json:"message"`
}
func (p *SMSPayload) ProcessTask(ctx context.Context, t *asynq.Task) error { // 方法名和参数类型固定
fmt.Printf("[SMS] Task for UserID %d completed successfully\n", p.UserID)
return nil
}
// ----------------------------- 方式三 ----------------------------------
const TypeMsgNotification = "msg:notification"
type MsgNotificationPayload struct {
UserID int `json:"user_id"`
Message string `json:"message"`
}
func HandleMsgNotificationTask(ctx context.Context, t *asynq.Task) error { // 方法名可自定,参数类型固定
var p MsgNotificationPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}
fmt.Printf("[MSG] Task for UserID %d completed successfully\n", p.UserID)
return nil
}
生产者示例
生产者可以通过多种选项来入列任务,包括设置优先级、延迟时间、截止时间和唯一ID等。
生产者示例代码
// example/producer/main.go
package main
import (
"fmt"
"time"
"github.com/go-dev-frame/sponge/pkg/sasynq"
"example/common"
)
func runProducer(client *sasynq.Client) error {
// 立刻执行!还是十万火急的那种!
userPayload1 := &common.EmailPayload{
UserID: 101,
Message: "This is a message that is immediately queued, with critical priority",
}
_, info, err := client.EnqueueNow(common.TypeEmailSend, userPayload1,
sasynq.WithQueue("critical"),
sasynq.WithRetry(5),
)
if err != nil {
return err
}
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeEmailSend, info.ID, info.Queue)
// 5秒后执行
userPayload2 := &common.SMSPayload{
UserID: 202,
Message: "This is a message added to the queue after a 5-second delay, with default priority",
}
_, info, err = client.EnqueueIn(5*time.Second, common.TypeSMSSend, userPayload2,
sasynq.WithQueue("default"),
sasynq.WithRetry(3),
)
if err != nil {
return err
}
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeSMSSend, info.ID, info.Queue)
// 指定时间执行
userPayload3 := &common.MsgNotificationPayload{
UserID: 303,
Message: "This is a message scheduled to run at a specific time, with low priority",
}
_, info, err = client.EnqueueAt(time.Now().Add(10*time.Second), common.TypeMsgNotification, userPayload3,
sasynq.WithQueue("low"),
sasynq.WithRetry(1),
)
if err != nil {
return err
}
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeMsgNotification, info.ID, info.Queue)
// 独一无二的任务,别重复执行!
userPayload4 := &common.EmailPayload{
UserID: 404,
Message: "This is a test message, with low priority, a 15-second deadline, and a unique ID",
}
task, err := sasynq.NewTask(common.TypeEmailSend, userPayload4)
if err != nil {
return err
}
info, err = client.Enqueue(task,
sasynq.WithQueue("low"),
sasynq.WithRetry(1),
sasynq.WithDeadline(time.Now().Add(15*time.Second)),
sasynq.WithUniqueID("unique-id-xxxx-xxxx"),
)
if err != nil {
return err
}
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeEmailSend, info.ID, info.Queue)
return nil
}
func main() {
cfg := sasynq.RedisConfig{
Addr: "localhost:6379",
}
client := sasynq.NewClient(cfg)
err := runProducer(client)
if err != nil {
panic(err)
}
defer client.Close()
fmt.Println("All tasks enqueued.")
}
通过链式调用的 sasynq.WithXXX
选项,设置队列、重试次数、截止时间、唯一ID等,直观简洁。
消费者示例
消费者服务端支持三种处理器注册方式。
消费者示例代码
package main
import (
"github.com/go-dev-frame/sponge/pkg/sasynq"
"example/common"
)
func runConsumer(redisCfg sasynq.RedisConfig) (*sasynq.Server, error) {
serverCfg := sasynq.DefaultServerConfig() // Uses critical, default, and low queues by default
srv := sasynq.NewServer(redisCfg, serverCfg)
// 使用中间件打印日志
srv.Use(sasynq.LoggingMiddleware())
// 注册处理器,注册的三种方式分别对应定义payload的三种定义
sasynq.RegisterTaskHandler(srv.Mux(), common.TypeEmailSend, sasynq.HandleFunc(common.HandleEmailTask)) // Method 1
srv.Register(common.TypeSMSSend, &common.SMSPayload{}) // Method 2
srv.RegisterFunc(common.TypeMsgNotification, common.HandleMsgNotificationTask) // Method 3
srv.Run()
return srv, nil
}
func main() {
cfg := sasynq.RedisConfig{
Addr: "localhost:6379",
}
srv, err := runConsumer(cfg)
if err != nil {
panic(err)
}
srv.WaitShutdown()
}
定时任务
分布式的周期性任务的调度变得极其简单。
定时任务示例代码
package main
import (
"context"
"fmt"
"github.com/go-dev-frame/sponge/pkg/sasynq"
)
const TypeScheduledGet = "scheduled:get"
type ScheduledGetPayload struct {
URL string `json:"url"`
}
func handleScheduledGetTask(ctx context.Context, p *ScheduledGetPayload) error {
fmt.Printf("[Get] Task for URL %s completed successfully\n", p.URL)
return nil
}
// -----------------------------------------------------------------------
func registerSchedulerTasks(scheduler *sasynq.Scheduler) error {
payload1 := &ScheduledGetPayload{URL: "https://google.com"}
entryID1, err := scheduler.RegisterTask("@every 2s", TypeScheduledGet, payload1)
if err != nil {
return err
}
fmt.Printf("Registered periodic task with entry ID: %s\n", entryID1)
payload2 := &ScheduledGetPayload{URL: "https://bing.com"}
entryID2, err := scheduler.RegisterTask("@every 3s", TypeScheduledGet, payload2)
if err != nil {
return err
}
fmt.Printf("Registered periodic task with entry ID: %s\n", entryID2)
scheduler.Run()
return nil
}
func runServer(redisCfg sasynq.RedisConfig) (*sasynq.Server, error) {
serverCfg := sasynq.DefaultServerConfig()
srv := sasynq.NewServer(redisCfg, serverCfg)
srv.Use(sasynq.LoggingMiddleware())
// Register handler for scheduled tasks
sasynq.RegisterTaskHandler(srv.Mux(), TypeScheduledGet, sasynq.HandleFunc(handleScheduledGetTask))
srv.Run()
return srv, nil
}
func main() {
cfg := sasynq.RedisConfig{
Addr: "localhost:6379",
}
scheduler := sasynq.NewScheduler(cfg)
err := registerSchedulerTasks(scheduler)
if err != nil {
panic(err)
}
srv, err := runServer(cfg)
if err != nil {
panic(err)
}
srv.Shutdown()
}