分布式事務(wù) | 使用DTM 的Saga 模式

DTM 簡介

前面章節(jié)提及的MassTransit枯夜、dotnetcore/CAP都提供了分布式事務(wù)的處理能力示罗,但也僅局限于Saga和本地消息表模式的實現(xiàn)桂塞。那有沒有一個獨立的分布式事務(wù)解決方案虑椎,涵蓋多種分布式事務(wù)處理模式瘟芝,如Saga遥昧、TCC覆醇、XA模式等朵纷。有,目前業(yè)界主要有兩種開源方案永脓,其一是阿里開源的Seata袍辞,另一個就是DTM。其中Seata僅支持Java常摧、Go和Python語言搅吁,因此不在.NET 的選擇范圍。DTM則通過提供簡單易用的HTTP和gRPC接口落午,屏蔽了語言的無關(guān)性谎懦,因此支持任何開發(fā)語言接入,目前提供了Go溃斋、Python界拦、NodeJs、Ruby盐类、Java和C#等語言的SDK寞奸。
DTM,全稱Distributed Transaction Manager在跳,是一個分布式事務(wù)管理器枪萄,解決跨數(shù)據(jù)庫、跨服務(wù)猫妙、跨語言更新數(shù)據(jù)的一致性問題瓷翻。它提供了Saga、TCC割坠、 XA和二階段消息模式以滿足不同應(yīng)用場景的需求齐帚,同時其首創(chuàng)的子事務(wù)屏障技術(shù)可以有效解決冪等、懸掛和空補償?shù)犬惓栴}彼哼。

DTM 事務(wù)處理過程及架構(gòu)

那DTM是如何處理分布式事務(wù)的呢对妄?以一個經(jīng)典的跨行轉(zhuǎn)賬業(yè)務(wù)為例來看下事務(wù)處理過程。對于跨行轉(zhuǎn)賬業(yè)務(wù)而言敢朱,很顯然是跨庫跨服務(wù)的應(yīng)用場景剪菱,不能簡單通過本地事務(wù)解決,可以使用Saga模式拴签,以下是基于DTM提供的Saga事務(wù)模式成功轉(zhuǎn)賬的的時序圖:

從以上時序圖可以看出孝常,DTM整個全局事務(wù)分為如下幾步:

  1. 用戶定義好全局事務(wù)所有的事務(wù)分支(全局事務(wù)的組成部分稱為事務(wù)分支),然后提交給DTM蚓哩,DTM持久化全局事務(wù)信息后构灸,立即返回
  2. DTM取出第一個事務(wù)分支,這里是TransOut岸梨,調(diào)用該服務(wù)并成功返回
  3. DTM取出第二個事務(wù)分支喜颁,這里是TransIn稠氮,調(diào)用該服務(wù)并成功返回
  4. DTM已完成所有的事務(wù)分支,將全局事務(wù)的狀態(tài)修改為已完成

基于以上這個時序圖的基礎(chǔ)上洛巢,再來看下DTM的架構(gòu):

整個DTM架構(gòu)中括袒,一共有三個角色次兆,分別承擔了不同的職責:

  • RM-資源管理器:RM是一個應(yīng)用服務(wù)稿茉,通常連接到獨立的數(shù)據(jù)庫,負責處理全局事務(wù)中的本地事務(wù)芥炭,執(zhí)行相關(guān)數(shù)據(jù)的修改漓库、提交、回滾园蝠、補償?shù)炔僮髅燧铩@缭谇懊娴倪@個Saga事務(wù)時序圖中,步驟2彪薛、3中被調(diào)用的TransIn和TransOut方法所在的服務(wù)都是RM茂装。
  • AP-應(yīng)用程序:AP是一個應(yīng)用服務(wù),負責全局事務(wù)的編排善延,他會注冊全局事務(wù)少态,注冊子事務(wù),調(diào)用RM接口易遣。例如在前面的這個SAGA事務(wù)中彼妻,發(fā)起步驟1的是AP,它編排了一個包含TransOut豆茫、TransIn的全局事務(wù)侨歉,然后提交給TM
  • TM-事務(wù)管理器:TM就是DTM服務(wù),負責全局事務(wù)的管理揩魂,作為一個獨立的服務(wù)而存在幽邓。每個全局事務(wù)都注冊到TM,每個事務(wù)分支也注冊到TM火脉。TM會協(xié)調(diào)所有的RM來執(zhí)行不同的事務(wù)分支牵舵,并根據(jù)執(zhí)行結(jié)果決定是否提交或回滾事務(wù)。例如在前面的Saga事務(wù)時序圖中忘分,TM在步驟2棋枕、3中調(diào)用了各個RM,在步驟4中妒峦,完成這個全局事務(wù)重斑。

