SSE
大约 3 分钟componentssegin
一个高性能的Go语言Server-Sent Events (SSE)服务器和客户端实现,支持事件单播和广播、断线重连、消息持久化等功能。
功能特性
- 🚀 高性能事件中心(Hub)管理客户端连接
- 🔌 支持断线自动重连和事件重发
- 📊 内置推送统计和性能监控
- 🔒 线程安全的客户端管理
- ⏱️ 支持超时重试和异步任务处理
- 💾 可选的持久化存储接口
- ❤️ 内置心跳检测机制
使用示例
服务端示例
package main
import (
"math/rand"
"net/http"
"strconv"
"time"
"github.com/gin-gonic/gin"
"github.com/go-dev-frame/sponge/pkg/sse"
)
func main() {
// 初始化SSE Hub
hub := sse.NewHub()
defer hub.Close()
// 创建Gin路由
r := gin.Default()
// SSE 事件流api接口
r.GET("/events", func(c *gin.Context) {
var uid string
u, isExists := c.Get("uid")
if !isExists {
uid = strconv.Itoa(rand.Intn(99) + 100)
} else {
uid, _ = u.(string)
}
hub.Serve(c, uid)
})
// 注册事件推送端点,支持指定用户推送和广播推送
// 指定用户推送
// curl -X POST -H "Content-Type: application/json" -d '{"uids":["u001"],"events":[{"event":"message","data":"hello_world"}]}' http://localhost:8080/push
// 广播推送,不指定用户表示向所有用户推送
// curl -X POST -H "Content-Type: application/json" -d '{"events":[{"event":"message","data":"hello_world"}]}' http://localhost:8080/push
r.POST("/push", hub.PushEventHandler())
// 模拟推送事件
go func() {
i := 0
for {
time.Sleep(time.Second * 5)
i++
e := &sse.Event{Event: sse.DefaultEventType, Data: "hello_world_" + strconv.Itoa(i)}
_ = hub.Push(nil, e) // 广播推送
//_ = hub.Push([]string{uid}, e) // 单点推送
}
}()
// 启动HTTP服务器
if err := http.ListenAndServe(":8080", r); err != nil {
panic(err)
}
}
客户端示例
package main
import (
"fmt"
"github.com/go-dev-frame/sponge/pkg/sse"
)
func main() {
url := "http://localhost:8080/events"
// 创建SSE客户端
client := sse.NewClient(url)
client.OnEvent(sse.DefaultEventType, func(event *sse.Event) {
fmt.Printf("Received: %#v\n", event)
})
err := client.Connect()
if err != nil {
fmt.Printf("连接失败: %v\n", err)
return
}
fmt.Println("SSE 客户端已启动,按 Ctrl+C 退出")
<-client.Wait()
}
高级配置
使用持久化存储
可以实现map、redis、mysql等存储,实现事件的持久化存储和查询,示例代码:
// 实现Store接口
type MyStore struct{}
func (s *MyStore) Save(ctx context.Context, e *sse.Event) error {
// 实现事件存储逻辑
return nil
}
func (s *MyStore) ListByLastID(ctx context.Context, eventType string, lastID string, pageSize int) ([]*sse.Event, string, error) {
// 实现事件查询逻辑,分页查询,返回事件列表、最后一个事件ID
return nil, nil
}
// 创建带存储的Hub
hub := sse.NewHub(sse.WithStore(&MyStore{}))
配置客户端断线重连时的是否需要事件重发未推送的事件
要开启此功能,需要与事件持久化存储一起使用,示例代码:
hub := sse.NewHub(
sse.WithStore(&MyStore{}),
sse.WithEnableResendEvents(),
)
自定义处理推送失败事件
代码示例:
fn := func(uid string, event *sse.Event) {
// 自定义处理推送失败逻辑,如记录日志或存入数据库
log.Printf("推送失败: 用户%s, 事件ID%s", uid, event.ID)
}
// 创建带推送失败处理的Hub
hub := sse.NewHub(sse.WithPushFailedHandleFn(fn))
API 参考
Hub 方法
NewHub(opts ...HubOption) *Hub
: 创建新的事件中心,支持设置自定义持久化、重发事件、日志、推送事件缓冲大小、并发推送事件的协程数选项。Push(uids []string, events ...*Event) error
: 推送事件到指定用户或所有用户OnlineClientsNum() int
: 获取在线客户端数量Close()
: 关闭事件中心PrintPushStats()
: 打印推送统计信息
Serve 方法
Serve(c *gin.Context, hub *Hub, uid string, opts...ServeOption)
: 处理SSE客户端连接请求,支持设置自定义请求头。
Client 方法
NewClient(url string) *SSEClient
: 创建新的SSE客户端,支持设置自定义请求头、重连时间间隔、日志选项。Connect() error
: 连接服务器Disconnect()
: 断开连接OnEvent(eventType string, callback EventCallback)
: 注册事件回调
性能调优
WithChanBufferSize(size int)
: 设置广播通道缓冲区大小WithWorkerNum(num int)
: 设置异步工作协程数