Skip to content

zjn-zjn/fisher

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

45 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Fisher

Go Reference License

简介

Fisher 是一个高可用的分布式资产转移系统,基于本地事务+状态表模式,实现N个账户扣除资产,M个账户接收的操作。系统保证资产转移的一致性和可追溯性,适用于各类需要跨账户资产转移的业务场景。

特色

  • 多商户隔离设计:支持多商户(mid)数据隔离,每个商户独立管理账户和官方账户
  • 官方账户负数设计:官方账户使用负数ID(如 -1, -10),与正数用户账户天然隔离,允许透支
  • 完美的零和对账系统:任何时刻系统中所有账户余额之和严格为0,这一数学特性提供了强大的自检机制
  • 热点账户避免机制:官方账户采用区间设计,交易时动态分散,有效避免热点账户问题
  • 半成功状态支持:源账户扣减完成即视为半成功,目标账户增加操作可持续推进
  • 分库分表支持:内置分库分表能力,支持跨库事务处理
  • 自动恢复机制:提供自检和自动推进功能,确保系统最终一致性

架构原理

Fisher 采用基于SAGA模式的分布式事务实现,通过"本地事务+补偿事务"的组合确保跨账户资产转移的一致性:

  1. 资产扣减阶段:首先执行源账户资产扣减的本地事务
  2. 资产增加阶段:执行目标账户资产增加的本地事务
  3. 状态管理阶段:根据执行结果更新转移状态为成功、半成功或失败
  4. 补偿机制
    • 如果任一阶段失败,则触发对应的补偿事务,恢复系统一致性
    • 对于半成功状态,系统会持续推进资产增加操作,不执行补偿
    • 提供自动检查机制,确保长时间未完成的转移被适当处理

使用场景

  • 充值场景: 官方账户(-1)扣减,用户账户增加
  • 买卖场景: A账户扣减,B账户增加,C账户收取分成,官方账户收取手续费
  • 合买场景: A账户和B账户扣减,C账户收取款项,官方账户收取手续费

系统架构

系统采用三层架构:

  • Model层:定义数据模型和请求结构
  • DAO层:负责数据访问和事务处理
  • Service层:实现业务逻辑和交易流程

数据模型

系统使用五张核心表:

  • merchant表:商户信息
  • official_account表:官方账户定义(负数区间)
  • state表:记录转移状态和过程
  • record表:记录具体的转移记录和补偿操作
  • account表:记录账户资产信息和余额变更

状态流转

转移操作经历以下状态流转:

  1. StateStatusDoing (1):转移进行中,执行源账户资产扣减和目标账户增加操作
  2. StateStatusRollbackDoing (2):回滚进行中,执行补偿操作恢复账户状态
  3. StateStatusHalfSuccess (3):半成功状态,源账户扣减成功,等待目标账户增加
  4. StateStatusSuccess (4):转移成功,所有源账户扣减和目标账户增加均完成
  5. StateStatusRollbackDone (5):回滚完成,所有资产变更已撤销

记录状态定义:

  1. RecordStatusNormal (1):正常记录状态
  2. RecordStatusRollback (2):回滚记录状态
  3. RecordStatusEmptyRollback (3):空回滚记录状态(未执行原操作的回滚)

金额方向定义:

  1. DirectionAdd (1):增加
  2. DirectionDeduct (2):扣减

安装与依赖

依赖

  • Go 1.22+
  • MySQL 5.7+

安装

go get github.com/zjn-zjn/fisher

快速使用

表结构

数据库表结构定义在 ddl.sql 文件中。

初始化

两种初始化方式:

// 方式一:使用默认配置初始化(单表)
err := basic.InitWithDefault(dbs []*gorm.DB)

// 方式二:使用自定义配置初始化
err := basic.InitWithConf(&basic.TransferConf{
    DBs:             dbs,  // 数据库列表
    StateSplitNum:   3,    // 转移状态分表数量
    RecordSplitNum:  3,    // 转移记录分表数量
    AccountSplitNum: 3,    // 账户分表数量
})

// 初始化官方账户缓存(可选,用于预热)
dao.InitOfficialAccountCache(ctx)

⚠️ 重要提示:数据库连接参数

禁止在数据库连接字符串中使用 clientFoundRows=true 参数!

Fisher 大量使用 RowsAffected 来判断数据库操作是否成功(如状态更新、记录更新等)。 clientFoundRows=true 会改变 MySQL 的 RowsAffected 返回行为(返回匹配行数而非实际修改行数), 导致业务逻辑判断错误。

// ❌ 错误
dsn := "user:pass@tcp(host:3306)/db?clientFoundRows=true"

// ✅ 正确
dsn := "user:pass@tcp(host:3306)/db"

创建商户和官方账户

// 创建商户
merchant := &model.Merchant{
    ID:     1,
    Name:   "测试商户",
    Status: model.MerchantStatusNormal,
}
dao.CreateMerchant(ctx, merchant)