總體而言,AP-應(yīng)用程序充當全局事務(wù)編排器的角色通過DTM提供的開箱即用的SDK進行全局事務(wù)和子事務(wù)的注冊肯骇。TM-事務(wù)管理器接收到注冊的全局事務(wù)和子事務(wù)后窥浪,負責調(diào)用RM-資源管理器來執(zhí)行對應(yīng)的事務(wù)分支祖很,TM-事務(wù)管理器根據(jù)事務(wù)分支的執(zhí)行結(jié)果決定是否提及或回滾事務(wù)。

快速上手

百聞不如一見漾脂,接下來就來實際上手體驗下如何基于DTM來實際應(yīng)用Saga進行分布式跨行轉(zhuǎn)賬事務(wù)的處理假颇。

創(chuàng)建示例項目

接下來就來創(chuàng)建一個示例項目:

  1. 使用dotnet new webapi -n DtmDemo.Webapi創(chuàng)建示例項目。
  2. 添加Nuget包:DtmcliPomelo.EntityFrameworkCore.MySql骨稿。
  3. 添加DTM配置項:
{
  "dtm": {
    "DtmUrl": "http://localhost:36789",
    "DtmTimeout": 10000,
    "BranchTimeout": 10000,
    "DBType": "mysql",
    "BarrierTableName": "dtm_barrier.barrier",
  }
}
  1. 定義銀行賬戶BankAccount實體類:
namespace DtmDemo.WebApi.Models
{
    public class BankAccount
    {
        public int Id { get; set; }
        public decimal Balance { get; set; }
    }
}
  1. 定義DtmDemoWebApiContext數(shù)據(jù)庫上下文:
using Microsoft.EntityFrameworkCore;

namespace DtmDemo.WebApi.Data
{
    public class DtmDemoWebApiContext : DbContext
    {
        public DtmDemoWebApiContext (DbContextOptions<DtmDemoWebApiContext> options)
            : base(options)
        {
        }

        public DbSet<DtmDemo.WebApi.Models.BankAccount> BankAccount { get; set; } = default!;
    }
}
  1. 注冊DbContext 和DTM服務(wù):
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using Dtmcli;

var builder = WebApplication.CreateBuilder(args);
var connectionStr = builder.Configuration.GetConnectionString("DtmDemoWebApiContext");
// 注冊DbContext
builder.Services.AddDbContext<DtmDemoWebApiContext>(options =>
{
    options.UseMySql(connectionStr, ServerVersion.AutoDetect(connectionStr));
});

// 注冊DTM
builder.Services.AddDtmcli(builder.Configuration, "dtm");
  1. 執(zhí)行dotnet ef migrations add 'Initial' 創(chuàng)建遷移笨鸡。
  2. 為便于初始化演示數(shù)據(jù),定義BankAccountController如下坦冠,其中PostBankAccount接口添加了await _context.Database.MigrateAsync();用于自動應(yīng)用遷移形耗。
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;

namespace DtmDemo.WebApi.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class BankAccountsController : ControllerBase
    {
        private readonly DtmDemoWebApiContext _context;

        public BankAccountsController(DtmDemoWebApiContext context)
        {
            _context = context;
        }
        [HttpGet]
        public async Task<ActionResult<IEnumerable<BankAccount>>> GetBankAccount()
        {
            return await _context.BankAccount.ToListAsync();
        }

        [HttpPost]
        public async Task<ActionResult<BankAccount>> PostBankAccount(BankAccount bankAccount)
        {
            await _context.Database.MigrateAsync();
            _context.BankAccount.Add(bankAccount);
            await _context.SaveChangesAsync();

            return Ok(bankAccount);
        }
}

應(yīng)用Saga模式

接下來定義SagaDemoController來使用DTM的Saga模式來模擬跨行轉(zhuǎn)賬分布式事務(wù):

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;
using DtmCommon;

