做市策略

SODOS Exchange的做市策略服务是一个高性能、数据库驱动的做市商系统,支持多币种、多策略的并发运行。系统采用 Go 语言开发,基于 Goroutine 实现高并发处理,通过 Redis 订阅和 HTTP 连接池与撮合引擎通信。

核心特性

  • 数据库驱动配置:策略配置存储在 MySQL 数据库,支持动态加载和热更新
  • 多策略支持:通过工厂模式支持多种做市策略,当前实现外部指数跟随策略
  • 高并发架构:每个币种-市场对独立运行一个 Goroutine,策略内部采用多 Worker 模型
  • 实时价格跟随:订阅外部交易所价格,实时调整本地盘口
  • 深度维护:自动维护 10 档买卖盘口,支持可配置的价差和数量
  • 活跃度管理:定期刷新挂单、注入市价单、触发合成交易,保持市场活跃
  • 对手盘检测:识别真实用户挂单并执行扫单操作
  • 订单管理:三级队列系统(买单、卖单、撤单),异步批量处理
  • 性能优化:原子操作、连接池、批量处理、无锁价格更新

安装部署

环境要求

  • Go 版本:1.20 或更高
  • 数据库:MySQL 5.7+ 或 MySQL 8.0+
  • 缓存:Redis 5.0+
  • 操作系统:Linux / macOS / Windows
  • 依赖服务:撮合引擎(Match Engine)

配置文件

系统从项目根目录(../../../)自动加载 .env 配置文件。创建 .env 文件并配置以下参数:

# 数据库配置
DB_HOST=192.168.1.12
DB_PORT=3306
DB_DATABASE=sodos_exchange
DB_USERNAME=root
DB_PASSWORD=yourpassword

# 系统Redis配置(用于数据持久化)
REDIS_HOST=192.168.1.12
REDIS_AUTH=yourpassword
REDIS_PORT=6380
REDIS_DB=0

# 行情Redis配置(用于实时数据)
MARKET_REDIS_HOST=192.168.1.12
MARKET_REDIS_AUTH=yourpassword
MARKET_REDIS_PORT=6378
MARKET_REDIS_DB=0

# 撮合引擎端点
MATCH_ENGINE_ENDPOINT=http://match_engine:8080

数据库初始化

在 MySQL 中创建以下数据表:

1. strategy_currency 表

