Asynq
Overview
sasynq
is a wrapper implementation based on the open-source library asynq. While maintaining full compatibility with the native asynq usage patterns, it provides a more concise and user-friendly SDK. Its main features include:
- Support for Redis Cluster and Sentinel modes, enabling high availability and horizontal scaling.
- The distributed task queue supports priority queues, delayed queues, unique tasks (to prevent duplicate execution), and scheduled task dispatching.
- Built-in task retry mechanism (with customizable retry counts), cancel tasks, timeout control, and deadline guarantees.
- Flexible scheduling options including immediate execution, delayed execution, and execution at a specific time.
- Unified logging using zap.
sasynq
simplifies asynchronous distributed task processing in Go, helping developers quickly write clear and maintainable background task code.
Task Queue
Task Payload and Handler Definition
The following shows three ways to define task payloads and handlers.
Example Code for Defining Task Payloads
// example/common/task.go
package common
import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
"github.com/go-dev-frame/sponge/pkg/sasynq"
)
// ----------------------------- Definition Method 1 (recommended)----------------------------------
const TypeEmailSend = "email:send"
type EmailPayload struct {
UserID int `json:"user_id"`
Message string `json:"message"`
}
func HandleEmailTask(ctx context.Context, p *EmailPayload) error {
fmt.Printf("[Email] Task for UserID %d completed successfully\n", p.UserID)
return nil
}
// ----------------------------- Definition Method 2 ----------------------------------
const TypeSMSSend = "sms:send"
type SMSPayload struct {
UserID int `json:"user_id"`
Message string `json:"message"`
}
func (p *SMSPayload) ProcessTask(ctx context.Context, t *asynq.Task) error {
fmt.Printf("[SMS] Task for UserID %d completed successfully\n", p.UserID)
return nil
}
// ----------------------------- Definition Method 3 ----------------------------------
const TypeMsgNotification = "msg:notification"
type MsgNotificationPayload struct {
UserID int `json:"user_id"`
Message string `json:"message"`
}
func HandleMsgNotificationTask(ctx context.Context, t *asynq.Task) error {
var p MsgNotificationPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}
fmt.Printf("[MSG] Task for UserID %d completed successfully\n", p.UserID)
return nil
}
const TypeUniqueEmailSend = "unique:email:send"
Producer Example
Producers can enqueue tasks with various options, including setting priority, delay duration, deadline, and a unique ID.
Producer Example Code
// example/producer/main.go
package main
import (
"fmt"
"time"
"github.com/go-dev-frame/sponge/pkg/sasynq"
"example/common"
)
func runProducer(client *sasynq.Client) error {
// Immediate enqueue with critical priority
userPayload1 := &common.EmailPayload{
UserID: 101,
Message: "This is a message that is immediately queued, with critical priority",
}
_, info, err := client.EnqueueNow(common.TypeEmailSend, userPayload1,
sasynq.WithQueue("critical"),
sasynq.WithRetry(5),
)
if err != nil {
return err
}
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeEmailSend, info.ID, info.Queue)
// Enqueue after a 5-second delay
userPayload2 := &common.SMSPayload{
UserID: 202,
Message: "This is a message added to the queue after a 5-second delay, with default priority",
}
_, info, err = client.EnqueueIn(5*time.Second, common.TypeSMSSend, userPayload2,
sasynq.WithQueue("default"),
sasynq.WithRetry(3),
)
if err != nil {
return err
}
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeSMSSend, info.ID, info.Queue)
// Enqueue to run at a specific time
userPayload3 := &common.MsgNotificationPayload{
UserID: 303,
Message: "This is a message scheduled to run at a specific time, with low priority",
}
_, info, err = client.EnqueueAt(time.Now().Add(10*time.Second), common.TypeMsgNotification, userPayload3,
sasynq.WithQueue("low"),
sasynq.WithRetry(1),
)
if err != nil {
return err
}
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeMsgNotification, info.ID, info.Queue)
// Example of using NewTask directly
userPayload4 := &common.EmailPayload{
UserID: 404,
Message: "This is a test message, with low priority, a 15-second deadline, and a unique ID",
}
task, err := sasynq.NewTask(common.TypeEmailSend, userPayload4)
if err != nil {
return err
}
info, err = client.Enqueue(task,
sasynq.WithQueue("low"),
sasynq.WithMaxRetry(1),
sasynq.WithDeadline(time.Now().Add(15*time.Second)),
sasynq.WithTaskID("unique-id-xxxx-xxxx"),
)
if err != nil {
return err
}
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeEmailSend, info.ID, info.Queue)
// Example of using EnqueueUnique
userPayload5 := &common.EmailPayload{
UserID: 505,
Message: "This is a unique task, with default priority, a 1-minute deadline",
}
userPayload5 := &EmailPayload{UserID: 505, Message: "unique task"}
_, info5, err := client.EnqueueUnique(time.Minute, common.TypeUniqueEmailSend, userPayload5,
sasynq.WithQueue("default"),
sasynq.WithMaxRetry(3))
if err != nil {
return err
}
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeUniqueEmailSend, info5.ID, info5.Queue)
return nil
}
func main() {
cfg := sasynq.RedisConfig{
Addr: "localhost:6379",
}
client := sasynq.NewClient(cfg)
err := runProducer(client)
if err != nil {
panic(err)
}
defer client.Close()
fmt.Println("All tasks enqueued.")
}
Use the sasynq.WithXXX
option to set queues, retry times, deadlines, unique IDs, etc., which is intuitive and concise.
Consumer Example
The consumer server supports three ways to register handlers.
Consumer Example Code
// example/consumer/main.go
package main
import (
"github.com/go-dev-frame/sponge/pkg/sasynq"
"github.com/go-dev-frame/sponge/pkg/logger"
"example/common"
)
func runConsumer(redisCfg sasynq.RedisConfig) (*sasynq.Server, error) {
serverCfg := sasynq.DefaultServerConfig(sasynq.WithLogger(logger.Get())) // Uses critical, default, and low queues by default
srv := sasynq.NewServer(redisCfg, serverCfg)
// Attach logging middleware
srv.Use(sasynq.LoggingMiddleware(sasynq.WithLogger(logger.Get())))
// Register task handlers (three methods available):
sasynq.RegisterTaskHandler(srv.Mux(), common.TypeEmailSend, sasynq.HandleFunc(common.HandleEmailTask)) // Method 1 (recommended)
srv.Register(common.TypeSMSSend, &common.SMSPayload{}) // Method 2: register struct as payload
srv.RegisterFunc(common.TypeMsgNotification, common.HandleMsgNotificationTask) // Method 3: register function directly
sasynq.RegisterTaskHandler(srv.Mux(), common.TypeUniqueEmailSend, sasynq.HandleFunc(common.HandleEmailTask))
srv.Run()
return srv, nil
}
func main() {
cfg := sasynq.RedisConfig{
Addr: "localhost:6379",
}
srv, err := runConsumer(cfg)
if err != nil {
panic(err)
}
srv.WaitShutdown()
}
Scheduled Tasks
Scheduling distributed periodic tasks becomes extremely simple.
Scheduled Tasks Example Code
package main
import (
"context"
"fmt"
"github.com/go-dev-frame/sponge/pkg/sasynq"
"github.com/go-dev-frame/sponge/pkg/logger"
)
const TypeScheduledGet = "scheduled:get"
type ScheduledGetPayload struct {
URL string `json:"url"`
}
func handleScheduledGetTask(ctx context.Context, p *ScheduledGetPayload) error {
fmt.Printf("[Get] Task for URL %s completed successfully\n", p.URL)
return nil
}
// -----------------------------------------------------------------------
func registerSchedulerTasks(scheduler *sasynq.Scheduler) error {
payload1 := &ScheduledGetPayload{URL: "https://google.com"}
entryID1, err := scheduler.RegisterTask("@every 2s", TypeScheduledGet, payload1)
if err != nil {
return err
}
fmt.Printf("Registered periodic task with entry ID: %s\n", entryID1)
payload2 := &ScheduledGetPayload{URL: "https://bing.com"}
entryID2, err := scheduler.RegisterTask("@every 3s", TypeScheduledGet, payload2)
if err != nil {
return err
}
fmt.Printf("Registered periodic task with entry ID: %s\n", entryID2)
scheduler.Run()
return nil
}
func runServer(redisCfg sasynq.RedisConfig) (*sasynq.Server, error) {
serverCfg := sasynq.DefaultServerConfig(sasynq.WithLogger(logger.Get()))
srv := sasynq.NewServer(redisCfg, serverCfg)
srv.Use(sasynq.LoggingMiddleware())
// Register handler for scheduled tasks
sasynq.RegisterTaskHandler(srv.Mux(), TypeScheduledGet, sasynq.HandleFunc(handleScheduledGetTask))
srv.Run()
return srv, nil
}
func main() {
cfg := sasynq.RedisConfig{
Addr: "localhost:6379",
}
scheduler := sasynq.NewScheduler(cfg, sasynq.WithSchedulerLogger(sasynq.WithLogger(logger.Get())))
err := registerSchedulerTasks(scheduler)
if err != nil {
panic(err)
}
srv, err := runServer(cfg)
if err != nil {
panic(err)
}
srv.Shutdown()
}
Cancel Tasks
Cancel Unexecuted One-time Tasks
For unexecuted one-time tasks, they can be cancelled using the inspector.CancelTask(queue, taskID)
method.
Cancel task example code
package main
import (
"fmt"
"github.com/go-dev-frame/sponge/pkg/sasynq"
)
var inspector = sasynq.NewInspector(sasynq.DefaultServerConfig())
func main() {
queue := "default"
taskID := "task-id-xxxx-xxxx"
isScheduled := false // set to true for scheduled tasks
err := cancelTask(queue, taskID, isScheduled)
if err != nil {
fmt.Printf("Failed to cancel task: %v\n", err)
}
}
func cancelTask(queue string, taskID string, isScheduled bool) error{
var err error
if isScheduled {
err = inspector.CancelTask(queue, taskID)
} else {
err = inspector.CancelTask("", taskID) // queue is empty string for non-scheduled tasks
}
if err != nil {
return err
}
return nil
}
Cancel Periodic Tasks
For periodic tasks, they can be cancelled using the scheduler.Unregister(entryID)
method.
Cancel task example code
package main
import (
"fmt"
"github.com/go-dev-frame/sponge/pkg/sasynq"
)
var scheduler = sasynq.NewScheduler(sasynq.DefaultServerConfig())
func main() {
entryID := "entry-id-xxxx-xxxx" // scheduler.RegisterTask() returns this ID
err := scheduler.Unregister(entryID)
if err != nil {
fmt.Printf("Failed to unregister periodic scheduled tasks: %v\n", err)
}
}