namespace DtmDemo.WebApi.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class SagaDemoController : ControllerBase
    {
        private readonly DtmDemoWebApiContext _context;
        private readonly IConfiguration _configuration;
        private readonly IDtmClient _dtmClient;
        private readonly IDtmTransFactory _transFactory;

        private readonly IBranchBarrierFactory _barrierFactory;
        private readonly ILogger<BankAccountsController> _logger;

        public SagaDemoController(DtmDemoWebApiContext context, IConfiguration configuration, IDtmClient dtmClient, IDtmTransFactory transFactory, ILogger<BankAccountsController> logger, IBranchBarrierFactory barrierFactory)
        {
            this._context = context;
            this._configuration = configuration;
            this._dtmClient = dtmClient;
            this._transFactory = transFactory;
            this._logger = logger;
            this._barrierFactory = barrierFactory;
        }
}

對于跨行轉(zhuǎn)賬業(yè)務(wù),使用DTM的Saga模式辙浑,首先要進行事務(wù)拆分激涤,可以拆分為以下4個子事務(wù),并分別實現(xiàn):

轉(zhuǎn)出子事務(wù)(TransferOut)

    [HttpPost("TransferOut")]
    public async Task<IActionResult> TransferOut([FromBody] TransferRequest request)
    {
        var msg = $"用戶{request.UserId}轉(zhuǎn)出{request.Amount}元";
        _logger.LogInformation($"轉(zhuǎn)出子事務(wù)-啟動:{msg}");
        // 1. 創(chuàng)建子事務(wù)屏障
        var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        try
        {
            using (var conn = _context.Database.GetDbConnection())
            {
                // 2. 在子事務(wù)屏障內(nèi)執(zhí)行事務(wù)操作
                await branchBarrier.Call(conn, async (tx) =>
                {
                    _logger.LogInformation($"轉(zhuǎn)出子事務(wù)-執(zhí)行:{msg}");
                    await _context.Database.UseTransactionAsync(tx);
                    var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                    if (bankAccount == null || bankAccount.Balance < request.Amount)
                        throw new InvalidDataException("賬戶不存在或余額不足判呕!");
                    bankAccount.Balance -= request.Amount;
                    await _context.SaveChangesAsync();
                });
            }
        }
        catch (InvalidDataException ex)
        {
            _logger.LogInformation($"轉(zhuǎn)出子事務(wù)-失斁胩摺:{ex.Message}");
            // 3. 按照接口協(xié)議,返回409侠草,以表示子事務(wù)失敗
            return new StatusCodeResult(StatusCodes.Status409Conflict);
        }
        _logger.LogInformation($"轉(zhuǎn)出子事務(wù)-成功:{msg}");
        return Ok();
    }

以上代碼中有幾點需要額外注意:

  1. 使用Saga模式辱挥,必須開啟子事務(wù)屏障:_barrierFactory.CreateBranchBarrier(Request.Query),其中Request.Query中的參數(shù)由DTM 生成梦抢,類似:?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga般贼,主要包含四個參數(shù):
    1. gid:全局事務(wù)Id
    2. trans_type:事務(wù)類型,是saga奥吩、msg哼蛆、xa或者是tcc。
    3. branch_id:子事務(wù)的Id
    4. op:當前操作霞赫,對于Saga事務(wù)模式腮介,要么為action(正向操作),要么為compensate(補償操作)端衰。
  2. 必須在子事務(wù)屏障內(nèi)執(zhí)行事務(wù)操作:branchBarrier.Call(conn, async (tx) =>{}
  3. 對于Saga正向操作而言叠洗,業(yè)務(wù)上的失敗與異常是需要做嚴格區(qū)分的,例如前面的余額不足旅东,是業(yè)務(wù)上的失敗灭抑,必須回滾。而對于網(wǎng)絡(luò)抖動等其他外界原因?qū)е碌氖聞?wù)失敗抵代,屬于業(yè)務(wù)異常腾节,則需要重試。因此若因業(yè)務(wù)失敗(這里是賬戶不存在或余額不足)而導(dǎo)致子事務(wù)失敗案腺,則必須通過拋異常的方式并返回**409**狀態(tài)碼以告知DTM 子事務(wù)失敗庆冕。
  4. 以上通過拋出異常的方式中斷子事務(wù)執(zhí)行并在外圍捕獲特定異常返回409狀態(tài)碼。在外圍捕獲異常時切忌放大異常捕獲劈榨,比如直接catch(Exception)访递,如此會捕獲由于網(wǎng)絡(luò)等其他原因?qū)е碌漠惓#鴮?dǎo)致DTM 不再自動處理該異常同辣,比如業(yè)務(wù)異常時的自動重試拷姿。

