gRPC 与拦截器
大约 8 分钟componentgRPCinterceptor
gRPC 配置
gRPC 服务端配置
在 configs
目录的 YAML 配置文件中配置 gRPC 服务端参数:
grpc:
port: 8282 # 监听端口
httpPort: 8283 # profile 和 metrics 的 http 监听端口
# 安全参数设置
# 如果 type="",表示无安全连接,无需填写任何参数
# 如果 type="one-way",表示服务器端认证,只需填写 "certFile "和 "keyFile "字段
# 如果 type="two-way",表示同时进行客户端和服务器端认证,应填写所有字段
serverSecure:
type: "" # 类型,"", "one-way", "two-way"
caFile: "" # ca 文件,仅在"two-way"时有效,绝对路径
certFile: "" # 服务端 cert 文件,绝对路径
keyFile: "" # 服务端 key 文件,绝对路径
gRPC 客户端配置
在 configs
目录的 YAML 配置文件中配置 gRPC 客户端连接参数:
grpcClient:
- name: "serverNameExample" # grpc 服务名称,用于服务发现
host: "127.0.0.1" # grpc 服务 ip, 用于直接连接
port: 8282 # grpc 服务端口
timeout: 0 # 请求超时时间(s),如果为0表示关闭超时控制
registryDiscoveryType: "" # 注册和发现类型:Consul、etcd、nacos(如果为空),使用主机和端口连接服务器
enableLoadBalance: true # 是否开启负载均衡
# 安全连接设置
# type="", 这意味着没有安全连接,无需填写任何参数
# type="one-way", 表示服务器端认证,只需填写 "serverName "和 "certFile "字段
# type="two-way", 是指客户端和服务器端认证,请填写所有字段
clientSecure:
type: "" # 类似:"", "one-way", "two-way"
serverName: "" # 域名, e.g. *.foo.com
caFile: "" # 客户端 ca 文件,仅在 "two-way"中有效,绝对路径
certFile: "" # 客户端证书文件,绝对路径,如果 secureType="one-way",请在此处填写服务器端证书文件
keyFile: "" # 客户端密钥文件,仅在 "two-way"情况下有效,绝对路径
clientToken:
enable: false # 是否启用令牌身份验证
appID: "" # app id
appKey: "" # app key
gRPC 示例
服务端示例代码
点击查看示例代码
package main
import (
"context"
"fmt"
"github.com/go-dev-frame/sponge/pkg/grpc/server"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
)
type greeterServer struct {
pb.UnimplementedGreeterServer
}
func (s *greeterServer) SayHello(_ context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
fmt.Printf("Received: %v\n", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func main() {
port := 8282
registerFn := func(s *grpc.Server) {
pb.RegisterGreeterServer(s, &greeterServer{})
// Register other services here
}
fmt.Printf("Starting server on port %d\n", port)
srv, err := server.Run(port, registerFn,
//server.WithSecure(credentials),
//server.WithUnaryInterceptor(unaryInterceptors...),
//server.WithStreamInterceptor(streamInterceptors...),
//server.WithServiceRegister(func() {}),
//server.WithStatConnections(metrics.WithConnectionsLogger(logger.Get()), metrics.WithConnectionsGauge()), // show connections or set prometheus metrics
)
if err != nil {
panic(err)
}
defer srv.Close()
select {}
}
客户端示例代码
点击查看示例代码
package main
import (
"context"
"fmt"
"github.com/go-dev-frame/sponge/pkg/grpc/client"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
conn, err := client.NewClient("127.0.0.1:8282",
//client.WithServiceDiscover(buildDiscovery(), false),
//client.WithLoadBalance(),
//client.WithSecure(credentials),
//client.WithUnaryInterceptor(unaryInterceptors...),
//client.WithStreamInterceptor(streamInterceptors...),
)
if err != nil {
panic(err)
}
greeterClient := pb.NewGreeterClient(conn)
reply, err := greeterClient.SayHello(context.Background(), &pb.HelloRequest{Name: "Alice"})
if err != nil {
panic(err)
}
fmt.Printf("Greeting: %s\n", reply.GetMessage())
conn.Close()
}
内置 gRPC 拦截器
sponge 创建的服务已预集成以下 gRPC 拦截器(实现代码见 internal/server/grpc.go
),可根据需要选择使用(部分拦截器的配置可通过 configs
目录下的 YAML 配置文件进行调整)。
类别 | 功能列表 |
---|---|
基础功能 | 日志、Request ID、超时、Recovery |
安全认证 | JWT 鉴权、Token 验证 |
流量控制 | 自适应限流、熔断降级、重试策略 |
可观测性 | 链路追踪、Metrics 采集 |
日志拦截器
服务端拦截器设置示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"github.com/go-dev-frame/sponge/pkg/logger"
"google.golang.org/grpc"
)
func setServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
option := grpc.ChainUnaryInterceptor(
// 如果想简化记录日志,可以使用UnaryServerSimpleLog代替UnaryServerLog
interceptor.UnaryServerLog(
logger.Get(),
interceptor.WithReplaceGRPCLogger(),
//interceptor.WithMarshalFn(fn), // 自定义marshal函数,默认为jsonpb.Marshal
//interceptor.WithLogIgnoreMethods(fullMethodNames), // 设置忽略指定方法的日志
//interceptor.WithMaxLen(400), // 设置日志最大长度,默认400
),
)
options = append(options, option)
return options
}
// 同理,设置 interceptor.StreamServerLog
客户端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
option := grpc.WithChainUnaryInterceptor(
interceptor.UnaryClientLog(
logger.Get(),
interceptor.WithReplaceGRPCLogger(),
),
)
options = append(options, option)
return options
}
// 同理,设置 interceptor.StreamClientLog
Request ID 拦截器
服务端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
option := grpc.ChainUnaryInterceptor(
interceptor.UnaryServerRequestID(),
)
options = append(options, option)
return options
}
客户端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
option := grpc.WithChainUnaryInterceptor(
interceptor.UnaryClientRequestID(),
)
options = append(options, option)
return options
}
超时拦截器
客户端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
option := grpc.WithChainUnaryInterceptor(
interceptor.UnaryClientTimeout(time.Second),
)
options = append(options, option)
return options
}
Recovery 拦截器
服务端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
option := grpc.ChainUnaryInterceptor(
interceptor.UnaryServerRecovery(),
)
options = append(options, option)
return options
}
客户端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
option := grpc.WithChainUnaryInterceptor(
interceptor.UnaryClientRecovery(),
)
options = append(options, option)
return options
}
JWT 认证拦截器
服务端拦截器示例代码
package main
import (
"context"
"net"
"time"
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"github.com/go-dev-frame/sponge/pkg/jwt"
"google.golang.org/grpc"
userV1 "user/api/user/v1"
)
func main() {
list, err := net.Listen("tcp", ":8282")
server := grpc.NewServer(getUnaryServerOptions()...)
userV1.RegisterUserServer(server, &user{})
server.Serve(list)
select {}
}
func getUnaryServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
// 示例1: 默认设置
{
options = append(options, grpc.UnaryInterceptor(
interceptor.UnaryServerJwtAuth(),
))
}
// 示例2: 自定义设置, signKey, extra verify function, rpc method
{
options = append(options, grpc.UnaryInterceptor(
interceptor.UnaryServerJwtAuth(
interceptor.WithSignKey([]byte("your_secret_key")),
interceptor.WithExtraVerify(extraVerifyFn),
interceptor.WithAuthIgnoreMethods( // 指定gRPC API以忽略令牌验证(完整路径)
"/api.user.v1.User/Register",
"/api.user.v1.User/Login",
),
),
))
}
return options
}
type user struct {
userV1.UnimplementedUserServer
}
// Login ...
func (s *user) Login(ctx context.Context, req *userV1.LoginRequest) (*userV1.LoginReply, error) {
// 检查用户名和密码
uid := "100"
fields := map[string]interface{}{"name": "bob","age": 10,"is_vip": true}
// 实例1: 自定义 jwt 设置, signKey, signMethod(HS256), expiry time(24 hour)
{
_, token, err := jwt.GenerateToken("100")
}
// 示例2: custom jwt options, signKey, signMethod(HS512), expiry time(12 hour), fields, claims
{
_, token, err := jwt.GenerateToken(
uid,
jwt.WithGenerateTokenSignKey([]byte("your_secret_key")),
jwt.WithGenerateTokenSignMethod(jwt.HS384),
jwt.WithGenerateTokenFields(fields),
jwt.WithGenerateTokenClaims([]jwt.RegisteredClaimsOption{
jwt.WithExpires(time.Hour * 12),
// jwt.WithIssuedAt(now),
// jwt.WithSubject("123"),
// jwt.WithIssuer("https://auth.example.com"),
// jwt.WithAudience("https://api.example.com"),
// jwt.WithNotBefore(now),
// jwt.WithJwtID("abc1234xxx"),
}...),
)
}
return &userV1.LoginReply{Token: token}, nil
}
func extraVerifyFn(ctx context.Context, claims *jwt.Claims) error {
// 判断用户是否禁用,从黑名单中查询jwt id是否存在
//if CheckBlackList(uid, claims.ID) {
// return errors.New("user is disabled")
//}
// get fields from claims
//uid := claims.UID
//name, _ := claims.GetString("name")
//age, _ := claims.GetInt("age")
//isVip, _ := claims.GetBool("is_vip")
return nil
}
// GetByID ...
func (s *user) GetByID(ctx context.Context, req *userV1.GetByIDRequest) (*userV1.GetByIDReply, error) {
// ......
claims,ok := interceptor.GetJwtClaims(ctx) //如果需要,可以从上下文中获取claims。
// ......
}
客户端拦截器示例代码
package main
import (
"context"
"github.com/go-dev-frame/sponge/pkg/grpc/grpccli"
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
userV1 "user/api/user/v1"
)
func main() {
conn, _ := grpccli.NewClient("127.0.0.1:8282")
cli := userV1.NewUserClient(conn)
uid := "100"
ctx := context.Background()
// 示例1: 设置"Authorization":"Bearer xxx" header到上下文
{
ctx = interceptor.SetAuthToCtx(ctx, authorization)
}
// 示例2: 设置token到上下文
{
ctx = interceptor.SetJwtTokenToCtx(ctx, token)
}
cli.GetByID(ctx, &userV1.GetUserByIDRequest{Id: 100})
}
限流拦截器
自适应限流拦截器用于限制客户端的请求速率,防止服务端过载。
服务端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
// rate limiter
option := grpc.ChainUnaryInterceptor(
interceptor.UnaryServerRateLimit(
//interceptor.WithWindow(time.Second*5), // default 10s
//interceptor.WithBucket(500), // default 100
//interceptor.WithCPUThreshold(900), // CPU使用率, default 80%
),
)
options = append(options, option)
return options
}
熔断拦截器
熔断拦截器用于保护服务端,防止因客户端请求过多而导致服务端资源耗尽。
服务端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
// circuit breaker
option := grpc.ChainUnaryInterceptor(
interceptor.UnaryServerCircuitBreaker(
//interceptor.WithBreakerOption(
//circuitbreaker.WithSuccess(75), // default 60
//circuitbreaker.WithRequest(100), // default 100
//circuitbreaker.WithBucket(10), // default 10
//circuitbreaker.WithWindow(time.Second*3), // default 3s
//),
//interceptor.WithUnaryServerDegradeHandler(handler), // 设置降级处理
//interceptor.WithValidCode(codes.DeadlineExceeded), // 为熔断器添加自定义错误代码
),
)
options = append(options, option)
return options
}
重试拦截器
客户端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
// retry
option := grpc.WithChainUnaryInterceptor(
interceptor.UnaryClientRetry(
//interceptor.WithRetryTimes(5), // 设置默认重试次数,默认是3
//interceptor.WithRetryInterval(100*time.Millisecond), // 设置重试间隔,默认值为50毫秒
//interceptor.WithRetryErrCodes(xxx), // 添加触发重试的错误吗, 默认是 codes.Internal, codes.DeadlineExceeded, codes.Unavailable
),
)
options = append(options, option)
return options
}
链路追踪拦截器
初始化示例代码
import (
"github.com/go-dev-frame/sponge/pkg/tracer"
"go.opentelemetry.io/otel"
)
// initialize tracing
func InitTrace(serviceName string) {
exporter, err := tracer.NewJaegerAgentExporter("192.168.3.37", "6831")
if err != nil {
panic(err)
}
resource := tracer.NewResource(
tracer.WithServiceName(serviceName),
tracer.WithEnvironment("dev"),
tracer.WithServiceVersion("demo"),
)
tracer.Init(exporter, resource) // collect all by default
}
// 如果需要, 创建一个新的 span
func SpanDemo(serviceName string, spanName string, ctx context.Context) {
_, span := otel.Tracer(serviceName).Start(
ctx, spanName,
trace.WithAttributes(attribute.String(spanName, time.Now().String())), // customised attributes
)
defer span.End()
// ......
}
服务端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
// use tracing
option := grpc.UnaryInterceptor(
interceptor.UnaryServerTracing(),
)
options = append(options, option)
return options
}
客户端拦截器示例代码
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
// use tracing
option := grpc.WithUnaryInterceptor(
interceptor.UnaryClientTracing(),
)
options = append(options, option)
return options
}
监控指标拦截器
服务端拦截器示例代码
import "github.com/go-dev-frame/sponge/pkg/grpc/metrics"
func UnaryServerLabels(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// set up prometheus custom labels
tag := grpc_ctxtags.NewTags().
Set(serverNameLabelKey, serverNameLabelValue).
Set(envLabelKey, envLabelValue)
newCtx := grpc_ctxtags.SetInContext(ctx, tag)
return handler(newCtx, req)
}
func getServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
// metrics interceptor
option := grpc.ChainUnaryInterceptor(
//UnaryServerLabels, // tag
metrics.UnaryServerMetrics(
// metrics.WithCounterMetrics(customizedCounterMetric) // adding custom metrics
),
)
options = append(options, option)
option = grpc.ChainStreamInterceptor(
metrics.StreamServerMetrics(), // metrics interceptor for streaming rpc
)
options = append(options, option)
return options
}
func main() {
rand.Seed(time.Now().UnixNano())
addr := ":8282"
fmt.Println("start rpc server", addr)
list, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
server := grpc.NewServer(getServerOptions()...)
serverNameV1.RegisterGreeterServer(server, &GreeterServer{})
// start metrics server, collect grpc metrics by default, turn on, go metrics
metrics.ServerHTTPService(":8283", server)
fmt.Println("start metrics server", ":8283")
err = server.Serve(list)
if err != nil {
panic(err)
}
}
客户端拦截器示例代码
import "github.com/go-dev-frame/sponge/pkg/grpc/metrics"
func getDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
// Metrics
options = append(options, grpc.WithUnaryInterceptor(metrics.UnaryClientMetrics()))
options = append(options, grpc.WithStreamInterceptor(metrics.StreamClientMetrics()))
return options
}
func main() {
conn, err := grpc.NewClient("127.0.0.1:8282", getDialOptions()...)
metrics.ClientHTTPService(":8284")
fmt.Println("start metrics server", ":8284")
client := serverNameV1.NewGreeterClient(conn)
i := 0
for {
i++
time.Sleep(time.Millisecond * 500) // qps is 2
err = sayHello(client, i)
if err != nil {
fmt.Println(err)
}
}
}