Python SDK

Zanbara Python SDK 为 Python 应用提供完整的 REST API 和 WebSocket 客户端封装,支持异步和同步两种模式。

安装

pip install zanbara-sdk

快速开始

同步模式

from zanbara import ZanbaraClient

# 创建客户端
client = ZanbaraClient('https://api.zanbarax.com/v1')

# 获取订单簿
orderbook = client.get_orderbook('BTC-PERP', depth=20)
print(f"最优买价: {orderbook['bids'][0][0]}")
print(f"最优卖价: {orderbook['asks'][0][0]}")

# 获取市场统计
stats = client.get_market_stats('BTC-PERP')
print(f"最新价: {stats['lastPrice']}")
print(f"24h涨跌: {stats['priceChangePercent24h']}%")

异步模式

import asyncio
from zanbara import AsyncZanbaraClient

async def main():
    # 创建异步客户端
    client = AsyncZanbaraClient('https://api.zanbarax.com/v1')

    # 获取订单簿
    orderbook = await client.get_orderbook('BTC-PERP', depth=20)
    print(f"订单簿: {orderbook}")

    # 获取K线数据
    klines = await client.get_klines('BTC-PERP', '1h', limit=100)
    print(f"获取了 {len(klines)} 根K线")

asyncio.run(main())

REST API 客户端

初始化

from zanbara import ZanbaraClient

# 基础配置
client = ZanbaraClient(
    base_url='https://api.zanbarax.com/v1',
    timeout=10,
    api_key='YOUR_API_KEY'  # 可选,用于私有接口
)

市场数据

# 获取所有市场
markets = client.get_markets()
for market in markets:
    print(f"{market['symbol']}: {market['baseCurrency']}/{market['quoteCurrency']}")

# 获取订单簿
orderbook = client.get_orderbook('BTC-PERP', depth=20)

# 获取最新成交
trades = client.get_trades('BTC-PERP', limit=50)

# 获取K线数据
klines = client.get_klines(
    market='BTC-PERP',
    interval='1h',
    start_time=int((datetime.now() - timedelta(days=1)).timestamp() * 1000),
    end_time=int(datetime.now().timestamp() * 1000),
    limit=24
)

# 获取24h统计
stats = client.get_market_stats('BTC-PERP')

账户操作

# 设置 API Key
client.set_api_key('YOUR_API_KEY')

# 获取持仓
positions = client.get_positions('YOUR_WALLET_ADDRESS')
for pos in positions:
    print(f"{pos['market']}: {pos['side']} {pos['size']}")

# 获取订单
orders = client.get_orders('YOUR_WALLET_ADDRESS', status='open')

# 获取成交历史
fills = client.get_fills(
    'YOUR_WALLET_ADDRESS',
    market='BTC-PERP',
    limit=50
)

高级订单

# 创建 Iceberg 订单
iceberg_order = client.create_iceberg_order(
    symbol='BTC-PERP',
    side='buy',
    total_quantity='5.0',
    visible_quantity='0.5',
    price='43000.0'
)
print(f"Iceberg 订单 ID: {iceberg_order['icebergOrderId']}")

# 创建 TWAP 订单
twap_order = client.create_twap_order(
    symbol='ETH-PERP',
    side='sell',
    total_quantity='10.0',
    duration_seconds=3600,
    num_slices=10,
    randomize_intervals=True
)
print(f"TWAP 订单 ID: {twap_order['twapOrderId']}")

# 获取执行统计
iceberg_stats = client.get_iceberg_stats('iceberg_abc123')
print(f"执行进度: {iceberg_stats['executedQuantity']}/{iceberg_stats['totalQuantity']}")

twap_stats = client.get_twap_stats('twap_xyz789')
print(f"平均成交价: {twap_stats['averageExecutionPrice']}")
print(f"总滑点: {twap_stats['totalSlippage']}")

# 暂停/恢复/取消
client.pause_iceberg_order('iceberg_abc123')
client.resume_iceberg_order('iceberg_abc123')
client.cancel_iceberg_order('iceberg_abc123')

WebSocket 客户端

基础使用

import asyncio
from zanbara import ZanbaraWebSocket

async def on_ticker(data):
    print(f"Ticker: {data['market']} ${data['lastPrice']}")

async def main():
    ws = ZanbaraWebSocket('wss://ws.zanbarax.com')

    # 连接
    await ws.connect()

    # 订阅 Ticker
    await ws.subscribe_ticker('BTC-PERP', on_ticker)

    # 保持运行
    await ws.run_forever()

asyncio.run(main())

订阅多个频道

import asyncio
from zanbara import ZanbaraWebSocket

class MarketMonitor:
    def __init__(self):
        self.ws = ZanbaraWebSocket('wss://ws.zanbarax.com')

    async def on_orderbook(self, data):
        print(f"订单簿更新: {data['market']}")

    async def on_trades(self, data):
        print(f"新成交: {data['market']} {data['side']} {data['quantity']}@{data['price']}")

    async def on_ticker(self, data):
        print(f"Ticker: {data['market']} {data['lastPrice']} ({data['priceChangePercent24h']}%)")

    async def start(self, markets):
        await self.ws.connect()

        for market in markets:
            await self.ws.subscribe_orderbook(market, self.on_orderbook)
            await self.ws.subscribe_trades(market, self.on_trades)
            await self.ws.subscribe_ticker(market, self.on_ticker)

        await self.ws.run_forever()