轉(zhuǎn)出補償子事務(wù)(TransferOut_Compensate)

轉(zhuǎn)出補償,就是回滾轉(zhuǎn)出操作邑闺,進行賬戶余額歸還跌前,實現(xiàn)如下:

    [HttpPost("TransferOut_Compensate")]
    public async Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request)
    {
        var msg = $"用戶{request.UserId}回滾轉(zhuǎn)出{request.Amount}元";
        _logger.LogInformation($"轉(zhuǎn)出補償子事務(wù)-啟動:{msg}");
        // 1. 創(chuàng)建子事務(wù)屏障
        var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        using (var conn = _context.Database.GetDbConnection())
        {
            // 在子事務(wù)屏障內(nèi)執(zhí)行事務(wù)操作
            await branchBarrier.Call(conn, async (tx) =>
            {
                _logger.LogInformation($"轉(zhuǎn)出補償子事務(wù)-執(zhí)行:{msg}");
                await _context.Database.UseTransactionAsync(tx);
                var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                if (bankAccount == null)
                    return; //對于補償操作,可直接返回陡舅,中斷后續(xù)操作
                bankAccount.Balance += request.Amount;
                await _context.SaveChangesAsync();
            });
        }
        _logger.LogInformation($"轉(zhuǎn)出補償子事務(wù)-成功!");
        // 2. 因補償操作必須成功伴挚,所以必須返回200靶衍。
        return Ok();
    }

由于DTM設(shè)計為總是執(zhí)行補償,也就是說即使正向操作子事務(wù)失敗時茎芋,DTM 仍舊會執(zhí)行補償邏輯颅眶。但子事務(wù)屏障會在執(zhí)行時判斷正向操作的執(zhí)行狀態(tài),當子事務(wù)失敗時田弥,并不會執(zhí)行補償邏輯涛酗。
另外DTM的補償操作,是要求最終成功的偷厦,只要還沒成功商叹,就會不斷進行重試,直到成功只泼。因此在補償子事務(wù)中剖笙,即使補償子事務(wù)中出現(xiàn)業(yè)務(wù)失敗時,也必須返回**200**请唱。因此當出現(xiàn)bankAccount==null時可以直接 return弥咪。

轉(zhuǎn)入子事務(wù)(TransferIn)

轉(zhuǎn)入子事務(wù)和轉(zhuǎn)出子事務(wù)的實現(xiàn)基本類似,都是開啟子事務(wù)屏障后十绑,在branchBarrier.Call(conn, async tx => {}中實現(xiàn)事務(wù)邏輯聚至,并通過拋異常的方式并最終返回409狀態(tài)碼來顯式告知DTM 子事務(wù)執(zhí)行失敗。

    [HttpPost("TransferIn")]
    public async Task<IActionResult> TransferIn([FromBody] TransferRequest request)
    {
        var msg = $"用戶{request.UserId}轉(zhuǎn)入{request.Amount}元";
        _logger.LogInformation($"轉(zhuǎn)入子事務(wù)-啟動:{msg}");
        var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        try
        {
            using (var conn = _context.Database.GetDbConnection())
            {
                await branchBarrier.Call(conn, async (tx) =>
                {
                    _logger.LogInformation($"轉(zhuǎn)入子事務(wù)-執(zhí)行:{msg}");
                    await _context.Database.UseTransactionAsync(tx);
                    var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                    if (bankAccount == null)
                        throw new InvalidDataException("賬戶不存在本橙!");
                    bankAccount.Balance += request.Amount;
                    await _context.SaveChangesAsync();
                });
            }
        }
        catch (InvalidDataException ex)
        {
            _logger.LogInformation($"轉(zhuǎn)入子事務(wù)-失敯夤:{ex.Message}");
            return new StatusCodeResult(StatusCodes.Status409Conflict);
        }
        _logger.LogInformation($"轉(zhuǎn)入子事務(wù)-成功:{msg}");
        return Ok();
    }

轉(zhuǎn)入補償子事務(wù)(TransferIn_Compensate)

轉(zhuǎn)入補償子事務(wù)和轉(zhuǎn)出補償子事務(wù)的實現(xiàn)也基本類似,都是開啟子事務(wù)屏障后,在branchBarrier.Call(conn, async tx => {}中實現(xiàn)事務(wù)邏輯坦报,并最終返回200狀態(tài)碼來告知DTM 補償子事務(wù)執(zhí)行成功库说。

    [HttpPost("TransferIn_Compensate")]
    public async Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request)
    {
        var msg = "用戶{request.UserId}回滾轉(zhuǎn)入{request.Amount}元";
        _logger.LogInformation($"轉(zhuǎn)入補償子事務(wù)-啟動:{msg}");
        var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
        using (var conn = _context.Database.GetDbConnection())
        {
            await branchBarrier.Call(conn, async (tx) =>
            {
                _logger.LogInformation($"轉(zhuǎn)入補償子事務(wù)-執(zhí)行:{msg}");
                await _context.Database.UseTransactionAsync(tx);
                var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
                if (bankAccount == null) return;
                bankAccount.Balance -= request.Amount;
                await _context.SaveChangesAsync();
            });
        }
        _logger.LogInformation($"轉(zhuǎn)入補償子事務(wù)-成功!");
        return Ok();
    }

編排Saga事務(wù)

拆分完子事務(wù)片择,最后就可以進行Saga事務(wù)編排了潜的,其代碼如下所示:

    [HttpPost("Transfer")]
    public async Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,
        CancellationToken cancellationToken)
    {
        try
        {
            _logger.LogInformation($"轉(zhuǎn)賬事務(wù)-啟動:用戶{fromUserId}轉(zhuǎn)賬{amount}元到用戶{toUserId}");
            //1. 生成全局事務(wù)ID
            var gid = await _dtmClient.GenGid(cancellationToken);
            var bizUrl = _configuration.GetValue<string>("TransferBaseURL");
            //2. 創(chuàng)建Saga
            var saga = _transFactory.NewSaga(gid);
            //3. 添加子事務(wù)
            saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",
                    new TransferRequest(fromUserId, amount))
                .Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",
                    new TransferRequest(toUserId, amount))
                .EnableWaitResult(); // 4. 按需啟用是否等待事務(wù)執(zhí)行結(jié)果

            //5. 提交Saga事務(wù)
            await saga.Submit(cancellationToken);
        }
        catch (DtmException ex) // 6. 如果開啟了`EnableWaitResult()`,則可通過捕獲異常的方式字管,捕獲事務(wù)失敗的結(jié)果啰挪。
        {
            _logger.LogError($"轉(zhuǎn)賬事務(wù)-失敗:用戶{fromUserId}轉(zhuǎn)賬{amount}元到用戶{toUserId}失敵笆濉亡呵!");
            return new BadRequestObjectResult($"轉(zhuǎn)賬失敗:{ex.Message}");
        }

        _logger.LogError($"轉(zhuǎn)賬事務(wù)-完成:用戶{fromUserId}轉(zhuǎn)賬{amount}元到用戶{toUserId}成功!");
        return Ok($"轉(zhuǎn)賬事務(wù)-完成:用戶{fromUserId}轉(zhuǎn)賬{amount}元到用戶{toUserId}成功硫戈!");
    }

主要步驟如下:

  1. 生成全局事務(wù)Id:var gid =await _dtmClient.GenGid(cancellationToken);
  2. 創(chuàng)建Saga全局事務(wù):_transFactory.NewSaga(gid);
  3. 添加子事務(wù):saga.Add(string action, string compensate, object postData);包含正向和反向子事務(wù)锰什。
  4. 如果依賴事務(wù)執(zhí)行結(jié)果,可通過EnableWaitResult()開啟事務(wù)結(jié)果等待丁逝。
  5. 提交Saga全局事務(wù):saga.Submit(cancellationToken);
  6. 若開啟了事務(wù)結(jié)果等待汁胆,可以通過try...catch..來捕獲DtmExcepiton異常來獲取事務(wù)執(zhí)行異常信息。

運行項目

既然DTM作為一個獨立的服務(wù)存在霜幼,其負責通過HTTPgRPC協(xié)議發(fā)起子事務(wù)的調(diào)用嫩码,因此首先需要啟動一個DTM實例,又由于本項目依賴MySQL罪既,因此我們采用Docker Compose的方式來啟動項目铸题。在Visual Studio中通過右鍵項目->Add->Docker Support->Linux 即可添加Dockerfile如下所示:

FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443

FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
WORKDIR /src
COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"]
RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj"
COPY . .
WORKDIR "/src/DtmDemo.WebApi"
RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"]

在Visual Studio中通過右鍵項目->Add Container Orchestrator Support->Docker Compose即可添加docker-compose.yml,由于整個項目依賴mysqlDTM琢感,修改docker-compose.yml如下所示丢间,其中定義了三個服務(wù):db,dtm和dtmdemo.webapi猩谊。

version: '3.4'
services:
  db:
    image: 'mysql:5.7'
    container_name: dtm-mysql
    environment:
      MYSQL_ROOT_PASSWORD: 123456 # 指定MySQL初始密碼
    volumes:
      - ./docker/mysql/scripts:/docker-entrypoint-initdb.d  # 掛載用于初始化數(shù)據(jù)庫的腳本
    ports:
      - '3306:3306'
  dtm:
    depends_on: ["db"]
    image: 'yedf/dtm:latest'
    container_name: dtm-svc
    environment:
      IS_DOCKER: '1'
      STORE_DRIVER: mysql  # 指定使用MySQL持久化DTM事務(wù)數(shù)據(jù)
      STORE_HOST: db   # 指定MySQL服務(wù)名千劈,這里是db
      STORE_USER: root
      STORE_PASSWORD: '123456'
      STORE_PORT: 3306
      STORE_DB: "dtm" # 指定DTM 數(shù)據(jù)庫名
    ports:
      - '36789:36789' # DTM HTTP 端口
      - '36790:36790' # DTM gRPC 端口
  dtmdemo.webapi:
    depends_on: ["dtm", "db"]
    image: ${DOCKER_REGISTRY-}dtmdemowebapi
    environment:
      ASPNETCORE_ENVIRONMENT: docker # 設(shè)定啟動環(huán)境為docker
    container_name: dtm-webapi-demo
    build:
      context: .
      dockerfile: DtmDemo.WebApi/Dockerfile
    ports:
      - '31293:80'   # 映射Demo:80端口到本地31293端口
      - '31294:443'  # 映射Demo:443端口到本地31294端口

其中dtmdemo.webapi服務(wù)通過ASPNETCORE_ENVIRONMENT: docker指定啟動環(huán)境為docker,因此需要在項目下添加appsettings.docker.json以配置應(yīng)用參數(shù):

{
  "ConnectionStrings": {
    "DtmDemoWebApiContext": "Server=db;port=3306;database=dtm_barrier;user id=root;password=123456;AllowLoadLocalInfile=true"
  },
  "TransferBaseURL": "http://dtmdemo.webapi/api/SagaDemo",
  "dtm": {
    "DtmUrl": "http://dtm:36789",
    "DtmTimeout": 10000,
    "BranchTimeout": 10000,
    "DBType": "mysql",
    "BarrierTableName": "dtm_barrier.barrier"
  }
}

另外db服務(wù)中通過volumes: ["./docker/mysql/scripts:/docker-entrypoint-initdb.d"]來掛載初始化腳本牌捷,以創(chuàng)建DTM依賴的MySQL 存儲數(shù)據(jù)庫dtm和示例項目使用子事務(wù)屏障需要的barrier數(shù)據(jù)表墙牌。腳本如下:

CREATE DATABASE IF NOT EXISTS dtm
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table IF EXISTS dtm.trans_global;
CREATE TABLE if not EXISTS dtm.trans_global (
  `id` bigint(22) NOT NULL AUTO_INCREMENT,
  `gid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg',
  `status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',
  `query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow',
  `protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `finish_time` datetime DEFAULT NULL,
  `rollback_time` datetime DEFAULT NULL,
  `options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout',
  `custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction',
  `next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',
  `next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',
  `owner` varchar(128) not null default '' comment 'who is locking this trans',
  `ext_data` TEXT comment 'result for this trans. currently used in workflow pattern',
  `result` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
  `rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
  PRIMARY KEY (`id`),
  UNIQUE KEY `gid` (`gid`),
  key `owner`(`owner`),
  key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
drop table IF EXISTS dtm.trans_branch_op;
CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
  `id` bigint(22) NOT NULL AUTO_INCREMENT,
  `gid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `url` varchar(1024) NOT NULL COMMENT 'the url of this op',
  `data` TEXT COMMENT 'request body, depreceated',
  `bin_data` BLOB COMMENT 'request body',
  `branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID',
  `op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',
  `status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed',
  `finish_time` datetime DEFAULT NULL,
  `rollback_time` datetime DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
drop table IF EXISTS dtm.kv;
CREATE TABLE IF NOT EXISTS dtm.kv (
  `id` bigint(22) NOT NULL AUTO_INCREMENT,
  `cat` varchar(45) NOT NULL COMMENT 'the category of this data',
  `k` varchar(128) NOT NULL,
  `v` TEXT,
  `version` bigint(22) default 1 COMMENT 'version of the value',
  create_time datetime default NULL,
  update_time datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE key `uniq_k`(`cat`, `k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;

create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
  id bigint(22) PRIMARY KEY AUTO_INCREMENT,
  trans_type varchar(45) default '',
  gid varchar(128) default '',
  branch_id varchar(128) default '',
  op varchar(45) default '',
  barrier_id varchar(45) default '',
  reason varchar(45) default '' comment 'the branch type who insert this record',
  create_time datetime DEFAULT now(),
  update_time datetime DEFAULT now(),
  key(create_time),
  key(update_time),
  UNIQUE key(gid, branch_id, op, barrier_id)
);

