系统从项目根目录(../../../)自动加载 .env 配置文件。创建 .env 文件并配置以下参数:
# 数据库配置
DB_HOST=192.168.1.12
DB_PORT=3306
DB_DATABASE=sodos_exchange
DB_USERNAME=root
DB_PASSWORD=yourpassword
# 系统Redis配置(用于数据持久化)
REDIS_HOST=192.168.1.12
REDIS_AUTH=yourpassword
REDIS_PORT=6380
REDIS_DB=0
# 行情Redis配置(用于实时数据)
MARKET_REDIS_HOST=192.168.1.12
MARKET_REDIS_AUTH=yourpassword
MARKET_REDIS_PORT=6378
MARKET_REDIS_DB=0
# 撮合引擎端点
MATCH_ENGINE_ENDPOINT=http://match_engine:8080
在 MySQL 中创建以下数据表:
CREATE TABLE `strategy_currency` (
`id` int NOT NULL AUTO_INCREMENT,
`currency_id` int NOT NULL COMMENT '币种ID',
`index_symbol` varchar(50) NOT NULL COMMENT '跟随外部币种符号',
`index_source` tinyint NOT NULL DEFAULT '0' COMMENT '0内部 1外部',
`market_status` tinyint NOT NULL DEFAULT '0' COMMENT '是否启用内部做市',
`market_type` tinyint NOT NULL COMMENT '市场类型:1现货 2杠杆',
`default_strategy` int NOT NULL COMMENT '默认应用策略id',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `strategy_strategy` (
`id` int NOT NULL AUTO_INCREMENT,
`strategy_name` varchar(100) NOT NULL COMMENT '策略名称',
`config` json NOT NULL COMMENT '策略配置参数',
`desc` text COMMENT '策略介绍',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO strategy_strategy (strategy_name, config, `desc`) VALUES (
'external_index_follow',
'{
"price_change_amount": 1.0,
"tick_size": 0.1,
"base_quantity": 10,
"price_follow_percent": 100.0,
"base_precision": 2,
"quote_precision": 4,
"max_active_orders": 100,
"maintenance_interval": "1s",
"depth_levels": [
{"spread_percent": 0.0001, "quantity_ratio": 1.0},
{"spread_percent": 0.0002, "quantity_ratio": 0.95},
{"spread_percent": 0.0005, "quantity_ratio": 0.9},
{"spread_percent": 0.001, "quantity_ratio": 0.85},
{"spread_percent": 0.015, "quantity_ratio": 0.75},
{"spread_percent": 0.02, "quantity_ratio": 0.65},
{"spread_percent": 0.025, "quantity_ratio": 0.55},
{"spread_percent": 0.03, "quantity_ratio": 0.45},
{"spread_percent": 0.035, "quantity_ratio": 0.35},
{"spread_percent": 0.04, "quantity_ratio": 0.25}
]
}',
'外部指数跟随策略'
);
INSERT INTO strategy_currency (currency_id, index_symbol, index_source, market_status, market_type, default_strategy) VALUES
(879, 'BTCUSDT', 1, 1, 1, 1), -- BTC/USDT 现货
(880, 'ETHUSDT', 1, 1, 1, 1); -- ETH/USDT 现货
cd services/market-maker
GOPROXY=https://goproxy.cn,direct go mod tidy
# 编译为二进制文件
go build -o bin/market-maker cmd/market-maker/main.go
# 或使用 Makefile(如果有)
make build
# Linux
GOOS=linux GOARCH=amd64 go build -o bin/market-maker-linux cmd/market-maker/main.go
# macOS
GOOS=darwin GOARCH=amd64 go build -o bin/market-maker-darwin cmd/market-maker/main.go
# Windows
GOOS=windows GOARCH=amd64 go build -o bin/market-maker.exe cmd/market-maker/main.go
./bin/market-maker
./bin/market-maker \
--http http://custom-engine:8080 \
--api-port :9090 \
--log custom.log
命令行参数说明:
| 参数 | 说明 | 环境变量 | 默认值 |
|---|---|---|---|
| --http | 撮合引擎HTTP端点 | MATCH_ENGINE_ENDPOINT | http://localhost:8080 |
| --api-port | API服务端口 | API_PORT | :8081 |
| --log | 日志文件路径 | LOG_FILE | market-maker.log |
注:命令行参数优先级高于环境变量。
创建 systemd 服务文件 /etc/systemd/system/market-maker.service:
[Unit]
Description=SODOS Market Maker Service
After=network.target mysql.service redis.service
[Service]
Type=simple
User=sodos
WorkingDirectory=/opt/sodos-exchange/services/market-maker
ExecStart=/opt/sodos-exchange/services/market-maker/bin/market-maker
Restart=on-failure
RestartSec=5s
StandardOutput=append:/var/log/sodos/market-maker.log
StandardError=append:/var/log/sodos/market-maker.error.log
[Install]
WantedBy=multi-user.target
启动服务:
sudo systemctl daemon-reload
sudo systemctl enable market-maker
sudo systemctl start market-maker
sudo systemctl status market-maker
创建 Dockerfile:
FROM golang:1.20-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o market-maker cmd/market-maker/main.go
FROM alpine:latest
RUN apk --no-cache add ca-certificates tzdata
WORKDIR /root/
COPY --from=builder /app/market-maker .
EXPOSE 8081
CMD ["./market-maker"]
构建镜像:
docker build -t sodos-market-maker:latest .
创建 docker-compose.yml:
version: '3.8'
services:
market-maker:
image: sodos-market-maker:latest
container_name: sodos-market-maker
restart: unless-stopped
ports:
- "8081:8081"
environment:
- DB_HOST=mysql
- DB_PORT=3306
- DB_DATABASE=sodos_exchange
- DB_USERNAME=root
- DB_PASSWORD=yourpassword
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_AUTH=yourpassword
- MATCH_ENGINE_ENDPOINT=http://match-engine:8080
depends_on:
- mysql
- redis
- match-engine
networks:
- sodos-network
networks:
sodos-network:
external: true
启动服务:
docker-compose up -d
查看日志:
docker-compose logs -f market-maker
curl http://localhost:8081/health
预期响应:
{
"status": "healthy",
"service": "market-maker"
}
curl http://localhost:8081/api/v1/strategies
预期响应:
[
{
"id": "879_1",
"currency_id": 879,
"symbol": "BTCUSDT",
"market_type": 1,
"is_running": true,
"metrics": {
"total_orders": 0,
"total_volume": 0
}
}
]
tail -f market-maker.log
预期输出:
2024/09/27 10:00:00 Market Maker Service starting...
2024/09/27 10:00:00 Loaded 2 currencies from database
2024/09/27 10:00:00 Strategy [879_1] initialized successfully
2024/09/27 10:00:00 Strategy [880_1] initialized successfully
2024/09/27 10:00:00 HTTP API server listening on :8081
系统启动后会按以下流程初始化:
.env 文件或命令行参数读取配置currency 表读取所有启用的币种strategy_currency 和 strategy_strategy 表读取配置market_status=1,自动启动策略;否则等待 HTTP 指令错误信息:
Error connecting to database: dial tcp 192.168.1.12:3306: connect: connection refused
解决方案:
systemctl status mysqlsudo ufw allow 3306bind-address 配置)错误信息:
Error connecting to Redis: dial tcp 192.168.1.12:6379: i/o timeout
解决方案:
systemctl status redisbind 和 protected-mode 设置错误信息:
go: module requires Go 1.20
解决方案:
wget https://go.dev/dl/go1.20.linux-amd64.tar.gzgo.mod 文件降低 Go 版本要求(不推荐)错误信息:
go: github.com/some/package: unrecognized import path
解决方案:
export GOPROXY=https://goproxy.cn,directgo clean -modcachego mod tidy┌─────────────────────────────────────────────────────────────┐
│ Strategy Manager │
│ (管理所有币种的做市策略,动态加载配置) │
└───────────────────┬─────────────────────────────────────────┘
│
┌───────────┴───────────┐
│ │
┌───────▼──────┐ ┌──────▼────────┐
│ Currency │ │ Currency │
│ Manager 1 │ ... │ Manager N │
│ (BTC/USDT) │ │ (ETH/USDT) │
└───────┬──────┘ └───────┬───────┘
│ │
┌───────▼──────────────────────▼────────┐
│ Strategy Factory │
│ (根据配置创建具体策略实例) │
└───────┬────────────────────────────────┘
│
┌───────▼──────────────────────────────┐
│ External Index Follow Strategy │
│ (外部指数跟随策略 - 7个Worker) │
├──────────────────────────────────────┤
│ • Buy Order Worker │
│ • Sell Order Worker │
│ • Cancel Order Worker │
│ • Depth Maintenance Worker │
│ • Depth Monitor Worker │
│ • Order Cleanup Worker │
│ • Market Activity Worker │
└──────┬───────────────────────────────┘
│
├─── Redis Pub/Sub ────┐
│ │
└─── HTTP Pool ─────────┤
│
┌──────────▼──────────┐
│ Match Engine │
│ (撮合引擎) │
└─────────────────────┘
策略管理器是整个系统的核心调度器,负责:
关键方法:
func (sm *StrategyManager) Initialize(ctx context.Context) error
func (sm *StrategyManager) StartStrategy(symbol string, marketType int8) error
func (sm *StrategyManager) StopStrategy(symbol string, marketType int8) error
func (sm *StrategyManager) ReloadStrategy(symbol string, marketType int8) error
func (sm *StrategyManager) GetStrategyStatus(symbol string, marketType int8) (string, error)
每个币种-市场对对应一个 CurrencyManager,运行在独立的 Goroutine 中:
一币种一 Goroutine 模型:
func (cm *CurrencyManager) Run(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 周期性检查和更新策略状态
}
}
}
采用工厂模式创建不同类型的策略实例:
func CreateStrategy(config *models.StrategyCurrency) (Strategy, error) {
switch config.StrategyName {
case "external_index_follow":
return external_index.NewStrategy(config)
// 未来可扩展其他策略类型
default:
return nil, fmt.Errorf("unknown strategy: %s", config.StrategyName)
}
}
所有策略必须实现统一接口:
type Strategy interface {
ID() string
Name() string
Symbol() string
MarketType() int8
Initialize(ctx context.Context) error
Start() error
Stop() error
IsRunning() bool
OnTick(ctx context.Context) error
OnPriceUpdate(price float64) error
OnDepthUpdate(depth interface{}) error
OnOrderUpdate(order interface{}) error
OnTradeUpdate(trade interface{}) error
GetConfig() interface{}
UpdateConfig(config interface{}) error
GetMetrics() *Metrics
}
这是当前实现的主力做市策略,通过订阅外部交易所(如 Binance)的价格,在本地交易所维护对应的买卖盘口。
使用 unsafe.Pointer 和 atomic.StoreUint64 实现无锁价格更新:
type Strategy struct {
currentPrice unsafe.Pointer // *float64
}
func (s *Strategy) OnPriceUpdate(price float64) error {
atomic.StorePointer(&s.currentPrice, unsafe.Pointer(&price))
return nil
}
func (s *Strategy) getCurrentPrice() float64 {
ptr := atomic.LoadPointer(&s.currentPrice)
if ptr == nil {
return 0
}
return *(*float64)(ptr)
}
优势:
以当前价格为基准,生成 10 档买卖盘口:
// 卖单:价格递增
for i := 0; i < 10; i++ {
priceOffset := float64(i+1) * config.SpreadPercent * currentPrice / 100
sellPrice := currentPrice + priceOffset
quantity := config.BaseQuantity * (1 + rand.Float64()*0.2) // ±20% 随机波动
// 提交卖单
}
// 买单:价格递减
for i := 0; i < 10; i++ {
priceOffset := float64(i+1) * config.SpreadPercent * currentPrice / 100
buyPrice := currentPrice - priceOffset
quantity := config.BaseQuantity * (1 + rand.Float64()*0.2)
// 提交买单
}
配置参数:
SpreadPercent:每档价差百分比(如 0.1% = 0.001)BaseQuantity:基础挂单数量func (s *Strategy) maintainDepth() {
// 1. 撤销所有旧订单
for _, orderID := range s.activeOrders {
s.cancelQueue <- orderID
}
s.activeOrders = []string{}
// 2. 等待撤单完成
time.Sleep(100 * time.Millisecond)
// 3. 挂出新订单
currentPrice := s.getCurrentPrice()
for i := 0; i < 10; i++ {
// 生成买卖单
}
}
每 10 秒选择 10% 的订单进行刷新(撤单重挂):
func (s *Strategy) refreshOrders() {
refreshCount := len(s.activeOrders) / 10
for i := 0; i < refreshCount; i++ {
orderID := s.activeOrders[rand.Intn(len(s.activeOrders))]
s.cancelQueue <- orderID
// 重新挂单
}
}
每 300ms 注入一个小额市价单,触发真实成交:
func (s *Strategy) injectMarketOrder() {
side := rand.Intn(2) // 0=buy, 1=sell
quantity := config.MarketOrderSize // 小额,如 0.001 BTC
if side == 0 {
// 提交市价买单
} else {
// 提交市价卖单
}
}
每 2 秒触发一次合成交易,模拟真实交易流水:
func (s *Strategy) triggerSyntheticTrade() {
currentPrice := s.getCurrentPrice()
quantity := config.SyntheticTradeSize
// 1. 挂限价买单
buyOrderID := s.submitOrder(ORDER_BUY, currentPrice, quantity)
// 2. 立即挂限价卖单(相同价格),触发撮合
sellOrderID := s.submitOrder(ORDER_SELL, currentPrice, quantity)
// 3. 生成成交记录
}
通过深度监控器检测非策略挂单:
func (s *Strategy) detectOpponentOrders(depth *Depth) []OpponentOrder {
opponents := []OpponentOrder{}
for _, order := range depth.Asks {
if !s.isMyOrder(order.OrderID) {
opponents = append(opponents, OpponentOrder{
Side: ORDER_SELL,
Price: order.Price,
Quantity: order.Quantity,
OrderID: order.OrderID,
})
}
}
// 同样检查买单
return opponents
}
当检测到对手盘时,提交市价单吃掉对手挂单:
func (s *Strategy) sweepOpponentOrders(opponents []OpponentOrder) {
for _, opp := range opponents {
// 只扫单价格在合理范围内的订单
priceDeviation := math.Abs(opp.Price - s.getCurrentPrice()) / s.getCurrentPrice()
if priceDeviation > config.MaxSweepPriceDeviation {
continue
}
// 提交市价单
if opp.Side == ORDER_SELL {
s.submitMarketOrder(ORDER_BUY, opp.Quantity)
} else {
s.submitMarketOrder(ORDER_SELL, opp.Quantity)
}
}
}
风控参数:
MaxSweepPriceDeviation:最大扫单价格偏离度(如 2%)MaxSweepQuantity:单次扫单最大数量SweepCooldown:扫单冷却时间type Strategy struct {
buyQueue chan OrderRequest // 买单队列,容量 10000
sellQueue chan OrderRequest // 卖单队列,容量 10000
cancelQueue chan string // 撤单队列,容量 10000
}
Worker 从队列批量消费,减少网络开销:
func (s *Strategy) processBuyOrders() {
batch := []OrderRequest{}
batchSize := 100
for {
select {
case order := <-s.buyQueue:
batch = append(batch, order)
if len(batch) >= batchSize {
// 批量提交
s.submitBatchOrders(batch)
batch = []OrderRequest{}
}
case <-time.After(100 * time.Millisecond):
// 超时也提交
if len(batch) > 0 {
s.submitBatchOrders(batch)
batch = []OrderRequest{}
}
}
}
}
type MatchEngineClient struct {
httpClient *http.Client
}
func NewMatchEngineClient() *MatchEngineClient {
return &MatchEngineClient{
httpClient: &http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
},
}
}
// 使用 atomic 而非 mutex
atomic.StoreUint64(&s.totalOrders, newValue)
atomic.AddUint64(&s.successOrders, 1)
定期清理旧订单,防止内存泄漏:
func (s *Strategy) truncateOrders() {
if len(s.activeOrders) > 10000 {
s.activeOrders = s.activeOrders[len(s.activeOrders)-10000:]
}
}
// 批量加载配置
configs, err := db.LoadAllStrategyCurrencies()
// 批量更新指标
db.BatchUpdateMetrics(metrics)
| 字段 | 类型 | 说明 |
|---|---|---|
| id | int | 策略 ID |
| name | string | 策略名称(如 external_index_follow) |
| description | string | 策略描述 |
| enabled | bool | 是否启用 |
| config_schema | json | 配置 Schema(JSON Schema 格式) |
| created_at | timestamp | 创建时间 |
| updated_at | timestamp | 更新时间 |
| 字段 | 类型 | 说明 |
|---|---|---|
| id | int | 配置 ID |
| strategy_id | int | 关联的策略 ID |
| symbol | string | 交易对符号(如 BTC/USDT) |
| market_type | int8 | 市场类型(1=币币,2=合约) |
| enabled | bool | 是否启用 |
| config | json | 策略配置参数 |
| created_at | timestamp | 创建时间 |
| updated_at | timestamp | 更新时间 |
{
"external_exchange": "binance",
"external_symbol": "BTCUSDT",
"spread_percent": 0.001,
"base_quantity": 0.1,
"max_order_quantity": 1.0,
"depth_levels": 10,
"refresh_interval_ms": 500,
"market_order_interval_ms": 300,
"market_order_size": 0.001,
"synthetic_trade_interval_ms": 2000,
"synthetic_trade_size": 0.01,
"order_refresh_interval_sec": 10,
"order_refresh_percent": 0.1,
"opponent_detection_enabled": true,
"max_sweep_price_deviation": 0.02,
"max_sweep_quantity": 0.5,
"sweep_cooldown_ms": 1000,
"order_timeout_sec": 300,
"max_active_orders": 10000
}
参数说明:
external_exchange:外部交易所名称external_symbol:外部交易对符号spread_percent:每档价差百分比(0.001 = 0.1%)base_quantity:基础挂单数量max_order_quantity:单笔订单最大数量depth_levels:维护的盘口档数(10 档)refresh_interval_ms:深度刷新间隔(毫秒)market_order_interval_ms:市价单注入间隔(毫秒)market_order_size:市价单大小synthetic_trade_interval_ms:合成交易间隔(毫秒)synthetic_trade_size:合成交易大小order_refresh_interval_sec:订单刷新间隔(秒)order_refresh_percent:每次刷新订单百分比(0.1 = 10%)opponent_detection_enabled:是否启用对手盘检测max_sweep_price_deviation:最大扫单价格偏离度max_sweep_quantity:单次扫单最大数量sweep_cooldown_ms:扫单冷却时间(毫秒)order_timeout_sec:订单超时时间(秒)max_active_orders:最大活跃订单数POST /api/v1/strategy/start
Content-Type: application/json
{
"symbol": "BTC/USDT",
"market_type": 1
}
响应:
{
"code": 0,
"message": "Strategy started successfully",
"data": {
"strategy_id": "external_index_follow_BTC/USDT_1",
"status": "running"
}
}
POST /api/v1/strategy/stop
Content-Type: application/json
{
"symbol": "BTC/USDT",
"market_type": 1
}
POST /api/v1/strategy/reload
Content-Type: application/json
{
"symbol": "BTC/USDT",
"market_type": 1
}
说明:重载会先停止策略,重新加载配置,然后启动。
GET /api/v1/strategy/status?symbol=BTC/USDT&market_type=1
响应:
{
"code": 0,
"message": "Success",
"data": {
"strategy_id": "external_index_follow_BTC/USDT_1",
"strategy_name": "external_index_follow",
"symbol": "BTC/USDT",
"market_type": 1,
"status": "running",
"current_price": 45000.50,
"metrics": {
"total_orders": 15234,
"success_orders": 15180,
"failed_orders": 54,
"total_trades": 1523,
"total_volume": 152.34,
"active_orders": 20,
"uptime_seconds": 86400
},
"config": {
"spread_percent": 0.001,
"base_quantity": 0.1
}
}
}
GET /api/v1/strategy/list
响应:
{
"code": 0,
"message": "Success",
"data": [
{
"symbol": "BTC/USDT",
"market_type": 1,
"strategy_name": "external_index_follow",
"status": "running"
},
{
"symbol": "ETH/USDT",
"market_type": 1,
"strategy_name": "external_index_follow",
"status": "running"
}
]
}
POST /api/v1/strategy/config/update
Content-Type: application/json
{
"symbol": "BTC/USDT",
"market_type": 1,
"config": {
"spread_percent": 0.002,
"base_quantity": 0.2
}
}
说明:配置更新后需要执行 /reload 才能生效。
频道:price:update:{exchange}:{symbol}
示例:price:update:binance:BTCUSDT
消息格式:
{
"exchange": "binance",
"symbol": "BTCUSDT",
"price": 45000.50,
"timestamp": 1699999999999
}
策略处理:
func (s *Strategy) subscribePriceUpdates() {
channel := fmt.Sprintf("price:update:%s:%s",
config.ExternalExchange, config.ExternalSymbol)
pubsub := redisClient.Subscribe(ctx, channel)
defer pubsub.Close()
for msg := range pubsub.Channel() {
var priceUpdate PriceUpdate
json.Unmarshal([]byte(msg.Payload), &priceUpdate)
s.OnPriceUpdate(priceUpdate.Price)
}
}
频道:depth:update:{exchange}:{symbol}
消息格式:
{
"exchange": "binance",
"symbol": "BTCUSDT",
"bids": [
[45000.50, 1.2],
[45000.00, 2.5]
],
"asks": [
[45001.00, 1.1],
[45001.50, 2.3]
],
"timestamp": 1699999999999
}
策略运行时会收集以下指标:
type Metrics struct {
TotalOrders uint64 // 总下单数
SuccessOrders uint64 // 成功订单数
FailedOrders uint64 // 失败订单数
TotalTrades uint64 // 总成交数
TotalVolume float64 // 总成交量
ActiveOrders int // 当前活跃订单数
UptimeSeconds int64 // 运行时长(秒)
LastUpdateTime int64 // 最后更新时间
}
// 示例:暴露 Prometheus 指标
var (
ordersTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "strategy_orders_total",
Help: "Total number of orders",
},
[]string{"symbol", "market_type", "side"},
)
activeOrdersGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "strategy_active_orders",
Help: "Number of active orders",
},
[]string{"symbol", "market_type"},
)
)
使用结构化日志(推荐 zap 或 logrus):
logger.Info("Strategy started",
zap.String("strategy_id", s.ID()),
zap.String("symbol", s.Symbol()),
zap.Int8("market_type", s.MarketType()),
)
logger.Error("Failed to submit order",
zap.String("strategy_id", s.ID()),
zap.Error(err),
zap.Float64("price", order.Price),
zap.Float64("quantity", order.Quantity),
)
GET /api/v1/health
响应:
{
"status": "healthy",
"strategies": {
"total": 10,
"running": 9,
"stopped": 1
},
"uptime_seconds": 86400
}
// 单笔订单数量限制
if quantity > config.MaxOrderQuantity {
return errors.New("order quantity exceeds limit")
}
// 活跃订单数量限制
if len(s.activeOrders) >= config.MaxActiveOrders {
return errors.New("active orders limit reached")
}
// 检查价格偏离
priceDeviation := math.Abs(orderPrice - s.getCurrentPrice()) / s.getCurrentPrice()
if priceDeviation > config.MaxPriceDeviation {
return errors.New("price deviation too large")
}
// 扫单冷却
if time.Since(s.lastSweepTime) < config.SweepCooldown {
return // 跳过本次扫单
}
// 单次扫单数量限制
if sweepQuantity > config.MaxSweepQuantity {
sweepQuantity = config.MaxSweepQuantity
}
func (s *Strategy) submitOrderWithRetry(order OrderRequest, maxRetries int) error {
var err error
for i := 0; i < maxRetries; i++ {
err = s.matchEngineClient.SubmitOrder(order)
if err == nil {
return nil
}
// 指数退避
backoff := time.Duration(math.Pow(2, float64(i))) * 100 * time.Millisecond
time.Sleep(backoff)
}
return fmt.Errorf("failed after %d retries: %w", maxRetries, err)
}
在 strategy_strategy 表中添加新策略记录:
INSERT INTO strategy_strategy (name, description, enabled, config_schema) VALUES (
'grid_trading',
'网格交易策略',
1,
'{
"type": "object",
"properties": {
"grid_levels": {"type": "integer"},
"grid_spacing_percent": {"type": "number"},
"grid_quantity": {"type": "number"}
}
}'
);
创建文件 internal/strategy/grid_trading/strategy.go:
package grid_trading
import (
"context"
"github.com/sodos-exchange/market-maker/internal/strategy"
)
type GridTradingStrategy struct {
// 策略字段
}
func NewStrategy(config *models.StrategyCurrency) (strategy.Strategy, error) {
// 初始化逻辑
}
func (s *GridTradingStrategy) Initialize(ctx context.Context) error {
// 初始化逻辑
}
func (s *GridTradingStrategy) Start() error {
// 启动逻辑
}
// 实现其他接口方法...
在 internal/strategy/factory.go 中注册:
func CreateStrategy(config *models.StrategyCurrency) (Strategy, error) {
switch config.StrategyName {
case "external_index_follow":
return external_index.NewStrategy(config)
case "grid_trading":
return grid_trading.NewStrategy(config)
default:
return nil, fmt.Errorf("unknown strategy: %s", config.StrategyName)
}
}
在 strategy_currency 表中添加配置:
INSERT INTO strategy_currency (strategy_id, symbol, market_type, enabled, config) VALUES (
2, -- grid_trading 的 strategy_id
'BTC/USDT',
1,
1,
'{
"grid_levels": 10,
"grid_spacing_percent": 0.005,
"grid_quantity": 0.1
}'
);
通过 HTTP API 启动:
POST /api/v1/strategy/start
{
"symbol": "BTC/USDT",
"market_type": 1
}
在策略内部添加自定义 Worker:
func (s *MyStrategy) runCustomWorker(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 执行自定义逻辑
}
}
}
func (s *MyStrategy) Start() error {
// 启动自定义 Worker
go s.runCustomWorker(s.ctx)
// 启动其他 Workers...
return nil
}
创建 internal/exchange/okex/client.go:
type OKExClient struct {
apiKey string
apiSecret string
}
func (c *OKExClient) SubscribePrice(symbol string) (<-chan float64, error) {
// WebSocket 订阅价格
}
func (c *OKExClient) GetDepth(symbol string) (*Depth, error) {
// 获取深度数据
}
func (s *Strategy) Initialize(ctx context.Context) error {
switch config.ExternalExchange {
case "binance":
s.exchangeClient = binance.NewClient()
case "okex":
s.exchangeClient = okex.NewClient()
}
// 订阅价格
priceChan, err := s.exchangeClient.SubscribePrice(config.ExternalSymbol)
go s.handlePriceUpdates(priceChan)
return nil
}
现象:调用 /start 接口返回错误
排查步骤:
strategy_currency 表)现象:策略运行但盘口没有订单
排查步骤:
getCurrentPrice() 是否返回有效价格现象:本地盘口价格与外部交易所价格差距大
排查步骤:
OnPriceUpdate 是否被频繁调用现象:服务内存持续增长
排查步骤:
activeOrders 列表是否过长(应启用截断)pprof 分析内存分配# 获取内存 profile
curl http://localhost:8080/debug/pprof/heap > heap.prof
go tool pprof heap.prof
import _ "net/http/pprof"
func main() {
go func() {
http.ListenAndServe("localhost:6060", nil)
}()
// 启动策略管理器...
}
访问 http://localhost:6060/debug/pprof/ 查看性能数据。
// 开发环境:DEBUG 级别
logger, _ := zap.NewDevelopment()
// 生产环境:INFO 级别
logger, _ := zap.NewProduction()
SODOS Exchange 的做市策略系统是一个高性能、可扩展的做市商解决方案。通过数据库驱动配置、多 Worker 并发模型、无锁价格更新、批量订单处理等技术,实现了高效的流动性提供。系统支持多种做市策略,可以根据不同交易对和市场环境灵活配置,满足交易所的流动性需求。
关键优势:
未来规划: