
BGUSER-NXB07P6L
2025/08/10 07:09
1
# -*- coding: utf-8 -*-
import hmac, base64, hashlib, json, time, asyncio, datetime as dt
import requests, websockets
########################
# ==== 配置:硬编码密钥(在本地替换占位符) ==== #
########################
OKX_API_KEY        = "REPLACE_WITH_YOUR_OKX_API_KEY"
OKX_API_SECRET     = "REPLACE_WITH_YOUR_OKX_API_SECRET"
OKX_API_PASSPHRASE = "REPLACE_WITH_YOUR_OKX_API_PASSPHRASE"
BG_API_KEY         = "REPLACE_WITH_YOUR_BITGET_API_KEY"
BG_API_SECRET      = "REPLACE_WITH_YOUR_BITGET_API_SECRET"
BG_PASSPHRASE      = "REPLACE_WITH_YOUR_BITGET_PASSPHRASE"
########################
# ==== 交易/策略参数 ==== #
########################
# 标的
OKX_INST_ID   = "BTC-USDT-SWAP"       # OKX 永续
BG_SYMBOL     = "BTCUSDT"             # Bitget USDT本位永续
BG_PRODUCT    = "USDT-FUTURES"        # Bitget 产品线
# 阈值
OPEN_EQUALITY_USD = 2.0               # |价差| <= 2 开仓锁价
CLOSE_SPREAD_USD  = 18.0              # |价差| >= 18 平仓
USE_MARK_PRICE    = True              # True=标记价;False=最新成交价
# 手数/张数(先小量测试)
OKX_SZ   = "10"
BG_SIZE  = "10"
# 开仓方向(默认 OKX 多 / Bitget 空;可改)
OPEN_OKX_SIDE  = "buy"                # buy/sell
OPEN_BG_SIDE   = "sell"               # buy/sell
OKX_TD_MODE    = "cross"              # cross/isolated
OKX_POS_SIDE   = "long" if OPEN_OKX_SIDE=="buy" else "short"
BG_MARGIN_COIN = "USDT"
# REST & WS
OKX_BASE = "https://www.okx.com"
BG_BASE  = "https://api.bitget.com"
OKX_WS_PUBLIC = "wss://ws.okx.com:8443/ws/v5/public"
BG_WS_PUBLIC  = "wss://ws.bitget.com/v2/ws/public"
# 运行控制
DRY_RUN          = False              # False=真下单(先小仓验证)
REQUEST_TIMEOUT  = 10
WS_RETRY_DELAY   = 2
########################
# ==== 工具函数 ==== #
########################
def iso_ts_ms_str():
    return str(int(time.time() * 1000))
def okx_headers(method, path, body):
    ts = dt.datetime.utcnow().isoformat(timespec="milliseconds") + "Z"
    prehash = f"{ts}{method.upper()}{path}{body}"
    sign = base64.b64encode(hmac.new(OKX_API_SECRET.encode(), prehash.encode(), hashlib.sha256).digest()).decode()
    return {
        "OK-ACCESS-KEY": OKX_API_KEY,
        "OK-ACCESS-SIGN": sign,
        "OK-ACCESS-TIMESTAMP": ts,
        "OK-ACCESS-PASSPHRASE": OKX_API_PASSPHRASE,
        "Content-Type": "application/json"
    }
def bg_sign(ts_ms:str, method:str, path:str, body:str, secret:str)->str:
    msg = f"{ts_ms}{method.upper()}{path}{body}".encode()
    sign = hmac.new(secret.encode(), msg, hashlib.sha256).digest()
    return base64.b64encode(sign).decode()
def bg_headers(method, path, body):
    ts = iso_ts_ms_str()
    sign = bg_sign(ts, method, path, body, BG_API_SECRET)
    return {
        "ACCESS-KEY": BG_API_KEY,
        "ACCESS-SIGN": sign,
        "ACCESS-PASSPHRASE": BG_PASSPHRASE,
        "ACCESS-TIMESTAMP": ts,
        "Content-Type": "application/json"
    }
