Redis 架构
版本: v1.0 最后更新: 2025-10-07
Redis 在 Zanbara 中的作用
Redis 是 Zanbara 的核心缓存和高频数据存储系统,承载以下关键功能:
订单簿缓存(实时价格档位)
用户会话管理(JWT token)
速率限制(请求计数)
行情数据缓存(ticker、trades)
实时消息发布订阅
分布式锁
核心数据结构
1. 订单簿 (Sorted Set)
订单簿使用 Redis Sorted Set 实现价格排序。
# 买单(按价格降序)
ZADD orderbook:SOL-PERP:bids 204.50 "order_id_1|size_10.5"
ZADD orderbook:SOL-PERP:bids 204.30 "order_id_2|size_5.2"
ZADD orderbook:SOL-PERP:bids 204.00 "order_id_3|size_20.0"
# 卖单(按价格升序)
ZADD orderbook:SOL-PERP:asks 205.00 "order_id_4|size_8.0"
ZADD orderbook:SOL-PERP:asks 205.20 "order_id_5|size_12.5"
ZADD orderbook:SOL-PERP:asks 205.50 "order_id_6|size_6.0"
# 获取最佳买单(Top 20档)
ZREVRANGE orderbook:SOL-PERP:bids 0 19 WITHSCORES
# 获取最佳卖单(Top 20档)
ZRANGE orderbook:SOL-PERP:asks 0 19 WITHSCORES
# 删除已成交订单
ZREM orderbook:SOL-PERP:bids "order_id_1|size_10.5"Rust 代码示例:
use redis::{Commands, RedisResult};
pub async fn get_orderbook_bids(
conn: &mut redis::Connection,
symbol: &str,
depth: isize,
) -> RedisResult<Vec<(String, f64)>> {
let key = format!("orderbook:{}:bids", symbol);
conn.zrevrange_withscores(key, 0, depth)
}
pub async fn add_order_to_book(
conn: &mut redis::Connection,
symbol: &str,
side: &str,
price: f64,
order_id: &str,
size: f64,
) -> RedisResult<()> {
let key = format!("orderbook:{}:{}", symbol, side);
let value = format!("{}|{}", order_id, size);
conn.zadd(key, value, price)
}2. 用户会话 (String + TTL)
# 设置会话(24小时过期)
SETEX session:GHj8k2...abc 86400 "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
# 获取会话
GET session:GHj8k2...abc
# 检查会话是否存在
EXISTS session:GHj8k2...abc
# 延长会话(重置过期时间)
EXPIRE session:GHj8k2...abc 86400
# 登出(删除会话)
DEL session:GHj8k2...abc3. 速率限制 (String + 原子递增)
# 订单速率限制(60秒内最多10笔)
INCR ratelimit:user:abc123:orders
EXPIRE ratelimit:user:abc123:orders 60
GET ratelimit:user:abc123:orders
# API速率限制(滑动窗口)
ZADD ratelimit:api:user:abc123 1696723200 "req_1"
ZADD ratelimit:api:user:abc123 1696723201 "req_2"
ZREMRANGEBYSCORE ratelimit:api:user:abc123 0 1696723140 # 删除60秒前的请求
ZCARD ratelimit:api:user:abc123 # 统计60秒内请求数Rust 速率限制实现:
pub async fn check_rate_limit(
conn: &mut redis::Connection,
user_id: &str,
limit: u32,
window_secs: usize,
) -> RedisResult<bool> {
let key = format!("ratelimit:user:{}:orders", user_id);
let count: u32 = conn.get(&key).unwrap_or(0);
if count >= limit {
return Ok(false); // 超过限制
}
redis::pipe()
.atomic()
.incr(&key, 1)
.expire(&key, window_secs)
.query(conn)?;
Ok(true)
}4. 行情快照 (Hash)
# 设置市场数据
HSET market:SOL-PERP last_price "205.00"
HSET market:SOL-PERP volume_24h "15000000"
HSET market:SOL-PERP high_24h "210.50"
HSET market:SOL-PERP low_24h "198.30"
HSET market:SOL-PERP change_24h "3.5"
# 批量设置
HMSET market:SOL-PERP \
last_price "205.00" \
volume_24h "15000000" \
high_24h "210.50" \
low_24h "198.30"
# 获取所有字段
HGETALL market:SOL-PERP
# 获取单个字段
HGET market:SOL-PERP last_price
# 原子递增交易量
HINCRBYFLOAT market:SOL-PERP volume_24h 1250.505. 最新成交 (List - FIFO)
# 添加最新成交(保留最近100笔)
LPUSH trades:SOL-PERP '{"price":205.0,"size":10,"side":"BUY","time":1696723200}'
LTRIM trades:SOL-PERP 0 99
# 获取最近10笔成交
LRANGE trades:SOL-PERP 0 9
# 获取最近成交数量
LLEN trades:SOL-PERP6. 实时推送 (Pub/Sub)
# 发布成交消息
PUBLISH trades:SOL-PERP '{"price":205,"size":10,"side":"BUY","time":1696723200}'
# 发布订单簿更新
PUBLISH orderbook:SOL-PERP '{"bids":[[204.5,10],[204.3,5]],"asks":[[205.0,8]]}'
# 发布持仓变动
PUBLISH positions:user:abc123 '{"symbol":"SOL-PERP","size":10,"pnl":250}'Rust Pub/Sub 示例:
use redis::aio::ConnectionManager;
pub async fn publish_trade(
conn: &mut ConnectionManager,
symbol: &str,
trade: &Trade,
) -> RedisResult<()> {
let channel = format!("trades:{}", symbol);
let payload = serde_json::to_string(trade).unwrap();
redis::cmd("PUBLISH")
.arg(channel)
.arg(payload)
.query_async(conn)
.await
}
pub async fn subscribe_trades(
conn: &mut ConnectionManager,
symbol: &str,
) -> RedisResult<()> {
let mut pubsub = conn.as_pubsub();
pubsub.subscribe(format!("trades:{}", symbol)).await?;
while let Some(msg) = pubsub.on_message().next().await {
let payload: String = msg.get_payload()?;
let trade: Trade = serde_json::from_str(&payload)?;
println!("Received trade: {:?}", trade);
}
Ok(())
}7. 分布式锁 (String - NX EX)
# 获取锁(30秒过期)
SET lock:settlement:batch_1 "worker_id_xyz" NX EX 30
# 检查锁是否存在
GET lock:settlement:batch_1
# 释放锁(Lua脚本保证原子性)
EVAL "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" \
1 lock:settlement:batch_1 "worker_id_xyz"Rust 分布式锁实现:
pub struct RedisLock {
key: String,
token: String,
ttl_secs: usize,
}
impl RedisLock {
pub async fn acquire(
conn: &mut redis::Connection,
key: String,
ttl_secs: usize,
) -> RedisResult<Option<Self>> {
let token = uuid::Uuid::new_v4().to_string();
let result: Option<String> = redis::cmd("SET")
.arg(&key)
.arg(&token)
.arg("NX")
.arg("EX")
.arg(ttl_secs)
.query(conn)?;
if result.is_some() {
Ok(Some(Self { key, token, ttl_secs }))
} else {
Ok(None)
}
}
pub async fn release(&self, conn: &mut redis::Connection) -> RedisResult<()> {
let script = r#"
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"#;
redis::Script::new(script)
.key(&self.key)
.arg(&self.token)
.invoke(conn)?;
Ok(())
}
}Redis Streams (消息队列)
成交事件流
# 生产消息
XADD trades:stream * \
trade_id "abc-123" \
user_id "user-456" \
symbol "SOL-PERP" \
price "205.0" \
size "10.0"
# 创建消费者组
XGROUP CREATE trades:stream settlement_group 0 MKSTREAM
# 消费消息(批量读取)
XREADGROUP GROUP settlement_group consumer1 \
COUNT 10 \
BLOCK 5000 \
STREAMS trades:stream >
# 确认消息
XACK trades:stream settlement_group 1696723200000-0
# 查看待处理消息
XPENDING trades:stream settlement_groupRust Streams 示例:
use redis::streams::{StreamReadOptions, StreamReadReply};
pub async fn produce_trade_event(
conn: &mut redis::Connection,
trade: &Trade,
) -> RedisResult<String> {
let items = &[
("trade_id", trade.id.as_str()),
("user_id", trade.user_id.as_str()),
("symbol", trade.symbol.as_str()),
("price", &trade.price.to_string()),
("size", &trade.size.to_string()),
];
redis::cmd("XADD")
.arg("trades:stream")
.arg("*")
.arg(items)
.query(conn)
}
pub async fn consume_trade_events(
conn: &mut redis::Connection,
group: &str,
consumer: &str,
) -> RedisResult<Vec<Trade>> {
let opts = StreamReadOptions::default()
.group(group, consumer)
.count(10)
.block(5000);
let results: StreamReadReply = conn.xread_options(&["trades:stream"], &[">"], &opts)?;
// 处理结果...
Ok(vec![])
}持久化配置
RDB + AOF 混合持久化
# redis.conf
# RDB快照配置
save 900 1 # 15分钟有1次写入则保存
save 300 10 # 5分钟有10次写入则保存
save 60 10000 # 1分钟有10000次写入则保存
dbfilename dump.rdb
dir /var/lib/redis
# AOF日志配置
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec # 每秒同步一次(性能与安全平衡)
# AOF重写配置
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
# 混合持久化(Redis 4.0+)
aof-use-rdb-preamble yes持久化策略选择
场景
推荐配置
原因
订单簿
AOF everysec
数据重要,允许最多丢失1秒
会话
RDB only
可接受丢失,重新登录即可
限流计数
No persistence
临时数据,可丢失
消息队列
AOF everysec
消息不能丢失
Redis Cluster 配置(扩展阶段)
集群架构
Redis Cluster (3主3从)
├─ Master 1 (Slots 0-5460)
│ └─ Replica 1
├─ Master 2 (Slots 5461-10922)
│ └─ Replica 2
└─ Master 3 (Slots 10923-16383)
└─ Replica 3集群配置文件
# redis-cluster.conf
# 启用集群模式
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
# 主从配置
cluster-replica-validity-factor 10
cluster-migration-barrier 1
# 故障转移
cluster-require-full-coverage no创建集群
# 启动6个Redis实例(3主3从)
redis-server --port 7000 --cluster-enabled yes --cluster-config-file nodes-7000.conf &
redis-server --port 7001 --cluster-enabled yes --cluster-config-file nodes-7001.conf &
# ... 启动其他实例
# 创建集群
redis-cli --cluster create \
127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
--cluster-replicas 1性能优化
内存优化
# 最大内存限制
maxmemory 8gb
# 淘汰策略
maxmemory-policy allkeys-lru
# 压缩配置
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64网络优化
# TCP配置
tcp-backlog 511
tcp-keepalive 300
# 客户端超时
timeout 300
# 慢查询日志
slowlog-log-slower-than 10000 # 10ms
slowlog-max-len 128监控指标
关键指标查询
# 内存使用
INFO memory
# QPS统计
INFO stats
# 客户端连接
INFO clients
# 持久化状态
INFO persistence
# 复制状态
INFO replication
# 慢查询日志
SLOWLOG GET 10Prometheus 监控
# prometheus.yml
scrape_configs:
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']关键指标:
redis_memory_used_bytes: 内存使用redis_commands_processed_total: 命令执行总数redis_connected_clients: 连接客户端数redis_keyspace_hits_total: 缓存命中次数redis_keyspace_misses_total: 缓存未命中次数
故障排查
常见问题
1. 内存溢出
# 检查内存使用
redis-cli INFO memory | grep used_memory_human
# 检查大Key
redis-cli --bigkeys
# 清理过期Key
redis-cli --scan --pattern "session:*" | xargs redis-cli DEL2. 慢查询
# 查看慢查询日志
redis-cli SLOWLOG GET 10
# 常见慢查询原因
# - KEYS命令(使用SCAN替代)
# - SMEMBERS大集合(使用SSCAN)
# - ZRANGE大范围(限制返回数量)3. 连接耗尽
# 查看连接数
redis-cli INFO clients | grep connected_clients
# 查看客户端列表
redis-cli CLIENT LIST
# 杀掉空闲连接
redis-cli CLIENT KILL TYPE normal相关文档
维护: Zanbara 技术团队
Last updated