gRPC and Interceptors
About 6 mincomponentgRPCinterceptor
gRPC Configuration
Server-Side Configuration
Configure gRPC server parameters in the YAML configuration file located in the configs
directory:
grpc:
port: 8282 # Listening port
httpPort: 8283 # HTTP listening port for profile and metrics
# Security parameters setting
# If type="", it means no secure connection, no need to fill in any parameters
# If type="one-way", it means server-side authentication, only "certFile" and "keyFile" fields need to be filled
# If type="two-way", it means both client and server-side authentication, all fields should be filled
serverSecure:
type: "" # Type: "", "one-way", "two-way"
caFile: "" # CA file, only effective for "two-way", absolute path
certFile: "" # Server cert file, absolute path
keyFile: "" # Server key file, absolute path
Client-Side Configuration
Configure gRPC client connection parameters in the YAML configuration file located in the configs
directory:
grpcClient:
- name: "serverNameExample" # gRPC service name, used for service discovery
host: "127.0.0.1" # gRPC service IP, used for direct connection
port: 8282 # gRPC service port
timeout: 0 # Request timeout (s), 0 means no timeout control
registryDiscoveryType: "" # Registry and discovery type: Consul, etcd, nacos (if empty), connect to server using host and port
enableLoadBalance: true # Whether to enable load balancing
# Secure connection settings
# type="", this means no secure connection, no need to fill in any parameters
# type="one-way", means server-side authentication, only "serverName" and "certFile" fields need to be filled
# type="two-way", means both client and server-side authentication, please fill in all fields
clientSecure:
type: "" # Like: "", "one-way", "two-way"
serverName: "" # Domain name, e.g. *.foo.com
caFile: "" # Client CA file, only effective in "two-way", absolute path
certFile: "" # Client certificate file, absolute path, if secureType="one-way", please fill in the server certificate file here
keyFile: "" # Client key file, only effective in "two-way", absolute path
clientToken:
enable: false # Whether to enable token authentication
appID: "" # app id
appKey: "" # app key
gRPC Example
Server-Side Code
Click to view example code
package main
import (
"context"
"fmt"
"github.com/go-dev-frame/sponge/pkg/grpc/server"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
)
type greeterServer struct {
pb.UnimplementedGreeterServer
}
func (s *greeterServer) SayHello(_ context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
fmt.Printf("Received: %v\n", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func main() {
port := 8282
registerFn := func(s *grpc.Server) {
pb.RegisterGreeterServer(s, &greeterServer{})
// Register other services here
}
fmt.Printf("Starting server on port %d\n", port)
srv, err := server.Run(port, registerFn,
//server.WithSecure(credentials),
//server.WithUnaryInterceptor(unaryInterceptors...),
//server.WithStreamInterceptor(streamInterceptors...),
//server.WithServiceRegister(func() {}),
//server.WithStatConnections(metrics.WithConnectionsLogger(logger.Get()), metrics.WithConnectionsGauge()),
)
if err != nil {
panic(err)
}
defer srv.Stop()
select {}
}
Client-Side Code
Click to view example code
package main
import (
"context"
"fmt"
"github.com/go-dev-frame/sponge/pkg/grpc/client"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
conn, err := client.NewClient("localhost:8282",
//client.WithServiceDiscover(buildDiscovery(), false),
//client.WithLoadBalance(),
//client.WithSecure(credentials),
//client.WithUnaryInterceptor(unaryInterceptors...),
//client.WithStreamInterceptor(streamInterceptors...),
)
if err != nil {
panic(err)
}
greeterClient := pb.NewGreeterClient(conn)
reply, err := greeterClient.SayHello(context.Background(), &pb.HelloRequest{Name: "Alice"})
if err != nil {
panic(err)
}
fmt.Printf("Greeting: %s\n", reply.GetMessage())
conn.Close()
}
Built-in gRPC Interceptors
Services created by sponge come with the following gRPC interceptors pre-integrated (implementation code can be found in internal/server/grpc.go
). You can choose to use them as needed (configuration for some interceptors can be adjusted through the YAML configuration file under the configs
directory).
Category | Feature List |
---|---|
Basic Functions | Logging, Request ID, Timeout, Recovery |
Security Authentication | JWT Authentication, Token Verification |
Traffic Control | Adaptive Rate Limiting, Circuit Breaker, Retry Policy |
Observability | Distributed Tracing, Metrics Collection |
Logging Interceptor
Server-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"github.com/go-dev-frame/sponge/pkg/logger"
"google.golang.org/grpc"
)
func setServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
option := grpc.ChainUnaryInterceptor(
// if you don't want to log reply data, you can use interceptor.StreamServerSimpleLog instead of interceptor.UnaryServerLog,
interceptor.UnaryServerLog( // set unary server logging
logger.Get(),
interceptor.WithReplaceGRPCLogger(),
//interceptor.WithMarshalFn(fn), // customised marshal function, default is jsonpb.Marshal
//interceptor.WithLogIgnoreMethods(fullMethodNames), // ignore methods logging
//interceptor.WithMaxLen(400), // logging max length, default 300
),
)
options = append(options, option)
return options
}
// you can also set stream server logging
Client-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
option := grpc.WithChainUnaryInterceptor(
interceptor.UnaryClientLog( // set unary client logging
logger.Get(),
interceptor.WithReplaceGRPCLogger(),
),
)
options = append(options, option)
return options
}
// you can also set stream client logging
Request ID Interceptor
Server-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
option := grpc.ChainUnaryInterceptor(
interceptor.UnaryServerRequestID(),
)
options = append(options, option)
return options
}
Client-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
option := grpc.WithChainUnaryInterceptor(
interceptor.UnaryClientRequestID(),
)
options = append(options, option)
return options
}
Timeout Interceptor
Client-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
option := grpc.WithChainUnaryInterceptor(
interceptor.UnaryClientTimeout(time.Second), // set timeout
)
options = append(options, option)
return options
}
Recovery Interceptor
Server-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
option := grpc.ChainUnaryInterceptor(
interceptor.UnaryServerRecovery(),
)
options = append(options, option)
return options
}
Client-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
option := grpc.WithChainUnaryInterceptor(
interceptor.UnaryClientRecovery(),
)
options = append(options, option)
return options
}
JWT Authentication Interceptor
Server-side interceptor example code
package main
import (
"context"
"net"
"time"
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"github.com/go-dev-frame/sponge/pkg/jwt"
"google.golang.org/grpc"
userV1 "user/api/user/v1"
)
func main() {
list, err := net.Listen("tcp", ":8282")
server := grpc.NewServer(getUnaryServerOptions()...)
userV1.RegisterUserServer(server, &user{})
server.Serve(list)
select {}
}
func getUnaryServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
// Case1: default options
{
options = append(options, grpc.UnaryInterceptor(
interceptor.UnaryServerJwtAuth(),
))
}
// Case 2: custom options, signKey, extra verify function, rpc method
{
options = append(options, grpc.UnaryInterceptor(
interceptor.UnaryServerJwtAuth(
interceptor.WithSignKey([]byte("your_secret_key")),
interceptor.WithExtraVerify(extraVerifyFn),
interceptor.WithAuthIgnoreMethods(// specify the gRPC API to ignore token verification(full path)
"/api.user.v1.User/Register",
"/api.user.v1.User/Login",
),
),
))
}
return options
}
type user struct {
userV1.UnimplementedUserServer
}
// Login ...
func (s *user) Login(ctx context.Context, req *userV1.LoginRequest) (*userV1.LoginReply, error) {
// check user and password success
uid := "100"
fields := map[string]interface{}{"name": "bob","age": 10,"is_vip": true}
// Case 1: default jwt options, signKey, signMethod(HS256), expiry time(24 hour)
{
_, token, err := jwt.GenerateToken("100")
}
// Case 2: custom jwt options, signKey, signMethod(HS512), expiry time(12 hour), fields, claims
{
_, token, err := jwt.GenerateToken(
uid,
jwt.WithGenerateTokenSignKey([]byte("your_secret_key")),
jwt.WithGenerateTokenSignMethod(jwt.HS384),
jwt.WithGenerateTokenFields(fields),
jwt.WithGenerateTokenClaims([]jwt.RegisteredClaimsOption{
jwt.WithExpires(time.Hour * 12),
// jwt.WithIssuedAt(now),
// jwt.WithSubject("123"),
// jwt.WithIssuer("https://auth.example.com"),
// jwt.WithAudience("https://api.example.com"),
// jwt.WithNotBefore(now),
// jwt.WithJwtID("abc1234xxx"),
}...),
)
}
return &userV1.LoginReply{Token: token}, nil
}
func extraVerifyFn(ctx context.Context, claims *jwt.Claims) error {
// judge whether the user is disabled, query whether jwt id exists from the blacklist
//if CheckBlackList(uid, claims.ID) {
// return errors.New("user is disabled")
//}
// get fields from claims
//uid := claims.UID
//name, _ := claims.GetString("name")
//age, _ := claims.GetInt("age")
//isVip, _ := claims.GetBool("is_vip")
return nil
}
// GetByID ...
func (s *user) GetByID(ctx context.Context, req *userV1.GetByIDRequest) (*userV1.GetByIDReply, error) {
// ......
claims,ok := interceptor.GetJwtClaims(ctx) // if necessary, claims can be got from gin context.
// ......
}
Client-side interceptor example code
package main
import (
"context"
"github.com/go-dev-frame/sponge/pkg/grpc/grpccli"
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
userV1 "user/api/user/v1"
)
func main() {
conn, _ := grpccli.NewClient("127.0.0.1:8282")
cli := userV1.NewUserClient(conn)
uid := "100"
ctx := context.Background()
// Case 1: get authorization from header key is "authorization", value is "Bearer xxx"
{
ctx = interceptor.SetAuthToCtx(ctx, authorization)
}
// Case 2: get token from grpc server response result
{
ctx = interceptor.SetJwtTokenToCtx(ctx, token)
}
cli.GetByID(ctx, &userV1.GetUserByIDRequest{Id: 100})
}
Rate Limiter Interceptor
Adaptive throttling interceptor is used to limit the request rate of the client and prevent the server from overloading.
Server-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
// rate limiter
option := grpc.ChainUnaryInterceptor(
interceptor.UnaryServerRateLimit(
//interceptor.WithWindow(time.Second*5), // default 10s
//interceptor.WithBucket(500), // default 100
//interceptor.WithCPUThreshold(900), // default 80%
),
)
options = append(options, option)
return options
}
Circuit Breaker Interceptor
Circuit breaker interceptor is used to protect the server from running out of resources due to excessive client requests.
Server-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
// circuit breaker
option := grpc.ChainUnaryInterceptor(
interceptor.UnaryServerCircuitBreaker(
//interceptor.WithValidCode(codes.DeadlineExceeded), // add error code for circuit breaker
//interceptor.WithUnaryServerDegradeHandler(handler), // add custom degrade handler
//interceptor.WithBreakerOption(
//circuitbreaker.WithSuccess(75), // default 60
//circuitbreaker.WithRequest(200), // default 100
//circuitbreaker.WithBucket(20), // default 10
//circuitbreaker.WithWindow(time.Second*5), // default 3s
//),
),
)
options = append(options, option)
return options
}
Retry Interceptor
Client-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
// retry
option := grpc.WithChainUnaryInterceptor(
interceptor.UnaryClientRetry(
//interceptor.WithRetryTimes(5), // modify the default number of retries to 3 by default
//interceptor.WithRetryInterval(100*time.Millisecond), // modify the default retry interval, default 50 milliseconds
//interceptor.WithRetryErrCodes(), // add trigger retry error code, default is codes.Internal, codes.DeadlineExceeded, codes.Unavailable
),
)
options = append(options, option)
return options
}
Tracing Interceptor
Initialize Tracing
import (
"github.com/go-dev-frame/sponge/pkg/tracer"
"go.opentelemetry.io/otel"
)
// initialize tracing
func InitTrace(serviceName string) {
exporter, err := tracer.NewJaegerAgentExporter("192.168.3.37", "6831")
if err != nil {
panic(err)
}
resource := tracer.NewResource(
tracer.WithServiceName(serviceName),
tracer.WithEnvironment("dev"),
tracer.WithServiceVersion("demo"),
)
tracer.Init(exporter, resource) // collect all by default
}
// if necessary, you can create a span in the program
func SpanDemo(serviceName string, spanName string, ctx context.Context) {
_, span := otel.Tracer(serviceName).Start(
ctx, spanName,
trace.WithAttributes(attribute.String(spanName, time.Now().String())), // customised attributes
)
defer span.End()
// ......
}
Server-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
// use tracing
option := grpc.UnaryInterceptor(
interceptor.UnaryServerTracing(),
)
options = append(options, option)
return options
}
Client-side interceptor example code
import (
"github.com/go-dev-frame/sponge/pkg/grpc/interceptor"
"google.golang.org/grpc"
)
func setDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
// use tracing
option := grpc.WithUnaryInterceptor(
interceptor.UnaryClientTracing(),
)
options = append(options, option)
return options
}
Metrics Interceptor
Server-side interceptor example code
import "github.com/go-dev-frame/sponge/pkg/grpc/metrics"
func UnaryServerLabels(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// set up prometheus custom labels
tag := grpc_ctxtags.NewTags().
Set(serverNameLabelKey, serverNameLabelValue).
Set(envLabelKey, envLabelValue)
newCtx := grpc_ctxtags.SetInContext(ctx, tag)
return handler(newCtx, req)
}
func getServerOptions() []grpc.ServerOption {
var options []grpc.ServerOption
// metrics interceptor
option := grpc.ChainUnaryInterceptor(
//UnaryServerLabels, // tag
metrics.UnaryServerMetrics(
// metrics.WithCounterMetrics(customizedCounterMetric) // adding custom metrics
),
)
options = append(options, option)
option = grpc.ChainStreamInterceptor(
metrics.StreamServerMetrics(), // metrics interceptor for streaming rpc
)
options = append(options, option)
return options
}
func main() {
rand.Seed(time.Now().UnixNano())
addr := ":8282"
fmt.Println("start rpc server", addr)
list, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
server := grpc.NewServer(getServerOptions()...)
serverNameV1.RegisterGreeterServer(server, &GreeterServer{})
// start metrics server, collect grpc metrics by default, turn on, go metrics
metrics.ServerHTTPService(":8283", server)
fmt.Println("start metrics server", ":8283")
err = server.Serve(list)
if err != nil {
panic(err)
}
}
Client-side interceptor example code
import "github.com/go-dev-frame/sponge/pkg/grpc/metrics"
func getDialOptions() []grpc.DialOption {
var options []grpc.DialOption
// use insecure transfer
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
// Metrics
options = append(options, grpc.WithUnaryInterceptor(metrics.UnaryClientMetrics()))
options = append(options, grpc.WithStreamInterceptor(metrics.StreamClientMetrics()))
return options
}
func main() {
conn, err := grpc.NewClient("127.0.0.1:8282", getDialOptions()...)
metrics.ClientHTTPService(":8284")
fmt.Println("start metrics server", ":8284")
client := serverNameV1.NewGreeterClient(conn)
i := 0
for {
i++
time.Sleep(time.Millisecond * 500) // qps is 2
err = sayHello(client, i)
if err != nil {
fmt.Println(err)
}
}
}