CREATE TABLE `strategy_currency` (
  `id` int NOT NULL AUTO_INCREMENT,
  `currency_id` int NOT NULL COMMENT '币种ID',
  `index_symbol` varchar(50) NOT NULL COMMENT '跟随外部币种符号',
  `index_source` tinyint NOT NULL DEFAULT '0' COMMENT '0内部 1外部',
  `market_status` tinyint NOT NULL DEFAULT '0' COMMENT '是否启用内部做市',
  `market_type` tinyint NOT NULL COMMENT '市场类型:1现货 2杠杆',
  `default_strategy` int NOT NULL COMMENT '默认应用策略id',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2. strategy_strategy 表

CREATE TABLE `strategy_strategy` (
  `id` int NOT NULL AUTO_INCREMENT,
  `strategy_name` varchar(100) NOT NULL COMMENT '策略名称',
  `config` json NOT NULL COMMENT '策略配置参数',
  `desc` text COMMENT '策略介绍',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

3. 插入默认策略配置

INSERT INTO strategy_strategy (strategy_name, config, `desc`) VALUES (
  'external_index_follow',
  '{
    "price_change_amount": 1.0,
    "tick_size": 0.1,
    "base_quantity": 10,
    "price_follow_percent": 100.0,
    "base_precision": 2,
    "quote_precision": 4,
    "max_active_orders": 100,
    "maintenance_interval": "1s",
    "depth_levels": [
      {"spread_percent": 0.0001, "quantity_ratio": 1.0},
      {"spread_percent": 0.0002, "quantity_ratio": 0.95},
      {"spread_percent": 0.0005, "quantity_ratio": 0.9},
      {"spread_percent": 0.001, "quantity_ratio": 0.85},
      {"spread_percent": 0.015, "quantity_ratio": 0.75},
      {"spread_percent": 0.02, "quantity_ratio": 0.65},
      {"spread_percent": 0.025, "quantity_ratio": 0.55},
      {"spread_percent": 0.03, "quantity_ratio": 0.45},
      {"spread_percent": 0.035, "quantity_ratio": 0.35},
      {"spread_percent": 0.04, "quantity_ratio": 0.25}
    ]
  }',
  '外部指数跟随策略'
);

4. 配置币种策略

INSERT INTO strategy_currency (currency_id, index_symbol, index_source, market_status, market_type, default_strategy) VALUES
(879, 'BTCUSDT', 1, 1, 1, 1),  -- BTC/USDT 现货
(880, 'ETHUSDT', 1, 1, 1, 1);  -- ETH/USDT 现货

编译构建

1. 下载依赖

cd services/market-maker
GOPROXY=https://goproxy.cn,direct go mod tidy

2. 编译可执行文件

# 编译为二进制文件
go build -o bin/market-maker cmd/market-maker/main.go

# 或使用 Makefile(如果有)
make build

3. 交叉编译(可选)

# Linux
GOOS=linux GOARCH=amd64 go build -o bin/market-maker-linux cmd/market-maker/main.go

# macOS
GOOS=darwin GOARCH=amd64 go build -o bin/market-maker-darwin cmd/market-maker/main.go

# Windows
GOOS=windows GOARCH=amd64 go build -o bin/market-maker.exe cmd/market-maker/main.go

运行服务

方式一:使用默认配置(从 .env 文件加载)

./bin/market-maker

方式二:使用命令行参数覆盖配置

./bin/market-maker \
  --http http://custom-engine:8080 \
  --api-port :9090 \
  --log custom.log

命令行参数说明

参数说明环境变量默认值
--http撮合引擎HTTP端点MATCH_ENGINE_ENDPOINThttp://localhost:8080
--api-portAPI服务端口API_PORT:8081
--log日志文件路径LOG_FILEmarket-maker.log

注:命令行参数优先级高于环境变量。

方式三:使用 systemd 管理(Linux)

创建 systemd 服务文件 /etc/systemd/system/market-maker.service

[Unit]
Description=SODOS Market Maker Service
After=network.target mysql.service redis.service

[Service]
Type=simple
User=sodos
WorkingDirectory=/opt/sodos-exchange/services/market-maker
ExecStart=/opt/sodos-exchange/services/market-maker/bin/market-maker
Restart=on-failure
RestartSec=5s
StandardOutput=append:/var/log/sodos/market-maker.log
StandardError=append:/var/log/sodos/market-maker.error.log

[Install]
WantedBy=multi-user.target

启动服务:

sudo systemctl daemon-reload
sudo systemctl enable market-maker
sudo systemctl start market-maker
sudo systemctl status market-maker

Docker 部署

1. 构建 Docker 镜像

创建 Dockerfile

FROM golang:1.20-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o market-maker cmd/market-maker/main.go

FROM alpine:latest
RUN apk --no-cache add ca-certificates tzdata
WORKDIR /root/
COPY --from=builder /app/market-maker .

EXPOSE 8081
CMD ["./market-maker"]

构建镜像:

docker build -t sodos-market-maker:latest .

2. 使用 Docker Compose

创建 docker-compose.yml

version: '3.8'

services:
  market-maker:
    image: sodos-market-maker:latest
    container_name: sodos-market-maker
    restart: unless-stopped
    ports:
      - "8081:8081"
    environment:
      - DB_HOST=mysql
      - DB_PORT=3306
      - DB_DATABASE=sodos_exchange
      - DB_USERNAME=root
      - DB_PASSWORD=yourpassword
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - REDIS_AUTH=yourpassword
      - MATCH_ENGINE_ENDPOINT=http://match-engine:8080
    depends_on:
      - mysql
      - redis
      - match-engine
    networks:
      - sodos-network

networks:
  sodos-network:
    external: true

启动服务:

docker-compose up -d

查看日志:

docker-compose logs -f market-maker

验证安装

1. 健康检查

curl http://localhost:8081/health

预期响应

{
  "status": "healthy",
  "service": "market-maker"
}

2. 查看策略列表

curl http://localhost:8081/api/v1/strategies

预期响应

[
  {
    "id": "879_1",
    "currency_id": 879,
    "symbol": "BTCUSDT",
    "market_type": 1,
    "is_running": true,
    "metrics": {
      "total_orders": 0,
      "total_volume": 0
    }
  }
]

3. 检查日志输出

tail -f market-maker.log

预期输出

2024/09/27 10:00:00 Market Maker Service starting...
2024/09/27 10:00:00 Loaded 2 currencies from database
2024/09/27 10:00:00 Strategy [879_1] initialized successfully
2024/09/27 10:00:00 Strategy [880_1] initialized successfully
2024/09/27 10:00:00 HTTP API server listening on :8081

启动流程

系统启动后会按以下流程初始化:

  1. 加载配置:从 .env 文件或命令行参数读取配置
  2. 连接数据库:连接 MySQL 数据库并验证连接
  3. 连接 Redis:连接系统 Redis 和行情 Redis
  4. 加载币种:从 currency 表读取所有启用的币种
  5. 加载策略配置:从 strategy_currencystrategy_strategy 表读取配置
  6. 创建协程:为每个币种-市场对创建独立的 CurrencyManager 协程
  7. 启动策略:如果 market_status=1,自动启动策略;否则等待 HTTP 指令
  8. 启动 HTTP API:监听指定端口,提供管理接口

常见安装问题

问题 1:数据库连接失败

错误信息

Error connecting to database: dial tcp 192.168.1.12:3306: connect: connection refused

解决方案

  • 检查数据库服务是否运行:systemctl status mysql
  • 验证数据库连接参数(主机、端口、用户名、密码)
  • 检查防火墙规则:sudo ufw allow 3306
  • 确认数据库允许远程连接(修改 bind-address 配置)

问题 2:Redis 连接失败

错误信息

Error connecting to Redis: dial tcp 192.168.1.12:6379: i/o timeout

解决方案

  • 检查 Redis 服务状态:systemctl status redis
  • 验证 Redis 密码配置
  • 检查 Redis 配置文件中的 bindprotected-mode 设置
  • 确认防火墙允许 Redis 端口

问题 3:编译错误

错误信息

go: module requires Go 1.20

解决方案

  • 升级 Go 版本:wget https://go.dev/dl/go1.20.linux-amd64.tar.gz
  • 或修改 go.mod 文件降低 Go 版本要求(不推荐)

问题 4:依赖下载失败

错误信息

go: github.com/some/package: unrecognized import path

解决方案

  • 使用国内代理:export GOPROXY=https://goproxy.cn,direct
  • 清理模块缓存:go clean -modcache
  • 重新下载依赖:go mod tidy

系统架构

整体架构

┌─────────────────────────────────────────────────────────────┐
│                      Strategy Manager                        │
│  (管理所有币种的做市策略,动态加载配置)                     │
└───────────────────┬─────────────────────────────────────────┘
                    │
        ┌───────────┴───────────┐
        │                       │
┌───────▼──────┐        ┌──────▼────────┐
│ Currency     │        │  Currency     │
│ Manager 1    │  ...   │  Manager N    │
│ (BTC/USDT)   │        │  (ETH/USDT)   │
└───────┬──────┘        └───────┬───────┘
        │                       │
┌───────▼──────────────────────▼────────┐
│        Strategy Factory                │
│  (根据配置创建具体策略实例)            │
└───────┬────────────────────────────────┘
        │
┌───────▼──────────────────────────────┐
│  External Index Follow Strategy      │
│  (外部指数跟随策略 - 7个Worker)      │
├──────────────────────────────────────┤
│ • Buy Order Worker                   │
│ • Sell Order Worker                  │
│ • Cancel Order Worker                │
│ • Depth Maintenance Worker           │
│ • Depth Monitor Worker               │
│ • Order Cleanup Worker               │
│ • Market Activity Worker             │
└──────┬───────────────────────────────┘
       │
       ├─── Redis Pub/Sub ────┐
       │                       │
       └─── HTTP Pool ─────────┤
                               │
                    ┌──────────▼──────────┐
                    │   Match Engine      │
                    │   (撮合引擎)        │
                    └─────────────────────┘

核心组件

1. StrategyManager(策略管理器)

策略管理器是整个系统的核心调度器,负责:

  • 从数据库加载策略和币种配置
  • 为每个币种-市场对创建独立的 CurrencyManager
  • 管理策略的启动、停止、重载
  • 提供 HTTP 管理接口

关键方法

func (sm *StrategyManager) Initialize(ctx context.Context) error
func (sm *StrategyManager) StartStrategy(symbol string, marketType int8) error
func (sm *StrategyManager) StopStrategy(symbol string, marketType int8) error
func (sm *StrategyManager) ReloadStrategy(symbol string, marketType int8) error
func (sm *StrategyManager) GetStrategyStatus(symbol string, marketType int8) (string, error)

2. CurrencyManager(币种管理器)

每个币种-市场对对应一个 CurrencyManager,运行在独立的 Goroutine 中:

  • 加载该币种的所有启用策略配置
  • 通过 StrategyFactory 创建策略实例
  • 启动策略的 Goroutine
  • 监控策略运行状态

一币种一 Goroutine 模型

func (cm *CurrencyManager) Run(ctx context.Context) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // 周期性检查和更新策略状态
        }
    }
}

3. StrategyFactory(策略工厂)

采用工厂模式创建不同类型的策略实例:

func CreateStrategy(config *models.StrategyCurrency) (Strategy, error) {
    switch config.StrategyName {
    case "external_index_follow":
        return external_index.NewStrategy(config)
    // 未来可扩展其他策略类型
    default:
        return nil, fmt.Errorf("unknown strategy: %s", config.StrategyName)
    }
}

4. Strategy Interface(策略接口)

所有策略必须实现统一接口:

type Strategy interface {
    ID() string
    Name() string
    Symbol() string
    MarketType() int8
    Initialize(ctx context.Context) error
    Start() error
    Stop() error
    IsRunning() bool
    OnTick(ctx context.Context) error
    OnPriceUpdate(price float64) error
    OnDepthUpdate(depth interface{}) error
    OnOrderUpdate(order interface{}) error
    OnTradeUpdate(trade interface{}) error
    GetConfig() interface{}
    UpdateConfig(config interface{}) error
    GetMetrics() *Metrics
}

外部指数跟随策略(External Index Follow Strategy)

这是当前实现的主力做市策略,通过订阅外部交易所(如 Binance)的价格,在本地交易所维护对应的买卖盘口。

策略架构

7 个 Worker Goroutine

  1. Buy Order Worker(买单处理器)
  • 从买单队列消费订单请求
  • 批量提交买单到撮合引擎
  • 记录订单 ID 用于追踪
  1. Sell Order Worker(卖单处理器)
  • 从卖单队列消费订单请求
  • 批量提交卖单到撮合引擎
  • 记录订单 ID 用于追踪
  1. Cancel Order Worker(撤单处理器)
  • 从撤单队列消费撤单请求
  • 批量提交撤单到撮合引擎
  • 清理已撤订单记录
  1. Depth Maintenance Worker(深度维护器)
  • 每 500ms 执行一次
  • 根据当前价格计算 10 档买卖盘口
  • 撤销旧订单,挂出新订单
  • 维持盘口稳定性
  1. Depth Monitor Worker(深度监控器)
  • 每 300ms 检查盘口状态
  • 检测对手盘(真实用户挂单)
  • 触发扫单操作,模拟真实交易
  1. Order Cleanup Worker(订单清理器)
  • 每 10 秒执行一次
  • 清理超时订单(超过 5 分钟未成交)
  • 截断订单列表(保留最新 10000 个)
  1. Market Activity Worker(市场活跃维护器)
  • 每 10 秒刷新一批订单(重新挂单)
  • 每 300ms 注入一个市价单
  • 每 2 秒触发一次合成交易(Synthetic Trade)

价格更新机制

无锁原子价格更新

使用 unsafe.Pointeratomic.StoreUint64 实现无锁价格更新:

type Strategy struct {
    currentPrice unsafe.Pointer // *float64
}

func (s *Strategy) OnPriceUpdate(price float64) error {
    atomic.StorePointer(&s.currentPrice, unsafe.Pointer(&price))
    return nil
}

func (s *Strategy) getCurrentPrice() float64 {
    ptr := atomic.LoadPointer(&s.currentPrice)
    if ptr == nil {
        return 0
    }
    return *(*float64)(ptr)
}

优势

  • 避免锁竞争,提高并发性能
  • 7 个 Worker 可以无阻塞读取最新价格
  • 价格更新频率可达 10000+ QPS

深度维护算法

10 档盘口生成

以当前价格为基准,生成 10 档买卖盘口:

// 卖单:价格递增
for i := 0; i < 10; i++ {
    priceOffset := float64(i+1) * config.SpreadPercent * currentPrice / 100
    sellPrice := currentPrice + priceOffset
    quantity := config.BaseQuantity * (1 + rand.Float64()*0.2) // ±20% 随机波动

    // 提交卖单
}

// 买单:价格递减
for i := 0; i < 10; i++ {
    priceOffset := float64(i+1) * config.SpreadPercent * currentPrice / 100
    buyPrice := currentPrice - priceOffset
    quantity := config.BaseQuantity * (1 + rand.Float64()*0.2)

    // 提交买单
}

配置参数

  • SpreadPercent:每档价差百分比(如 0.1% = 0.001)
  • BaseQuantity:基础挂单数量
  • 数量随机波动 ±20%,模拟真实市场

盘口刷新策略

func (s *Strategy) maintainDepth() {
    // 1. 撤销所有旧订单
    for _, orderID := range s.activeOrders {
        s.cancelQueue <- orderID
    }
    s.activeOrders = []string{}

    // 2. 等待撤单完成
    time.Sleep(100 * time.Millisecond)

    // 3. 挂出新订单
    currentPrice := s.getCurrentPrice()
    for i := 0; i < 10; i++ {
        // 生成买卖单
    }
}

活跃度维护

订单刷新(Order Refresh)

每 10 秒选择 10% 的订单进行刷新(撤单重挂):

func (s *Strategy) refreshOrders() {
    refreshCount := len(s.activeOrders) / 10
    for i := 0; i < refreshCount; i++ {
        orderID := s.activeOrders[rand.Intn(len(s.activeOrders))]
        s.cancelQueue <- orderID
        // 重新挂单
    }
}

市价单注入(Market Order Injection)

每 300ms 注入一个小额市价单,触发真实成交:

func (s *Strategy) injectMarketOrder() {
    side := rand.Intn(2) // 0=buy, 1=sell
    quantity := config.MarketOrderSize // 小额,如 0.001 BTC

    if side == 0 {
        // 提交市价买单
    } else {
        // 提交市价卖单
    }
}

合成交易(Synthetic Trade)

每 2 秒触发一次合成交易,模拟真实交易流水:

func (s *Strategy) triggerSyntheticTrade() {
    currentPrice := s.getCurrentPrice()
    quantity := config.SyntheticTradeSize

    // 1. 挂限价买单
    buyOrderID := s.submitOrder(ORDER_BUY, currentPrice, quantity)

    // 2. 立即挂限价卖单(相同价格),触发撮合
    sellOrderID := s.submitOrder(ORDER_SELL, currentPrice, quantity)

    // 3. 生成成交记录
}

对手盘检测与扫单

对手盘识别

通过深度监控器检测非策略挂单:

func (s *Strategy) detectOpponentOrders(depth *Depth) []OpponentOrder {
    opponents := []OpponentOrder{}

    for _, order := range depth.Asks {
        if !s.isMyOrder(order.OrderID) {
            opponents = append(opponents, OpponentOrder{
                Side: ORDER_SELL,
                Price: order.Price,
                Quantity: order.Quantity,
                OrderID: order.OrderID,
            })
        }
    }

    // 同样检查买单
    return opponents
}

扫单逻辑

当检测到对手盘时,提交市价单吃掉对手挂单:

func (s *Strategy) sweepOpponentOrders(opponents []OpponentOrder) {
    for _, opp := range opponents {
        // 只扫单价格在合理范围内的订单
        priceDeviation := math.Abs(opp.Price - s.getCurrentPrice()) / s.getCurrentPrice()
        if priceDeviation > config.MaxSweepPriceDeviation {
            continue
        }

        // 提交市价单
        if opp.Side == ORDER_SELL {
            s.submitMarketOrder(ORDER_BUY, opp.Quantity)
        } else {
            s.submitMarketOrder(ORDER_SELL, opp.Quantity)
        }
    }
}

风控参数

  • MaxSweepPriceDeviation:最大扫单价格偏离度(如 2%)
  • MaxSweepQuantity:单次扫单最大数量
  • SweepCooldown:扫单冷却时间

订单队列系统

三级队列架构

type Strategy struct {
    buyQueue    chan OrderRequest    // 买单队列,容量 10000
    sellQueue   chan OrderRequest    // 卖单队列,容量 10000
    cancelQueue chan string          // 撤单队列,容量 10000
}

批量处理

Worker 从队列批量消费,减少网络开销:

func (s *Strategy) processBuyOrders() {
    batch := []OrderRequest{}
    batchSize := 100

    for {
        select {
        case order := <-s.buyQueue:
            batch = append(batch, order)

            if len(batch) >= batchSize {
                // 批量提交
                s.submitBatchOrders(batch)
                batch = []OrderRequest{}
            }

        case <-time.After(100 * time.Millisecond):
            // 超时也提交
            if len(batch) > 0 {
                s.submitBatchOrders(batch)
                batch = []OrderRequest{}
            }
        }
    }
}

性能优化

1. HTTP 连接池

type MatchEngineClient struct {
    httpClient *http.Client
}

func NewMatchEngineClient() *MatchEngineClient {
    return &MatchEngineClient{
        httpClient: &http.Client{
            Timeout: 5 * time.Second,
            Transport: &http.Transport{
                MaxIdleConns:        100,
                MaxIdleConnsPerHost: 100,
                IdleConnTimeout:     90 * time.Second,
            },
        },
    }
}

2. 原子操作避免锁

// 使用 atomic 而非 mutex
atomic.StoreUint64(&s.totalOrders, newValue)
atomic.AddUint64(&s.successOrders, 1)

3. 订单截断

定期清理旧订单,防止内存泄漏:

func (s *Strategy) truncateOrders() {
    if len(s.activeOrders) > 10000 {
        s.activeOrders = s.activeOrders[len(s.activeOrders)-10000:]
    }
}

4. 批量数据库操作

// 批量加载配置
configs, err := db.LoadAllStrategyCurrencies()

// 批量更新指标
db.BatchUpdateMetrics(metrics)

配置参数

策略配置(strategy_strategy 表)

字段类型说明
idint策略 ID
namestring策略名称(如 external_index_follow)
descriptionstring策略描述
enabledbool是否启用
config_schemajson配置 Schema(JSON Schema 格式)
created_attimestamp创建时间
updated_attimestamp更新时间

币种配置(strategy_currency 表)

字段类型说明
idint配置 ID
strategy_idint关联的策略 ID
symbolstring交易对符号(如 BTC/USDT)
market_typeint8市场类型(1=币币,2=合约)
enabledbool是否启用
configjson策略配置参数
created_attimestamp创建时间
updated_attimestamp更新时间

外部指数跟随策略配置示例

{
  "external_exchange": "binance",
  "external_symbol": "BTCUSDT",
  "spread_percent": 0.001,
  "base_quantity": 0.1,
  "max_order_quantity": 1.0,
  "depth_levels": 10,
  "refresh_interval_ms": 500,
  "market_order_interval_ms": 300,
  "market_order_size": 0.001,
  "synthetic_trade_interval_ms": 2000,
  "synthetic_trade_size": 0.01,
  "order_refresh_interval_sec": 10,
  "order_refresh_percent": 0.1,
  "opponent_detection_enabled": true,
  "max_sweep_price_deviation": 0.02,
  "max_sweep_quantity": 0.5,
  "sweep_cooldown_ms": 1000,
  "order_timeout_sec": 300,
  "max_active_orders": 10000
}

参数说明

  • external_exchange:外部交易所名称
  • external_symbol:外部交易对符号
  • spread_percent:每档价差百分比(0.001 = 0.1%)
  • base_quantity:基础挂单数量
  • max_order_quantity:单笔订单最大数量
  • depth_levels:维护的盘口档数(10 档)
  • refresh_interval_ms:深度刷新间隔(毫秒)
  • market_order_interval_ms:市价单注入间隔(毫秒)
  • market_order_size:市价单大小
  • synthetic_trade_interval_ms:合成交易间隔(毫秒)
  • synthetic_trade_size:合成交易大小
  • order_refresh_interval_sec:订单刷新间隔(秒)
  • order_refresh_percent:每次刷新订单百分比(0.1 = 10%)
  • opponent_detection_enabled:是否启用对手盘检测
  • max_sweep_price_deviation:最大扫单价格偏离度
  • max_sweep_quantity:单次扫单最大数量
  • sweep_cooldown_ms:扫单冷却时间(毫秒)
  • order_timeout_sec:订单超时时间(秒)
  • max_active_orders:最大活跃订单数

HTTP 管理接口

启动策略

POST /api/v1/strategy/start
Content-Type: application/json

{
  "symbol": "BTC/USDT",
  "market_type": 1
}

响应

{
  "code": 0,
  "message": "Strategy started successfully",
  "data": {
    "strategy_id": "external_index_follow_BTC/USDT_1",
    "status": "running"
  }
}

停止策略

POST /api/v1/strategy/stop
Content-Type: application/json

{
  "symbol": "BTC/USDT",
  "market_type": 1
}

重载策略

POST /api/v1/strategy/reload
Content-Type: application/json

{
  "symbol": "BTC/USDT",
  "market_type": 1
}

说明:重载会先停止策略,重新加载配置,然后启动。

查询策略状态

GET /api/v1/strategy/status?symbol=BTC/USDT&market_type=1

响应

{
  "code": 0,
  "message": "Success",
  "data": {
    "strategy_id": "external_index_follow_BTC/USDT_1",
    "strategy_name": "external_index_follow",
    "symbol": "BTC/USDT",
    "market_type": 1,
    "status": "running",
    "current_price": 45000.50,
    "metrics": {
      "total_orders": 15234,
      "success_orders": 15180,
      "failed_orders": 54,
      "total_trades": 1523,
      "total_volume": 152.34,
      "active_orders": 20,
      "uptime_seconds": 86400
    },
    "config": {
      "spread_percent": 0.001,
      "base_quantity": 0.1
    }
  }
}

列出所有策略

GET /api/v1/strategy/list

响应

{
  "code": 0,
  "message": "Success",
  "data": [
    {
      "symbol": "BTC/USDT",
      "market_type": 1,
      "strategy_name": "external_index_follow",
      "status": "running"
    },
    {
      "symbol": "ETH/USDT",
      "market_type": 1,
      "strategy_name": "external_index_follow",
      "status": "running"
    }
  ]
}

更新策略配置

POST /api/v1/strategy/config/update
Content-Type: application/json

{
  "symbol": "BTC/USDT",
  "market_type": 1,
  "config": {
    "spread_percent": 0.002,
    "base_quantity": 0.2
  }
}

说明:配置更新后需要执行 /reload 才能生效。

Redis 通信协议

订阅价格更新

频道price:update:{exchange}:{symbol}

示例price:update:binance:BTCUSDT

消息格式

{
  "exchange": "binance",
  "symbol": "BTCUSDT",
  "price": 45000.50,
  "timestamp": 1699999999999
}

策略处理

