Redis 架构

版本: v1.0 最后更新: 2025-10-07


Redis 在 Zanbara 中的作用

Redis 是 Zanbara 的核心缓存和高频数据存储系统,承载以下关键功能:

  • 订单簿缓存(实时价格档位)

  • 用户会话管理(JWT token)

  • 速率限制(请求计数)

  • 行情数据缓存(ticker、trades)

  • 实时消息发布订阅

  • 分布式锁


核心数据结构

1. 订单簿 (Sorted Set)

订单簿使用 Redis Sorted Set 实现价格排序。

# 买单(按价格降序)
ZADD orderbook:SOL-PERP:bids 204.50 "order_id_1|size_10.5"
ZADD orderbook:SOL-PERP:bids 204.30 "order_id_2|size_5.2"
ZADD orderbook:SOL-PERP:bids 204.00 "order_id_3|size_20.0"

# 卖单(按价格升序)
ZADD orderbook:SOL-PERP:asks 205.00 "order_id_4|size_8.0"
ZADD orderbook:SOL-PERP:asks 205.20 "order_id_5|size_12.5"
ZADD orderbook:SOL-PERP:asks 205.50 "order_id_6|size_6.0"

# 获取最佳买单(Top 20档)
ZREVRANGE orderbook:SOL-PERP:bids 0 19 WITHSCORES

# 获取最佳卖单(Top 20档)
ZRANGE orderbook:SOL-PERP:asks 0 19 WITHSCORES

# 删除已成交订单
ZREM orderbook:SOL-PERP:bids "order_id_1|size_10.5"

Rust 代码示例:

use redis::{Commands, RedisResult};

pub async fn get_orderbook_bids(
    conn: &mut redis::Connection,
    symbol: &str,
    depth: isize,
) -> RedisResult<Vec<(String, f64)>> {
    let key = format!("orderbook:{}:bids", symbol);
    conn.zrevrange_withscores(key, 0, depth)
}

pub async fn add_order_to_book(
    conn: &mut redis::Connection,
    symbol: &str,
    side: &str,
    price: f64,
    order_id: &str,
    size: f64,
) -> RedisResult<()> {
    let key = format!("orderbook:{}:{}", symbol, side);
    let value = format!("{}|{}", order_id, size);
    conn.zadd(key, value, price)
}

2. 用户会话 (String + TTL)

# 设置会话(24小时过期)
SETEX session:GHj8k2...abc 86400 "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

# 获取会话
GET session:GHj8k2...abc

# 检查会话是否存在
EXISTS session:GHj8k2...abc

# 延长会话(重置过期时间)
EXPIRE session:GHj8k2...abc 86400

# 登出(删除会话)
DEL session:GHj8k2...abc

3. 速率限制 (String + 原子递增)

# 订单速率限制(60秒内最多10笔)
INCR ratelimit:user:abc123:orders
EXPIRE ratelimit:user:abc123:orders 60
GET ratelimit:user:abc123:orders

# API速率限制(滑动窗口)
ZADD ratelimit:api:user:abc123 1696723200 "req_1"
ZADD ratelimit:api:user:abc123 1696723201 "req_2"
ZREMRANGEBYSCORE ratelimit:api:user:abc123 0 1696723140  # 删除60秒前的请求
ZCARD ratelimit:api:user:abc123  # 统计60秒内请求数

Rust 速率限制实现:

pub async fn check_rate_limit(
    conn: &mut redis::Connection,
    user_id: &str,
    limit: u32,
    window_secs: usize,
) -> RedisResult<bool> {
    let key = format!("ratelimit:user:{}:orders", user_id);

    let count: u32 = conn.get(&key).unwrap_or(0);

    if count >= limit {
        return Ok(false);  // 超过限制
    }

    redis::pipe()
        .atomic()
        .incr(&key, 1)
        .expire(&key, window_secs)
        .query(conn)?;

    Ok(true)
}

4. 行情快照 (Hash)

# 设置市场数据
HSET market:SOL-PERP last_price "205.00"
HSET market:SOL-PERP volume_24h "15000000"
HSET market:SOL-PERP high_24h "210.50"
HSET market:SOL-PERP low_24h "198.30"
HSET market:SOL-PERP change_24h "3.5"

# 批量设置
HMSET market:SOL-PERP \
    last_price "205.00" \
    volume_24h "15000000" \
    high_24h "210.50" \
    low_24h "198.30"

