行情聚合

SODOS Exchange的行情聚合服务负责从外部交易所(主要是 Binance)订阅实时行情数据,聚合处理后通过 WebSocket 推送给客户端。系统采用 Go 语言开发,由三个核心模块组成
  • ws-gateway:WebSocket 服务端,处理客户端连接和数据推送
  • market-data:市场数据服务,订阅外部行情并进行聚合处理
  • kline-sync:K线历史数据回填工具

系统架构

┌─────────────────────────────────────────────────────────────┐
│                  External Exchange (Binance)                 │
│             Ticker / Trade / Depth WebSocket API             │
└────────────────────┬────────────────────────────────────────┘
                     │ WebSocket Subscribe
                     ▼
┌─────────────────────────────────────────────────────────────┐
│                     market-data Service                      │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │ Ticker Worker│  │ Trade Worker │  │ Kline Agg    │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │ Depth Worker │  │ Inner Agg    │  │ Ticker Saver │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
└────────────────────┬────────────────────────────────────────┘
                     │ Redis Pub/Sub + Elasticsearch
                     ▼
┌─────────────────────────────────────────────────────────────┐
│                      ws-gateway Service                      │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │ Ticker Agg   │  │ Depth Agg    │  │ Trade Agg    │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
└────────────────────┬────────────────────────────────────────┘
                     │ WebSocket Push
                     ▼
           ┌─────────────────────┐
           │   Client Browsers   │
           │   & Mobile Apps     │
           └─────────────────────┘

WebSocket 服务端(ws-gateway)

职责

WebSocket 服务端负责:

  • 管理客户端 WebSocket 连接
  • 订阅 Redis 频道接收行情数据
  • 将行情数据推送给订阅的客户端
  • 处理内部推送请求(HTTP 接口)

核心组件

1. WebSocket Server

  • 监听端口:默认 :9602(可通过 WS_ADDR 环境变量配置)
  • 路由
  • /ws - 客户端 WebSocket 连接入口
  • /internal/push/user - 内部用户推送接口
  • /internal/push/msgpack - 内部 Msgpack 格式推送接口

2. 四大聚合器

Ticker 聚合器
  • 订阅频道marketdata:ticker:inner(内部)、marketdata:ticker:out(外部)
  • 功能
  • 合并内外部 ticker 数据
  • 内部数据:每日 8:00 AM 重置的高低价、成交量
  • 外部数据:Binance 实时价格、涨跌幅
  • 每秒批量推送,每批最多 50 条
  • 数据合并逻辑
if 内部数据有效 (未过 8:00 AM 重置点):
  使用内部的 high/low/volume
  使用外部的 last_price/open_price
else:
  全部使用外部数据
Depth 聚合器
  • 订阅频道marketdata:depth:{market_type} (市场类型:1=现货,5=合约)
  • 功能
  • 接收深度数据(买卖盘口)
  • 多 Worker 并发推送(4个主 Worker + 2个重试 Worker)
  • 超时重试机制(最多 3 次,延迟 150ms)
  • 队列缓冲(主队列 2048,重试队列 1024)
Trade 聚合器
  • 订阅频道marketdata:trade:{symbol}:{market_type}
  • 功能
  • 接收实时成交数据
  • 按币种推送给订阅的客户端
  • 支持历史成交记录回放
MarketPrice 聚合器
  • 订阅频道marketdata:market_price
  • 功能
  • 推送最新市场价格
  • 用于价格指示器和快速报价

连接管理

  • 币种映射刷新:默认每 300 秒从数据库重新加载(可通过 CURRENCY_MAP_REFRESH_SECONDS 配置)
  • 订阅管理:客户端可订阅特定币种和市场类型的行情
  • 数据格式:使用 Msgpack 二进制格式压缩,减少带宽消耗

市场数据服务(market-data)

职责

市场数据服务负责:

  • 订阅外部交易所 WebSocket 行情
  • 聚合处理行情数据
  • 生成 K 线数据
  • 将数据发布到 Redis 和 Elasticsearch

外部行情订阅

Ticker Worker(现货/合约)

SpotTickerWorkerMarginTickerWorker 订阅 Binance 的 24 小时行情数据:

  • 连接地址wss://stream.binance.com/ws/{symbol}@ticker
  • 订阅数据
  • 开盘价(o)、最高价(h)、最低价(l)、收盘价(c)
  • 价格变化(p)、价格变化百分比(P)
  • 24小时成交量(v)、最后成交量(Q)
  • 数据映射:通过数据库中的 index_symbol 将外部交易对映射到内部 currency_id
  • 发布频道marketdata:ticker:out
  • 重订阅机制:60 秒未收到消息自动重新订阅
  • 心跳维持:每 15 秒发送 Ping

Trade Worker(现货/合约)

SpotTradeWorkerMarginTradeWorker 订阅 Binance 的实时成交数据:

  • 连接地址wss://stream.binance.com/ws/{symbol}@trade
  • 订阅数据
  • 成交价格(p)、成交数量(q)
  • 成交时间(T,毫秒时间戳)
  • 买卖方向(m,true=卖方吃单)
  • 秒级聚合
  • 按秒聚合同方向的成交数据
  • 累计数量,保留最后价格
  • 跨秒边界时发送聚合结果
  • 发布频道
  • 全局:marketdata:trade:{market_type}
  • 单币种:marketdata:outer_trade:{symbol}:{market_type}

聚合示例

原始成交流(1秒内):
  1234567890100ms: 买 0.01 BTC @ 50000
  1234567890300ms: 买 0.02 BTC @ 50001
  1234567890800ms: 买 0.03 BTC @ 50002

聚合后:
  买方向成交: 0.06 BTC @ 50002 (最后价格)
  成交时间: 1234567890800ms

K线聚合(KlineAggregator)

K线聚合器从成交数据实时生成 K 线,无需等待周期结束。

支持周期

13 个周期:1m3m5m15m30m1h2h4h6h12h1d1w1M

聚合逻辑

  1. 订阅成交频道marketdata:trade:1(现货)、marketdata:trade:5(合约)
  2. 成交数据应用
  • 根据成交时间计算所属周期窗口
  • 更新 OHLCV:
* Open(开盘价):首笔成交价格
* High(最高价):周期内最高价
* Low(最低价):周期内最低价
* Close(收盘价):最新成交价
* Volume(成交量):累计成交量
* Quote Volume(成交额):累计成交额
* Count(成交笔数):成交次数

3. 周期窗口计算

  • 短周期(1m~12h):UTC 时区对齐
  • 日线(1d):UTC+8 时区,每天 8:00 AM 开始
  • 周线(1w):UTC+8 时区,每周一 8:00 AM 开始
  • 月线(1M):UTC+8 时区,每月1日 8:00 AM 开始
  1. 多任务定时器
  • 2秒:刷新 Redis 聚合缓存(kline:agg:{symbol}:{market_type}:{period}
  • 1秒:持久化已完成的 K 线到 Elasticsearch
  • 1秒:检查周期边界,生成空蜡烛
  • 2分钟:持久化进行中的长周期 K 线(≥3m)
  1. Elasticsearch 存储
  • 索引命名kline_{symbol}_{market_type}(如 kline_btcusdt_1
  • 文档 ID{symbol}_{market_type}_{period}_{open_time}
  • 批量写入:每批最多 2000 条或 4MB
  • 重试机制:失败 3 次指数退避,记录到 logs/kline_es_error.log

空蜡烛生成

当检测到周期结束但没有新成交时,自动生成"空蜡烛":

Open = High = Low = Close = 上一根K线的收盘价
Volume = Quote Volume = Count = 0

ES 种子数据加载

启动时从 Elasticsearch 加载每个币种、市场、周期的最新 K 线:

  • 用于初始化 prevClose(上一收盘价)
  • 确保开盘价连续性
  • 超时时间:3 秒

内部数据聚合(InnerDataAggregator)

处理交易所内部成交数据(撮合引擎生成):

  • 订阅频道marketdata:inner_trade
  • 功能
  • 计算内部的 ticker 数据(每日 8:00 AM 重置)
  • 维护 high_pricelow_pricevolume
  • 发布到 marketdata:ticker:inner

批量保存器

TickerBatchSaver

  • 订阅频道marketdata:ticker:out
  • 功能:批量写入 ticker 数据到 Redis Hash
  • 键格式ticker:hash:{market_type}
  • 字段格式{currency_id} -> JSON

TradePriceSaver

  • 订阅频道marketdata:trade_price
  • 功能:批量保存最新成交价和时间戳
  • 键格式currency:{currency_id}:last_pricecurrency:{currency_id}:last_trade_time

K线同步工具(kline-sync)

功能

从 Binance API 批量获取历史 K 线数据并回填到 Elasticsearch,用于:

  • 初始化系统时加载历史数据
  • 补充缺失的 K 线记录
  • 定期更新长周期 K 线

使用方式

默认模板回填

go run ./cmd/kline-sync

使用内置模板,回填不同周期的历史数据:

  • 1m: 7天 | 3m: 21天 | 5m: 35天
  • 15m: 90天 | 30m: 180天 | 1h: 360天
  • 2h/4h/6h/12h: 720天 | 1d: 365天
  • 1w: 520天 | 1M: 600天

指定币种和周期

go run ./cmd/kline-sync --symbols=BTCUSDT,ETHUSDT --periods=1m,1h --days=30

参数说明

参数说明示例
--symbols币对列表(逗号分隔),留空则加载所有启用币种--symbols=BTCUSDT,ETHUSDT
--periods周期列表(逗号分隔)--periods=1m,1h,1d
--days对应周期的天数,可只给一个值应用于所有周期--days=30--days=7,30,90
--template使用预设模板(如 default--template=default
--proxyHTTP 代理地址--proxy=http://127.0.0.1:7890
--enable-proxy强制启用代理--enable-proxy

同步逻辑

  1. 加载币种配置
  • 从数据库读取 currency
  • 过滤出 --symbols 指定的币种(若无则全部)
  • 获取 index_symbol(Binance 交易对名称)
  1. 查询已有数据
  • 查询 ES 中该币种/市场/周期的最早 K 线
  • 以此为终点向更旧时间回溯
  1. 批量获取
  • 调用 Binance Klines API:https://api.binance.com/api/v3/klines
  • 参数:symbolintervalstartTimeendTimelimit=1000
  • 每次最多获取 1000 条
  1. 批量写入 ES
  • 每批 2000 条或 4MB 限制
  • 使用 Bulk API,3 次指数退避重试
  • 失败记录到 logs/kline_es_error.log
  1. 错误处理
  • 遇到 429(限流)/ 418(IP 封禁)/ 451(合规限制)自动重试
  • 仍失败则终止该周期同步,继续下一个

数据流向

Ticker 数据流

Binance Ticker WebSocket
    ↓
SpotTickerWorker / MarginTickerWorker
    ↓ (解析、映射 currency_id)
Redis Pub/Sub: marketdata:ticker:out
    ↓
TickerBatchSaver → Redis Hash: ticker:hash:{market_type}
    ↓
ws-gateway Ticker 聚合器
    ↓ (合并内外部数据)
WebSocket 推送给客户端

Trade → Kline 数据流

Binance Trade WebSocket
    ↓
SpotTradeWorker / MarginTradeWorker
    ↓ (秒级聚合)
Redis Pub/Sub: marketdata:trade:{market_type}
    ↓
KlineAggregator (订阅)
    ↓ (实时聚合 13 个周期)
Redis Hash: kline:agg:{symbol}:{market_type}:{period}
Elasticsearch: kline_{symbol}_{market_type}
    ↓
ws-gateway Trade 聚合器
    ↓
WebSocket 推送给客户端

Depth 数据流

Binance Depth WebSocket (由 PHP/其他服务订阅)
    ↓
Redis Pub/Sub: marketdata:depth:{market_type}
    ↓
ws-gateway Depth 聚合器
    ↓ (多 Worker 并发推送)
WebSocket 推送给客户端 (带重试机制)

关键技术点

1. 断线重连机制

所有外部 WebSocket 连接都有指数退避重连:

backoff := 500ms
for {
    err := connectAndRun()
    if err == nil {
        backoff = 500ms // 重置
        continue
    }
    time.Sleep(backoff)
    backoff = min(backoff * 2, 30s) // 最大 30 秒
}

2. 超时重订阅

每 60 秒检查是否有 symbol 超过 60 秒未收到消息,自动重新订阅:

func resubscribeTimeout() {
    timeouts := symbolsWithoutMsgFor60s()
    if len(timeouts) > 0 {
        send SUBSCRIBE message to WebSocket
    }
}

3. 多 Worker 并发推送

Depth 聚合器使用 4 个主 Worker + 2 个重试 Worker:

订阅消息 → 主队列(2048) → 4个 Worker 并发推送
                    ↓ (超时)
              重试队列(1024) → 2个 Worker 延迟重试

4. 原子操作与锁

  • Ticker 聚合器:使用 sync.Mutex 保护状态更新
  • Kline 聚合器:使用 sync.Mutex 保护 K 线数据结构
  • Trade Worker:使用 sync.Mutex 保护秒级聚合缓存

5. Context 超时控制

所有 Redis 和 Elasticsearch 操作都有超时控制:

ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err := esClient.BulkWrite(ctx, items)

6. 代理支持

通过环境变量配置 HTTP 代理:

REQUEST_PROXY=true
HTTP_PROXY=http://127.0.0.1:7890

Worker 会自动使用代理连接 Binance API(规避网络限制)。

7. Elasticsearch 模板管理

启动时自动确保 Kline 索引模板存在:

esClient.EnsureKlineTemplate()

模板定义字段类型、分片配置、副本数等。

监控与日志

日志输出

各 Worker 会输出关键日志:

[spot-ticker] starting, sources=10
[spot-ticker] dial wss://stream.binance.com/ws/btcusdt@ticker
[spot-ticker] connected
[spot-ticker] subscribe 10 streams
[spot-ticker] resubscribe 2 streams (timeout detected)
[spot-trade] publish global error: context deadline exceeded
[kline-agg] redis flush error: connection refused
[kline-agg] ES bulk write failed (attempt 2/3)

错误日志文件

K线写入 Elasticsearch 失败会记录到:

  • 文件logs/kline_es_error.log
  • 内容:失败的文档 ID、索引名称、错误信息

健康检查

ws-gateway 提供 HTTP 接口用于健康检查:

  • GET /ws - WebSocket 入口(升级协议)
  • GET /internal/push/user - 内部推送接口(用于测试连通性)

性能优化

批量操作

  • Redis 批量写入:使用 Pipeline 批量执行 HSET
  • ES 批量写入:使用 Bulk API,每批 2000 条或 4MB
  • 秒级聚合:减少 Redis Pub/Sub 消息数量

数据压缩

  • Msgpack 格式:WebSocket 推送使用 Msgpack 二进制格式,比 JSON 小 30%-50%
  • 批量推送:Ticker 每批最多 50 条,减少推送次数

连接池

  • Redis 连接池:使用 go-redis 内置连接池
  • HTTP 代理:复用 HTTP Transport 连接

内存控制

  • 有限队列:Depth 队列限制 2048+1024,防止内存溢出
  • 超时清理:旧的聚合数据定期清理

配置参数

WebSocket 服务端

环境变量说明默认值
WS_ADDRWebSocket 监听地址:9602
CURRENCY_MAP_REFRESH_SECONDS币种映射刷新间隔(秒)300

市场数据服务

环境变量说明默认值
REQUEST_PROXY是否启用 HTTP 代理false
HTTP_PROXYHTTP 代理地址-
REDIS_HOSTRedis 主机localhost
REDIS_PORTRedis 端口6379
REDIS_AUTHRedis 密码-
REDIS_DBRedis 数据库编号0

Elasticsearch

环境变量说明默认值
ES_HOSTElasticsearch 地址http://localhost:9200
ES_USERNAMEElasticsearch 用户名-
ES_PASSWORDElasticsearch 密码-

故障排查

WebSocket 连接断开

现象:客户端频繁断线重连

排查

  • 检查 Redis 连接状态:redis-cli PING
  • 查看 ws-gateway 日志是否有订阅错误
  • 检查网络防火墙设置

K线数据缺失

现象:部分周期 K 线数据为空

排查

  • 检查 market-data 服务是否运行
  • 查看 logs/kline_es_error.log 是否有写入失败
  • 手动运行 kline-sync 回填数据

Binance 连接失败

现象[spot-ticker] dial error: connection refused

排查

  • 检查网络连通性:curl https://stream.binance.com
  • 配置 HTTP 代理:REQUEST_PROXY=true HTTP_PROXY=http://proxy:port
  • 检查 Binance API 状态页面

Redis 发布失败

现象publish error: context deadline exceeded

排查

  • 检查 Redis 连接池配置
  • 增加超时时间(修改代码中的 redisPublishTimeout
  • 检查 Redis 内存是否充足

ES 写入失败

现象ES bulk write failed (attempt 3/3)

排查

  • 检查 Elasticsearch 集群状态:curl http://es:9200/_cluster/health
  • 检查索引分片状态:curl http://es:9200/_cat/indices
  • 查看 Elasticsearch 日志是否有 OOM 或磁盘满

加密数字货币交易所系统-WEB3区块链开发者 • © 2026