4.2 开始Saga事务
...大约 3 分钟
安装依赖包
- 提前准备:安装并启动Consul、RabbitMQ(可配置不启用)、数据库支持(SqlServer、Oracle、MySql、PostgreSql、Sqlite)
dotnet add package Wing.Saga.Client
dotnet add package Wing.RabbitMQ
dotnet add package Wing.Consul
dotnet add package Wing.SqlServer(可选Wing.MySql/Wing.Oracle/Wing.PostgreSQL)
Install-Package Wing.Saga.Client
Install-Package Wing.RabbitMQ
Install-Package Wing.Consul
dotnet add package Wing.SqlServer(可选Wing.MySql/Wing.Oracle/Wing.PostgreSQL)
事务单元模型
数据传输实体,需要继承基类事务单元模型Wing.Saga.Client.UnitModel
,并标记为可序列化。
[Serializable]
public class SampleUnitModel : UnitModel
{
public string HelloName { get; set; }
}
[Serializable]
public class UnitModel
{
// 事务名称,必填
public string Name { get; set; }
// 描述
public string Description { get; set; }
}
事务单元
子事务由一个个事务单元构成,子事务需要继承基类事务单元Wing.Saga.Client. SagaUnit<>
,Commit
:事务成功执行提交方法,Cancel
:事务执行失败补偿方法。
public class SampleSagaUnit1 : SagaUnit<SampleUnitModel>
{
public override Task<SagaResult> Cancel(SampleUnitModel model, SagaResult previousResult)
{
return Task.FromResult(new SagaResult { Success = true });
}
public override Task<SagaResult> Commit(SampleUnitModel model, SagaResult previousResult)
{
return Task.FromResult(new SagaResult { Success = true });
}
}
public class SagaResult
{
// 事务执行结果
public bool Success { get; set; }
// 消息提示
public string Msg { get; set; }
// 返回数据
public dynamic Data { get; set; }
}
配置
Saga:UseEventBus
是否启用事件总线,如果启用,需要安装包Wing.RabbitMQ
{
"ConnectionStrings": {
"Wing": "Data Source=192.168.56.96;User Id=sa;Password=wing123.;Initial Catalog=Wing;TrustServerCertificate=true;Pooling=true;Min Pool Size=1"
},
"Saga": {
"UseEventBus": false
}
}
完整事务
Start
:开始事务,给定事务名称,定义事务执行策略
Then
:执行子事务,定义子事务名称和传参
End
:事务结束
[HttpGet]
public string Get(string name)
{
Saga.Start("Saga分布式事务样例", new SagaOptions { TranPolicy = TranPolicy.Backward })
.Then(new SampleSagaUnit1(), new SampleUnitModel { Name = "事务单元1", HelloName = name })
.Then(new SampleSagaUnit2(), new SampleUnitModel { Name = "事务单元2", HelloName = name })
.Then(new SampleSagaUnit3(), new SampleUnitModel { Name = "事务单元3", HelloName = name })
.End();
return $"Hello {name}";
}
public class SagaOptions
{
/// <summary>
/// 事务执行策略
/// </summary>
public TranPolicy TranPolicy { get; set; }
/// <summary>
/// 熔断条件(重试指定次数失败后,则不再重试)
/// </summary>
public int BreakerCount { get; set; }
/// <summary>
/// 自定义策略条件(向前恢复指定次数,失败则向后恢复)
/// </summary>
public int CustomCount { get; set; }
/// <summary>
/// 事务描述
/// </summary>
public string Description { get; set; }
}
public enum TranPolicy : int
{
/// <summary>
/// 向前恢复
/// </summary>
Forward = 0,
/// <summary>
/// 向后恢复
/// </summary>
Backward = 1,
/// <summary>
/// 先前再后(向前恢复指定次数,如果失败,则向后恢复)
/// </summary>
Custom = 2
}
Program代码
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
}).AddWing(builder => builder.AddConsul());
using Wing;
var builder = WebApplication.CreateBuilder(args);
builder.Host.AddWing(builder => builder.AddConsul());
// Add services to the container.
builder.Services.AddControllers();
// 启用EventBus
// builder.Services.AddWing().AddSaga().AddEventBus().AddJwt();
// 不启用EventBus
builder.Services.AddWing().AddPersistence().AddSaga().AddJwt();
var app = builder.Build();
// Configure the HTTP request pipeline.
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
Startup代码
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
// 启用EventBus
// services.AddWing().AddSaga().AddEventBus().AddJwt();
// 不启用EventBus
services.AddWing().AddPersistence().AddSaga().AddJwt();
}
事务重试
此接口由事务协调器定时调用触发,定时回滚失败的事务或者重试向前执行事务。
http接口,示例4.2-1
(点击查看完整示例代码)
[ApiController]
[Route("Wing/Saga/[controller]/[action]")]
[Authorize]
public class TranRetryController : ControllerBase
{
private readonly ITranRetryService _tranRetryService;
public TranRetryController(ITranRetryService tranRetryService)
{
_tranRetryService = tranRetryService;
}
public Task<ResponseData> Commit(RetryData retryData)
{
return _tranRetryService.Commit(retryData);
}
public Task<ResponseData> Cancel(RetryData retryData)
{
return _tranRetryService.Cancel(retryData);
}
}
grpc接口,示例4.2-2
(点击查看完整示例代码)
[Authorize]
public class MyTranRetryService : TranRetryGrpcService
{
public MyTranRetryService(ITranRetryService tranRetryService) : base(tranRetryService)
{
}
}
查看运行结果
运行示例4.3
、4.2-1
,浏览器访问 http://localhost:4211/weatherforecast ,可以看到执行的事务Saga-Wing.Demo_4.2.1
,如下图: