永续合约 09 - 撮合引擎原理与 Go 实现

本文介绍撮合引擎的原理和实现. 从红黑树, BTree, 跳表的选型对比入手, 用 Go 完整实现一个 Price-Time Priority 撮合引擎, 然后讨论分片, Lock-Free 队列, Event Sourcing 等生产级扩容方案.


一、目录

# 章节 内容
- 术语表 撮合核心, 数据结构, 扩容相关术语
3 撮合引擎架构 整体架构, Price-Time Priority, 核心数据结构需求
4 数据结构选型: 红黑树 vs BTree 有序树需求, 红黑树, BTree, 选型对比
5 Go 实现: 撮合引擎 Order ID, 排序器, 数据结构, 订单簿操作, 撮合算法, 性能分析
6 扩容方案 扩容挑战, 方案对比, 分片 + Event Sourcing, Go 优化, DEX 考量
7 小结 核心要点, 延伸阅读

二、术语表

2.1 撮合核心

术语 英文 含义
撮合引擎 Matching Engine 按规则将买单和卖单配对成交的核心组件
价格-时间优先 Price-Time Priority 撮合规则: 最优价格优先; 同价格下先到的订单优先成交
订单簿 Order Book 维护当前所有未成交订单的数据结构, 分 bid (买) 和 ask (卖) 两侧
价格档位 Price Level 同一价格下所有订单的聚合, 包含该价格的总挂单量
成交记录 Trade / Fill 一次撮合的结果, 包含价格, 数量, 买卖双方信息
吃单 Taker 主动与订单簿中已有订单成交的一方
挂单 Maker 订单进入订单簿等待成交的一方

2.2 数据结构

术语 英文 含义
红黑树 Red-Black Tree 自平衡二叉搜索树, 保证 O(log n) 的插入/删除/查找
B-Tree B-Tree 多路平衡搜索树, 每个节点存多个 key, 对缓存友好
跳表 Skip List 多层链表, 概率平衡, Redis 的有序集合使用此结构
FIFO 队列 FIFO Queue 先进先出队列, 同一价格档位内的订单按时间排序

2.3 扩容

术语 英文 含义
分片 Sharding 按交易对 (symbol) 将订单簿分配到不同引擎实例
无锁队列 Lock-Free Queue 基于 CAS (Compare-And-Swap) 的并发队列, 避免锁竞争
事件溯源 Event Sourcing 所有状态变更以事件流形式记录, 可回放恢复状态
热备 Hot Standby 备用引擎实时同步主引擎状态, 主引擎故障时秒级切换

三、撮合引擎架构

3.1 整体架构

撮合引擎架构 (Matching Engine Architecture) 订单入口 (Order Gateway) REST / gRPC API WebSocket 推送 FIX Protocol (传统金融) P2P Gossip (dYdX) 链上 Tx (Hyperliquid) 排序器 (Sequencer): 按到达时间分配序号 撮合引擎核心 (Matching Engine Core) Order Book (订单簿) Bids (买单) 价格从高到低排列 $3000 × 50 ETH $2999 × 120 ETH $2998 × 200 ETH Asks (卖单) 价格从低到高排列 $3001 × 30 ETH $3002 × 80 ETH $3003 × 150 ETH 撮合算法 Price-Time Priority 价格档位: 红黑树 / BTree 同价订单: FIFO 链表 成交输出 Trade Events 输出层 (Output) 成交推送 (WS/MQ) 行情更新 (L1/L2) 持仓/余额更新 Event Log (审计) CEX 目标延迟: < 1ms | dYdX: ~1-2s (出块) | Hyperliquid: ~200ms (HyperBFT)

3.2 撮合规则: Price-Time Priority

Price-Time Priority (价格-时间优先) 规则 1: 价格优先 (Price Priority) 买单 (Bids): 出价高者优先 Alice: Buy @ $3002 ← 最优先 Bob: Buy @ $3001 Carol: Buy @ $3000 卖单 (Asks): 要价低者优先 Dave: Sell @ $3003 ← 最优先 Eve: Sell @ $3005 规则 2: 时间优先 (Time Priority) 同一价格下, 先到的订单先成交 (FIFO) 价格 $3001 的买单队列: Bob 10:00:01 × 50 ← 先成交 Fan 10:00:03 × 30 ← 后成交 Gina 10:00:05 × 20 ← 最后 Bob 和 Fan 出价一样, 但 Bob 先到 → Bob 先成交 撮合条件 (Match Condition) Best Bid (最高买价) >= Best Ask (最低卖价) → 可以成交! 成交价 = Maker 的挂单价 (谁先挂的, 用谁的价格) 所有主流交易所 (CEX + DEX 订单簿) 使用相同规则: Binance, dYdX, Hyperliquid, NYSE, NASDAQ

3.3 核心数据结构需求

Order Book 核心操作与复杂度要求 # 操作 说明 目标复杂度 频率 1 插入订单 (Insert) 新订单到达, 按价格插入对应档位 O(log n) 极高 (每秒万次) 2 删除订单 (Delete) 订单取消或全部成交, 移除 O(1) 极高 (做市商撤单) 3 查找最优价 (Min/Max) 买方最高价 (max) / 卖方最低价 (min) O(log n) 极高 (每次撮合) 4 遍历价格档 (Traverse) 从最优价开始逐档吃单 (大单跨档) O(k) 中 (大单才跨档) 5 修改订单 (Modify) 改价 = 删除 + 插入 (改价后时间重置) O(log n) 高 (做市商改价) n = 价格档位数 (不是订单数), 典型值: 活跃市场约 1,000 ~ 10,000 档 k = 吃掉的档数, 通常 1~5 (除非极大单) 关键瓶颈: 操作 1+2+3 占 99% 的调用量, 数据结构选型主要优化这三个

为什么选择树 (tree) 而不是数组或哈希表? 从上表可以看出, Order Book 对数据结构有三个同时成立的硬性要求:

  1. 有序 (ordered): 买方需要最高价优先, 卖方需要最低价优先. 撮合的第一步就是找到最优价 — 这要求数据结构天然维护排序, 排除哈希表 (无序).
  2. 插入/删除快 (O(log n)): 活跃市场每秒有成千上万笔订单新增和取消. 排序数组虽然有序, 但插入/删除是 O(n) (需要移动元素), 在万级档位下无法承受.
  3. 可顺序遍历 (traversable): 大单吃穿当前最优价时, 需要从最优价开始逐档向下遍历. 堆 (Heap) 虽然能快速找到最大/最小值, 但不支持按序遍历下一个档位.

同时满足这三个条件的数据结构只有平衡有序树 (BST): 插入/删除/查找均为 O(log n), 天然有序, 支持 in-order 遍历. 红黑树, B-Tree, 跳表 (Skip List) 都属于这一类. 这就是为什么几乎所有撮合引擎的 Order Book 都基于树结构.


四、数据结构选型: 红黑树 vs BTree

4.1 为什么需要有序树?

Order Book 的价格档位必须有序: 买方需要快速找到最高价 (降序), 卖方需要快速找到最低价 (升序). 哪种数据结构能同时满足有序, 快速增删, 可遍历?

候选数据结构对比: 谁能胜任 Order Book? ① 排序数组 (Sorted Array) 2999 3000 3001 3002 ↓ 插入 3005 3008 ... 插入 3002 后, 后续元素全部右移! ✓ 有序, 查找 O(log n) (二分查找) ✗ 插入/删除 O(n) — 移动元素太慢 淘汰 ✗ ② 哈希表 (Hash Map) [0] 3005 [1] 2999 [2] 3008 [3] 3001 [4] ... 最高价是哪个? 散列无序, 必须遍历全部! → 找最优价 O(n), 每次撮合都要扫一遍 ✓ 插入/删除 O(1), 极快 ✗ 无序 — 无法快速找最优价 淘汰 ✗ ③ 堆 (Max-Heap) 3008 3005 3001 2999 3000 堆顶 = 最高价 找到它: O(1)! 但 3005 的 "下一个" 是 3001? 3000? 堆不保证兄弟顺序 → 无法逐档遍历 ✓ 找最优价 O(1), 插入 O(log n) ✗ 不支持按价格顺序遍历 — 大单跨档时无法逐档走 淘汰 ✗ ④ 平衡有序树 (RBTree / BTree / SkipList) 3001 2999 3005 2998 3008 中序遍历 = 价格天然有序! 2998 → 2999 → 3001 → 3005 → 3008 左子树全部 < 根 右子树全部 > 根 ✓ 插入/删除/查找: O(log n) ✓ 找最优价 + 顺序遍历: 全部支持 胜出 ✓ 汇总: Order Book 三大需求 vs 四种候选 数据结构 有序? 插入/删除 找最优价 顺序遍历 结论 排序数组 O(n) ✗ O(1) 淘汰 哈希表 O(1) O(n) ✗ 淘汰 堆 (Heap) 部分 O(log n) O(1) 淘汰 平衡有序树 O(log n) O(log n) 胜出 ✓ 平衡有序树家族: 红黑树 (Red-Black Tree), B-Tree, 跳表 (Skip List) 它们在渐进复杂度上等价, 区别在于: 常数因子, 缓存友好度, 实现复杂度 为什么堆差一点就能用? 堆的 "找最优价 O(1)" 看似诱人, 但 Order Book 需要逐档遍历 (大单跨档吃单), 堆只能拿到堆顶, 拿不到 "第二优" → 出局 为什么哈希表差一点就能用? 哈希表的 "插入 O(1)" 极快, 但每次撮合都要找最优价, O(n) 遍历在每秒万笔订单的高频场景下不可接受 → 出局

4.2 红黑树 (Red-Black Tree)

红黑树 (Red-Black Tree) 结构示意 $3000 $2998 $3002 $2997 $2999 $3001 $3003 = Black 节点 = Red 节点 每个节点 = 一个价格档位, 节点内挂 FIFO 订单队列 找最高买价: 一路往右走 → O(log n) | 找最低卖价: 一路往左走 → O(log n)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
红黑树特点:

结构: 自平衡二叉搜索树, 每个节点有颜色 (红/黑)
平衡保证: 最长路径 ≤ 2 × 最短路径 (通过旋转维护)

优点:
- 严格 O(log n) 所有操作
- 中序遍历天然有序 (从最优价逐档吃单)
- Go 标准库没有, 第三方库维护普遍停滞 (emirpasic/gods 可用)
- 适合频繁插入/删除 (旋转操作最多 3 次)

缺点:
- 每个节点一个 key → 树高较大
- 指针跳转多 → 缓存不友好 (cache miss 多)
- 实现复杂 (旋转 + 重着色逻辑)

4.3 BTree

BTree (degree=4) 结构示意 实际撮合引擎用 degree=64, 这里用 4 方便展示 $2999 $3001 $3003 ← 一个节点存 3 个 key $2997 $2998 $3000 $3002 $3004 $3005 vs 红黑树: 每个节点只有 1 个 key, 10 万档需要 ~17 层 BTree: 每个节点最多 2×degree 个 key, degree=64 时 10 万档只需 ~3 层 缓存优势: 一个节点的 key 连续存储在内存中 CPU 一次 cache line (64 bytes) 可以读取多个 key → 比红黑树逐个指针跳转快得多
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
BTree 特点:

结构: 多路平衡搜索树, 每个节点存多个 key (如 64 个)
平衡保证: 所有叶子在同一层

优点:
- 节点内多个 key 连续存储 → 缓存友好 (一次 cache line 读多个 key)
- 树高更矮 (degree=64 时, 10 万个 key 只需 3 层)
- Go 生态成熟: tidwall/btree 库 (泛型, 活跃维护)
- 批量操作友好 (节点内线性扫描, 利用 CPU 预取)

缺点:
- 节点分裂/合并比红黑树旋转更重 (但频率更低)
- 内存使用可能略高 (节点预分配空间)
- 范围删除需要额外处理

4.4 选型对比

红黑树 vs BTree: 撮合引擎选型 红黑树 (Red-Black Tree) 每个节点: 1 个 key (价格) 树高: ~17 层 (10 万档位) 查找: O(log₂ n) = ~17 次比较 ✓ 插入/删除极快 (最多 3 次旋转) ✓ 最坏情况性能稳定 ✓ 内存精确, 无预分配浪费 ✗ 指针跳转多, cache miss 高 ✗ Go 无标准库实现 ✗ 实现复杂度高 适合: 极端低延迟, 订单变动极频繁的场景 BTree (B-Tree) 每个节点: 最多 64 个 key 树高: ~3 层 (10 万档位, degree=64) 查找: O(log₆₄ n) = ~3 次节点访问 ✓ 缓存友好 (节点内连续内存) ✓ 树高极矮, 实际性能更快 ✓ Go 成熟库: tidwall/btree (泛型) ✗ 节点分裂/合并开销较大 ✗ 内存有预分配浪费 ✗ 并发控制需要额外设计 适合: 通用场景, 缓存命中率优先, 快速开发 本文选型: BTree (tidwall/btree): 缓存友好 + 泛型 + 活跃维护 + 实际基准测试更快
维度 红黑树 BTree (degree=64) 胜出
理论复杂度 O(log₂ n) O(log₆₄ n) BTree (树高更矮)
实际查找性能 ~17 次指针跳转 ~3 次节点访问 BTree (缓存友好)
插入/删除 旋转 (最多 3 次) 节点分裂/合并 红黑树 (略快)
缓存命中率 低 (指针分散) 高 (连续内存) BTree
Go 生态 需自行实现 tidwall/btree (泛型) BTree
内存效率 精确分配 有预分配浪费 红黑树
有序遍历 中序遍历, O(n) 叶节点链式遍历, O(n) 平手

选型结论: 本文使用 BTree (tidwall/btree). 原因:

1
2
3
4
1. 在 Go 中实际基准测试, BTree 通常比红黑树快 2-5x (缓存效应主导)
2. tidwall/btree 支持 Go 泛型, 无需类型断言; 活跃维护 (google/btree 已归档)
3. 原生 Ascend/Descend 范围查询 + Path Hinting, 天然适合 Order Book 的价格区间扫描
4. 红黑树的优势 (频繁单点插入/删除) 在现代 CPU 缓存体系下被 BTree 的局部性碾压

补充: 其他选择

  • 跳表 (Skip List): Redis 的选择, 实现简单, 并发友好 (可以细粒度加锁). 适合需要高并发但不追求极致性能的场景.
  • 数组 + 二分查找: 如果价格档位很少 (< 100), 简单数组可能是最快的, 但不适合活跃市场.
  • 实际生产: Binance 据传使用 LMAX Disruptor 模式 + 自定义内存结构, 不依赖通用树.

五、Go 实现: 撮合引擎

5.1 Order ID 生成

订单 ID 看似简单, 实际是个工程难题: 需要同时满足: 唯一, 有序, 高性能, 分布式安全.

Order ID 生成方案对比 方案 1: 原子自增 atomic.AddUint64(&counter, 1) ✓ 极快 (~1ns), 严格有序 ✓ 单机唯一, 实现最简单 ✗ 多实例会冲突 (需要分段) ✗ 重启后需要恢复计数器 适合: 单机撮合引擎 (大多数场景) 方案 2: Snowflake (雪花算法) 时间戳 + 机器 ID + 序列号 ✓ 分布式唯一, 无需协调 ✓ 时间有序 (同毫秒内序列号递增) ✗ 依赖时钟 (时钟回拨会出问题) ✗ 比纯自增慢 (~10-50ns) 适合: 多机分片, 分布式系统 方案 3: UUID v7 128-bit, 时间有序的 UUID ✓ 全球唯一, 无需中心分配 ✓ 标准化, 跨系统兼容 ✗ 128-bit 太大 (浪费内存/带宽) ✗ 比较操作慢于 uint64 适合: 跨服务追踪, 非热路径 Snowflake ID 位布局 (64-bit) 0 1b Timestamp (毫秒, 41-bit) ~69 年不重复 Machine ID (10-bit) 最多 1024 台机器 Sequence (12-bit) 每毫秒 4096 个 ID
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import (
"sync/atomic"
"time"
)

// ─────────────────────────────────────────────
// 方案 1: 原子自增 (单机撮合引擎推荐)
// ─────────────────────────────────────────────
// 最简单, 最快, 单机场景完全够用.
// 撮合引擎本身就是单线程的, 但 API 层可能多 goroutine 提交订单,
// 所以用 atomic 保证并发安全.

type AtomicIDGen struct {
counter uint64
}

func (g *AtomicIDGen) Next() uint64 {
return atomic.AddUint64(&g.counter, 1)
}

// ─────────────────────────────────────────────
// 方案 2: Snowflake (多机分片推荐)
// ─────────────────────────────────────────────
// Twitter 发明, 64-bit 整数, 天然有序, 分布式唯一.
// 适合多个撮合引擎实例 (按交易对分片) 共存的场景.

const (
epoch = 1700000000000 // 自定义纪元 (ms), 减小时间戳位数
machineBits = 10 // 机器 ID 占 10 位 (最多 1024 台)
sequenceBits = 12 // 序列号占 12 位 (每毫秒 4096 个)
machineShift = sequenceBits // 12
timeShift = machineBits + sequenceBits // 22
sequenceMask = (1 << sequenceBits) - 1 // 0xFFF
)

type Snowflake struct {
machineID uint64
sequence uint64
lastTime int64
}

func NewSnowflake(machineID uint64) *Snowflake {
return &Snowflake{machineID: machineID}
}

func (s *Snowflake) Next() uint64 {
now := time.Now().UnixMilli() - epoch

if now == s.lastTime {
// 同一毫秒内, 序列号递增
s.sequence = (s.sequence + 1) & sequenceMask
if s.sequence == 0 {
// 序列号溢出 (4096/ms 用完了), 等下一毫秒
for now <= s.lastTime {
now = time.Now().UnixMilli() - epoch
}
}
} else {
s.sequence = 0
}
s.lastTime = now

// 拼接: 时间戳 (41-bit) | 机器 ID (10-bit) | 序列号 (12-bit)
return uint64(now)<<timeShift | s.machineID<<machineShift | s.sequence
}

本文选择: 单机用原子自增 (最简单), 分片时升级为 Snowflake. 下面的 Sequencer 使用原子自增.

为什么不用 UUID?
Order ID 是撮合引擎的热路径: 每个订单创建, 每次成交, 每次取消都要用. uint64 (8 bytes) 比 UUID (16 bytes) 省一半内存, map 查找也更快. 性能敏感场景下, 64-bit 整数是最优选择.

5.1.1 Snowflake 在容器化环境下的 Machine ID 问题

Snowflake 依赖一个稳定的 Machine ID, 但容器化 (K8s / Docker) 环境中 Pod 随时创建/销毁/漂移, 没有 “固定机器” 的概念:

1
2
传统部署: 机器固定, 手动分配 Machine ID = 1, 2, 3 → 永远不变
容器化: Pod 随时重建, IP/hostname 都会变 → Machine ID 从哪来?

解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// ─────────────────────────────────────────────
// 方案 1: K8s StatefulSet 序号 (推荐, 零依赖)
// ─────────────────────────────────────────────
// StatefulSet Pod 名字天然有序: engine-0, engine-1, engine-2 ...
// K8s 保证序号唯一且稳定 (Pod 重建后序号不变)

func MachineIDFromStatefulSet() uint64 {
hostname := os.Getenv("HOSTNAME") // "engine-3"
parts := strings.Split(hostname, "-")
id, _ := strconv.ParseUint(parts[len(parts)-1], 10, 64)
return id % 1024 // Machine ID 10-bit, 最大 1023
}

// ─────────────────────────────────────────────
// 方案 2: 中心化分配 (Redis / etcd)
// ─────────────────────────────────────────────
// Pod 启动时向 Redis INCR 拿唯一编号
// 优点: 适用任何部署模式
// 缺点: 依赖外部服务, Redis 挂了 → 新 Pod 起不来

func MachineIDFromRedis(client *redis.Client) uint64 {
id, _ := client.Incr(context.Background(), "snowflake:machine_id").Result()
return uint64(id) % 1024
}

// ─────────────────────────────────────────────
// 方案 3: 交易对 hash (撮合引擎专用, 推荐)
// ─────────────────────────────────────────────
// 撮合引擎按交易对分片, 每个交易对只有 1 个活跃实例
// 用交易对名称 hash 作为 Machine ID:
// - Pod 重启/漂移, 只要交易对不变, ID 不变
// - 不同交易对天然不同 Machine ID
// - 零依赖, 确定性计算

func MachineIDFromSymbol(symbol string) uint64 {
h := fnv.New32a()
_, _ = h.Write([]byte(symbol))
return uint64(h.Sum32()) % 1024
}
// MachineIDFromSymbol("ETH-USDC") → 固定值, 不随 Pod 变
// MachineIDFromSymbol("BTC-USDC") → 另一个固定值

方案对比:

方案 适用场景 可靠性 外部依赖
原子自增 (不用 Snowflake) 单实例 最高
交易对 hash 按交易对分片的撮合引擎
StatefulSet 序号 K8s 有状态服务 K8s
Redis/etcd 分配 通用分布式服务 Redis/etcd
IP 哈希 / 随机数 非关键路径 低 (有碰撞)

为什么撮合引擎不怕 Machine ID 问题?
撮合引擎按交易对分片, 每个交易对只有一个活跃实例 (单线程撮合).
同一交易对不存在多实例并发生成 ID 的情况.
所以单机场景直接用原子自增就够了; 只有跨交易对合并 Trade 时才需要 Snowflake, 用交易对 hash 做 Machine ID 即可.
这也是为什么 “分片 + Snowflake” 是天然搭配: 分片 key 本身就是稳定的 Machine ID 来源.

5.2 排序器 (Sequencer)

排序器是撮合引擎的 “入口闸门”: 所有订单必须先经过排序器, 分配全局序号, 再送入撮合核心.

排序器 (Sequencer) 工作流程 API 层 (多 goroutine) goroutine 1: Buy ETH goroutine 2: Sell ETH goroutine 3: Cancel #42 Sequencer (排序器) inbound chan (缓冲队列) 1. 从 channel 取出订单 2. 分配 Order ID (原子自增) 3. 记录 Timestamp 4. 参数校验 (价格/数量合法) 5. 写入 Event Log (可选) 6. 送入撮合引擎 单 goroutine 串行处理 (保证顺序) 撮合引擎 (单线程) Match() / Cancel() → 生成 Trade → 更新 Order Book 输出 Trades L2 更新 Events 关键: API 层多 goroutine 并发 → Sequencer 单 goroutine 串行 → 撮合引擎单线程处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// OrderType 区分不同的操作类型.
type OrderType int

const (
OrderTypeLimit OrderType = iota // 限价单 (指定价格)
OrderTypeMarket // 市价单 (价格为 0, 表示不限价)
OrderTypeCancel // 取消订单
)

// InboundOrder 是从 API 层提交到排序器的原始请求.
// 此时还没有 ID 和 Timestamp, 这些由 Sequencer 分配.
type InboundOrder struct {
Type OrderType
Side Side
Price int64 // 限价单: 指定价格; 市价单: 0
Qty int64
CancelID uint64 // Type=Cancel 时, 要取消的订单 ID
}

// Sequencer 是撮合引擎的入口.
// 接收多个 goroutine 的并发提交, 串行化后送入撮合引擎.
//
// 为什么需要 Sequencer?
// 撮合引擎是单线程的, 但外部请求是并发的 (HTTP/gRPC/WebSocket).
// Sequencer 用 channel 将并发请求排成一个队列,
// 保证每个订单有全局唯一的 ID 和确定的处理顺序.
type Sequencer struct {
inbound chan InboundOrder // 并发安全的输入队列
idGen AtomicIDGen // ID 生成器
book *OrderBook // 撮合引擎
trades chan []Trade // 成交结果输出
done chan struct{} // 关闭信号
}

func NewSequencer(book *OrderBook, bufferSize int) *Sequencer {
return &Sequencer{
inbound: make(chan InboundOrder, bufferSize), // 带缓冲, 削峰
book: book,
trades: make(chan []Trade, bufferSize),
done: make(chan struct{}),
}
}

// Submit 由 API 层调用, 提交订单到排序器.
// 多个 goroutine 可以并发调用, channel 保证安全.
func (s *Sequencer) Submit(order InboundOrder) {
s.inbound <- order
}

// Run 启动排序器的主循环 (单 goroutine).
// 这是整个撮合引擎的串行瓶颈点: 所有订单在这里排队.
func (s *Sequencer) Run() {
go func() {
for {
select {
case in := <-s.inbound:
s.process(in)
case <-s.done:
return
}
}
}()
}

func (s *Sequencer) process(in InboundOrder) {
switch in.Type {
case OrderTypeCancel:
// 取消订单: 直接从 Order Book 移除
_ = s.book.CancelOrder(in.CancelID)

case OrderTypeLimit:
// 限价单: 分配 ID → 撮合
order := &Order{
ID: s.idGen.Next(),
Side: in.Side,
Price: in.Price,
Qty: in.Qty,
Timestamp: time.Now(),
}
result := s.book.Match(order)
if len(result.Trades) > 0 {
s.trades <- result.Trades
}

case OrderTypeMarket:
// 市价单: 用极端价格确保吃掉所有可成交档位
// Buy: 用 MaxInt64 表示 "不限价, 多贵都买"
// Sell: 用 1 表示 "不限价, 多便宜都卖"
price := int64(1<<63 - 1) // MaxInt64
if in.Side == Sell {
price = 1
}
order := &Order{
ID: s.idGen.Next(),
Side: in.Side,
Price: price,
Qty: in.Qty,
Timestamp: time.Now(),
}
result := s.book.Match(order)
if len(result.Trades) > 0 {
s.trades <- result.Trades
}
// 市价单如果没完全成交, 剩余部分不挂簿 (丢弃)
// 因为市价单的语义是 "立即成交, 成交不了就算了"
if result.MakerResting {
s.book.CancelOrder(order.ID)
}
}
}

// Stop 优雅关闭排序器.
func (s *Sequencer) Stop() {
close(s.done)
}

5.2.1 Channel 性能与缓冲策略

Channel 本身是瓶颈吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Go buffered channel 性能:
多生产者 → 单消费者: ~100-200ns/op (内部有 mutex)
→ 换算吞吐: 5,000,000 ops/sec

实际需求:
Binance 峰值: ~100,000 orders/sec
Hyperliquid: ~100,000 orders/sec
dYdX v4: ~10,000 orders/sec

channel 的 5M/sec 远超需求 → 不是瓶颈

真正的瓶颈在消费端 (process 方法):
分配 ID: ~1ns (atomic)
记录 Timestamp: ~20ns (time.Now)
调用 Match(): ~1-10μs (BTree 查找 + 撮合) ← 大头在这里
→ channel 的 100ns 只占总延迟的 ~1%

缓冲大小怎么选?

1
2
3
4
5
6
7
8
9
10
11
12
缓冲的作用: 削峰 (吸收突发流量)

太小 (64): 突发瞬间填满 → Submit 阻塞 → API goroutine 卡住
太大 (1M): 队列堆积 → 延迟从 1μs 变成 100ms → 不可接受

推荐公式:
bufferSize = 峰值 QPS × 可接受延迟

: 峰值 50,000/sec, 可接受 10ms 延迟
bufferSize = 50,000 × 0.01 = 500

实际: 1024 ~ 4096 (覆盖短暂突发, 不堆积太多)

Channel 满了怎么办? (背压策略)

三种背压策略 (Back-Pressure) 策略 1: 阻塞等待 channel 满 → Submit() 阻塞 → API goroutine 卡住 → HTTP 请求自然排队 ✓ 不丢订单 ✓ 最简单 (当前实现) ✗ 突发时用户请求变慢 适合: 大部分场景 策略 2: 拒绝 + 返回错误 channel 满 → 立即返回错误 → 用户收到 "系统繁忙" → 客户端可重试 ✓ 不阻塞, 延迟可控 ✓ 保护撮合引擎不过载 ✗ 用户需要处理重试逻辑 适合: 低延迟优先的场景 策略 3: Ring Buffer 覆盖 固定大小环形缓冲 → 满了就覆盖最旧的 → 永远不阻塞 ✓ 无锁, 性能最高 ✓ LMAX Disruptor 模式 ✗ 旧订单可能被丢弃 适合: 做市商 (旧报价本该被覆盖) 核心权衡: 阻塞 = 不丢数据但延迟升高 | 拒绝 = 保延迟但丢请求 | 覆盖 = 全都要但丢旧数据 DEX 场景: channel 的 100ns 延迟远小于出块时间 (200ms ~ 2s), 策略 1 足够
1
2
3
4
5
6
7
8
9
10
// 策略 2 的实现: 非阻塞提交, 满了返回错误
func (s *Sequencer) TrySubmit(order InboundOrder) error {
select {
case s.inbound <- order:
return nil
default:
// channel 满了, 立即返回而不是阻塞
return fmt.Errorf("sequencer overloaded, retry later")
}
}

更激进的优化: 去掉 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
如果 channel 的 mutex 开销成了问题 (极端低延迟 CEX 场景):

1. LMAX Disruptor 模式
- 预分配固定大小 ring buffer (如 2^20 个 slot)
- 生产者和消费者通过 atomic 指针协调
- 零内存分配, 零锁, CPU cache 友好

2. 批量处理
- 攒一批 (如 100 个) 再一起处理
- 减少 channel 读写次数, 代价是增加固定延迟

DEX 场景下不需要这些优化:
dYdX 出块 ~1-2s, Hyperliquid ~200ms
channel 的 100ns 相比出块时间可以忽略
只有 CEX (追求 < 1μs 延迟) 才需要干掉 channel

5.2.2 排序器可靠性: 重启与冷热分离

排序器重启会丢失 channel 中未处理的订单. 生产环境需要考虑可靠性:

1
2
3
4
5
6
7
8
重启会丢什么?
1. channel 里的订单 (还没被 process 的) → 全丢
2. 已分配但未完成撮合的订单 → 丢了
3. trades channel 里未推送的成交 → 丢了

关键洞察: 不是所有订单都需要同等保护
做市商的限价单: 每秒更新几十次, 丢了自动重发 → 不需要持久化
用户的止损单: 可能挂几天, 丢了用户不知道 → 必须持久化

冷热分离架构:

冷热分离: 排序器可靠性架构 API 层 所有订单入口 路由分类器 按 OrderType 分流 热路径 冷路径 热路径 (Hot Path): 纯内存, 追求速度 市价单, 做市商限价单 (短期订单) → 直接进 channel → Sequencer → Match() 不写 WAL, 不写磁盘, 丢了用户重发 延迟: ~1-10μs | 吞吐: 100k+/sec 冷路径 (Cold Path): WAL 持久化, 保证不丢 止损单, GTC, 条件单 (长期订单) → 先写 WAL (fsync) → 再进 channel → Sequencer → Match() → 撮合/触发完成 → 标记 WAL 条目为 completed → 重启时 → 回放 WAL 中 未完成 的条目 延迟: ~10-100μs (多一次磁盘写) | 但这类订单不追求速度 重启恢复流程 1. 读取 WAL, 过滤出状态 = pending 的条目 (未完成的冷路径订单) 2. 按序号重新注入 Sequencer (恢复止损/GTC 订单到 Order Book) 3. 热路径订单不恢复, 做市商机器人检测到重连后自动重发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// WAL 条目状态.
type WALStatus int

const (
WALPending WALStatus = iota // 已写入, 未处理完
WALCompleted // 撮合/触发完成
)

// WALEntry 是写入 WAL 的一条记录.
type WALEntry struct {
SeqNo uint64 // 全局序号 (WAL 内递增)
Status WALStatus // 处理状态
Order InboundOrder // 原始订单
}

// WAL 是 Write-Ahead Log, 用于冷路径订单的持久化.
// 只有止损/GTC 等长期订单走 WAL, 市价单/做市商报价不走.
type WAL struct {
file *os.File
encoder *json.Encoder
seqNo uint64
mu sync.Mutex // WAL 写入需要串行化
}

func NewWAL(path string) (*WAL, error) {
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, fmt.Errorf("open WAL: %w", err)
}
return &WAL{
file: f,
encoder: json.NewEncoder(f),
}, nil
}

// Append 写入一条 WAL 记录并 fsync.
// fsync 保证数据落盘, 即使进程崩溃, 数据不丢.
func (w *WAL) Append(order InboundOrder) (uint64, error) {
w.mu.Lock()
defer w.mu.Unlock()

w.seqNo++
entry := WALEntry{
SeqNo: w.seqNo,
Status: WALPending,
Order: order,
}

if err := w.encoder.Encode(entry); err != nil {
return 0, fmt.Errorf("encode WAL entry: %w", err)
}
// fsync: 强制刷盘, 这是可靠性的关键
// 没有 fsync, 数据可能还在 OS 缓冲区, 断电会丢
if err := w.file.Sync(); err != nil {
return 0, fmt.Errorf("fsync WAL: %w", err)
}
return w.seqNo, nil
}

// MarkCompleted 标记某条 WAL 记录已处理完成.
// 实际生产中用 checkpoint 文件记录已完成的最大 SeqNo, 而非逐条标记.
func (w *WAL) MarkCompleted(seqNo uint64) {
// 简化实现: 追加一条 completed 记录
// 生产级: 维护 checkpoint, 定期截断已完成的 WAL 段
w.mu.Lock()
defer w.mu.Unlock()
_ = w.encoder.Encode(WALEntry{SeqNo: seqNo, Status: WALCompleted})
_ = w.file.Sync()
}

// ─────────────────────────────────────────────
// WAL 回放 (Recovery & Replay)
// ─────────────────────────────────────────────
//
// 回放流程:
// 1. 读取 WAL 文件, 解析所有条目
// 2. 读取 Checkpoint (记录已确认处理到哪个 SeqNo)
// 3. 过滤: SeqNo > checkpoint 且 Status = Pending 的条目
// 4. 按 SeqNo 排序 (保证和原始顺序一致)
// 5. 逐条重新注入 Sequencer → 重建 Order Book 状态
//
// 为什么要 Checkpoint?
// WAL 会不断增长, 不能每次重启都从头回放.
// Checkpoint 记录 "这个 SeqNo 之前的全部处理完了",
// 回放时只需要处理 checkpoint 之后的条目.

// Checkpoint 记录已确认完成的最大 SeqNo.
// 定期写入独立文件, 重启时读取.
type Checkpoint struct {
LastCompletedSeqNo uint64 `json:"last_completed_seq_no"`
Timestamp int64 `json:"timestamp"`
}

// SaveCheckpoint 将当前 checkpoint 写入文件.
// 使用 write-rename 模式: 先写临时文件, 再 rename.
// rename 是原子操作, 即使写入中途崩溃, 旧 checkpoint 文件不会损坏.
func SaveCheckpoint(path string, cp Checkpoint) error {
data, err := json.Marshal(cp)
if err != nil {
return fmt.Errorf("marshal checkpoint: %w", err)
}

tmpPath := path + ".tmp"
if err := os.WriteFile(tmpPath, data, 0644); err != nil {
return fmt.Errorf("write temp checkpoint: %w", err)
}
// rename 是原子操作: 要么完全替换, 要么不动
return os.Rename(tmpPath, path)
}

// LoadCheckpoint 读取 checkpoint, 不存在则返回零值 (从头回放).
func LoadCheckpoint(path string) (Checkpoint, error) {
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return Checkpoint{}, nil // 首次启动, 从头开始
}
return Checkpoint{}, fmt.Errorf("read checkpoint: %w", err)
}
var cp Checkpoint
if err := json.Unmarshal(data, &cp); err != nil {
return Checkpoint{}, fmt.Errorf("unmarshal checkpoint: %w", err)
}
return cp, nil
}

// RecoverResult 包含回放的结果统计.
type RecoverResult struct {
TotalEntries int // WAL 中总条目数
SkippedEntries int // 已完成或在 checkpoint 之前, 跳过的条目数
ReplayedOrders int // 实际回放的订单数
LastSeqNo uint64 // WAL 中最大的 SeqNo (用于恢复 ID 生成器)
}

// Recover 读取 WAL + Checkpoint, 返回需要回放的订单和统计信息.
func Recover(walPath, checkpointPath string) ([]InboundOrder, *RecoverResult, error) {
// 1. 读取 checkpoint
cp, err := LoadCheckpoint(checkpointPath)
if err != nil {
return nil, nil, fmt.Errorf("load checkpoint: %w", err)
}

// 2. 读取 WAL 文件
f, err := os.Open(walPath)
if err != nil {
if os.IsNotExist(err) {
return nil, &RecoverResult{}, nil // 没有 WAL, 首次启动
}
return nil, nil, fmt.Errorf("open WAL: %w", err)
}
defer func() { _ = f.Close() }()

// 3. 解析所有条目, 构建 SeqNo → 最新状态 的映射
// 同一个 SeqNo 可能出现两次: 先 Pending, 后 Completed
// 用 map 自动保留最新状态
entries := make(map[uint64]WALEntry)
result := &RecoverResult{}
decoder := json.NewDecoder(f)

for decoder.More() {
var entry WALEntry
if err := decoder.Decode(&entry); err != nil {
// WAL 末尾可能有不完整的写入 (崩溃时半条 JSON)
// 这是正常的: 最后一次 fsync 之后, 崩溃前写了半条
// 安全策略: 跳过损坏的条目, 从最后一条完整的恢复
break
}
entries[entry.SeqNo] = entry
result.TotalEntries++
if entry.SeqNo > result.LastSeqNo {
result.LastSeqNo = entry.SeqNo
}
}

// 4. 过滤: 只保留 checkpoint 之后且状态为 Pending 的条目
type seqOrder struct {
seqNo uint64
order InboundOrder
}
var pending []seqOrder

for seqNo, entry := range entries {
if seqNo <= cp.LastCompletedSeqNo {
result.SkippedEntries++ // checkpoint 之前, 已确认完成
continue
}
if entry.Status == WALCompleted {
result.SkippedEntries++ // 显式标记完成
continue
}
pending = append(pending, seqOrder{seqNo: seqNo, order: entry.Order})
}

// 5. 按 SeqNo 排序, 保证回放顺序和原始提交顺序一致
// 这一步至关重要: 乱序回放会导致 Order Book 状态不一致
sort.Slice(pending, func(i, j int) bool {
return pending[i].seqNo < pending[j].seqNo
})

orders := make([]InboundOrder, len(pending))
for i, p := range pending {
orders[i] = p.order
}
result.ReplayedOrders = len(orders)
return orders, result, nil
}

冷热分离的 Sequencer (含完整恢复流程):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// ReliableSequencer 在基础 Sequencer 之上增加了冷热分离和 WAL 回放.
type ReliableSequencer struct {
*Sequencer
wal *WAL
checkpointPath string
completedCount uint64 // 自上次 checkpoint 以来完成的条目数
}

// NewReliableSequencer 创建可靠排序器, 启动时自动回放 WAL.
//
// 启动流程:
// 1. 打开 WAL 文件
// 2. 读取 Checkpoint (上次确认处理到哪里)
// 3. 回放 WAL 中未完成的条目 → 重建 Order Book 状态
// 4. 恢复 ID 生成器的计数器 (避免重复 ID)
// 5. 启动 Sequencer 主循环
func NewReliableSequencer(book *OrderBook, walPath, cpPath string, bufSize int) (*ReliableSequencer, error) {
wal, err := NewWAL(walPath)
if err != nil {
return nil, err
}

rs := &ReliableSequencer{
Sequencer: NewSequencer(book, bufSize),
wal: wal,
checkpointPath: cpPath,
}

// ── 回放 WAL ──
pending, result, err := Recover(walPath, cpPath)
if err != nil {
return nil, fmt.Errorf("WAL recovery: %w", err)
}

if result.ReplayedOrders > 0 {
fmt.Printf("[Recovery] WAL entries: %d, skipped: %d, replaying: %d\n",
result.TotalEntries, result.SkippedEntries, result.ReplayedOrders)

// 逐条回放: 按原始顺序重新注入撮合引擎
for _, order := range pending {
rs.Sequencer.Submit(order)
}
fmt.Printf("[Recovery] Replay complete. Order Book restored.\n")
}

// 恢复 ID 生成器: 从 WAL 最大 SeqNo 之后继续分配
// 避免重启后生成重复 ID
if result.LastSeqNo > 0 {
rs.Sequencer.idGen.counter = result.LastSeqNo
}

return rs, nil
}

// Submit 根据订单类型分流: 热路径直接进 channel, 冷路径先写 WAL.
func (rs *ReliableSequencer) Submit(order InboundOrder) error {
if rs.isColdPath(order) {
// 冷路径: 先写 WAL (fsync 保证落盘), 再进 channel
seqNo, err := rs.wal.Append(order)
if err != nil {
return fmt.Errorf("WAL write failed: %w", err)
}
order.walSeqNo = seqNo
}
rs.Sequencer.Submit(order)
return nil
}

// OnOrderCompleted 在订单撮合/触发完成后调用.
// 标记 WAL 条目为已完成, 并定期写 Checkpoint.
func (rs *ReliableSequencer) OnOrderCompleted(walSeqNo uint64) {
if walSeqNo == 0 {
return // 热路径订单没有 walSeqNo, 跳过
}

// 标记该条目完成
rs.wal.MarkCompleted(walSeqNo)
rs.completedCount++

// 每 1000 条完成后写一次 Checkpoint
// Checkpoint 频率是个权衡:
// 太频繁: 磁盘 IO 增加
// 太稀疏: 重启时需要回放更多条目
if rs.completedCount%1000 == 0 {
_ = SaveCheckpoint(rs.checkpointPath, Checkpoint{
LastCompletedSeqNo: walSeqNo,
Timestamp: time.Now().UnixMilli(),
})
}
}

// isColdPath 判断订单是否走冷路径 (需要持久化).
func (rs *ReliableSequencer) isColdPath(order InboundOrder) bool {
return order.Type == OrderTypeStopLoss ||
order.Type == OrderTypeGTC
}

// GracefulShutdown 优雅关闭: 写最终 Checkpoint + 关闭 WAL.
// 优雅关闭后重启, 回放量为 0 (所有条目都已确认完成).
func (rs *ReliableSequencer) GracefulShutdown() {
// 写最终 checkpoint
_ = SaveCheckpoint(rs.checkpointPath, Checkpoint{
LastCompletedSeqNo: rs.wal.seqNo,
Timestamp: time.Now().UnixMilli(),
})
_ = rs.wal.file.Close()
rs.Sequencer.Stop()
}

回放流程图:

WAL 回放流程 (重启恢复) 1. 读 Checkpoint 上次完成到 SeqNo=4500 2. 扫描 WAL 共 4520 条, 跳过损坏尾部 3. 过滤 + 排序 SeqNo > 4500 且 Pending 4. 逐条回放 15 条 → Sequencer → Match 5. 就绪 Order Book 恢复 WAL 文件内容 (示意) #4498 ✓ #4499 ✓ #4500 ✓ ↑ Checkpoint #4501 ⏳ #4502 ✓ #4503 ⏳ ... #4520 ⏳ 损坏 (跳过) ✓ = Completed (跳过) ⏳ = Pending (需要回放) 损坏 = 崩溃时半写的条目 (安全跳过) 边界情况处理 WAL 不存在 → 首次启动, 跳过回放 | Checkpoint 不存在 → 从 WAL 第一条开始回放 WAL 尾部损坏 → 跳过损坏条目, 从最后一条完整的恢复 | 优雅关闭 → 回放量为 0 (全部已确认)

与 dYdX / Hyperliquid 的对应关系:

概念 本文实现 dYdX v4 Hyperliquid
热路径 channel, 纯内存 Short-Term Order (内存 + gossip) N/A (全部上链)
冷路径 WAL 持久化 Stateful Order (上链) 每个订单都上链
重启恢复 回放 WAL 未完成条目 从链上状态恢复 从链上状态恢复
热路径丢失后果 做市商自动重发 做市商自动重发 不会丢
冷路径丢失后果 不会丢 (WAL 保证) 不会丢 (链上保证) 不会丢

区块链 = 天然 WAL: 链上的每个区块就是一条 WAL 记录, 全球共识 + 不可篡改. Hyperliquid “每个订单上链” 等于所有订单都走冷路径, 可靠性最强, 代价是需要 HyperBFT 的极高吞吐才扛得住. dYdX 的冷热分离 (Short-Term vs Stateful) 和本文的设计思路完全一致.

5.2.3 WAL 存储选型: 文件 vs 数据库

纯文件 WAL 不是性能差, 反而是最快的. 数据库底层也是在写文件 WAL (MySQL redo log, PostgreSQL WAL), 直接写文件跳过了网络/事务/索引等中间层.

各方案对比:

WAL 存储选型: 延迟 vs 能力 延迟 本地文件 WAL ~10-50μs | 零依赖 嵌入式 KV ~20-100μs | 可查询 Redis AOF ~50-500μs | TTL 支持 PostgreSQL ~0.5-5ms | 主从复制 Kafka ~2-5ms | 多消费者 功能少 (纯追加) 功能多 (查询/复制/多消费者) 越往右功能越多, 但延迟也越高, 冷路径订单不追求速度, 选右侧完全合理
方案 写入延迟 跨节点恢复 查询能力 适用场景
本地文件 WAL ~10-50μs 不支持 (本地) 无 (只能顺序读) 单机撮合, 极致速度
嵌入式 KV (BadgerDB / BoltDB) ~20-100μs 不支持 (本地) 有 (索引查询) 单机 + 需要查历史
Redis AOF ~50-500μs 主从复制 有 (KV + TTL) 需要过期机制的订单
PostgreSQL / MySQL ~0.5-5ms 主从复制 强 (SQL) 多机分片, K8s, 需要运维成熟
Kafka ~2-5ms 多副本 无 (只能顺序消费) Event Sourcing, 多下游消费

冷路径选 DB 完全合理:

1
2
3
4
5
6
7
8
9
10
11
12
冷路径订单 (止损 / GTC) 的性质:
- 频率低: 用户一天设几个止损, 不是每秒几万
- 延迟不敏感: 多 1ms 无所谓, 这不是高频交易
- 可靠性要求高: 丢了 = 用户爆仓

→ 多 1ms 延迟换来: 主从复制, SQL 查询, 运维成熟
→ 对止损单来说非常划算

生产级典型组合:
热路径: channel (纯内存) → 撮合引擎 ← 速度优先
冷路径: PostgreSQL → channel → 撮合引擎 ← 可靠优先
审计流: Kafka (成交结果) → 下游系统 ← 回放优先

5.2.4 审计流 (Audit Trail)

审计流记录撮合引擎的所有输出, 是合规, 风控, 对账的 “唯一事实来源” (source of truth).

审计点在输出端, 不在输入端:

1
2
3
4
5
6
7
8
9
为什么不在输入端 (Sequencer)?
- 热路径订单还没撮合, 记录了没意义 (可能被取消/失败)
- 会增加热路径延迟
- 信息不完整 (不知道成交价, 对手方是谁)

为什么在输出端 (Match 之后)?
- 有确定性结果: 谁和谁成交, 什么价格, 什么数量
- 不在撮合关键路径上 (异步推送, 不影响下一笔撮合)
- 信息完整, 可作为所有下游系统的数据源
审计流在整体架构中的位置 Sequencer Match() 审计点 在这里分叉 ↓ Step 1: 同步写本地 Event Log append + fsync ~10-50μs Step 2: 异步推 Kafka 另一个 goroutine 消费 Event Log 风控 合规 对账 分析 为什么分两步? (同步写本地 + 异步推 Kafka) 同步写本地文件: 快 (~10μs), 保证不丢, 进程崩溃也有完整记录 异步推 Kafka: 慢 (~2-5ms), 但可重试, 下游灵活, 直接同步写 Kafka 会让撮合延迟增加 2-5ms

审计流记录的事件类型:

事件 说明 重要性
OrderAccepted 订单被 Sequencer 接受, 分配了 ID
OrderRejected 订单被拒绝 (参数非法, 余额不足)
OrderMatched 撮合成交 (双方 ID, 价格, 数量) 核心, 监管/对账的唯一依据
OrderCancelled 订单被取消
OrderExpired 订单超时过期
LiquidationTriggered 清算触发

各平台的审计方式:

平台 审计流实现 特点
CEX (Binance) 本地 Event Log + Kafka + 数据仓库 完整审计链, 监管可查
dYdX v4 链上区块 = 审计 (只有成交结果上链) 撮合过程不可审计 (内存中)
Hyperliquid 链上区块 = 审计 (每个订单上链) 全链路可审计, 但代码闭源
传统股票交易所 FIX Drop Copy + 监管报告 法律强制, 保存 7 年+

DEX 的天然优势: CEX 要自己建审计系统 (Event Log + Kafka + 数据仓库), DEX 的链本身就是全球共识的审计流: 每个区块 = 一条不可篡改的审计记录. 这是 “去中心化” 在合规层面最实际的好处: 不是 “没人管”, 而是 “人人都能查”.

5.3 数据结构定义 (Order / PriceLevel / OrderBook)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package engine

import (
"container/list"
"fmt"
"time"

"github.com/tidwall/btree"
)

// Side 表示订单方向.
type Side int

const (
Buy Side = iota // 买单 (Bid)
Sell // 卖单 (Ask)
)

// Order 表示一个订单.
// 这是撮合引擎中最基本的数据单元.
type Order struct {
ID uint64 // 唯一订单 ID, 由 Sequencer 分配
Side Side // Buy 或 Sell
Price int64 // 价格, 用整数表示 (如 $3000.50 → 300050, 精度 0.01)
Qty int64 // 剩余数量 (部分成交后会减少)
Timestamp time.Time // 到达时间, 用于同价格下的 FIFO 排序

// element 是订单在 PriceLevel 队列中的位置指针.
// 有了它, 取消订单时可以 O(1) 定位, 不需要遍历队列.
element *list.Element
}

// PriceLevel 表示一个价格档位.
// 同一价格的所有订单按 FIFO 排列在 orders 链表中.
type PriceLevel struct {
Price int64 // 该档位的价格
Orders *list.List // 订单 FIFO 队列 (双向链表, 支持 O(1) 头部取出和任意位置删除)
TotalQty int64 // 该档位的总挂单量 (冗余字段, 避免遍历计算)
}

// Trade 表示一次成交记录.
type Trade struct {
BidOrderID uint64 // 买方订单 ID
AskOrderID uint64 // 卖方订单 ID
Price int64 // 成交价格 (= maker 的挂单价格)
Qty int64 // 成交数量
Timestamp time.Time
}

// OrderBook 维护单个交易对的买卖订单簿.
// 内部使用两棵 BTree 分别管理 bid 和 ask 的价格档位.
// tidwall/btree.Map 是泛型有序映射, key=价格, value=PriceLevel 指针.
type OrderBook struct {
Symbol string // 交易对, 如 "ETH-USDC"
Bids btree.Map[int64, *PriceLevel] // 买单价格树 (需要快速找最大值)
Asks btree.Map[int64, *PriceLevel] // 卖单价格树 (需要快速找最小值)
Orders map[uint64]*Order // 全局订单索引: orderID → Order (用于 O(1) 查找/取消)
levels map[int64]*PriceLevel // 价格 → PriceLevel 的快速映射 (避免重复 BTree 查找)
}

// NewOrderBook 创建一个新的订单簿.
// btree.Map 零值即可使用, 内部按 key (int64 价格) 自然排序.
func NewOrderBook(symbol string) *OrderBook {
return &OrderBook{
Symbol: symbol,
// Bids, Asks: btree.Map 零值可用, 默认 degree 适合大多数场景
Orders: make(map[uint64]*Order),
levels: make(map[int64]*PriceLevel),
}
}

5.4 订单簿操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// addOrder 将订单加入对应的价格档位.
// 如果该价格档位不存在, 创建新档位并插入 BTree.
func (ob *OrderBook) addOrder(order *Order) {
// 查找或创建价格档位
level, exists := ob.levels[order.Price]
if !exists {
level = &PriceLevel{
Price: order.Price,
Orders: list.New(),
}
ob.levels[order.Price] = level

// 插入到对应的 BTree (泛型, 无需类型断言)
if order.Side == Buy {
ob.Bids.Set(level.Price, level)
} else {
ob.Asks.Set(level.Price, level)
}
}

// 订单追加到队列末尾 (FIFO: 后到的排后面)
order.element = level.Orders.PushBack(order)
level.TotalQty += order.Qty

// 加入全局索引
ob.Orders[order.ID] = order
}

// removeOrder 从订单簿中移除一个订单.
// 如果移除后价格档位为空, 从 BTree 中删除该档位.
func (ob *OrderBook) removeOrder(order *Order) {
level, exists := ob.levels[order.Price]
if !exists {
return
}

// 从 FIFO 队列中移除 (O(1), 因为有 element 指针)
level.Orders.Remove(order.element)
level.TotalQty -= order.Qty

// 从全局索引移除
delete(ob.Orders, order.ID)

// 如果档位空了, 清理 BTree
if level.Orders.Len() == 0 {
if order.Side == Buy {
ob.Bids.Delete(order.Price)
} else {
ob.Asks.Delete(order.Price)
}
delete(ob.levels, order.Price)
}
}