func (s *Strategy) subscribePriceUpdates() {
    channel := fmt.Sprintf("price:update:%s:%s",
        config.ExternalExchange, config.ExternalSymbol)

    pubsub := redisClient.Subscribe(ctx, channel)
    defer pubsub.Close()

    for msg := range pubsub.Channel() {
        var priceUpdate PriceUpdate
        json.Unmarshal([]byte(msg.Payload), &priceUpdate)
        s.OnPriceUpdate(priceUpdate.Price)
    }
}

订阅深度更新

频道depth:update:{exchange}:{symbol}

消息格式

{
  "exchange": "binance",
  "symbol": "BTCUSDT",
  "bids": [
    [45000.50, 1.2],
    [45000.00, 2.5]
  ],
  "asks": [
    [45001.00, 1.1],
    [45001.50, 2.3]
  ],
  "timestamp": 1699999999999
}

监控与运维

关键指标

策略运行时会收集以下指标:

type Metrics struct {
    TotalOrders    uint64  // 总下单数
    SuccessOrders  uint64  // 成功订单数
    FailedOrders   uint64  // 失败订单数
    TotalTrades    uint64  // 总成交数
    TotalVolume    float64 // 总成交量
    ActiveOrders   int     // 当前活跃订单数
    UptimeSeconds  int64   // 运行时长(秒)
    LastUpdateTime int64   // 最后更新时间
}

Prometheus 监控(规划中)

// 示例:暴露 Prometheus 指标
var (
    ordersTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "strategy_orders_total",
            Help: "Total number of orders",
        },
        []string{"symbol", "market_type", "side"},
    )

    activeOrdersGauge = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "strategy_active_orders",
            Help: "Number of active orders",
        },
        []string{"symbol", "market_type"},
    )
)

日志记录

使用结构化日志(推荐 zap 或 logrus):

logger.Info("Strategy started",
    zap.String("strategy_id", s.ID()),
    zap.String("symbol", s.Symbol()),
    zap.Int8("market_type", s.MarketType()),
)

logger.Error("Failed to submit order",
    zap.String("strategy_id", s.ID()),
    zap.Error(err),
    zap.Float64("price", order.Price),
    zap.Float64("quantity", order.Quantity),
)

健康检查

GET /api/v1/health

响应

{
  "status": "healthy",
  "strategies": {
    "total": 10,
    "running": 9,
    "stopped": 1
  },
  "uptime_seconds": 86400
}

安全与风控

订单限制

// 单笔订单数量限制
if quantity > config.MaxOrderQuantity {
    return errors.New("order quantity exceeds limit")
}

// 活跃订单数量限制
if len(s.activeOrders) >= config.MaxActiveOrders {
    return errors.New("active orders limit reached")
}

价格偏离保护

// 检查价格偏离
priceDeviation := math.Abs(orderPrice - s.getCurrentPrice()) / s.getCurrentPrice()
if priceDeviation > config.MaxPriceDeviation {
    return errors.New("price deviation too large")
}

扫单风控

// 扫单冷却
if time.Since(s.lastSweepTime) < config.SweepCooldown {
    return // 跳过本次扫单
}

// 单次扫单数量限制
if sweepQuantity > config.MaxSweepQuantity {
    sweepQuantity = config.MaxSweepQuantity
}

错误重试

func (s *Strategy) submitOrderWithRetry(order OrderRequest, maxRetries int) error {
    var err error
    for i := 0; i < maxRetries; i++ {
        err = s.matchEngineClient.SubmitOrder(order)
        if err == nil {
            return nil
        }

        // 指数退避
        backoff := time.Duration(math.Pow(2, float64(i))) * 100 * time.Millisecond
        time.Sleep(backoff)
    }
    return fmt.Errorf("failed after %d retries: %w", maxRetries, err)
}

扩展开发指南

添加新策略

  1. 定义策略配置 Schema

strategy_strategy 表中添加新策略记录:

INSERT INTO strategy_strategy (name, description, enabled, config_schema) VALUES (
    'grid_trading',
    '网格交易策略',
    1,
    '{
        "type": "object",
        "properties": {
            "grid_levels": {"type": "integer"},
            "grid_spacing_percent": {"type": "number"},
            "grid_quantity": {"type": "number"}
        }
    }'
);
  1. 实现 Strategy 接口

创建文件 internal/strategy/grid_trading/strategy.go

package grid_trading

import (
    "context"
    "github.com/sodos-exchange/market-maker/internal/strategy"
)

type GridTradingStrategy struct {
    // 策略字段
}

func NewStrategy(config *models.StrategyCurrency) (strategy.Strategy, error) {
    // 初始化逻辑
}

func (s *GridTradingStrategy) Initialize(ctx context.Context) error {
    // 初始化逻辑
}

func (s *GridTradingStrategy) Start() error {
    // 启动逻辑
}

// 实现其他接口方法...
  1. 注册到 Factory

internal/strategy/factory.go 中注册:

func CreateStrategy(config *models.StrategyCurrency) (Strategy, error) {
    switch config.StrategyName {
    case "external_index_follow":
        return external_index.NewStrategy(config)
    case "grid_trading":
        return grid_trading.NewStrategy(config)
    default:
        return nil, fmt.Errorf("unknown strategy: %s", config.StrategyName)
    }
}
  1. 配置并启动

strategy_currency 表中添加配置:

INSERT INTO strategy_currency (strategy_id, symbol, market_type, enabled, config) VALUES (
    2, -- grid_trading 的 strategy_id
    'BTC/USDT',
    1,
    1,
    '{
        "grid_levels": 10,
        "grid_spacing_percent": 0.005,
        "grid_quantity": 0.1
    }'
);

通过 HTTP API 启动:

POST /api/v1/strategy/start
{
  "symbol": "BTC/USDT",
  "market_type": 1
}

自定义 Worker

在策略内部添加自定义 Worker:

func (s *MyStrategy) runCustomWorker(ctx context.Context) {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // 执行自定义逻辑
        }
    }
}

func (s *MyStrategy) Start() error {
    // 启动自定义 Worker
    go s.runCustomWorker(s.ctx)

    // 启动其他 Workers...
    return nil
}

接入新的外部交易所

  1. 实现 ExchangeClient 接口

创建 internal/exchange/okex/client.go

type OKExClient struct {
    apiKey    string
    apiSecret string
}

func (c *OKExClient) SubscribePrice(symbol string) (<-chan float64, error) {
    // WebSocket 订阅价格
}

func (c *OKExClient) GetDepth(symbol string) (*Depth, error) {
    // 获取深度数据
}
  1. 在策略中使用
func (s *Strategy) Initialize(ctx context.Context) error {
    switch config.ExternalExchange {
    case "binance":
        s.exchangeClient = binance.NewClient()
    case "okex":
        s.exchangeClient = okex.NewClient()
    }

    // 订阅价格
    priceChan, err := s.exchangeClient.SubscribePrice(config.ExternalSymbol)
    go s.handlePriceUpdates(priceChan)

    return nil
}

故障排查

常见问题

1. 策略无法启动

现象:调用 /start 接口返回错误

排查步骤

  • 检查数据库配置是否正确(strategy_currency 表)
  • 检查 Redis 连接是否正常
  • 检查撮合引擎 API 是否可达
  • 查看日志中的详细错误信息

2. 订单未挂出

现象:策略运行但盘口没有订单

排查步骤

  • 检查是否收到外部价格更新(Redis 订阅)
  • 检查 getCurrentPrice() 是否返回有效价格
  • 检查订单队列是否阻塞(队列满)
  • 查看 Worker Goroutine 是否正常运行

3. 价格更新延迟

现象:本地盘口价格与外部交易所价格差距大

排查步骤

  • 检查 Redis Pub/Sub 订阅是否正常
  • 检查网络延迟
  • 检查价格推送频率是否过低
  • 检查 OnPriceUpdate 是否被频繁调用

4. 内存占用过高

现象:服务内存持续增长

排查步骤

  • 检查 activeOrders 列表是否过长(应启用截断)
  • 检查订单队列是否积压
  • 使用 pprof 分析内存分配
  • 检查是否有 Goroutine 泄漏
# 获取内存 profile
curl http://localhost:8080/debug/pprof/heap > heap.prof
go tool pprof heap.prof

调试工具

启用 pprof

import _ "net/http/pprof"

func main() {
    go func() {
        http.ListenAndServe("localhost:6060", nil)
    }()

    // 启动策略管理器...
}

访问 http://localhost:6060/debug/pprof/ 查看性能数据。

日志级别调整

// 开发环境:DEBUG 级别
logger, _ := zap.NewDevelopment()

// 生产环境:INFO 级别
logger, _ := zap.NewProduction()

总结

SODOS Exchange 的做市策略系统是一个高性能、可扩展的做市商解决方案。通过数据库驱动配置、多 Worker 并发模型、无锁价格更新、批量订单处理等技术,实现了高效的流动性提供。系统支持多种做市策略,可以根据不同交易对和市场环境灵活配置,满足交易所的流动性需求。

关键优势

  • 高并发:Goroutine-per-currency 模型,每个币种独立运行
  • 低延迟:无锁价格更新,批量订单处理
  • 高可用:错误重试、健康检查、优雅关闭
  • 易扩展:策略工厂模式,插件化设计
  • 易运维:数据库配置,HTTP 管理接口,结构化日志

未来规划

  • 支持更多做市策略(网格交易、三角套利等)
  • Prometheus 监控集成
  • WebSocket 实时推送策略状态
  • 多撮合引擎支持(主备切换)
  • 更精细的风控规则

加密数字货币交易所系统-WEB3区块链开发者 • © 2026