import os import random import logging from logging.handlers import TimedRotatingFileHandler from config import Config from xtquant.xttrader import XtQuantTrader from xtquant.xttype import StockAccount from xtquant import xtconstant from xtquant.xtdata import get_instrument_detail, get_trading_time import datetime as dt from chinese_calendar import is_workday # 日志配置 LOG_DIR = "log" if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) log_path = os.path.join(LOG_DIR, "%Y-%m-%d.log") logger = logging.getLogger("xt_trader") logger.setLevel(logging.INFO) handler = TimedRotatingFileHandler( os.path.join(LOG_DIR, "xt_trader.log"), when="midnight", interval=1, backupCount=7, encoding="utf-8" ) handler.suffix = "%Y-%m-%d" formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) if not logger.handlers: logger.addHandler(handler) class MyXtQuantTraderCallback: def on_connected(self): logger.info("连接成功") def on_disconnected(self): logger.warning("连接断开") def on_account_status(self, status): logger.info(f"账号状态: {status.account_id} {status.status}") def on_stock_asset(self, asset): logger.info(f"资金变动: {asset.account_id} {asset.cash} {asset.total_asset}") def on_stock_order(self, order): logger.info(f"委托回报: {order.stock_code} {order.order_status} {order.order_sysid}") def on_stock_trade(self, trade): logger.info(f"成交变动: {trade.account_id} {trade.stock_code} {trade.order_id}") def on_stock_position(self, position): logger.info(f"持仓变动: {position.stock_code} {position.volume}") def on_order_error(self, order_error): logger.error(f"委托失败: {order_error.order_id} {order_error.error_id} {order_error.error_msg}") def on_cancel_error(self, cancel_error): logger.error(f"撤单失败: {cancel_error.order_id} {cancel_error.error_id} {cancel_error.error_msg}") def on_order_stock_async_response(self, response): logger.info(f"异步下单反馈: {response.order_id}") def on_cancel_order_stock_async_response(self, response): logger.info(f"异步撤单反馈: {response.order_id}") def on_smt_appointment_async_response(self, response): logger.info(f"约券异步反馈: {response.seq}") class XtTrader: def __init__(self): self._ACCOUNT = Config.XT_ACCOUNT self._PATH = Config.XT_PATH self._SESSION_ID = random.randint(100000, 99999999) self._account_type = os.environ.get("XT_ACCOUNT_TYPE", "STOCK") self._strategy_name = os.environ.get("XT_STRATEGY_NAME", "xt_strategy") self._remark = os.environ.get("XT_REMARK", "remark") self._callback = MyXtQuantTraderCallback() self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID) self.account = StockAccount(self._ACCOUNT, self._account_type) self.xt_trader.register_callback(self._callback) self.started = False self.connected = False self.subscribed = False def login(self): if not self.started: self.xt_trader.start() self.started = True if not self.connected: result = self.xt_trader.connect() self.connected = (result == 0) if not self.subscribed: result = self.xt_trader.subscribe(self.account) self.subscribed = (result == 0) return self.connected and self.subscribed def logout(self): if self.started: self.xt_trader.stop() self.started = False self.connected = False self.subscribed = False def get_balance(self): asset = self.xt_trader.query_stock_asset(self.account) if asset: return { "account_id": asset.account_id, "cash": asset.cash, "frozen_cash": asset.frozen_cash, "market_value": asset.market_value, "total_asset": asset.total_asset } return None def get_positions(self): positions = self.xt_trader.query_stock_positions(self.account) if positions: return [ { "account_id": p.account_id, "stock_code": p.stock_code, "volume": p.volume, "can_use_volume": p.can_use_volume, "open_price": p.open_price, "market_value": p.market_value, "frozen_volume": p.frozen_volume, "on_road_volume": p.on_road_volume, "yesterday_volume": p.yesterday_volume, "avg_price": p.avg_price } for p in positions ] return [] def get_today_trades(self): trades = self.xt_trader.query_stock_trades(self.account) if trades: return [ { "account_id": t.account_id, "stock_code": t.stock_code, "stock_name": get_instrument_detail(t.stock_code)["InstrumentName"] if get_instrument_detail(t.stock_code) else "", "order_id": t.order_id, "traded_id": t.traded_id, "traded_time": t.traded_time, "traded_price": t.traded_price, "traded_volume": t.traded_volume, "traded_amount": t.traded_amount, "trade_type": "buy" if t.order_type == xtconstant.STOCK_BUY else "sell" } for t in trades ] return [] def get_today_entrust(self): orders = self.xt_trader.query_stock_orders(self.account) if orders: return [ { "account_id": o.account_id, "stock_code": o.stock_code, "order_id": o.order_id, "order_time": o.order_time, "order_type": "buy" if o.order_type == xtconstant.STOCK_BUY else "sell", "order_volume": o.order_volume, "price_type": self._convert_price_type(o.price_type), "price": o.price, "traded_volume": o.traded_volume, "traded_price": o.traded_price, "order_status": o.order_status, "status_msg": o.status_msg } for o in orders ] return [] def _convert_price_type(self, price_type): """Convert numeric price type to readable string""" price_type_map = { xtconstant.LATEST_PRICE: "latest_price", # 最新价 xtconstant.FIX_PRICE: "limit_price", # 指定价/限价 xtconstant.MARKET_BEST: "market_best", # 市价最优价 xtconstant.MARKET_CANCEL: "market_cancel", # 市价即成剩撤 xtconstant.MARKET_CANCEL_ALL: "market_cancel_all", # 市价全额成交或撤销 xtconstant.MARKET_PEER_PRICE_FIRST: "market_peer_best", # 对手方最优价格 xtconstant.MARKET_MINE_PRICE_FIRST: "market_mine_best", # 本方最优价格 } return price_type_map.get(price_type, f"unknown_{price_type}") def buy(self, code, price, amount): order_id = self.xt_trader.order_stock( self.account, code, xtconstant.STOCK_BUY, amount, xtconstant.FIX_PRICE, price, self._strategy_name, self._remark ) return {"order_id": order_id} def sell(self, code, price, amount): order_id = self.xt_trader.order_stock( self.account, code, xtconstant.STOCK_SELL, amount, xtconstant.FIX_PRICE, price, self._strategy_name, self._remark ) return {"order_id": order_id} def cancel(self, entrust_no): # 撤单接口需要订单编号 result = self.xt_trader.cancel_order_stock(self.account, int(entrust_no)) return {"cancel_result": result} def is_trading_time(self): """判断当前是否为交易时间 Returns: bool: True 表示当前为交易时间,False 表示当前休市 """ try: now = dt.datetime.now() # 判断是否为工作日(使用 chinese_calendar 判断,会考虑节假日和调休) if not is_workday(now): return False # 判断是否在交易时间段内 current_time = now.time() morning_start = dt.time(9, 30) # 上午开市时间 9:30 morning_end = dt.time(11, 30) # 上午休市时间 11:30 afternoon_start = dt.time(13, 0) # 下午开市时间 13:00 afternoon_end = dt.time(15, 0) # 下午休市时间 15:00 # 判断是否在上午或下午的交易时段 is_morning_session = morning_start <= current_time <= morning_end is_afternoon_session = afternoon_start <= current_time <= afternoon_end return is_morning_session or is_afternoon_session except Exception as e: logger.error(f"判断交易时间发生错误: {str(e)}") return False if __name__ == "__main__": trader = XtTrader() trader.login() logger.info(f"账户余额: {trader.get_balance()}") logger.info(f"持仓: {trader.get_positions()}") logger.info(f"当日成交: {trader.get_today_trades()}") logger.info(f"当日委托: {trader.get_today_entrust()}")