########################
# ==== 下单/平仓(OKX) ==== #
########################
def okx_place_market(instId, tdMode, side, posSide, sz):
    path = "/api/v5/trade/order"
    url  = OKX_BASE + path
    body = {
        "instId": instId,
        "tdMode": tdMode,
        "side": side,           # buy/sell
        "posSide": posSide,     # long/short(对冲)
        "ordType": "market",
        "sz": str(sz)
    }
    data = json.dumps(body, separators=(",",":"))
    if DRY_RUN:
        print("[DRYRUN][OKX] place", data)
        return {"dryrun":True, "req":body}
    r = requests.post(url, headers=okx_headers("POST", path, data), data=data, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()
    return r.json()
def okx_close_market(instId, tdMode, posSide, sz):
    side = "sell" if posSide=="long" else "buy"
    return okx_place_market(instId, tdMode, side, posSide, sz)
########################
# ==== 下单/平仓(Bitget v2) ==== #
########################
def bg_place_market(symbol, productType, marginCoin, side, tradeSide, size):
    path = "/api/v2/mix/order/place-order"
    url  = BG_BASE + path
    body = {
        "symbol": symbol,
        "productType": productType,   # USDT-FUTURES
        "marginCoin": marginCoin,     # USDT
        "side": side,                 # buy/sell
        "tradeSide": tradeSide,       # open/close
        "orderType": "market",
        "size": str(size),
        "clientOid": f"arb_{int(time.time()*1000)}"
    }
    data = json.dumps(body, separators=(",",":"))
    if DRY_RUN:
        print("[DRYRUN][Bitget] place", data)
        return {"dryrun":True, "req":body}
    r = requests.post(url, headers=bg_headers("POST", path, data), data=data, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()
    return r.json()
def bg_open(symbol, productType, marginCoin, side, size):
    return bg_place_market(symbol, productType, marginCoin, side, "open", size)
def bg_close(symbol, productType, marginCoin, side, size):
    opp_side = "buy" if side=="sell" else "sell"
    return bg_place_market(symbol, productType, marginCoin, opp_side, "close", size)
########################
# ==== 价格源(WS) ==== #
########################
class PriceFeed:
    def __init__(self):
        self.okx_price = None
        self.bg_price  = None
    def okx_channel(self):
        if USE_MARK_PRICE:
            return {"op":"subscribe","args":[{"channel":"mark-price","instId":OKX_INST_ID}]}
        else:
            return {"op":"subscribe","args":[{"channel":"tickers","instId":OKX_INST_ID}]}
    def bg_channel(self):
        channel = "markPrice" if USE_MARK_PRICE else "ticker"
        return {"op":"subscribe","args":[{"instType":"USDT-FUTURES","channel":channel,"instId":BG_SYMBOL}]}
    async def okx_loop(self):
        while True:
            try:
                async with websockets.connect(OKX_WS_PUBLIC, ping_interval=20) as ws:
                    await ws.send(json.dumps(self.okx_channel()))
                    async for msg in ws:
                        data = json.loads(msg)
                        if data.get("event")=="subscribe":
                            print("[OKX] subscribed")
                        elif "data" in data:
                            d = data["data"][0]
                            self.okx_price = float(d["markPx"] if USE_MARK_PRICE else d["last"])
            except Exception as e:
                print("[OKX WS] error:", e)
                await asyncio.sleep(WS_RETRY_DELAY)
    async def bg_loop(self):
        while True:
            try:
                async with websockets.connect(BG_WS_PUBLIC, ping_interval=20) as ws:
                    await ws.send(json.dumps(self.bg_channel()))
                    async for msg in ws:
                        data = json.loads(msg)
                        if data.get("event")=="subscribe":
                            print("[Bitget] subscribed")
                        elif "arg" in data and "data" in data:
                            d = data["data"][0]
                            if USE_MARK_PRICE:
                                self.bg_price = float(d.get("markPrice") or d.get("price"))
                            else:
                                self.bg_price = float(d.get("last") or d.get("price"))
            except Exception as e:
                print("[Bitget WS] error:", e)
                await asyncio.sleep(WS_RETRY_DELAY)
########################
# ==== 策略执行 ==== #
########################
class ArbState:
    def __init__(self):
        self.has_position   = False
        self.entry_spread   = None
        self.okx_side_open  = OPEN_OKX_SIDE
        self.bg_side_open   = OPEN_BG_SIDE
async def main():
    feed = PriceFeed()
    state = ArbState()
    tasks = [asyncio.create_task(feed.okx_loop()),
             asyncio.create_task(feed.bg_loop())]
    try:
        while True:
            await asyncio.sleep(0.2)
            if feed.okx_price is None or feed.bg_price is None:
                continue
            spread = feed.okx_price - feed.bg_price  # OKX - Bitget
            now = dt.datetime.now().strftime("%H:%M:%S")
            print(f"{now} P_okx={feed.okx_price:.2f}  P_bg={feed.bg_price:.2f}  spread={spread:.2f}", end="\r")
            # 无持仓 → 锁价开仓
            if not state.has_position and abs(spread) <= OPEN_EQUALITY_USD:
                print("\n[OPEN] |spread|<=OPEN_EQUALITY_USD,尝试锁价开仓")
                try:
                    okx_res = okx_place_market(OKX_INST_ID, OKX_TD_MODE, state.okx_side_open, OKX_POS_SIDE, OKX_SZ)
                    bg_res  = bg_open(BG_SYMBOL, BG_PRODUCT, BG_MARGIN_COIN, state.bg_side_open, BG_SIZE)
                    print("[OKX OPEN RES]", okx_res)
                    print("[BG  OPEN RES]", bg_res)
                    state.has_position = True
                    state.entry_spread = spread
                except Exception as e:
                    print("[OPEN ERROR]", e)
            # 有持仓 → 触发平仓
            if state.has_position and abs(spread) >= CLOSE_SPREAD_USD:
                print(f"\n[CLOSE] |spread|>=CLOSE_SPREAD_USD,开始同时平仓 | spread={spread:.2f}")
                try:
                    okx_res = okx_close_market(OKX_INST_ID, OKX_TD_MODE, OKX_POS_SIDE, OKX_SZ)
                    bg_res  = bg_close(BG_SYMBOL, BG_PRODUCT, BG_MARGIN_COIN, state.bg_side_open, BG_SIZE)
                    print("[OKX CLOSE RES]", okx_res)
                    print("[BG  CLOSE RES]", bg_res)
                    pnl = (spread - (state.entry_spread or 0.0))
                    print(f"[PNL est] Δspread = {pnl:.2f} USD(未含费率/资金费/滑点)")
                    state.has_position = False
                    state.entry_spread = None
                except Exception as e:
                    print("[CLOSE ERROR]", e)
    finally:
        for t in tasks:
            t.cancel()
if __name__ == "__main__":
    asyncio.run(main())