# 获取所有字段
HGETALL market:SOL-PERP

# 获取单个字段
HGET market:SOL-PERP last_price

# 原子递增交易量
HINCRBYFLOAT market:SOL-PERP volume_24h 1250.50

5. 最新成交 (List - FIFO)

# 添加最新成交(保留最近100笔)
LPUSH trades:SOL-PERP '{"price":205.0,"size":10,"side":"BUY","time":1696723200}'
LTRIM trades:SOL-PERP 0 99

# 获取最近10笔成交
LRANGE trades:SOL-PERP 0 9

# 获取最近成交数量
LLEN trades:SOL-PERP

6. 实时推送 (Pub/Sub)

# 发布成交消息
PUBLISH trades:SOL-PERP '{"price":205,"size":10,"side":"BUY","time":1696723200}'

# 发布订单簿更新
PUBLISH orderbook:SOL-PERP '{"bids":[[204.5,10],[204.3,5]],"asks":[[205.0,8]]}'

# 发布持仓变动
PUBLISH positions:user:abc123 '{"symbol":"SOL-PERP","size":10,"pnl":250}'

Rust Pub/Sub 示例:

use redis::aio::ConnectionManager;

pub async fn publish_trade(
    conn: &mut ConnectionManager,
    symbol: &str,
    trade: &Trade,
) -> RedisResult<()> {
    let channel = format!("trades:{}", symbol);
    let payload = serde_json::to_string(trade).unwrap();
    redis::cmd("PUBLISH")
        .arg(channel)
        .arg(payload)
        .query_async(conn)
        .await
}

pub async fn subscribe_trades(
    conn: &mut ConnectionManager,
    symbol: &str,
) -> RedisResult<()> {
    let mut pubsub = conn.as_pubsub();
    pubsub.subscribe(format!("trades:{}", symbol)).await?;

    while let Some(msg) = pubsub.on_message().next().await {
        let payload: String = msg.get_payload()?;
        let trade: Trade = serde_json::from_str(&payload)?;
        println!("Received trade: {:?}", trade);
    }

    Ok(())
}

7. 分布式锁 (String - NX EX)

# 获取锁(30秒过期)
SET lock:settlement:batch_1 "worker_id_xyz" NX EX 30

# 检查锁是否存在
GET lock:settlement:batch_1

# 释放锁(Lua脚本保证原子性)
EVAL "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" \
    1 lock:settlement:batch_1 "worker_id_xyz"

Rust 分布式锁实现:

pub struct RedisLock {
    key: String,
    token: String,
    ttl_secs: usize,
}

impl RedisLock {
    pub async fn acquire(
        conn: &mut redis::Connection,
        key: String,
        ttl_secs: usize,
    ) -> RedisResult<Option<Self>> {
        let token = uuid::Uuid::new_v4().to_string();

        let result: Option<String> = redis::cmd("SET")
            .arg(&key)
            .arg(&token)
            .arg("NX")
            .arg("EX")
            .arg(ttl_secs)
            .query(conn)?;

        if result.is_some() {
            Ok(Some(Self { key, token, ttl_secs }))
        } else {
            Ok(None)
        }
    }

    pub async fn release(&self, conn: &mut redis::Connection) -> RedisResult<()> {
        let script = r#"
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
        "#;

        redis::Script::new(script)
            .key(&self.key)
            .arg(&self.token)
            .invoke(conn)?;

        Ok(())
    }
}

Redis Streams (消息队列)

成交事件流

# 生产消息
XADD trades:stream * \
    trade_id "abc-123" \
    user_id "user-456" \
    symbol "SOL-PERP" \
    price "205.0" \
    size "10.0"

# 创建消费者组
XGROUP CREATE trades:stream settlement_group 0 MKSTREAM

# 消费消息(批量读取)
XREADGROUP GROUP settlement_group consumer1 \
    COUNT 10 \
    BLOCK 5000 \
    STREAMS trades:stream >

# 确认消息
XACK trades:stream settlement_group 1696723200000-0

# 查看待处理消息
XPENDING trades:stream settlement_group

Rust Streams 示例:

use redis::streams::{StreamReadOptions, StreamReadReply};

