分布式事务(DTM)
大约 5 分钟componentdtmdistributedtransaction
什么是 DTM?
DTM(Distributed Transaction Manager)是一款开源的分布式事务管理器,专为解决跨数据库、跨服务、跨语言栈的数据一致性问题而设计。DTM 支持多种分布式事务协议,包括:
- 工作流模式
- Saga 模式
- TCC 模式
- XA 协议
- 两阶段消息
- Outbox 模式
DTM 使用指南
配置与启动 DTM 服务
DTM 支持多种存储引擎,包括 MySQL、Redis、MongoDB 等。
存储配置(以 MySQL 为例)
以 MySQL 作为存储引擎为例,修改 DTM 配置文件 (示例配置文件):
Store: # specify which engine to store trans status
Driver: 'mysql'
Host: '192.168.3.37'
User: 'root'
Password: '123456'
Port: 3306
Db: 'dtm'
# 使用注册与发现(sponge 的 dtm 驱动已支持 etcd、consul、nacos)
#MicroService:
#Driver: 'dtm-driver-sponge'
#Target: 'etcd://127.0.0.1:2379/dtmservice'
#EndPoint: 'grpc://127.0.0.1:36790'
启动 DTM 服务
# 默认端口:HTTP-36789, gRPC-36790
./dtm -c conf.yml
在 gRPC 服务中使用 DTM
1. 在 gRPC 服务配置 DTM
Sponge 的 DTM 驱动已集成到 DTM (v1.19.0+) 框架中,通过简单的注册中心配置即可实现服务发现:
app:
registryDiscoveryType: "etcd" # 注册与发现类型: consul, etcd, nacos, 如果为空,表示禁止使用注册与发现
grpcClient:
- name: "dtmservice" # dtm 服务名称
registryDiscoveryType: "etcd" # 注册与发现类型: consul, etcd, nacos, 如果为空,表示禁止使用注册与发现
host: "127.0.0.1" # dtm 服务 ip 或域名, 如果开启服务与注册,此字段无效
port: 36790 # dtm 服务的 grpc 端口,如果开启服务与注册,此字段无效
etcd:
addrs: ["127.0.0.1:2379"]
注:DTM 服务同时需要配置 MicroService 字段。
2. 获取 DTM 服务地址
在服务初始化时调用 InitDtmServerResolver()
,业务逻辑中通过 GetDtmEndpoint()
获取 DTM 服务地址:
点击查看完整代码
package rpcclient
import (
"fmt"
"strings"
"sync"
"time"
_ "github.com/zhufuyi/dtmdriver-sponge"
"google.golang.org/grpc/resolver"
"github.com/go-dev-frame/sponge/pkg/consulcli"
"github.com/go-dev-frame/sponge/pkg/etcdcli"
"github.com/go-dev-frame/sponge/pkg/logger"
"github.com/go-dev-frame/sponge/pkg/nacoscli"
"github.com/go-dev-frame/sponge/pkg/servicerd/discovery"
"github.com/go-dev-frame/sponge/pkg/servicerd/registry"
"github.com/go-dev-frame/sponge/pkg/servicerd/registry/consul"
"github.com/go-dev-frame/sponge/pkg/servicerd/registry/etcd"
"github.com/go-dev-frame/sponge/pkg/servicerd/registry/nacos"
"your-server-module/internal/config"
)
var (
dtmServerEndPoint string
dtmServerOnce sync.Once
)
// InitDtmServerResolver init dtm resolver
func InitDtmServerResolver() {
cfg := config.Get()
serverName := "dtmservice"
var grpcClientCfg config.GrpcClient
for _, cli := range cfg.GrpcClient {
if strings.EqualFold(cli.Name, serverName) {
grpcClientCfg = cli
break
}
}
if grpcClientCfg.Name == "" {
panic(fmt.Sprintf("not found grpc service name '%v' in configuration file(yaml), "+
"please add gprc service configuration in the configuration file(yaml) under the field grpcClient", serverName))
}
var (
isUseDiscover bool
iDiscovery registry.Discovery
)
switch grpcClientCfg.RegistryDiscoveryType {
// discovering services using consul
case "consul":
dtmServerEndPoint = "discovery:///" + grpcClientCfg.Name // connecting to grpc services by service name
cli, err := consulcli.Init(cfg.Consul.Addr, consulcli.WithWaitTime(time.Second*5))
if err != nil {
panic(fmt.Sprintf("consulcli.Init error: %v, addr: %s", err, cfg.Consul.Addr))
}
iDiscovery = consul.New(cli)
isUseDiscover = true
// discovering services using etcd
case "etcd":
dtmServerEndPoint = "discovery:///" + grpcClientCfg.Name // Connecting to grpc services by service name
cli, err := etcdcli.Init(cfg.Etcd.Addrs, etcdcli.WithDialTimeout(time.Second*5))
if err != nil {
panic(fmt.Sprintf("etcdcli.Init error: %v, addr: %v", err, cfg.Etcd.Addrs))
}
iDiscovery = etcd.New(cli)
isUseDiscover = true
// discovering services using nacos
case "nacos":
// example: endpoint = "discovery:///serverName.scheme"
dtmServerEndPoint = "discovery:///" + grpcClientCfg.Name + ".grpc" // Connecting to grpc services by service name
cli, err := nacoscli.NewNamingClient(
cfg.NacosRd.IPAddr,
cfg.NacosRd.Port,
cfg.NacosRd.NamespaceID)
if err != nil {
panic(fmt.Sprintf("nacoscli.NewNamingClient error: %v, ipAddr: %s, port: %d",
err, cfg.NacosRd.IPAddr, cfg.NacosRd.Port))
}
iDiscovery = nacos.New(cli)
isUseDiscover = true
default:
// if service discovery is not used, connect directly to the grpc service using the ip and port
dtmServerEndPoint = fmt.Sprintf("%s:%d", grpcClientCfg.Host, grpcClientCfg.Port)
isUseDiscover = false
}
if isUseDiscover {
logger.Infof("[dtm] using service discovery, type = %s, endpoint = %s", grpcClientCfg.RegistryDiscoveryType, dtmServerEndPoint)
builder := discovery.NewBuilder(iDiscovery, discovery.WithInsecure(true), discovery.DisableDebugLog())
resolver.Register(builder)
} else {
logger.Infof("[dtm] using address direct connection, endpoint = %s", dtmServerEndPoint)
}
}
// GetDtmEndpoint get dtm service endpoint
func GetDtmEndpoint() string {
if dtmServerEndPoint == "" {
dtmServerOnce.Do(func() {
InitDtmServerResolver()
})
}
return dtmServerEndPoint
}
注:把 your-server-module
替换为实际的服务的 module 名称。
3. 获取 gRPC 服务地址
服务初始化时调用 InitEndpointsForDtm()
,通过 GetYourServerNameEndpoint()
获取当前服务地址:
点击查看完整代码
package rpcclient
import (
"fmt"
"strings"
"sync"
"github.com/go-dev-frame/sponge/pkg/logger"
"your-server-module/internal/config"
)
var (
yourServerNameEndPoint string
yourServerNameOnce sync.Once
)
// get endpoint for dtm service
func getEndpoint(serverName string) (string, error) {
cfg := config.Get()
var grpcClientCfg config.GrpcClient
// local service
if serverName == cfg.App.Name {
grpcClientCfg = config.GrpcClient{
Name: serverName,
Host: cfg.App.Host,
Port: cfg.Grpc.Port,
RegistryDiscoveryType: cfg.App.RegistryDiscoveryType,
EnableLoadBalance: true,
}
} else {
// remote service
for _, cli := range cfg.GrpcClient {
if strings.EqualFold(cli.Name, serverName) {
grpcClientCfg = cli
break
}
}
}
if grpcClientCfg.Name == "" {
return "", fmt.Errorf("not found grpc service name '%v' in configuration file(yaml), "+
"please add gprc service configuration in the configuration file(yaml) under the field grpcClient", serverName)
}
var endpoint string
var isUseDiscover bool
switch grpcClientCfg.RegistryDiscoveryType {
case "consul", "etcd":
endpoint = "discovery:///" + grpcClientCfg.Name
isUseDiscover = true
case "nacos":
endpoint = "discovery:///" + grpcClientCfg.Name + ".grpc"
isUseDiscover = true
default:
endpoint = fmt.Sprintf("%s:%d", grpcClientCfg.Host, grpcClientCfg.Port)
}
if isUseDiscover {
logger.Infof("[dtm] connects to the [%s] service through service discovery, type = %s, endpoint = %s", serverName, grpcClientCfg.RegistryDiscoveryType, endpoint)
} else {
logger.Infof("[dtm] connects directly to the [%s] service through IP address, endpoint = %s", serverName, endpoint)
}
return endpoint, nil
}
// InitEndpointsForDtm init endpoints for dtm service
func InitEndpointsForDtm() {
GetYourServerNameEndpoint()
}
// GetYourServerNameEndpoint get grpc endpoint
func GetYourServerNameEndpoint() string {
if yourServerNameEndPoint == "" {
yourServerNameOnce.Do(func() {
endpoint, err := getEndpoint(config.Get().App.Name)
if err != nil {
panic(err)
}
yourServerNameEndPoint = endpoint
})
}
return yourServerNameEndPoint
}
注:
- 把
your-server-module
替换为实际的服务的 module 名称。 - 把
YourServerName
和yourServerName
替换为实际的服务名称(区分大小写)。
4. 创建 DTM 事务
获取到 DTM 服务和gRPC 服务的 endpoint 地址之后,就可以在 Sponge 创建的服务中就可以使用工作流
、Saga
、TCC
、XA
、两阶段消息
等分布式事务协议进行分布式事务处理。
点击查看使用两阶段消息示例代码
// Transfer 转账
func (s *transfer) Transfer(ctx context.Context, req *transferV1.TransferRequest) (*transferV1.TransferReply, error) {
err := req.Validate()
if err != nil {
logger.Warn("req.Validate error", logger.Err(err), logger.Any("req", req), interceptor.ServerCtxRequestIDField(ctx))
return nil, ecode.StatusInvalidParams.Err()
}
var (
// 使用 ip 直连方式还是服务发现方式,取决于配置文件中配置
dtmServer = rpcclient.GetDtmEndpoint()
transferServer = rpcclient.GetTransferEndpoint()
)
transOutURL := transferServer + transferV1.Transfer_TransOut_FullMethodName
transOutBody := &transferV1.TransOutRequest{
Amount: req.Amount,
UserId: req.FromUserId,
}
transInURL := transferServer + transferV1.Transfer_TransIn_FullMethodName
transInData := &transferV1.TransInRequest{
Amount: req.Amount,
UserId: req.ToUserId,
}
headers := map[string]string{interceptor.ContextRequestIDKey: interceptor.ServerCtxRequestID(ctx)}
gid := dtmgrpc.MustGenGid(dtmServer)
msg := dtmgrpc.NewMsgGrpc(dtmServer, gid, dtmgrpc.WithBranchHeaders(headers))
msg.Add(transOutURL, transOutBody)
msg.Add(transInURL, transInData)
msg.WaitResult = true
err = msg.Submit()
if err != nil {
logger.Error("Transfer error", logger.Err(err), logger.Any("req", req), interceptor.ServerCtxRequestIDField(ctx))
return nil, ecode.StatusInternalServerError.Err()
}
logger.Info("submit dtm transaction success", logger.String("gid", gid), interceptor.ServerCtxRequestIDField(ctx))
return &transferV1.TransferReply{}, nil
}
在 Sponge 创建的服务中集成 DTM 完整示例
gRPC 服务
- 点击查看 秒杀场景中的二阶段消息实现。
web 服务
- 点击查看 秒杀场景中的二阶段消息实现。