Distributed Transaction (DTM)
--- START OF FILE dtm.md ---
What is DTM?
DTM (Distributed Transaction Manager) is an open-source distributed transaction manager designed to solve data consistency problems across databases, services, and language stacks. DTM supports multiple distributed transaction protocols, including:
- Workflow mode
- Saga mode
- TCC mode
- XA protocol
- Two-Phase Message
- Outbox mode
DTM Usage Guide
Configure and Start the DTM Service
DTM supports multiple storage engines, including MySQL, Redis, MongoDB, etc.
Storage Configuration (Using MySQL as an Example)
Taking MySQL as the storage engine for example, modify the DTM configuration file (sample configuration file):
Store: # specify which engine to store trans status
Driver: 'mysql'
Host: '192.168.3.37'
User: 'root'
Password: '123456'
Port: 3306
Db: 'dtm'
# Use service registration and discovery (Sponge's dtm driver supports etcd, consul, nacos)
#MicroService:
#Driver: 'dtm-driver-sponge'
#Target: 'etcd://127.0.0.1:2379/dtmservice'
#EndPoint: 'grpc://127.0.0.1:36790'
Start the DTM Service
# Default ports: HTTP-36789, gRPC-36790
./dtm -c conf.yml
Useing DTM in gRPC Services
1. Configure DTM in the gRPC Service
Sponge's DTM driver has been integrated into the DTM (v1.19.0+) framework, enabling service discovery through simple registry center configuration:
app:
registryDiscoveryType: "etcd" # Registry and discovery type: consul, etcd, nacos. If empty, service registration and discovery is disabled
grpcClient:
- name: "dtmservice" # dtm service name
registryDiscoveryType: "etcd" # Registry and discovery type: consul, etcd, nacos. If empty, service registration and discovery is disabled
host: "127.0.0.1" # dtm service IP or domain, this field is invalid if service registration is enabled
port: 36790 # dtm service's gRPC port, this field is invalid if service registration is enabled
etcd:
addrs: ["127.0.0.1:2379"]
Note: The DTM service also needs the MicroService field to be configured.
2. Get the DTM Service Address
Call InitDtmServerResolver()
during service initialization, and get the DTM service address in the business logic using GetDtmEndpoint()
:
Click to view the full code
package rpcclient
import (
"fmt"
"strings"
"sync"
"time"
_ "github.com/zhufuyi/dtmdriver-sponge"
"google.golang.org/grpc/resolver"
"github.com/go-dev-frame/sponge/pkg/consulcli"
"github.com/go-dev-frame/sponge/pkg/etcdcli"
"github.com/go-dev-frame/sponge/pkg/logger"
"github.com/go-dev-frame/sponge/pkg/nacoscli"
"github.com/go-dev-frame/sponge/pkg/servicerd/discovery"
"github.com/go-dev-frame/sponge/pkg/servicerd/registry"
"github.com/go-dev-frame/sponge/pkg/servicerd/registry/consul"
"github.com/go-dev-frame/sponge/pkg/servicerd/registry/etcd"
"github.com/go-dev-frame/sponge/pkg/servicerd/registry/nacos"
"your-server-module/internal/config"
)
var (
dtmServerEndPoint string
dtmServerOnce sync.Once
)
// InitDtmServerResolver init dtm resolver
func InitDtmServerResolver() {
cfg := config.Get()
serverName := "dtmservice"
var grpcClientCfg config.GrpcClient
for _, cli := range cfg.GrpcClient {
if strings.EqualFold(cli.Name, serverName) {
grpcClientCfg = cli
break
}
}
if grpcClientCfg.Name == "" {
panic(fmt.Sprintf("not found grpc service name '%v' in configuration file(yaml), "+
"please add gprc service configuration in the configuration file(yaml) under the field grpcClient", serverName))
}
var (
isUseDiscover bool
iDiscovery registry.Discovery
)
switch grpcClientCfg.RegistryDiscoveryType {
// discovering services using consul
case "consul":
dtmServerEndPoint = "discovery:///" + grpcClientCfg.Name // connecting to grpc services by service name
cli, err := consulcli.Init(cfg.Consul.Addr, consulcli.WithWaitTime(time.Second*5))
if err != nil {
panic(fmt.Sprintf("consulcli.Init error: %v, addr: %s", err, cfg.Consul.Addr))
}
iDiscovery = consul.New(cli)
isUseDiscover = true
// discovering services using etcd
case "etcd":
dtmServerEndPoint = "discovery:///" + grpcClientCfg.Name // Connecting to grpc services by service name
cli, err := etcdcli.Init(cfg.Etcd.Addrs, etcdcli.WithDialTimeout(time.Second*5))
if err != nil {
panic(fmt.Sprintf("etcdcli.Init error: %v, addr: %v", err, cfg.Etcd.Addrs))
}
iDiscovery = etcd.New(cli)
isUseDiscover = true
// discovering services using nacos
case "nacos":
// example: endpoint = "discovery:///serverName.scheme"
dtmServerEndPoint = "discovery:///" + grpcClientCfg.Name + ".grpc" // Connecting to grpc services by service name
cli, err := nacoscli.NewNamingClient(
cfg.NacosRd.IPAddr,
cfg.NacosRd.Port,
cfg.NacosRd.NamespaceID)
if err != nil {
panic(fmt.Sprintf("nacoscli.NewNamingClient error: %v, ipAddr: %s, port: %d",
err, cfg.NacosRd.IPAddr, cfg.NacosRd.Port))
}
iDiscovery = nacos.New(cli)
isUseDiscover = true
default:
// if service discovery is not used, connect directly to the grpc service using the ip and port
dtmServerEndPoint = fmt.Sprintf("%s:%d", grpcClientCfg.Host, grpcClientCfg.Port)
isUseDiscover = false
}
if isUseDiscover {
logger.Infof("[dtm] using service discovery, type = %s, endpoint = %s", grpcClientCfg.RegistryDiscoveryType, dtmServerEndPoint)
builder := discovery.NewBuilder(iDiscovery, discovery.WithInsecure(true), discovery.DisableDebugLog())
resolver.Register(builder)
} else {
logger.Infof("[dtm] using address direct connection, endpoint = %s", dtmServerEndPoint)
}
}
// GetDtmEndpoint get dtm service endpoint
func GetDtmEndpoint() string {
if dtmServerEndPoint == "" {
dtmServerOnce.Do(func() {
InitDtmServerResolver()
})
}
return dtmServerEndPoint
}
Note: Replace your-server-module
with the module name of the actual service.
3. Get the gRPC Service Address
Call InitEndpointsForDtm()
during service initialization, and get the current service address using Get_your_server_name_Endpoint()
:
Click to view the full code
package rpcclient
import (
"fmt"
"strings"
"sync"
"github.com/go-dev-frame/sponge/pkg/logger"
"your-server-module/internal/config"
)
var (
yourServerNameEndPoint string
yourServerNameOnce sync.Once
)
// get endpoint for dtm service
func getEndpoint(serverName string) (string, error) {
cfg := config.Get()
var grpcClientCfg config.GrpcClient
// local service
if serverName == cfg.App.Name {
grpcClientCfg = config.GrpcClient{
Name: serverName,
Host: cfg.App.Host,
Port: cfg.Grpc.Port,
RegistryDiscoveryType: cfg.App.RegistryDiscoveryType,
EnableLoadBalance: true,
}
} else {
// remote service
for _, cli := range cfg.GrpcClient {
if strings.EqualFold(cli.Name, serverName) {
grpcClientCfg = cli
break
}
}
}
if grpcClientCfg.Name == "" {
return "", fmt.Errorf("not found grpc service name '%v' in configuration file(yaml), "+
"please add gprc service configuration in the configuration file(yaml) under the field grpcClient", serverName)
}
var endpoint string
var isUseDiscover bool
switch grpcClientCfg.RegistryDiscoveryType {
case "consul", "etcd":
endpoint = "discovery:///" + grpcClientCfg.Name
isUseDiscover = true
case "nacos":
endpoint = "discovery:///" + grpcClientCfg.Name + ".grpc"
isUseDiscover = true
default:
endpoint = fmt.Sprintf("%s:%d", grpcClientCfg.Host, grpcClientCfg.Port)
}
if isUseDiscover {
logger.Infof("[dtm] connects to the [%s] service through service discovery, type = %s, endpoint = %s", serverName, grpcClientCfg.RegistryDiscoveryType, endpoint)
} else {
logger.Infof("[dtm] connects directly to the [%s] service through IP address, endpoint = %s", serverName, endpoint)
}
return endpoint, nil
}
// InitEndpointsForDtm init endpoints for dtm service
func InitEndpointsForDtm() {
GetYourServerNameEndpoint()
}
// GetYourServerNameEndpoint get grpc endpoint
func GetYourServerNameEndpoint() string {
if yourServerNameEndPoint == "" {
yourServerNameOnce.Do(func() {
endpoint, err := getEndpoint(config.Get().App.Name)
if err != nil {
panic(err)
}
yourServerNameEndPoint = endpoint
})
}
return yourServerNameEndPoint
}
Note:
- Replace
your-server-module
with the module name of the actual service. - Replace
YourServerName
andYourServerName
with the actual service name (case-sensitive).
4. Create a DTM Transaction
After obtaining the endpoint addresses for the DTM service and the gRPC service, you can use distributed transaction protocols such as Workflow
, Saga
, TCC
, XA
, and Two-Phase Message
in the services created with Sponge to handle distributed transactions.
Click to view the Two-Phase Message example code
func (s *transfer) Transfer(ctx context.Context, req *transferV1.TransferRequest) (*transferV1.TransferReply, error) {
err := req.Validate()
if err != nil {
logger.Warn("req.Validate error", logger.Err(err), logger.Any("req", req), interceptor.ServerCtxRequestIDField(ctx))
return nil, ecode.StatusInvalidParams.Err()
}
var (
// Whether to use ip direct connection or service discovery is configured in the configuration file
dtmServer = rpcclient.GetDtmEndpoint()
transferServer = rpcclient.GetTransferEndpoint()
)
transOutURL := transferServer + transferV1.Transfer_TransOut_FullMethodName
transOutBody := &transferV1.TransOutRequest{
Amount: req.Amount,
UserId: req.FromUserId,
}
transInURL := transferServer + transferV1.Transfer_TransIn_FullMethodName
transInData := &transferV1.TransInRequest{
Amount: req.Amount,
UserId: req.ToUserId,
}
headers := map[string]string{interceptor.ContextRequestIDKey: interceptor.ServerCtxRequestID(ctx)}
gid := dtmgrpc.MustGenGid(dtmServer)
msg := dtmgrpc.NewMsgGrpc(dtmServer, gid, dtmgrpc.WithBranchHeaders(headers))
msg.Add(transOutURL, transOutBody)
msg.Add(transInURL, transInData)
msg.WaitResult = true
err = msg.Submit()
if err != nil {
logger.Error("Transfer error", logger.Err(err), logger.Any("req", req), interceptor.ServerCtxRequestIDField(ctx))
return nil, ecode.StatusInternalServerError.Err()
}
logger.Info("submit dtm transaction success", logger.String("gid", gid), interceptor.ServerCtxRequestIDField(ctx))
return &transferV1.TransferReply{}, nil
}
Complete Example of Integrating DTM in a Service Created with Sponge
gRPC Service
- Click to view the Two-Phase Message implementation in a flash sale scenario.
Web Service
- Click to view the Two-Phase Message implementation in a flash sale scenario.