// 创建官方账户(区间长度10,对应 -1 ~ -10)
account, err := service.CreateOfficialAccount(ctx, &service.CreateOfficialAccountRequest{
    Mid:         1,
    Name:        "官方充值账户",
    Length:      10,  // 区间长度,1表示单个账户
    Description: "用于充值的官方账户",
})
// 创建后:account.OfficeID = -10, 区间范围 [-10, -1]

使用示例

以下是一个完整的资产转移示例:

// 定义常量
const (
    ItemTypeGold basic.ItemType = 1  // 资产类型:金币
    
    TransferSceneBuyGoods        basic.TransferScene = 1  // 转账场景:购买商品
    ChangeTypeSpend              basic.ChangeType    = 1  // 变更类型:消费支出
    ChangeTypeSellGoodsIncome    basic.ChangeType    = 2  // 变更类型:商品销售收入
    ChangeTypeSellGoodsCopyright basic.ChangeType    = 3  // 变更类型:版权分成收入
)

// 执行资产转移
ctx := context.Background()
mid := int64(1)                           // 商户ID
buyerAccountId := int64(100000000001)     // 买家账户ID
sellerAccountId := int64(100000000002)    // 卖家账户ID

err := service.Transfer(ctx, &model.TransferReq{
    Mid:            mid,                      // 商户ID
    TransferId:     12345,                    // 转移ID(业务方保证mid下唯一)
    TransferScene:  TransferSceneBuyGoods,    // 转账场景
    UseHalfSuccess: true,                     // 启用半成功机制
    Extra:          "购买数字商品",             // 扩展信息
    
    // 资金来源账户
    FromAccounts: []*model.TransferItem{
        {
            AccountId:  -1,                   // 官方账户(负数,会自动离散到区间内)
            ItemType:   ItemTypeGold,
            Amount:     100,
            ChangeType: ChangeTypeSpend,
            Extra:      "官方充值支出",
        },
    },
    
    // 资金目标账户
    ToAccounts: []*model.TransferItem{
        {
            AccountId:  sellerAccountId,
            ItemType:   ItemTypeGold,
            Amount:     90,
            ChangeType: ChangeTypeSellGoodsIncome,
            Extra:      "商品销售收入",
        },
        {
            AccountId:  buyerAccountId,
            ItemType:   ItemTypeGold,
            Amount:     10,
            ChangeType: ChangeTypeSellGoodsCopyright,
            Extra:      "版权分成收入",
        },
    },
})

if err != nil {
    log.Println("转账失败:", err)
    return
}
log.Println("转账成功")

官方账户说明

官方账户使用负数ID,支持两种使用方式:

  1. 区间入口ID:传入区间的起始ID(如 -1),系统会自动随机离散到区间内的某个ID
  2. 具体ID:传入区间内的具体ID(如 -5),系统直接使用该ID,不做离散
// 假设官方账户区间为 [-10, -1]

// 方式1:传入 -1(区间入口),系统随机离散到 -1 ~ -10 中的某个值
FromAccounts: []*model.TransferItem{{AccountId: -1, ...}}

// 方式2:传入 -5(具体ID),直接使用 -5
FromAccounts: []*model.TransferItem{{AccountId: -5, ...}}

更多示例请参考:demo_test.go

操作接口详情

Transfer 转移接口

  • 入参
    • req
      • Mid 商户ID
      • TransferId 转移ID(业务方保证mid下唯一)
      • TransferScene 转移场景
      • UseHalfSuccess 是否使用半成功
      • Extra 扩展信息
      • FromAccounts 转移发起者
        • AccountId 发起账户ID(负数为官方账户)
        • ItemType 转移物品类型
        • Amount 扣减数量
        • ChangeType 扣减类型
        • Extra 扩展信息
      • ToAccounts 转移接收者
        • AccountId 接收账户ID
        • ItemType 转移物品类型
        • Amount 增加数量
        • ChangeType 增加类型
        • Extra 扩展信息
  • 返回
    • error 转移错误原因

Rollback 回滚转移

  • 入参
    • req
      • Mid 商户ID
      • TransferId 回滚的转移ID
      • TransferScene 回滚的转移场景
  • 返回
    • error 回滚错误原因

Inspection 检查推进

  • 入参
    • lastTime 这个时间之前的所有历史交易状态检查与推进
  • 返回
    • []error 推进产生错误的列表

最佳实践

  • 唯一性保证:业务方保证 transfer_id 在 mid 下唯一
  • 事务隔离级别:推荐使用数据库事务隔离级别:READ-COMMITTED
  • 定期检查:建议设置定时任务执行 Inspection 接口,处理半成功状态的转移
  • 官方账户管理:合理设置官方账户区间长度,避免热点
  • 异常监控:对系统错误和半成功状态进行监控
  • 分表策略:根据业务量合理配置分表数量

常见错误

错误 说明
InsufficientAmountErr 账户余额不足
AlreadyRolledBackErr 转移已被回滚
StateMutationErr 状态变更错误
MerchantNotFoundErr 商户不存在
OfficialAccountNotFoundErr 官方账户不存在
OfficialAccountDisabledErr 官方账户已禁用

许可证

本项目采用 Apache License 2.0 许可证 - 详见 LICENSE 文件

About

transfer with saga

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages