Liquidation Keeper

概述

Liquidation Keeper是Zanbara的自动清算机器人,负责监控所有用户持仓,在保证金不足时及时执行强制平仓,保护平台和用户利益。

Keeper工作原理

核心职责

  1. 实时监控: 每10秒扫描所有持仓

  2. 风险计算: 实时计算每个持仓的保证金率

  3. 清算触发: 保证金率<110%时立即清算

  4. 订单执行: 以市价快速平仓

  5. 资金结算: 处理清算后的资金分配

架构设计

┌─────────────────────────────────────────────┐
│         Liquidation Keeper Service          │
├─────────────────────────────────────────────┤
│                                             │
│  ┌─────────────┐    ┌──────────────┐       │
│  │  Monitor    │───→│  Risk        │       │
│  │  Thread     │    │  Calculator  │       │
│  └─────────────┘    └──────────────┘       │
│         │                    │              │
│         ↓                    ↓              │
│  ┌─────────────────────────────────┐       │
│  │    Liquidation Queue            │       │
│  │    (Priority Queue)             │       │
│  └─────────────────────────────────┘       │
│         │                                   │
│         ↓                                   │
│  ┌─────────────┐    ┌──────────────┐       │
│  │  Executor   │───→│  Order       │       │
│  │  Pool (10)  │    │  Submitter   │       │
│  └─────────────┘    └──────────────┘       │
│         │                                   │
│         ↓                                   │
│  ┌─────────────────────────────────┐       │
│  │    Settlement Engine            │       │
│  └─────────────────────────────────┘       │
│                                             │
└─────────────────────────────────────────────┘

监控频率

扫描策略

  • 正常模式: 每10秒全量扫描

  • 高风险模式: 每5秒扫描(市场波动>5%)

  • 紧急模式: 每2秒扫描(市场波动>10%)

优化技术

class PositionMonitor:
    def scan_positions(self):
        """增量扫描优化"""
        # 1. 只扫描有持仓的用户
        users_with_positions = get_users_with_open_positions()
        
        # 2. 优先扫描高风险用户(保证金率<200%)
        high_risk_users = [u for u in users_with_positions 
                          if u.margin_ratio < 2.0]
        
        # 3. 批量获取标记价格(减少重复查询)
        mark_prices = batch_get_mark_prices()
        
        # 4. 并行计算(10个worker)
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(self.check_position, u, mark_prices) 
                      for u in high_risk_users]
            
            for future in as_completed(futures):
                position = future.result()
                if position and position.margin_ratio < 1.1:
                    self.liquidation_queue.put(position)

清算执行机制

并发控制

  • 全局最多10个并发清算

  • 每个用户同时只能清算1个仓位

  • 使用Redis分布式锁防止重复

  • 优先级队列(保证金率最低优先)

执行流程

class LiquidationExecutor:
    def execute(self, position):
        """执行清算"""
        try:
            # 1. 获取分布式锁
            lock = self.acquire_lock(position.id, timeout=30)
            if not lock:
                logger.warning(f"Failed to acquire lock for {position.id}")
                return
            
            # 2. 二次确认(防止并发)
            if not self.verify_liquidation_needed(position):
                return
            
            # 3. 标记状态
            position.status = "LIQUIDATING"
            db.commit()
            
            # 4. 构造清算订单
            order = {
                "user_id": position.user_id,
                "symbol": position.symbol,
                "side": "SELL" if position.side == "LONG" else "BUY",
                "quantity": position.quantity,
                "type": "MARKET",
                "reduce_only": True,
                "liquidation": True
            }
            
            # 5. 提交订单
            order_id = self.order_engine.submit(order)
            
            # 6. 等待成交(超时10秒)
            filled = self.wait_for_fill(order_id, timeout=10)
            
            if not filled:
                # 重试机制
                self.retry_liquidation(position)
            else:
                # 7. 结算资金
                self.settle(position, filled)
                
            # 8. 通知用户
            self.notify_user(position)
            
        except Exception as e:
            logger.error(f"Liquidation failed: {e}")
            self.handle_error(position, e)
        finally:
            # 9. 释放锁
            self.release_lock(lock)

