Liquidation Keeper
概述
Liquidation Keeper是Zanbara的自动清算机器人,负责监控所有用户持仓,在保证金不足时及时执行强制平仓,保护平台和用户利益。
Keeper工作原理
核心职责
实时监控: 每10秒扫描所有持仓
风险计算: 实时计算每个持仓的保证金率
清算触发: 保证金率<110%时立即清算
订单执行: 以市价快速平仓
资金结算: 处理清算后的资金分配
架构设计
┌─────────────────────────────────────────────┐
│ Liquidation Keeper Service │
├─────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────┐ │
│ │ Monitor │───→│ Risk │ │
│ │ Thread │ │ Calculator │ │
│ └─────────────┘ └──────────────┘ │
│ │ │ │
│ ↓ ↓ │
│ ┌─────────────────────────────────┐ │
│ │ Liquidation Queue │ │
│ │ (Priority Queue) │ │
│ └─────────────────────────────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────┐ ┌──────────────┐ │
│ │ Executor │───→│ Order │ │
│ │ Pool (10) │ │ Submitter │ │
│ └─────────────┘ └──────────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────────────────────────┐ │
│ │ Settlement Engine │ │
│ └─────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────┘监控频率
扫描策略
正常模式: 每10秒全量扫描
高风险模式: 每5秒扫描(市场波动>5%)
紧急模式: 每2秒扫描(市场波动>10%)
优化技术
class PositionMonitor:
def scan_positions(self):
"""增量扫描优化"""
# 1. 只扫描有持仓的用户
users_with_positions = get_users_with_open_positions()
# 2. 优先扫描高风险用户(保证金率<200%)
high_risk_users = [u for u in users_with_positions
if u.margin_ratio < 2.0]
# 3. 批量获取标记价格(减少重复查询)
mark_prices = batch_get_mark_prices()
# 4. 并行计算(10个worker)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(self.check_position, u, mark_prices)
for u in high_risk_users]
for future in as_completed(futures):
position = future.result()
if position and position.margin_ratio < 1.1:
self.liquidation_queue.put(position)清算执行机制
并发控制
全局最多10个并发清算
每个用户同时只能清算1个仓位
使用Redis分布式锁防止重复
优先级队列(保证金率最低优先)
执行流程
class LiquidationExecutor:
def execute(self, position):
"""执行清算"""
try:
# 1. 获取分布式锁
lock = self.acquire_lock(position.id, timeout=30)
if not lock:
logger.warning(f"Failed to acquire lock for {position.id}")
return
# 2. 二次确认(防止并发)
if not self.verify_liquidation_needed(position):
return
# 3. 标记状态
position.status = "LIQUIDATING"
db.commit()
# 4. 构造清算订单
order = {
"user_id": position.user_id,
"symbol": position.symbol,
"side": "SELL" if position.side == "LONG" else "BUY",
"quantity": position.quantity,
"type": "MARKET",
"reduce_only": True,
"liquidation": True
}
# 5. 提交订单
order_id = self.order_engine.submit(order)
# 6. 等待成交(超时10秒)
filled = self.wait_for_fill(order_id, timeout=10)
if not filled:
# 重试机制
self.retry_liquidation(position)
else:
# 7. 结算资金
self.settle(position, filled)
# 8. 通知用户
self.notify_user(position)
except Exception as e:
logger.error(f"Liquidation failed: {e}")
self.handle_error(position, e)
finally:
# 9. 释放锁
self.release_lock(lock)重试机制
def retry_liquidation(self, position, attempt=1):
"""清算重试"""
if attempt > 3:
# 超过3次,标记为异常
position.status = "LIQUIDATION_FAILED"
alert_ops_team(f"Liquidation failed after 3 attempts: {position.id}")
return
# 指数退避
wait_time = 2 ** attempt # 2s, 4s, 8s
time.sleep(wait_time)
# 重新提交
self.execute(position)性能指标
关键指标
指标
目标值
当前值
说明
监控延迟
<100ms
82ms
检测到清算条件的时间
执行延迟
<10秒
6.8秒
从触发到成交的时间
成功率
>99%
99.7%
清算成功执行的比例
并发数
≤10
平均3.2
同时进行的清算数
穿仓率
<0.1%
0.08%
清算后仍有负债的比例
性能优化
批量查询: 一次获取所有标记价格
增量扫描: 只检查有持仓的用户
优先级队列: 高风险优先处理
并行执行: 10个worker并发清算
缓存优化: Redis缓存用户状态
异常处理
常见异常
订单未成交: 重试3次,仍失败则人工介入
价格剧烈波动: 切换到紧急模式,加快扫描
流动性不足: 分批清算,避免冲击市场
系统故障: 自动切换到备用Keeper
应急预案
class EmergencyProtocol:
def handle_keeper_failure(self):
"""Keeper故障处理"""
# 1. 启动备用Keeper
self.start_backup_keeper()
# 2. 暂停新开仓
pause_new_positions()
# 3. 通知运营团队
alert_ops("Keeper failure detected!")
# 4. 提高保证金要求
increase_margin_requirement(1.5)
# 5. 记录所有待清算仓位
pending = get_pending_liquidations()
for p in pending:
emergency_liquidate(p)监控和告警
实时监控
Keeper运行状态
清算队列长度
平均执行时间
成功/失败率
穿仓事件
告警规则
警告级别:
- 清算队列>5: 🟡 注意
- 清算队列>10: 🟠 警告
- 清算队列>20: 🔴 紧急
- 执行时间>15秒: 🟡 注意
- 执行时间>30秒: 🟠 警告
- 执行时间>60秒: 🔴 紧急
- 失败率>1%: 🟡 注意
- 失败率>5%: 🟠 警告
- 失败率>10%: 🔴 紧急
- 穿仓率>0.2%: 🟡 注意
- 穿仓率>0.5%: 🟠 警告
- 穿仓率>1%: 🔴 紧急告警通知
Slack频道实时通知
PagerDuty紧急呼叫(红色级别)
邮件日报
Grafana Dashboard
Keeper高可用
冗余设计
主Keeper: 实时运行
备Keeper: 热备份,主故障时自动接管
监控Keeper: 监控主备状态
故障转移
class KeeperHA:
def __init__(self):
self.role = "STANDBY" # MASTER / STANDBY / MONITOR
self.heartbeat_interval = 5 # seconds
def heartbeat(self):
"""心跳检测"""
while True:
redis.setex(f"keeper:{self.id}:heartbeat", 10, "alive")
time.sleep(self.heartbeat_interval)
def check_master_alive(self):
"""检查主Keeper是否存活"""
master_heartbeat = redis.get("keeper:master:heartbeat")
return master_heartbeat is not None
def takeover(self):
"""接管主Keeper"""
logger.warning("Taking over as MASTER")
self.role = "MASTER"
redis.set("keeper:master", self.id)
self.start_monitoring()相关文档
Last updated