pub async fn produce_trade_event(
    conn: &mut redis::Connection,
    trade: &Trade,
) -> RedisResult<String> {
    let items = &[
        ("trade_id", trade.id.as_str()),
        ("user_id", trade.user_id.as_str()),
        ("symbol", trade.symbol.as_str()),
        ("price", &trade.price.to_string()),
        ("size", &trade.size.to_string()),
    ];

    redis::cmd("XADD")
        .arg("trades:stream")
        .arg("*")
        .arg(items)
        .query(conn)
}

pub async fn consume_trade_events(
    conn: &mut redis::Connection,
    group: &str,
    consumer: &str,
) -> RedisResult<Vec<Trade>> {
    let opts = StreamReadOptions::default()
        .group(group, consumer)
        .count(10)
        .block(5000);

    let results: StreamReadReply = conn.xread_options(&["trades:stream"], &[">"], &opts)?;

    // 处理结果...
    Ok(vec![])
}

持久化配置

RDB + AOF 混合持久化

# redis.conf

# RDB快照配置
save 900 1        # 15分钟有1次写入则保存
save 300 10       # 5分钟有10次写入则保存
save 60 10000     # 1分钟有10000次写入则保存

dbfilename dump.rdb
dir /var/lib/redis

# AOF日志配置
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec  # 每秒同步一次(性能与安全平衡)

# AOF重写配置
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

# 混合持久化(Redis 4.0+)
aof-use-rdb-preamble yes

持久化策略选择

场景
推荐配置
原因

订单簿

AOF everysec

数据重要,允许最多丢失1秒

会话

RDB only

可接受丢失,重新登录即可

限流计数

No persistence

临时数据,可丢失

消息队列

AOF everysec

消息不能丢失


Redis Cluster 配置(扩展阶段)

集群架构

Redis Cluster (3主3从)
├─ Master 1 (Slots 0-5460)
│  └─ Replica 1
├─ Master 2 (Slots 5461-10922)
│  └─ Replica 2
└─ Master 3 (Slots 10923-16383)
   └─ Replica 3

集群配置文件

# redis-cluster.conf

# 启用集群模式
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000

# 主从配置
cluster-replica-validity-factor 10
cluster-migration-barrier 1

# 故障转移
cluster-require-full-coverage no

创建集群

# 启动6个Redis实例(3主3从)
redis-server --port 7000 --cluster-enabled yes --cluster-config-file nodes-7000.conf &
redis-server --port 7001 --cluster-enabled yes --cluster-config-file nodes-7001.conf &
# ... 启动其他实例

# 创建集群
redis-cli --cluster create \
    127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
    127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
    --cluster-replicas 1

性能优化

内存优化

# 最大内存限制
maxmemory 8gb

# 淘汰策略
maxmemory-policy allkeys-lru

# 压缩配置
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64

网络优化

# TCP配置
tcp-backlog 511
tcp-keepalive 300

# 客户端超时
timeout 300

# 慢查询日志
slowlog-log-slower-than 10000  # 10ms
slowlog-max-len 128

监控指标

关键指标查询

# 内存使用
INFO memory

# QPS统计
INFO stats

# 客户端连接
INFO clients

# 持久化状态
INFO persistence

# 复制状态
INFO replication

# 慢查询日志
SLOWLOG GET 10

Prometheus 监控

# prometheus.yml
scrape_configs:
  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121']

关键指标:

  • redis_memory_used_bytes: 内存使用

  • redis_commands_processed_total: 命令执行总数

  • redis_connected_clients: 连接客户端数

  • redis_keyspace_hits_total: 缓存命中次数

  • redis_keyspace_misses_total: 缓存未命中次数


故障排查

常见问题

1. 内存溢出

# 检查内存使用
redis-cli INFO memory | grep used_memory_human

# 检查大Key
redis-cli --bigkeys

# 清理过期Key
redis-cli --scan --pattern "session:*" | xargs redis-cli DEL

2. 慢查询

# 查看慢查询日志
redis-cli SLOWLOG GET 10

# 常见慢查询原因
# - KEYS命令(使用SCAN替代)
# - SMEMBERS大集合(使用SSCAN)
# - ZRANGE大范围(限制返回数量)

3. 连接耗尽

# 查看连接数
redis-cli INFO clients | grep connected_clients

# 查看客户端列表
redis-cli CLIENT LIST

# 杀掉空闲连接
redis-cli CLIENT KILL TYPE normal

相关文档


维护: Zanbara 技术团队

Last updated