重试机制

def retry_liquidation(self, position, attempt=1):
    """清算重试"""
    if attempt > 3:
        # 超过3次,标记为异常
        position.status = "LIQUIDATION_FAILED"
        alert_ops_team(f"Liquidation failed after 3 attempts: {position.id}")
        return
    
    # 指数退避
    wait_time = 2 ** attempt  # 2s, 4s, 8s
    time.sleep(wait_time)
    
    # 重新提交
    self.execute(position)

性能指标

关键指标

指标
目标值
当前值
说明

监控延迟

<100ms

82ms

检测到清算条件的时间

执行延迟

<10秒

6.8秒

从触发到成交的时间

成功率

>99%

99.7%

清算成功执行的比例

并发数

≤10

平均3.2

同时进行的清算数

穿仓率

<0.1%

0.08%

清算后仍有负债的比例

性能优化

  1. 批量查询: 一次获取所有标记价格

  2. 增量扫描: 只检查有持仓的用户

  3. 优先级队列: 高风险优先处理

  4. 并行执行: 10个worker并发清算

  5. 缓存优化: Redis缓存用户状态

异常处理

常见异常

  1. 订单未成交: 重试3次,仍失败则人工介入

  2. 价格剧烈波动: 切换到紧急模式,加快扫描

  3. 流动性不足: 分批清算,避免冲击市场

  4. 系统故障: 自动切换到备用Keeper

应急预案

class EmergencyProtocol:
    def handle_keeper_failure(self):
        """Keeper故障处理"""
        # 1. 启动备用Keeper
        self.start_backup_keeper()
        
        # 2. 暂停新开仓
        pause_new_positions()
        
        # 3. 通知运营团队
        alert_ops("Keeper failure detected!")
        
        # 4. 提高保证金要求
        increase_margin_requirement(1.5)
        
        # 5. 记录所有待清算仓位
        pending = get_pending_liquidations()
        for p in pending:
            emergency_liquidate(p)

监控和告警

实时监控

  • Keeper运行状态

  • 清算队列长度

  • 平均执行时间

  • 成功/失败率

  • 穿仓事件

告警规则

警告级别:
- 清算队列>5: 🟡 注意
- 清算队列>10: 🟠 警告
- 清算队列>20: 🔴 紧急

- 执行时间>15秒: 🟡 注意
- 执行时间>30秒: 🟠 警告
- 执行时间>60秒: 🔴 紧急

- 失败率>1%: 🟡 注意
- 失败率>5%: 🟠 警告
- 失败率>10%: 🔴 紧急

- 穿仓率>0.2%: 🟡 注意
- 穿仓率>0.5%: 🟠 警告
- 穿仓率>1%: 🔴 紧急

告警通知

  • Slack频道实时通知

  • PagerDuty紧急呼叫(红色级别)

  • 邮件日报

  • Grafana Dashboard

Keeper高可用

冗余设计

  • 主Keeper: 实时运行

  • 备Keeper: 热备份,主故障时自动接管

  • 监控Keeper: 监控主备状态

故障转移

class KeeperHA:
    def __init__(self):
        self.role = "STANDBY"  # MASTER / STANDBY / MONITOR
        self.heartbeat_interval = 5  # seconds
        
    def heartbeat(self):
        """心跳检测"""
        while True:
            redis.setex(f"keeper:{self.id}:heartbeat", 10, "alive")
            time.sleep(self.heartbeat_interval)
    
    def check_master_alive(self):
        """检查主Keeper是否存活"""
        master_heartbeat = redis.get("keeper:master:heartbeat")
        return master_heartbeat is not None
    
    def takeover(self):
        """接管主Keeper"""
        logger.warning("Taking over as MASTER")
        self.role = "MASTER"
        redis.set("keeper:master", self.id)
        self.start_monitoring()

相关文档

Last updated