// CancelOrder 取消一个订单.
// 通过全局索引 O(1) 找到订单, 然后 O(1) 从队列中移除.
func (ob *OrderBook) CancelOrder(orderID uint64) error {
order, exists := ob.Orders[orderID]
if !exists {
return fmt.Errorf("order %d not found", orderID)
}
ob.removeOrder(order)
return nil
}

// BestBid 返回当前最高买价档位, 没有则返回 nil.
// btree.Map.Max() 返回 (key, value, ok), 泛型无需类型断言.
func (ob *OrderBook) BestBid() *PriceLevel {
_, level, ok := ob.Bids.Max()
if !ok {
return nil
}
return level
}

// BestAsk 返回当前最低卖价档位, 没有则返回 nil.
func (ob *OrderBook) BestAsk() *PriceLevel {
_, level, ok := ob.Asks.Min()
if !ok {
return nil
}
return level
}

5.5 撮合算法

撮合流程 (Match Algorithm) 新订单到达 订单是 Buy? 检查 Asks 最低价 买价 >= 卖价? Yes 成交! 成交价 = maker (卖方) 挂单价 成交量 = min(买方量, 卖方量) 更新双方剩余数量 生成 Trade 记录 买方还有剩余? Yes → 继续匹配下一档 剩余数量挂入订单簿 (Maker) No No 直接挂入订单簿 (成为 Maker, 等待被匹配)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// MatchResult 包含一次撮合的所有产出.
type MatchResult struct {
Trades []Trade // 成交记录 (可能多笔, 跨越多个价格档位)
IncomingOrder *Order // 输入的订单 (可能部分成交, Qty 已更新)
MakerResting bool // true = 订单有剩余, 已挂入订单簿
}

// Match 接收一个新订单, 执行撮合, 返回成交结果.
//
// 核心逻辑 (以 Buy 订单为例):
// 1. 找到 Asks 最低价 (best ask)
// 2. 如果 buy.Price >= best_ask.Price → 可以成交
// 3. 按 FIFO 逐个吃掉该档位的订单
// 4. 该档位吃完 → 移到下一档, 重复
// 5. 吃不动了 (价格不够或 Asks 空了) → 剩余挂入 Bids
func (ob *OrderBook) Match(incoming *Order) *MatchResult {
result := &MatchResult{IncomingOrder: incoming}

for incoming.Qty > 0 {
// 找对手方最优价
var bestLevel *PriceLevel
if incoming.Side == Buy {
bestLevel = ob.BestAsk()
} else {
bestLevel = ob.BestBid()
}

// 没有对手单, 停止撮合
if bestLevel == nil {
break
}

// 价格不匹配, 停止撮合
// Buy: 我出的价 < 对方要的价 → 买不起
// Sell: 我要的价 > 对方出的价 → 对方嫌贵
if incoming.Side == Buy && incoming.Price < bestLevel.Price {
break
}
if incoming.Side == Sell && incoming.Price > bestLevel.Price {
break
}

// 逐个吃掉该档位的订单 (FIFO)
for bestLevel.Orders.Len() > 0 && incoming.Qty > 0 {
// 取队首 (最早到达的订单)
front := bestLevel.Orders.Front()
resting := front.Value.(*Order)

// 计算成交量 = min(incoming 剩余, resting 剩余)
matchQty := min(incoming.Qty, resting.Qty)

// 生成成交记录
trade := Trade{
Price: resting.Price, // 成交价 = maker 的挂单价
Qty: matchQty,
Timestamp: time.Now(),
}
if incoming.Side == Buy {
trade.BidOrderID = incoming.ID
trade.AskOrderID = resting.ID
} else {
trade.BidOrderID = resting.ID
trade.AskOrderID = incoming.ID
}
result.Trades = append(result.Trades, trade)

// 更新数量
incoming.Qty -= matchQty
resting.Qty -= matchQty
bestLevel.TotalQty -= matchQty

// resting 订单完全成交 → 移除
if resting.Qty == 0 {
ob.removeOrder(resting)
}
}
}

// 如果 incoming 还有剩余, 挂入订单簿 (成为 maker)
if incoming.Qty > 0 {
ob.addOrder(incoming)
result.MakerResting = true
}

return result
}

5.6 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func main() {
// ─── 初始化完整链路: OrderBook → Sequencer ───

ob := NewOrderBook("ETH-USDC")
seq := NewSequencer(ob, 1024) // 缓冲 1024 个订单
seq.Run()

// 消费成交结果 (实际生产中推送到 WebSocket / MQ)
go func() {
for trades := range seq.trades {
for _, t := range trades {
fmt.Printf("Trade: %d @ $%.2f\n", t.Qty, float64(t.Price)/100)
}
}
}()

// ─── 模拟做市商挂单 (通过 Sequencer 提交) ───

seq.Submit(InboundOrder{Type: OrderTypeLimit, Side: Sell, Price: 300100, Qty: 50})
seq.Submit(InboundOrder{Type: OrderTypeLimit, Side: Sell, Price: 300200, Qty: 80})
seq.Submit(InboundOrder{Type: OrderTypeLimit, Side: Sell, Price: 300100, Qty: 30})

// ─── 模拟用户下单 ───

// 限价买单: 出价 $3002, 买 100 个
seq.Submit(InboundOrder{Type: OrderTypeLimit, Side: Buy, Price: 300200, Qty: 100})

// 撮合过程 (Sequencer 内部自动完成):
// Sequencer 分配 ID=1,2,3 给三个卖单
// Sequencer 分配 ID=4 给买单
// 调用 ob.Match(buy):
// 1. Best ask = $3001 (ID:1, qty=50) → 成交 50 @ $3001
// 2. 同价 $3001 (ID:3, qty=30) → FIFO → 成交 30 @ $3001
// 3. 下一档 $3002 (ID:2, qty=80) → 成交 20 @ $3002 (部分成交)
// 4. 买单全部成交, 不挂簿

// Output:
// Trade: 50 @ $3001.00
// Trade: 30 @ $3001.00
// Trade: 20 @ $3002.00

// ─── 市价单示例 ───

// 市价卖单: 不限价, 卖 30 个 (吃掉 Bids 最高价)
seq.Submit(InboundOrder{Type: OrderTypeMarket, Side: Sell, Qty: 30})

// ─── 取消订单 ───

seq.Submit(InboundOrder{Type: OrderTypeCancel, CancelID: 2}) // 取消 ID=2 的挂单

time.Sleep(100 * time.Millisecond) // 等待异步处理完成
seq.Stop()
}

完整链路: API 多 goroutine 并发调用 seq.Submit() → channel 排队 → Sequencer 单 goroutine 分配 ID + Timestamp → ob.Match() 单线程撮合 → 成交结果送入 trades channel → 下游消费推送.

5.7 性能分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
单次撮合的时间复杂度:

找最优价 (BestAsk/BestBid): O(log n): BTree Min/Max
逐档吃单: O(k × m): k=吃掉的档数, m=每档订单数
挂入订单簿: O(log n): BTree Insert
取消订单: O(1): map 查找 + 链表删除

其中 n = 价格档位数, 典型值 1,000~10,000
BTree degree=64 时, log₆₄(10000) ≈ 2.3, 实际只需 3 次节点访问

内存占用 (粗估):
Order: ~100 bytes
PriceLevel: ~60 bytes + 链表开销
10 万挂单, 1 万档位: ~12 MB
→ 完全在 L2/L3 cache 内, 性能极佳

六、扩容方案

6.1 扩容挑战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
撮合引擎的特殊性:

1. 严格串行
- 同一个交易对的订单必须按时间顺序处理
- 不能并行处理同一个 Order Book (会导致成交顺序不一致)
- 这是撮合引擎最大的扩容瓶颈

2. 极致延迟要求
- CEX 目标: 单次撮合 < 1μs (微秒)
- DEX 目标: < 1ms (受出块时间限制)
- 加锁, 跨进程通信都会引入延迟

3. 突发峰值
- 正常: 1,000 orders/sec
- 行情剧变: 100,000+ orders/sec
- 必须能扛住峰值, 否则就是 "交易所宕机"

6.2 扩容方案对比

撮合引擎扩容方案 1 单机极致优化 • 单线程处理 (避免锁开销) • CPU 绑核 (避免上下文切换) • 内存预分配 + 对象池 (避免 GC) • Ring Buffer 输入队列 (LMAX Disruptor 模式) ✓ 吞吐: ~1M orders/sec (单核) ✗ 上限: 单核天花板, 无法水平扩展 代表: LMAX Exchange, Binance 撮合核心 2 按交易对分片 (Sharding) • 每个交易对一个独立引擎实例 • ETH-USDC → Engine A, BTC-USDC → Engine B • 交易对之间天然无依赖, 可完美并行 • 路由层按 symbol hash 分发到对应引擎 ✓ 吞吐: N × 单引擎 (线性扩展) ✗ 限制: 单个热门交易对仍受单机瓶颈 代表: 大多数 CEX, dYdX v4 (每个 market 独立撮合) 3 Event Sourcing + 热备 • 所有操作记录为事件流 (OrderPlaced, OrderMatched...) • 主引擎处理, 备引擎实时回放事件流 • 主引擎故障 → 备引擎秒级接管 (状态已同步) • 事件流可用于审计, 回测, 状态重建 ✓ 高可用, 故障恢复快, 完整审计链 ✗ 事件序列化/反序列化增加延迟 代表: LMAX (Journal), Hyperliquid (链本身 = 事件流) 4 异步流水线 • 拆分多个阶段, 用无锁队列连接: 接收 → 验证 → 排序 → 撮合 → 推送 • 每个阶段独立 goroutine, 绑定不同 CPU • 撮合阶段仍是单线程 (保证顺序) ✓ 整体吞吐提升, 撮合核心不受干扰 ✗ 系统复杂度高, 调试困难 代表: 现代 CEX 通用架构, Go channel 天然适合 实际生产: 方案 1+2+3 组合: 单机极致优化 × 按交易对分片 × Event Sourcing 热备

6.3 推荐组合: 分片 + 单线程 + Event Sourcing

生产级架构: 分片 + 单线程 + Event Sourcing Router (路由层) 按 symbol hash 分发到对应引擎 ETH Engine 单线程 | Ring Buffer BTC Engine 单线程 | Ring Buffer SOL Engine 单线程 | Ring Buffer ← 每个交易对 一个引擎 Event Log ETH Standby 回放事件流 BTC Standby 回放事件流 SOL Standby 回放事件流 ← 热备引擎 实时同步 每个引擎内部: 单线程 (无锁) | Ring Buffer 输入队列 | 撮合结果写入 Event Log | 热备引擎消费 Event Log, 保持状态同步 主引擎挂掉 → 热备引擎立即接管 (failover), 无需重建 Order Book

6.3.1 热备 (Hot Standby) 详细实现

