Distributed Transaction (DTM)
--- START OF FILE dtm.md ---
What is DTM?
DTM (Distributed Transaction Manager) is an open-source distributed transaction manager designed to solve data consistency problems across databases, services, and language stacks. DTM supports multiple distributed transaction protocols, including:
- Workflow mode
- Saga mode
- TCC mode
- XA protocol
- Two-Phase Message
- Outbox mode
DTM Usage Guide
Configure and Start the DTM Service
DTM supports multiple storage engines, including MySQL, Redis, MongoDB, etc.
Storage Configuration (Using MySQL as an Example)
Taking MySQL as the storage engine for example, modify the DTM configuration file (sample configuration file):
Store: # specify which engine to store trans status
    Driver: 'mysql'
    Host: '192.168.3.37'
    User: 'root'
    Password: '123456'
    Port: 3306
    Db: 'dtm'
# Use service registration and discovery (Sponge's dtm driver supports etcd, consul, nacos)
#MicroService:
    #Driver: 'dtm-driver-sponge'
    #Target: 'etcd://127.0.0.1:2379/dtmservice'
    #EndPoint: 'grpc://127.0.0.1:36790'Start the DTM Service
# Default ports: HTTP-36789, gRPC-36790
./dtm -c conf.ymlUseing DTM in gRPC Services
1. Configure DTM in the gRPC Service
Sponge's DTM driver has been integrated into the DTM (v1.19.0+) framework, enabling service discovery through simple registry center configuration:
app:
  registryDiscoveryType: "etcd"          # Registry and discovery type: consul, etcd, nacos. If empty, service registration and discovery is disabled
grpcClient:
  - name: "dtmservice"                   # dtm service name
    registryDiscoveryType: "etcd"        # Registry and discovery type: consul, etcd, nacos. If empty, service registration and discovery is disabled
    host: "127.0.0.1"                    # dtm service IP or domain, this field is invalid if service registration is enabled
    port: 36790                          # dtm service's gRPC port, this field is invalid if service registration is enabled
etcd:
  addrs: ["127.0.0.1:2379"]Note: The DTM service also needs the MicroService field to be configured.
2. Get the DTM Service Address
Call InitDtmServerResolver() during service initialization, and get the DTM service address in the business logic using GetDtmEndpoint():
Click to view the full code
package rpcclient
import (
    "fmt"
    "strings"
    "sync"
    "time"
    _ "github.com/zhufuyi/dtmdriver-sponge"
    "google.golang.org/grpc/resolver"
    "github.com/go-dev-frame/sponge/pkg/consulcli"
    "github.com/go-dev-frame/sponge/pkg/etcdcli"
    "github.com/go-dev-frame/sponge/pkg/logger"
    "github.com/go-dev-frame/sponge/pkg/nacoscli"
    "github.com/go-dev-frame/sponge/pkg/servicerd/discovery"
    "github.com/go-dev-frame/sponge/pkg/servicerd/registry"
    "github.com/go-dev-frame/sponge/pkg/servicerd/registry/consul"
    "github.com/go-dev-frame/sponge/pkg/servicerd/registry/etcd"
    "github.com/go-dev-frame/sponge/pkg/servicerd/registry/nacos"
    "your-server-module/internal/config"
)
var (
    dtmServerEndPoint string
    dtmServerOnce     sync.Once
)
// InitDtmServerResolver init dtm resolver
func InitDtmServerResolver() {
    cfg := config.Get()
    serverName := "dtmservice"
    var grpcClientCfg config.GrpcClient
    for _, cli := range cfg.GrpcClient {
        if strings.EqualFold(cli.Name, serverName) {
            grpcClientCfg = cli
            break
        }
    }
    if grpcClientCfg.Name == "" {
        panic(fmt.Sprintf("not found grpc service name '%v' in configuration file(yaml), "+
            "please add gprc service configuration in the configuration file(yaml) under the field grpcClient", serverName))
    }
    var (
        isUseDiscover bool
        iDiscovery    registry.Discovery
    )
    switch grpcClientCfg.RegistryDiscoveryType {
    // discovering services using consul
    case "consul":
        dtmServerEndPoint = "discovery:///" + grpcClientCfg.Name // connecting to grpc services by service name
        cli, err := consulcli.Init(cfg.Consul.Addr, consulcli.WithWaitTime(time.Second*5))
        if err != nil {
            panic(fmt.Sprintf("consulcli.Init error: %v, addr: %s", err, cfg.Consul.Addr))
        }
        iDiscovery = consul.New(cli)
        isUseDiscover = true
    // discovering services using etcd
    case "etcd":
        dtmServerEndPoint = "discovery:///" + grpcClientCfg.Name // Connecting to grpc services by service name
        cli, err := etcdcli.Init(cfg.Etcd.Addrs, etcdcli.WithDialTimeout(time.Second*5))
        if err != nil {
            panic(fmt.Sprintf("etcdcli.Init error: %v, addr: %v", err, cfg.Etcd.Addrs))
        }
        iDiscovery = etcd.New(cli)
        isUseDiscover = true
    // discovering services using nacos
    case "nacos":
        // example: endpoint = "discovery:///serverName.scheme"
        dtmServerEndPoint = "discovery:///" + grpcClientCfg.Name + ".grpc" // Connecting to grpc services by service name
        cli, err := nacoscli.NewNamingClient(
            cfg.NacosRd.IPAddr,
            cfg.NacosRd.Port,
            cfg.NacosRd.NamespaceID)
        if err != nil {
            panic(fmt.Sprintf("nacoscli.NewNamingClient error: %v, ipAddr: %s, port: %d",
                err, cfg.NacosRd.IPAddr, cfg.NacosRd.Port))
        }
        iDiscovery = nacos.New(cli)
        isUseDiscover = true
    default:
        // if service discovery is not used, connect directly to the grpc service using the ip and port
        dtmServerEndPoint = fmt.Sprintf("%s:%d", grpcClientCfg.Host, grpcClientCfg.Port)
        isUseDiscover = false
    }
    if isUseDiscover {
        logger.Infof("[dtm] using service discovery, type = %s, endpoint = %s", grpcClientCfg.RegistryDiscoveryType, dtmServerEndPoint)
        builder := discovery.NewBuilder(iDiscovery, discovery.WithInsecure(true), discovery.DisableDebugLog())
        resolver.Register(builder)
    } else {
        logger.Infof("[dtm] using address direct connection, endpoint = %s", dtmServerEndPoint)
    }
}
// GetDtmEndpoint get dtm service endpoint
func GetDtmEndpoint() string {
    if dtmServerEndPoint == "" {
        dtmServerOnce.Do(func() {
            InitDtmServerResolver()
        })
    }
    return dtmServerEndPoint
}Note: Replace your-server-module with the module name of the actual service.
3. Get the gRPC Service Address
Call InitEndpointsForDtm() during service initialization, and get the current service address using Get_your_server_name_Endpoint():
Click to view the full code
package rpcclient
import (
    "fmt"
    "strings"
    "sync"
    "github.com/go-dev-frame/sponge/pkg/logger"
    "your-server-module/internal/config"
)
var (
    yourServerNameEndPoint string
    yourServerNameOnce     sync.Once
)
// get endpoint for dtm service
func getEndpoint(serverName string) (string, error) {
    cfg := config.Get()
    var grpcClientCfg config.GrpcClient
    // local service
    if serverName == cfg.App.Name {
        grpcClientCfg = config.GrpcClient{
            Name:                  serverName,
            Host:                  cfg.App.Host,
            Port:                  cfg.Grpc.Port,
            RegistryDiscoveryType: cfg.App.RegistryDiscoveryType,
            EnableLoadBalance:     true,
        }
    } else {
        // remote service
        for _, cli := range cfg.GrpcClient {
            if strings.EqualFold(cli.Name, serverName) {
                grpcClientCfg = cli
                break
            }
        }
    }
    if grpcClientCfg.Name == "" {
        return "", fmt.Errorf("not found grpc service name '%v' in configuration file(yaml), "+
            "please add gprc service configuration in the configuration file(yaml) under the field grpcClient", serverName)
    }
    var endpoint string
    var isUseDiscover bool
    switch grpcClientCfg.RegistryDiscoveryType {
    case "consul", "etcd":
        endpoint = "discovery:///" + grpcClientCfg.Name
        isUseDiscover = true
    case "nacos":
        endpoint = "discovery:///" + grpcClientCfg.Name + ".grpc"
        isUseDiscover = true
    default:
        endpoint = fmt.Sprintf("%s:%d", grpcClientCfg.Host, grpcClientCfg.Port)
    }
    if isUseDiscover {
        logger.Infof("[dtm] connects to the [%s] service through service discovery, type = %s, endpoint = %s", serverName, grpcClientCfg.RegistryDiscoveryType, endpoint)
    } else {
        logger.Infof("[dtm] connects directly to the [%s] service through IP address, endpoint = %s", serverName, endpoint)
    }
    return endpoint, nil
}
// InitEndpointsForDtm init endpoints for dtm service
func InitEndpointsForDtm() {
    GetYourServerNameEndpoint()
}
// GetYourServerNameEndpoint get grpc endpoint
func GetYourServerNameEndpoint() string {
    if yourServerNameEndPoint == "" {
        yourServerNameOnce.Do(func() {
            endpoint, err := getEndpoint(config.Get().App.Name)
            if err != nil {
                panic(err)
            }
            yourServerNameEndPoint = endpoint
        })
    }
    return yourServerNameEndPoint
}Note:
- Replace your-server-modulewith the module name of the actual service.
- Replace YourServerNameandYourServerNamewith the actual service name (case-sensitive).
4. Create a DTM Transaction
After obtaining the endpoint addresses for the DTM service and the gRPC service, you can use distributed transaction protocols such as Workflow, Saga, TCC, XA, and Two-Phase Message in the services created with Sponge to handle distributed transactions.
Click to view the Two-Phase Message example code
func (s *transfer) Transfer(ctx context.Context, req *transferV1.TransferRequest) (*transferV1.TransferReply, error) {
    err := req.Validate()
    if err != nil {
        logger.Warn("req.Validate error", logger.Err(err), logger.Any("req", req), interceptor.ServerCtxRequestIDField(ctx))
        return nil, ecode.StatusInvalidParams.Err()
    }
    var (
        // Whether to use ip direct connection or service discovery is configured in the configuration file
        dtmServer      = rpcclient.GetDtmEndpoint()
        transferServer = rpcclient.GetTransferEndpoint()
    )
    transOutURL := transferServer + transferV1.Transfer_TransOut_FullMethodName
    transOutBody := &transferV1.TransOutRequest{
        Amount: req.Amount,
        UserId: req.FromUserId,
    }
    transInURL := transferServer + transferV1.Transfer_TransIn_FullMethodName
    transInData := &transferV1.TransInRequest{
        Amount: req.Amount,
        UserId: req.ToUserId,
    }
    headers := map[string]string{interceptor.ContextRequestIDKey: interceptor.ServerCtxRequestID(ctx)}
    gid := dtmgrpc.MustGenGid(dtmServer)
    msg := dtmgrpc.NewMsgGrpc(dtmServer, gid, dtmgrpc.WithBranchHeaders(headers))
    msg.Add(transOutURL, transOutBody)
    msg.Add(transInURL, transInData)
    msg.WaitResult = true
    err = msg.Submit()
    if err != nil {
        logger.Error("Transfer error", logger.Err(err), logger.Any("req", req), interceptor.ServerCtxRequestIDField(ctx))
        return nil, ecode.StatusInternalServerError.Err()
    }
    logger.Info("submit dtm transaction success", logger.String("gid", gid), interceptor.ServerCtxRequestIDField(ctx))
    return &transferV1.TransferReply{}, nil
}Complete Example of Integrating DTM in a Service Created with Sponge
- gRPC Service - Click to view the Two-Phase Message implementation in a flash sale scenario.
 
- Web Service - Click to view the Two-Phase Message implementation in a flash sale scenario.