┌─────────────────────────────────────────────────────────────┐
│ External Exchange (Binance) │
│ Ticker / Trade / Depth WebSocket API │
└────────────────────┬────────────────────────────────────────┘
│ WebSocket Subscribe
▼
┌─────────────────────────────────────────────────────────────┐
│ market-data Service │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Ticker Worker│ │ Trade Worker │ │ Kline Agg │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Depth Worker │ │ Inner Agg │ │ Ticker Saver │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────┬────────────────────────────────────────┘
│ Redis Pub/Sub + Elasticsearch
▼
┌─────────────────────────────────────────────────────────────┐
│ ws-gateway Service │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Ticker Agg │ │ Depth Agg │ │ Trade Agg │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────┬────────────────────────────────────────┘
│ WebSocket Push
▼
┌─────────────────────┐
│ Client Browsers │
│ & Mobile Apps │
└─────────────────────┘
WebSocket 服务端负责:
:9602(可通过 WS_ADDR 环境变量配置)/ws - 客户端 WebSocket 连接入口/internal/push/user - 内部用户推送接口/internal/push/msgpack - 内部 Msgpack 格式推送接口marketdata:ticker:inner(内部)、marketdata:ticker:out(外部)if 内部数据有效 (未过 8:00 AM 重置点):
使用内部的 high/low/volume
使用外部的 last_price/open_price
else:
全部使用外部数据
marketdata:depth:{market_type} (市场类型:1=现货,5=合约)marketdata:trade:{symbol}:{market_type}marketdata:market_priceCURRENCY_MAP_REFRESH_SECONDS 配置)市场数据服务负责:
SpotTickerWorker 和 MarginTickerWorker 订阅 Binance 的 24 小时行情数据:
wss://stream.binance.com/ws/{symbol}@tickerindex_symbol 将外部交易对映射到内部 currency_idmarketdata:ticker:outSpotTradeWorker 和 MarginTradeWorker 订阅 Binance 的实时成交数据:
wss://stream.binance.com/ws/{symbol}@trademarketdata:trade:{market_type}marketdata:outer_trade:{symbol}:{market_type}聚合示例:
原始成交流(1秒内):
1234567890100ms: 买 0.01 BTC @ 50000
1234567890300ms: 买 0.02 BTC @ 50001
1234567890800ms: 买 0.03 BTC @ 50002
聚合后:
买方向成交: 0.06 BTC @ 50002 (最后价格)
成交时间: 1234567890800ms
K线聚合器从成交数据实时生成 K 线,无需等待周期结束。
13 个周期:1m、3m、5m、15m、30m、1h、2h、4h、6h、12h、1d、1w、1M
marketdata:trade:1(现货)、marketdata:trade:5(合约)* Open(开盘价):首笔成交价格
* High(最高价):周期内最高价
* Low(最低价):周期内最低价
* Close(收盘价):最新成交价
* Volume(成交量):累计成交量
* Quote Volume(成交额):累计成交额
* Count(成交笔数):成交次数
3. 周期窗口计算:
kline:agg:{symbol}:{market_type}:{period})kline_{symbol}_{market_type}(如 kline_btcusdt_1){symbol}_{market_type}_{period}_{open_time}logs/kline_es_error.log当检测到周期结束但没有新成交时,自动生成"空蜡烛":
Open = High = Low = Close = 上一根K线的收盘价
Volume = Quote Volume = Count = 0
启动时从 Elasticsearch 加载每个币种、市场、周期的最新 K 线:
prevClose(上一收盘价)处理交易所内部成交数据(撮合引擎生成):
marketdata:inner_tradehigh_price、low_price、volumemarketdata:ticker:innermarketdata:ticker:outticker:hash:{market_type}{currency_id} -> JSONmarketdata:trade_pricecurrency:{currency_id}:last_price、currency:{currency_id}:last_trade_time从 Binance API 批量获取历史 K 线数据并回填到 Elasticsearch,用于:
go run ./cmd/kline-sync
使用内置模板,回填不同周期的历史数据:
go run ./cmd/kline-sync --symbols=BTCUSDT,ETHUSDT --periods=1m,1h --days=30
| 参数 | 说明 | 示例 |
|---|---|---|
--symbols | 币对列表(逗号分隔),留空则加载所有启用币种 | --symbols=BTCUSDT,ETHUSDT |
--periods | 周期列表(逗号分隔) | --periods=1m,1h,1d |
--days | 对应周期的天数,可只给一个值应用于所有周期 | --days=30 或 --days=7,30,90 |
--template | 使用预设模板(如 default) | --template=default |
--proxy | HTTP 代理地址 | --proxy=http://127.0.0.1:7890 |
--enable-proxy | 强制启用代理 | --enable-proxy |
currency 表--symbols 指定的币种(若无则全部)index_symbol(Binance 交易对名称)https://api.binance.com/api/v3/klinessymbol、interval、startTime、endTime、limit=1000logs/kline_es_error.logBinance Ticker WebSocket
↓
SpotTickerWorker / MarginTickerWorker
↓ (解析、映射 currency_id)
Redis Pub/Sub: marketdata:ticker:out
↓
TickerBatchSaver → Redis Hash: ticker:hash:{market_type}
↓
ws-gateway Ticker 聚合器
↓ (合并内外部数据)
WebSocket 推送给客户端
Binance Trade WebSocket
↓
SpotTradeWorker / MarginTradeWorker
↓ (秒级聚合)
Redis Pub/Sub: marketdata:trade:{market_type}
↓
KlineAggregator (订阅)
↓ (实时聚合 13 个周期)
Redis Hash: kline:agg:{symbol}:{market_type}:{period}
Elasticsearch: kline_{symbol}_{market_type}
↓
ws-gateway Trade 聚合器
↓
WebSocket 推送给客户端
Binance Depth WebSocket (由 PHP/其他服务订阅)
↓
Redis Pub/Sub: marketdata:depth:{market_type}
↓
ws-gateway Depth 聚合器
↓ (多 Worker 并发推送)
WebSocket 推送给客户端 (带重试机制)
所有外部 WebSocket 连接都有指数退避重连:
backoff := 500ms
for {
err := connectAndRun()
if err == nil {
backoff = 500ms // 重置
continue
}
time.Sleep(backoff)
backoff = min(backoff * 2, 30s) // 最大 30 秒
}
每 60 秒检查是否有 symbol 超过 60 秒未收到消息,自动重新订阅:
func resubscribeTimeout() {
timeouts := symbolsWithoutMsgFor60s()
if len(timeouts) > 0 {
send SUBSCRIBE message to WebSocket
}
}
Depth 聚合器使用 4 个主 Worker + 2 个重试 Worker:
订阅消息 → 主队列(2048) → 4个 Worker 并发推送
↓ (超时)
重试队列(1024) → 2个 Worker 延迟重试
sync.Mutex 保护状态更新sync.Mutex 保护 K 线数据结构sync.Mutex 保护秒级聚合缓存所有 Redis 和 Elasticsearch 操作都有超时控制:
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err := esClient.BulkWrite(ctx, items)
通过环境变量配置 HTTP 代理:
REQUEST_PROXY=true
HTTP_PROXY=http://127.0.0.1:7890
Worker 会自动使用代理连接 Binance API(规避网络限制)。
启动时自动确保 Kline 索引模板存在:
esClient.EnsureKlineTemplate()
模板定义字段类型、分片配置、副本数等。
各 Worker 会输出关键日志:
[spot-ticker] starting, sources=10
[spot-ticker] dial wss://stream.binance.com/ws/btcusdt@ticker
[spot-ticker] connected
[spot-ticker] subscribe 10 streams
[spot-ticker] resubscribe 2 streams (timeout detected)
[spot-trade] publish global error: context deadline exceeded
[kline-agg] redis flush error: connection refused
[kline-agg] ES bulk write failed (attempt 2/3)
K线写入 Elasticsearch 失败会记录到:
logs/kline_es_error.logws-gateway 提供 HTTP 接口用于健康检查:
GET /ws - WebSocket 入口(升级协议)GET /internal/push/user - 内部推送接口(用于测试连通性)go-redis 内置连接池| 环境变量 | 说明 | 默认值 |
|---|---|---|
WS_ADDR | WebSocket 监听地址 | :9602 |
CURRENCY_MAP_REFRESH_SECONDS | 币种映射刷新间隔(秒) | 300 |
| 环境变量 | 说明 | 默认值 |
|---|---|---|
REQUEST_PROXY | 是否启用 HTTP 代理 | false |
HTTP_PROXY | HTTP 代理地址 | - |
REDIS_HOST | Redis 主机 | localhost |
REDIS_PORT | Redis 端口 | 6379 |
REDIS_AUTH | Redis 密码 | - |
REDIS_DB | Redis 数据库编号 | 0 |
| 环境变量 | 说明 | 默认值 |
|---|---|---|
ES_HOST | Elasticsearch 地址 | http://localhost:9200 |
ES_USERNAME | Elasticsearch 用户名 | - |
ES_PASSWORD | Elasticsearch 密码 | - |
现象:客户端频繁断线重连
排查:
redis-cli PING现象:部分周期 K 线数据为空
排查:
logs/kline_es_error.log 是否有写入失败kline-sync 回填数据现象:[spot-ticker] dial error: connection refused
排查:
curl https://stream.binance.comREQUEST_PROXY=true HTTP_PROXY=http://proxy:port现象:publish error: context deadline exceeded
排查:
redisPublishTimeout)现象:ES bulk write failed (attempt 3/3)
排查:
curl http://es:9200/_cluster/healthcurl http://es:9200/_cat/indices