热备的核心思想: 主引擎每执行一个操作, 就把该操作的事件发给备引擎; 备引擎回放同样的事件, 维护一份一模一样的 Order Book. 主引擎挂了, 备引擎立即接管.

热备架构 (Hot Standby) 主引擎 (Primary) Sequencer Match() Event Log (本地文件 WAL) OrderAccepted, OrderMatched, OrderCancelled ... Heartbeat: 每 100ms 发送一次 "我还活着" 状态: ACTIVE (处理所有请求) 事件流复制 (TCP / Unix Socket) 备引擎 (Standby) 接收事件流 → 回放到 Order Book 维护一份和主引擎一模一样的状态 不处理外部请求 (只读状态) Sequencer 挂起, 不接受订单 监听 Heartbeat: 超时 → 触发故障切换 状态: STANDBY (只同步, 不处理) 故障切换 (Failover) 流程 1. 备引擎检测到 Heartbeat 超时 (如连续 3 次未收到, 即 300ms) 2. 备引擎回放完所有已收到的事件 (确保状态完整) → 切换状态为 ACTIVE 3. 路由层更新: 新请求发到原备引擎 (通过 DNS / Service / Leader Election) 切换耗时: ~300ms (心跳超时) + ~10ms (回放剩余事件) ≈ 亚秒级

事件定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// EngineEvent 是主引擎产生的事件, 用于热备同步.
// 所有改变 Order Book 状态的操作都必须生成事件.
type EventType int

const (
EventOrderAccepted EventType = iota // 新订单加入 Order Book
EventOrderMatched // 撮合成交
EventOrderCancelled // 订单取消
EventOrderExpired // 订单过期
EventHeartbeat // 心跳 (不改变状态, 只证明活着)
)

type EngineEvent struct {
SeqNo uint64 // 全局递增序号 (备引擎用来检测是否有遗漏)
Type EventType
Timestamp time.Time
// 不同事件类型携带不同数据
Order *Order // Accepted / Cancelled / Expired
Trades []Trade // Matched
}

主引擎: 产生事件流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// PrimaryEngine 在撮合的同时产生事件流, 发送给备引擎.
type PrimaryEngine struct {
book *OrderBook
seq *Sequencer
eventSeq uint64 // 事件序号, 严格递增
replicas []chan EngineEvent // 备引擎的事件通道 (可以有多个备)
heartbeat *time.Ticker
}

func NewPrimaryEngine(symbol string, bufSize int) *PrimaryEngine {
book := NewOrderBook(symbol)
pe := &PrimaryEngine{
book: book,
seq: NewSequencer(book, bufSize),
heartbeat: time.NewTicker(100 * time.Millisecond),
}
return pe
}

// AddReplica 注册一个备引擎的事件通道.
func (pe *PrimaryEngine) AddReplica(ch chan EngineEvent) {
pe.replicas = append(pe.replicas, ch)
}

// emit 将事件广播给所有备引擎.
func (pe *PrimaryEngine) emit(event EngineEvent) {
pe.eventSeq++
event.SeqNo = pe.eventSeq
event.Timestamp = time.Now()

for _, ch := range pe.replicas {
// 非阻塞发送: 如果备引擎消费慢, 跳过而不是阻塞主引擎
select {
case ch <- event:
default:
// 备引擎跟不上 → 记录告警, 备引擎需要从 WAL 全量恢复
// 不能因为备慢了就阻塞主引擎的撮合
}
}
}

// Run 主引擎主循环: 撮合 + 产生事件 + 心跳.
func (pe *PrimaryEngine) Run() {
// 心跳 goroutine
go func() {
for range pe.heartbeat.C {
pe.emit(EngineEvent{Type: EventHeartbeat})
}
}()

// 撮合循环 (在 Sequencer 的 process 中 hook 事件产生)
pe.seq.Run()
}

// OnMatched 撮合完成后调用, 产生事件.
// 在 Sequencer.process() 中, Match() 返回后调用此方法.
func (pe *PrimaryEngine) OnMatched(order *Order, result *MatchResult) {
if result.MakerResting {
// 订单挂入簿, 产生 Accepted 事件
pe.emit(EngineEvent{
Type: EventOrderAccepted,
Order: order,
})
}
if len(result.Trades) > 0 {
// 有成交, 产生 Matched 事件
pe.emit(EngineEvent{
Type: EventOrderMatched,
Trades: result.Trades,
})
}
}

备引擎: 消费事件流 + 故障检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// StandbyEngine 消费主引擎的事件流, 维护一份镜像 Order Book.
type StandbyEngine struct {
book *OrderBook
events chan EngineEvent // 从主引擎接收事件
lastEventSeq uint64 // 最后收到的事件序号 (检测遗漏)
lastHeartbeat time.Time // 最后收到心跳的时间
state EngineState // STANDBY or ACTIVE
onPromote func() // 切换为主引擎时的回调 (通知路由层)
}

type EngineState int

const (
StateStandby EngineState = iota
StateActive
)

func NewStandbyEngine(symbol string, events chan EngineEvent, onPromote func()) *StandbyEngine {
return &StandbyEngine{
book: NewOrderBook(symbol),
events: events,
lastHeartbeat: time.Now(),
state: StateStandby,
onPromote: onPromote,
}
}

// Run 备引擎主循环: 消费事件 + 监控心跳.
func (se *StandbyEngine) Run() {
// 心跳超时检测 (独立 goroutine)
failoverTimeout := 300 * time.Millisecond // 3 次心跳未收到 → 故障
go func() {
ticker := time.NewTicker(50 * time.Millisecond) // 检测频率
defer ticker.Stop()
for range ticker.C {
if se.state == StateStandby &&
time.Since(se.lastHeartbeat) > failoverTimeout {
se.promote()
return
}
}
}()

// 事件消费循环
for event := range se.events {
se.handleEvent(event)
}
}

// handleEvent 处理单个事件, 回放到本地 Order Book.
func (se *StandbyEngine) handleEvent(event EngineEvent) {
// 检测事件是否有遗漏 (序号不连续)
if event.Type != EventHeartbeat {
if event.SeqNo != se.lastEventSeq+1 && se.lastEventSeq != 0 {
// 有遗漏! 需要从 WAL 全量恢复
// 生产环境: 触发告警, 暂停接受请求, 全量同步
fmt.Printf("[Standby] event gap: expected %d, got %d\n",
se.lastEventSeq+1, event.SeqNo)
}
se.lastEventSeq = event.SeqNo
}

switch event.Type {
case EventHeartbeat:
se.lastHeartbeat = time.Now()

case EventOrderAccepted:
// 订单挂入簿 → 在本地 Order Book 执行同样的操作
se.book.addOrder(event.Order)

case EventOrderMatched:
// 成交 → 更新本地 Order Book 中对应订单的数量
for _, trade := range event.Trades {
se.applyTrade(trade)
}

case EventOrderCancelled:
if event.Order != nil {
_ = se.book.CancelOrder(event.Order.ID)
}
}
}

// applyTrade 将成交结果应用到本地 Order Book.
func (se *StandbyEngine) applyTrade(trade Trade) {
// 更新 bid 方订单
if bidOrder, ok := se.book.Orders[trade.BidOrderID]; ok {
bidOrder.Qty -= trade.Qty
if bidOrder.Qty <= 0 {
se.book.removeOrder(bidOrder)
}
}
// 更新 ask 方订单
if askOrder, ok := se.book.Orders[trade.AskOrderID]; ok {
askOrder.Qty -= trade.Qty
if askOrder.Qty <= 0 {
se.book.removeOrder(askOrder)
}
}
}

// promote 将备引擎提升为主引擎.
func (se *StandbyEngine) promote() {
fmt.Printf("[Standby → Primary] Failover triggered! last heartbeat: %v ago\n",
time.Since(se.lastHeartbeat))

se.state = StateActive

// 通知路由层: 把新请求发到我这里
if se.onPromote != nil {
se.onPromote()
}

// 此时 se.book 的状态和主引擎最后一刻一致
// 可以基于这个 Order Book 启动新的 Sequencer 开始处理请求
}

完整启动流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
symbol := "ETH-USDC"
eventCh := make(chan EngineEvent, 4096)

// 启动主引擎
primary := NewPrimaryEngine(symbol, 1024)
primary.AddReplica(eventCh)
primary.Run()

// 启动备引擎
standby := NewStandbyEngine(symbol, eventCh, func() {
// 通知路由层切换
// 实际生产: 更新 K8s Service endpoint / DNS / etcd leader key
fmt.Println("[Router] Switching traffic to standby engine")
})
go standby.Run()

// 正常情况:
// 主引擎处理请求 → 产生事件 → eventCh → 备引擎回放
// 备引擎的 Order Book 始终是主引擎的镜像
//
// 故障切换:
// 主引擎崩溃 → 心跳停止 → 备引擎 300ms 后检测到
// → promote() → 备引擎变成新的主引擎 → 路由层切换流量
// → 用户感知: 最多 ~300ms 中断
}

生产级补充 (本文不实现, 列出要点):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1. 路由层切换
K8s: 更新 Service 的 endpoint (备 Pod IP 替换主 Pod IP)
etcd: Leader Election: 主引擎持有 lease, 崩溃后 lease 过期, 备引擎竞选
DNS: 更新 A 记录 (TTL 要极短, 否则切换慢)

2. 脑裂防护 (Split Brain)
主引擎没挂, 只是网络抖动 → 备引擎以为主挂了 → 两个都变成 Active
→ 两个引擎同时撮合 → 订单重复成交 → 灾难

解决:
Fencing Token: 每次 promote 递增一个 token
路由层只接受最新 token 的引擎
旧主引擎恢复后发现自己的 token 过期 → 自动降级为 Standby

3. 全量同步 (Full Sync)
备引擎启动时 Order Book 是空的, 不能只靠事件流 (会遗漏之前的状态)
需要: 主引擎定期做 Snapshot (序列化完整 Order Book)
备引擎启动流程: 加载最新 Snapshot → 从 Snapshot 对应的 SeqNo 开始消费事件流

4. 多备引擎
可以有多个备, 一个挂了还有下一个
类似 MySQL 的一主多从: Primary → Standby A, Standby B

与各平台的对应:

概念 本文实现 dYdX v4 Hyperliquid CEX (Binance)
主引擎 PrimaryEngine Block Proposer (轮换) Leader 验证者 主撮合服务器
备引擎 StandbyEngine 其他验证者 (回放区块) 其他验证者 热备服务器
事件流 TCP channel 区块 (CometBFT 共识) 区块 (HyperBFT) 内部 Event Log
故障切换 心跳超时 + promote 共识自动选新 Proposer 共识自动选新 Leader 手动/自动切换
脑裂防护 Fencing Token BFT 共识天然防脑裂 BFT 共识天然防脑裂 ZooKeeper / etcd

区块链的 BFT 共识 = 天然的热备: 在 dYdX / Hyperliquid 中, 所有验证者都维护一份 Order Book 副本, 共识协议自动处理 Leader 轮换和故障切换. 不需要自己实现心跳/切换/脑裂防护, 这些都被共识层解决了. 这也是为什么 “自建撮合引擎” 比 “在区块链上跑撮合” 工程复杂度更高的原因之一.

6.4 Go 特有的优化

6.4.1 对象池: 避免频繁 GC

1
2
3
4
5
6
7
8
9
10
11
// sync.Pool: 复用 Order 对象, 减少 GC 压力
// 撮合引擎每秒创建/销毁几万个 Order, 不复用 → GC 频繁 → 延迟抖动
var orderPool = sync.Pool{
New: func() any { return new(Order) },
}

// 从池中获取, 而不是 new
order := orderPool.Get().(*Order)
// 用完后归还 (归还前清零, 避免脏数据)
*order = Order{}
orderPool.Put(order)

6.4.2 Channel 异步流水线

1
2
3
4
5
6
7
// Go channel 天然适合撮合引擎的多阶段流水线:
// 每个阶段一个 goroutine, 用 channel 连接
type Pipeline struct {
incoming chan *Order // 接收阶段 → 验证阶段
validated chan *Order // 验证阶段 → 撮合阶段
trades chan []Trade // 撮合阶段 → 推送阶段
}

6.4.3 CPU 绑核 (Linux)

Go 绑核需要两步: 先锁 goroutine 到 OS 线程, 再绑线程到 CPU 核心.

1
2
3
4
5
6
7
8
9
10
为什么需要两步?

Go 的 goroutine 默认在多个 OS 线程间调度:
goroutine A → 可能跑在 thread 1, 下一秒跑在 thread 3

CPU affinity 绑定的是 OS 线程, 不是 goroutine:
1. LockOSThread() → goroutine 不再迁移到其他线程
2. SchedSetaffinity() → 线程不再迁移到其他 CPU 核心

不先锁线程, 绑核无效, goroutine 换了线程, affinity 就失效了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import (
"fmt"
"runtime"

"golang.org/x/sys/unix"
)

// PinToCPU 将当前 goroutine 绑定到指定 CPU 核心.
// 只在 Linux 上有效, macOS/Windows 不支持 SchedSetaffinity.
func PinToCPU(cpuID int) error {
// Step 1: 锁定 goroutine 到当前 OS 线程
runtime.LockOSThread()

// Step 2: 设置 CPU affinity
var cpuSet unix.CPUSet
cpuSet.Zero()
cpuSet.Set(cpuID) // 只允许运行在 cpuID 这个核心上

// pid=0 表示当前线程
if err := unix.SchedSetaffinity(0, &cpuSet); err != nil {
return fmt.Errorf("sched_setaffinity cpu=%d: %w", cpuID, err)
}
return nil
}

// 使用: 撮合引擎主循环绑到 CPU 1
func (s *Sequencer) RunPinned(cpuID int) error {
if err := PinToCPU(cpuID); err != nil {
return err
}
for {
select {
case in := <-s.inbound:
s.process(in)
case <-s.done:
return nil
}
}
}

为什么绑核能提升性能:

1
2
3
4
5
6
7
8
9
10
11
12
不绑核:
OS 把线程从 CPU 0 迁移到 CPU 3
→ L1/L2 cache 全部失效 (cold cache)
→ BTree 节点要从内存重新加载, 每次 cache miss ≈ 100ns

绑核后:
线程永远在 CPU 0 上 → cache 一直是热的
→ 性能提升 2-5x, p99 抖动消失

实测 (典型):
不绑核: p99 = 15μs, 偶发 50μs+ 抖动
绑核后: p99 = 5μs, 抖动消失

生产环境 CPU 分配:

1
2
3
4
5
6
7
8
9
10
11
典型配置:
CPU 0: 系统 / 中断处理 (不要绑业务到 CPU 0)
CPU 1: Sequencer 撮合主循环 (绑核, isolcpus 隔离)
CPU 2: 网络 I/O (API, WebSocket)
CPU 3: Kafka producer / 审计流
CPU 4-7: Go runtime / GC / 其他 goroutine

配合 Linux isolcpus:
# kernel cmdline
isolcpus=1 ← 把 CPU 1 从调度器隔离, 只有显式绑定的线程能用
避免其他进程 "偷" 撮合引擎的 CPU 时间

平台兼容性:

平台 LockOSThread CPU Affinity 说明
Linux 支持 unix.SchedSetaffinity 完整支持, 生产推荐
macOS 支持 不支持 只能锁线程, 不能绑核
K8s/Docker 支持 容器层 --cpuset-cpus K8s CPU Manager static policy: requests=limits=整数 → 自动独占核心

K8s 绑核完整配置:

K8s 通过 CPU Manager 的 static policy 实现独占核心分配. 当 Pod 的 CPU requests = limits 且为整数时, kubelet 自动分配独占核心.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 1. kubelet 配置: 启用 CPU Manager static policy
# /var/lib/kubelet/config.yaml (或 kubelet 启动参数)
# 注: v1beta1 仍是主流; K8s 已推出 kubelet.config.k8s.io/v1, 按集群版本选择
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
cpuManagerPolicy: static # 默认 none, 改为 static
# 可选: 预留 CPU 给系统组件
kubeReserved:
cpu: "1" # CPU 0 留给 kubelet / 系统
systemReserved:
cpu: "1" # CPU 1 留给 OS 内核
# 应用于节点上的所有 Pod, 修改后需要:
# 1. 停止 kubelet
# 2. 删除 /var/lib/kubelet/cpu_manager_state
# 3. 重启 kubelet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 2. Pod spec: 撮合引擎 (Guaranteed QoS, 独占 CPU)
apiVersion: v1
kind: Pod
metadata:
name: matching-engine-eth
spec:
containers:
- name: engine
image: matching-engine:latest
resources:
# requests = limits 且为整数 → 触发 static policy 独占分配
requests:
cpu: "2" # 2 个完整核心
memory: "4Gi"
limits:
cpu: "2" # 必须和 requests 相同
memory: "4Gi"
env:
- name: GOMAXPROCS
value: "2" # 限制 Go 只用分配到的 2 个核心
# 可选: 通过 Downward API 获取分配到的 CPU 列表
# 然后在代码里用 SchedSetaffinity 绑到具体核心
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 3. 完整 StatefulSet 示例: 撮合引擎 + 绑核 + 持久化 WAL
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: matching-engine
spec:
replicas: 3 # ETH, BTC, SOL 各一个实例
selector:
matchLabels:
app: matching-engine
template:
metadata:
labels:
app: matching-engine
spec:
# 调度到有 static CPU Manager 的节点
nodeSelector:
cpu-manager: "static"
containers:
- name: engine
image: matching-engine:latest
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "2"
memory: "4Gi"
env:
- name: GOMAXPROCS
value: "2"
- name: SYMBOL
# 从 StatefulSet 序号推导交易对 (engine-0=ETH, engine-1=BTC, ...)
valueFrom:
fieldRef:
fieldPath: metadata.name
volumeMounts:
- name: wal-storage
mountPath: /data/wal # WAL 持久化目录
# PVC: WAL 文件需要持久化存储, Pod 重建后数据不丢
volumeClaimTemplates:
- metadata:
name: wal-storage
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: ssd # SSD, 因为 WAL fsync 对 IOPS 敏感
resources:
requests:
storage: 10Gi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
K8s 绑核的工作原理:

static CPU Manager 分配流程:
1. Pod 声明 cpu requests=limits=2 (整数, Guaranteed QoS)
2. kubelet 从可用 CPU 池中分配 2 个独占核心 (如 CPU 4, CPU 5)
3. 通过 cgroup cpuset 限制容器只能使用这两个核心
4. 其他 Pod 不会被调度到这两个核心上

容器内代码进一步绑核:
K8s 分配了 CPU 4 5 → 容器只能用这两个
代码里 PinToCPU(0) → 实际绑到容器的第一个核心 (CPU 4)
→ 双重保障: K8s 层独占 + 代码层绑核

GOMAXPROCS 的作用:
Go 默认 GOMAXPROCS = 机器总核心数 (如 64)
但容器只分配了 2 个核心 → Go 创建 64 个线程争抢 2 个核心
→ 大量上下文切换, 性能暴跌
设置 GOMAXPROCS=2 → Go 只创建 2 个 P (处理器), 匹配实际核心数

6.4.4 预分配 slice

1
2
// 预估单次撮合最多 64 笔成交, 避免 append 反复扩容
trades := make([]Trade, 0, 64)

6.5 DEX 特有的扩容考量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
DEX 和 CEX 的扩容面临不同约束:

CEX:
- 单机房, 内网通信, 延迟 < 1ms
- 可以自由选择硬件 (高频交易用 FPGA)
- 不需要共识, 撮合结果即最终结果

dYdX v4:
- 撮合在验证者内存中, 但结果要走 CometBFT 共识
- 出块时间 ~1-2s 是硬瓶颈 (不是撮合速度, 而是共识速度)
- 扩容方向: 优化共识 (减少通信轮次), 而非优化撮合

Hyperliquid:
- 每个订单都上链, 共识层就是撮合层
- HyperBFT ~200ms 出块, 100k+ orders/sec
- 扩容方向: 优化共识吞吐 (HyperBFT 针对订单簿场景优化)

总结:
CEX 瓶颈: 撮合引擎本身 → 优化数据结构和单机性能
DEX 瓶颈: 共识层 → 优化共识协议, 撮合引擎不是瓶颈

七、小结

7.1 核心要点

维度 要点
撮合规则 Price-Time Priority: 价格优先, 同价时间优先
数据结构 BTree (缓存友好, tidwall/btree 泛型) > 红黑树 (理论优雅但缓存差)
订单簿结构 BTree 管理价格档位 + FIFO 链表管理同价订单 + HashMap 全局索引
单机优化 单线程 + CPU 绑核 + 对象池 + Ring Buffer
水平扩展 按交易对分片 (天然无依赖)
高可用 Event Sourcing + 热备引擎
CEX vs DEX CEX 瓶颈在撮合引擎; DEX 瓶颈在共识层

7.2 延伸阅读

1
2
3
4
5
- LMAX Disruptor: 低延迟撮合引擎的经典设计 (Java, 开源)
- tidwall/btree: Go BTree 实现, 支持泛型 (github.com/tidwall/btree)
- dYdX v4 源码: 订单撮合逻辑 (Go, 开源)
github.com/dydxprotocol/v4-chain → protocol/x/clob/
- Hyperliquid: 撮合在共识层, 代码未开源, 但架构文档可参考

相关阅读:


永续合约 09 - 撮合引擎原理与 Go 实现
https://mritd.com/2025/11/08/perp-matching-engine/
作者
Kovacs
发布于
2025年11月8日
许可协议