# 使用
async def main():
    monitor = MarketMonitor()
    await monitor.start(['BTC-PERP', 'ETH-PERP', 'SOL-PERP'])

asyncio.run(main())

认证连接 (私有频道)

import asyncio
from zanbara import ZanbaraClient, ZanbaraWebSocket

async def main():
    # 获取 WebSocket token
    client = ZanbaraClient('https://api.zanbarax.com/v1', api_key='YOUR_API_KEY')
    token = client.get_websocket_token()

    # 使用 token 连接
    ws = ZanbaraWebSocket(f'wss://ws.zanbarax.com?token={token}')
    await ws.connect()

    # 订阅私有频道
    await ws.subscribe_orders('YOUR_WALLET_ADDRESS', on_order_update)
    await ws.subscribe_positions('YOUR_WALLET_ADDRESS', on_position_update)
    await ws.subscribe_account(on_account_update)

    await ws.run_forever()

async def on_order_update(data):
    print(f"订单更新: {data['orderId']} {data['status']}")

async def on_position_update(data):
    print(f"持仓更新: {data['market']} {data['unrealizedPnl']}")

async def on_account_update(data):
    print(f"账户更新: {data['totalEquity']}")

asyncio.run(main())

Pandas 集成

SDK 提供 Pandas DataFrame 集成,方便数据分析:

import pandas as pd
from zanbara import ZanbaraClient

client = ZanbaraClient('https://api.zanbarax.com/v1')

# 获取K线数据并转换为 DataFrame
klines = client.get_klines('BTC-PERP', '1h', limit=100)
df = pd.DataFrame(klines)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)

# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
    df[col] = df[col].astype(float)

# 计算技术指标
df['sma_20'] = df['close'].rolling(window=20).mean()
df['ema_12'] = df['close'].ewm(span=12).mean()
df['ema_26'] = df['close'].ewm(span=26).mean()
df['macd'] = df['ema_12'] - df['ema_26']

# 计算 RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))

# 打印统计
print(df.tail())
print(f"\n统计信息:")
print(f"最高价: {df['high'].max()}")
print(f"最低价: {df['low'].min()}")
print(f"平均成交量: {df['volume'].mean():.2f}")

异步批量操作

import asyncio
from zanbara import AsyncZanbaraClient

async def fetch_multiple_markets(client, markets):
    """并发获取多个市场数据"""
    tasks = [client.get_market_stats(market) for market in markets]
    results = await asyncio.gather(*tasks)
    return results

async def main():
    client = AsyncZanbaraClient('https://api.zanbarax.com/v1')

    markets = ['BTC-PERP', 'ETH-PERP', 'SOL-PERP', 'ARB-PERP']

    # 并发获取
    stats_list = await fetch_multiple_markets(client, markets)

    for stats in stats_list:
        print(f"{stats['market']}: ${stats['lastPrice']} ({stats['priceChangePercent24h']}%)")

asyncio.run(main())

错误处理

from zanbara import ZanbaraClient, ZanbaraError, ApiError, NetworkError

client = ZanbaraClient('https://api.zanbarax.com/v1')

try:
    orderbook = client.get_orderbook('BTC-PERP')
except ApiError as e:
    print(f"API 错误: {e.message}")
    print(f"错误码: {e.code}")
    print(f"状态码: {e.status_code}")
except NetworkError as e:
    print(f"网络错误: {e}")
except ZanbaraError as e:
    print(f"SDK 错误: {e}")

完整示例

import asyncio
import pandas as pd
from datetime import datetime, timedelta
from zanbara import AsyncZanbaraClient, ZanbaraWebSocket

class TradingBot:
    def __init__(self, api_url, ws_url):
        self.api_client = AsyncZanbaraClient(api_url)
        self.ws_client = ZanbaraWebSocket(ws_url)
        self.current_price = {}

    async def initialize(self):
        """初始化"""
        # 连接 WebSocket
        await self.ws_client.connect()
        print("✓ WebSocket 已连接")

        # 加载历史数据
        await self.load_historical_data()

    async def load_historical_data(self):
        """加载历史数据"""
        end_time = int(datetime.now().timestamp() * 1000)
        start_time = int((datetime.now() - timedelta(days=1)).timestamp() * 1000)

        klines = await self.api_client.get_klines(
            'BTC-PERP',
            '1h',
            start_time=start_time,
            end_time=end_time
        )

        df = pd.DataFrame(klines)
        print(f"✓ 加载了 {len(df)} 根K线")

    async def on_ticker(self, data):
        """处理 Ticker 更新"""
        market = data['market']
        price = float(data['lastPrice'])

        self.current_price[market] = price
        print(f"{market}: ${price}")

    async def run(self, markets):
        """运行机器人"""
        await self.initialize()

        # 订阅市场数据
        for market in markets:
            await self.ws_client.subscribe_ticker(market, self.on_ticker)

        print("✓ 交易机器人已启动")
        await self.ws_client.run_forever()

# 运行
async def main():
    bot = TradingBot(
        'https://api.zanbarax.com/v1',
        'wss://ws.zanbarax.com'
    )
    await bot.run(['BTC-PERP', 'ETH-PERP'])

asyncio.run(main())

下一步

Last updated