跳至主要內容

4.2 开始Saga事务

linguicheng...大约 3 分钟

安装依赖包

  • 提前准备:安装并启动Consul、RabbitMQ(可配置不启用)、数据库支持(SqlServer、Oracle、MySql、PostgreSql、Sqlite)
dotnet add package Wing.Saga.Client

dotnet add package Wing.RabbitMQ

dotnet add package Wing.Consul

dotnet add package Wing.SqlServer(可选Wing.MySql/Wing.Oracle/Wing.PostgreSQL)

事务单元模型

数据传输实体,需要继承基类事务单元模型Wing.Saga.Client.UnitModel,并标记为可序列化。

[Serializable]
public class SampleUnitModel : UnitModel
{
    public string HelloName { get; set; }
}

[Serializable]
public class UnitModel
{
    // 事务名称,必填
    public string Name { get; set; }

    // 描述
    public string Description { get; set; }
}

事务单元

子事务由一个个事务单元构成,子事务需要继承基类事务单元Wing.Saga.Client. SagaUnit<>Commit:事务成功执行提交方法,Cancel:事务执行失败补偿方法。

public class SampleSagaUnit1 : SagaUnit<SampleUnitModel>
{
    public override Task<SagaResult> Cancel(SampleUnitModel model, SagaResult previousResult)
    {
        return Task.FromResult(new SagaResult { Success = true });
    }

    public override Task<SagaResult> Commit(SampleUnitModel model, SagaResult previousResult)
    {
        return Task.FromResult(new SagaResult { Success = true });
    }
}

public class SagaResult
{
    // 事务执行结果
    public bool Success { get; set; }

    // 消息提示

    public string Msg { get; set; }
   
    // 返回数据

    public dynamic Data { get; set; }
}

配置

Saga:UseEventBus 是否启用事件总线,如果启用,需要安装包Wing.RabbitMQ

{
    "ConnectionStrings": {
        "Wing": "Data Source=192.168.56.96;User Id=sa;Password=wing123.;Initial Catalog=Wing;TrustServerCertificate=true;Pooling=true;Min Pool Size=1"
    },
    "Saga": {
        "UseEventBus": false
    }
}

完整事务

Start:开始事务,给定事务名称,定义事务执行策略

Then:执行子事务,定义子事务名称和传参

End:事务结束

[HttpGet]
public string Get(string name)
{
    Saga.Start("Saga分布式事务样例", new SagaOptions { TranPolicy = TranPolicy.Backward })
            .Then(new SampleSagaUnit1(), new SampleUnitModel { Name = "事务单元1", HelloName = name })
            .Then(new SampleSagaUnit2(), new SampleUnitModel { Name = "事务单元2", HelloName = name })
            .Then(new SampleSagaUnit3(), new SampleUnitModel { Name = "事务单元3", HelloName = name })
            .End();
    return $"Hello {name}";
}

public class SagaOptions
{
    /// <summary>
    /// 事务执行策略
    /// </summary>
    public TranPolicy TranPolicy { get; set; }
    /// <summary>
    /// 熔断条件(重试指定次数失败后,则不再重试)
    /// </summary>
    public int BreakerCount { get; set; }
    /// <summary>
    /// 自定义策略条件(向前恢复指定次数,失败则向后恢复)
    /// </summary>
    public int CustomCount { get; set; }
    /// <summary>
    /// 事务描述
    /// </summary>
    public string Description { get; set; }
}

public enum TranPolicy : int
{
    /// <summary>
    /// 向前恢复
    /// </summary>
    Forward = 0,
    /// <summary>
    /// 向后恢复
    /// </summary>
    Backward = 1,
    /// <summary>
    /// 先前再后(向前恢复指定次数,如果失败,则向后恢复)
    /// </summary>
    Custom = 2
}

Program代码

public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>();
                }).AddWing(builder => builder.AddConsul());





 

Startup代码

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();
    // 启用EventBus
    // services.AddWing().AddSaga().AddEventBus().AddJwt();
    // 不启用EventBus
    services.AddWing().AddPersistence().AddSaga().AddJwt();
}



 




事务重试

此接口由事务协调器定时调用触发,定时回滚失败的事务或者重试向前执行事务。

http接口,示例4.2-1(点击查看完整示例代码open in new window)

[ApiController]
[Route("Wing/Saga/[controller]/[action]")]
[Authorize]
public class TranRetryController : ControllerBase
{
    private readonly ITranRetryService _tranRetryService;
    public TranRetryController(ITranRetryService tranRetryService)
    {
        _tranRetryService = tranRetryService;
    }

    public Task<ResponseData> Commit(RetryData retryData)
    {
        return  _tranRetryService.Commit(retryData);
    }

    public Task<ResponseData> Cancel(RetryData retryData)
    {
        return _tranRetryService.Cancel(retryData);
    }
}

grpc接口,示例4.2-2(点击查看完整示例代码open in new window)

[Authorize]
public class MyTranRetryService : TranRetryGrpcService
{
    public MyTranRetryService(ITranRetryService tranRetryService) : base(tranRetryService)
    {
    }
}

查看运行结果

运行示例4.34.2-1,浏览器访问 http://localhost:4211/weatherforecastopen in new window ,可以看到执行的事务Saga-Wing.Demo_4.2.1,如下图:

上次编辑于:
贡献者: linguicheng