準備完畢,即可通過點擊Visual Studio工具欄的Docker Compose的啟動按鈕暗甥,啟動后可以在Containers窗口看到啟動了dtm-mysql喜滨、dtm-svcdtm-webapi-demo三個容器,并在瀏覽器中打開了 http://localhost:31293/swagger/index.html Swagger 網(wǎng)頁撤防。該種方式啟動項目是支持斷點調(diào)試項目虽风,如下圖所示:

通過BankAccouts控制器的POST接口,初始化用戶1和用戶2各100元。再通過SagaDemo控制器的/api/Transfer接口辜膝,進行Saga事務(wù)測試无牵。

  1. 用戶1轉(zhuǎn)賬10元到用戶2

由于用戶1和用戶2已存在,且用戶1余額足夠厂抖, 因此該筆轉(zhuǎn)賬合法因此會成功茎毁,其執(zhí)行路徑為:轉(zhuǎn)出(成功)->轉(zhuǎn)入(成功)-> 事務(wù)完成,執(zhí)行日志如下圖所示:

  1. 用戶3轉(zhuǎn)賬10元到用戶1

由于用戶3不存在忱辅,因此執(zhí)行路徑為:轉(zhuǎn)出(失斊咧)->轉(zhuǎn)出補償(成功)->事務(wù)完成。從下圖的執(zhí)行日志可以看出墙懂,轉(zhuǎn)出子事務(wù)失敗橡卤,還是會調(diào)用對應(yīng)的轉(zhuǎn)出補償操作,但子事務(wù)屏障會過進行過濾损搬,因此實際上并不會執(zhí)行真正的轉(zhuǎn)出補償邏輯碧库,其中紅線框住的部分就是證明。

  1. 用戶1轉(zhuǎn)賬10元到用戶3

由于用戶3不存在场躯,因此執(zhí)行路徑為:轉(zhuǎn)出(成功)->轉(zhuǎn)入(失斕肝)->轉(zhuǎn)入補償(成功)->轉(zhuǎn)出補償(成功)->事務(wù)完成。從下圖的執(zhí)行日志可以看出踢关,轉(zhuǎn)入子事務(wù)失敗,還是會調(diào)用對應(yīng)的轉(zhuǎn)入補償操作粘茄,但子事務(wù)屏障會過進行過濾签舞,因此實際上并不會執(zhí)行真正的轉(zhuǎn)入補償邏輯,其中紅線框住的部分就是證明柒瓣。

子事務(wù)屏障

在以上的示例中儒搭,重復(fù)提及子事務(wù)屏障,那子事務(wù)屏障具體是什么芙贫,這里有必要重點說明下搂鲫。以上面用戶1轉(zhuǎn)賬10元到用戶3為例,整個事務(wù)流轉(zhuǎn)過程中磺平,即轉(zhuǎn)出(成功)->轉(zhuǎn)入(失敾耆浴)->轉(zhuǎn)入補償(成功)->轉(zhuǎn)出補償(成功)->事務(wù)完成。
在提交事務(wù)之后拣挪,首先是全局事務(wù)的落庫擦酌,主要由DTM 服務(wù)負責,主要包括兩張表:trans_globaltrans_branch_op菠劝,DTM 依此進行子事務(wù)分支的協(xié)調(diào)赊舶。其中trans_global會插入一條全局事務(wù)記錄,用于記錄全局事務(wù)的狀態(tài)信息,如下圖1所示笼平。trans_branch_op表為trans_global的子表园骆,記錄四條子事務(wù)分支數(shù)據(jù),如下圖2所示:

具體的服務(wù)再接收到來自Dtm的子事務(wù)分支調(diào)用時寓调,每次都會往子事務(wù)屏障表barrier中插入一條數(shù)據(jù)锌唾,如下圖所示。業(yè)務(wù)服務(wù)就是依賴此表來完成子事務(wù)的控制捶牢。

而子事務(wù)屏障的核心就是子事務(wù)屏障表唯一鍵的設(shè)計鸠珠,以gidbranch_id秋麸、opbarrier_id為唯一索引渐排,利用唯一索引,“以改代查”來避免競態(tài)條件灸蟆。在跨行轉(zhuǎn)賬的Saga示例中驯耻,子事務(wù)分支的執(zhí)行步驟如下所示:

  1. 開啟本地事務(wù)
  2. 對于當前操作op(action|compensate),使用inster ignore into barrier(trans_type, gid, branch_id, op, barrier_id, reason)向子事務(wù)屏障表插入一條數(shù)據(jù)炒考,有幾種情況:
    1. 插入成功且影響條數(shù)大于0可缚,則繼續(xù)向下執(zhí)行。
    2. 插入成功但影響條數(shù)等于0斋枢,說明觸發(fā)唯一鍵約束帘靡,此時會進行空補償、懸掛和重復(fù)請求判斷瓤帚,若是則直接返回描姚,跳過后續(xù)子事務(wù)分支邏輯的執(zhí)行。
  3. 第2步插入成功戈次,則可以繼續(xù)執(zhí)行子事務(wù)分支邏輯轩勘,執(zhí)行業(yè)務(wù)數(shù)據(jù)表操作,結(jié)果分兩種請求
    1. 子事務(wù)成功怯邪,子事務(wù)屏障表操作和業(yè)務(wù)數(shù)據(jù)表操作由于共享同一個本地事務(wù)绊寻,提交本地事務(wù),因此可實現(xiàn)強一致性悬秉,當前子事務(wù)分支完成澄步。
    2. 子事務(wù)失敗,回滾本地事務(wù)

每個子事務(wù)分支通過以上步驟搂捧,即可實現(xiàn)下圖的效果:

小結(jié)

本文主要介紹了DTM的Saga模式的應(yīng)用驮俗,基于DTM 首創(chuàng)的子事務(wù)屏障技術(shù),使得開發(fā)者基于DTM 提供的SDK能夠輕松開發(fā)出更可靠的分布式應(yīng)用允跑,徹底將開發(fā)人員從網(wǎng)絡(luò)異常的處理中解放出來王凑,再也不用擔心空補償搪柑、防懸掛、冪等等分布式問題索烹。如果要進行分布式事務(wù)框架的選型工碾,DTM 將是不二之選。

本文由mdnice多平臺發(fā)布

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末百姓,一起剝皮案震驚了整個濱河市渊额,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌垒拢,老刑警劉巖旬迹,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異求类,居然都是意外死亡奔垦,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進店門尸疆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來椿猎,“玉大人,你說我怎么就攤上這事寿弱》该撸” “怎么了?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵症革,是天一觀的道長筐咧。 經(jīng)常有香客問我,道長噪矛,這世上最難降的妖魔是什么嗜浮? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮摩疑,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘畏铆。我一直安慰自己雷袋,他們只是感情好,可當我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布辞居。 她就那樣靜靜地躺著楷怒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪瓦灶。 梳的紋絲不亂的頭發(fā)上鸠删,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天,我揣著相機與錄音贼陶,去河邊找鬼刃泡。 笑死巧娱,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的烘贴。 我是一名探鬼主播禁添,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼桨踪!你這毒婦竟也來了老翘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤锻离,失蹤者是張志新(化名)和其女友劉穎铺峭,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體汽纠,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡卫键,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了疏虫。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片永罚。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖卧秘,靈堂內(nèi)的尸體忽然破棺而出呢袱,到底是詐尸還是另有隱情,我是刑警寧澤翅敌,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布羞福,位于F島的核電站,受9級特大地震影響蚯涮,放射性物質(zhì)發(fā)生泄漏治专。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一遭顶、第九天 我趴在偏房一處隱蔽的房頂上張望张峰。 院中可真熱鬧,春花似錦棒旗、人聲如沸喘批。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽饶深。三九已至,卻和暖如春逛拱,著一層夾襖步出監(jiān)牢的瞬間敌厘,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工朽合, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留俱两,地道東北人饱狂。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像锋华,于是被迫代替她去往敵國和親嗡官。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容