diff --git a/src/real/real_trader_manager.py b/src/real/real_trader_manager.py index 1a81798..8a7b6ab 100644 --- a/src/real/real_trader_manager.py +++ b/src/real/real_trader_manager.py @@ -1,12 +1,10 @@ import time import threading import schedule +import weakref from xtquant import xtconstant from logger_config import get_logger from config import Config -import json -from typing import Dict -from position_manager import PositionManager from functools import wraps from trade_constants import ( ORDER_STATUS_COMPLETED, @@ -44,8 +42,8 @@ class RealTraderManager: trader: XtTrader实例 position_manager: StrategyPositionManager实例 """ - # 使用传入的trader和position_manager实例 - self.trader = trader + # 使用传入的trader实例,使用弱引用避免循环引用 + self.trader = weakref.proxy(trader) # 启动调度器 self._start_scheduler() @@ -149,10 +147,11 @@ class RealTraderManager: f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}" ) - # 立即更新一次订单状态 - threading.Thread( - target=self.check_and_retry, - args=(order_id, strategy_name, code, direction, amount, 1), + # 设置60秒后检查订单状态 + threading.Timer( + 60, + self.check_and_retry, + args=(order_id, strategy_name), name=f"CheckOrder-{order_id}" ).start() @@ -170,7 +169,6 @@ class RealTraderManager: code: 股票代码 direction: 交易方向 left_amount: 剩余数量 - available_retry_count: 重试次数 Returns: bool: 补单是否成功 @@ -183,142 +181,71 @@ class RealTraderManager: new_order = self.place_order(strategy_name, code, direction, left_amount, 0, ORDER_TYPE_MARKET) new_order_id = new_order.get("order_id") if new_order.get("success") and new_order_id: - # 立即检查新市价单 - threading.Thread( - target=self.check_and_retry, - args=(new_order_id, strategy_name, code, direction, left_amount), - name=f"CheckMarketOrder-{new_order_id}" - ).start() + # 订单已在place_order中设置了60秒后检查 return True else: logger.error(f"补单失败: {new_order}") return False - def check_and_retry(self, order_id, strategy_name, code, direction, amount, available_retry_count=1): + def check_and_retry(self, order_id, strategy_name): + """检查订单状态并处理未完成订单 + + Args: + order_id: 订单ID + strategy_name: 策略名称 + """ position_manager = self.trader.get_position_manager(strategy_name) order_info = position_manager.get_pending_order(order_id) if not order_info: - logger.warning(f"订单信息不存在: ID={order_id}") + logger.warning(f"订单信息不存在, 可能全部完成或者撤单: ID={order_id}") return - filled = order_info.filled - target_amount = order_info.amount - order_type = order_info.order_type - status = self._update_order_status(order_id, strategy_name) + + # 使用trader的handle_order_update方法更新订单状态 + status = self.trader.handle_order_update(order_id, strategy_name) + + # 如果状态为None,说明处理失败,则直接返回 + if status is None: + logger.warning(f"订单状态更新失败: ID={order_id}") + return + + # 重新获取订单信息,因为可能已经被更新 + order_info = position_manager.get_pending_order(order_id) + if not order_info: + logger.info(f"订单已完成: ID={order_id}") + return if order_type == ORDER_TYPE_MARKET: - # 市价单,只递归检查 + # 市价单,如果未完成则继续检查 if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]: - logger.info(f"市价单未完成,1分钟后继续检查: ID={order_id}, 状态={status}") - threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name, code, direction, amount)).start() + logger.info(f"市价单未完成,60秒后继续检查: ID={order_id}, 状态={status}") + threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name)).start() else: logger.info(f"市价单已完成: ID={order_id}, 状态={status}") elif order_type == ORDER_TYPE_LIMIT: - # 限价单,未完成则撤单补市价单 - if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL] or status is None: - if available_retry_count > 0: - logger.info(f"限价单未完成,1分钟后继续检查: ID={order_id}, 状态={status}") - threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name, code, direction, amount, 0)).start() - else: - # 尝试撤单 - try: - logger.info(f"限价单未完成,尝试撤单: ID={order_id}, 状态={status}") - self.trader.cancel(order_id) - position_manager.update_order_status(order_id, 0, ORDER_STATUS_CANCELLED) - except Exception as e: - logger.error(f"撤单失败: order_id={order_id}, error={str(e)}") - - # 计算剩余数量, 如果剩余数量大于0, 则补单 - left_amount = target_amount - filled - self._place_market_order_for_remainder(strategy_name, code, direction, left_amount) + filled = order_info.filled + target_amount = order_info.amount + left_amount = target_amount - filled + + # 限价单,这是60秒后的检查,如果未完成则撤单补市价单 + if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]: + # 尝试撤单 + try: + logger.info(f"限价单60秒后仍未完成,尝试撤单: ID={order_id}, 状态={status}") + self.trader.cancel(order_id) + position_manager.update_order_status(order_id, filled, ORDER_STATUS_CANCELLED) + except Exception as e: + logger.error(f"撤单失败: order_id={order_id}, error={str(e)}") + + # 计算剩余数量, 如果剩余数量大于0, 则补单 + self._place_market_order_for_remainder(strategy_name, order_info.code, order_info.direction, left_amount) else: logger.info(f"限价单已完成: ID={order_id}, 状态={status}") else: logger.warning(f"未知订单类型: ID={order_id}, type={order_type}") - def _update_order_status(self, order_id, strategy_name): - """更新单个订单状态 - - Args: - order_id: 订单ID - """ - # 检查订单是否存在 - position_manager = self.trader.get_position_manager(strategy_name) - order_info = position_manager.get_pending_order(order_id) - if not order_info: - return None - - try: - # 获取订单之前的状态,用于判断是否发生变化 - previous_status = order_info.status - previous_volume = order_info.filled - - time.sleep(3) - updated_order = self.trader.get_order(order_id) - if not updated_order: - logger.error(f"获取订单失败, 订单可能正在报单: {order_id}") - return None - - # 根据委托状态更新订单状态 - if updated_order["order_status"] == xtconstant.ORDER_SUCCEEDED: - # 全部成交 - filled = updated_order["traded_volume"] - position_manager.update_order_status(order_id, filled, ORDER_STATUS_COMPLETED) - # 更新持仓 - position_manager.update_position( - order_info.code, - order_info.direction, - filled, - ) - return ORDER_STATUS_COMPLETED - elif updated_order["order_status"] == xtconstant.ORDER_PART_SUCC: - # 部分成交 - filled = updated_order.get("traded_volume", 0) - position_manager.update_order_status( - order_id, filled, ORDER_STATUS_PARTIAL - ) - - # 如果成交量有变化,记录日志并更新持仓 - if filled != previous_volume: - target_amount = order_info.amount - logger.info( - f"订单部分成交更新: ID={order_id}, 代码={order_info.code}, 目标数量={target_amount}, 已成交数量={filled}, 剩余数量={target_amount - filled}" - ) - - # 更新持仓(仅更新已成交部分) - if filled > 0: - position_manager.update_position( - order_info.code, - order_info.direction, - filled, - ) - return ORDER_STATUS_PARTIAL - elif updated_order["order_status"] in [ - xtconstant.ORDER_CANCELED, - xtconstant.ORDER_JUNK, - ]: - # 已撤单或废单 - position_manager.update_order_status( - order_id, - 0, - ORDER_STATUS_CANCELLED - ) - return ORDER_STATUS_CANCELLED - - elif updated_order["order_status"] in [ - xtconstant.ORDER_UNREPORTED, - xtconstant.ORDER_WAIT_REPORTING, - xtconstant.ORDER_REPORTED, - ]: - # 未报、待报、已报 - if previous_status != ORDER_STATUS_PENDING: - position_manager.update_order_status(order_id, 0, ORDER_STATUS_PENDING) - return ORDER_STATUS_PENDING - - except Exception as e: - logger.error(f"更新订单状态时发生异常: order_id={order_id}, error={str(e)}") - return None + def _check_order_feasibility(self, code, direction, amount, price): """检查订单是否可行(资金或持仓是否足够) diff --git a/src/real/xt_trader.py b/src/real/xt_trader.py index ff8911a..a686a0d 100644 --- a/src/real/xt_trader.py +++ b/src/real/xt_trader.py @@ -10,6 +10,13 @@ from xtquant import xtconstant from xtquant.xtdata import get_instrument_detail from logger_config import get_logger from utils.mail_util import MailUtil +from trade_constants import ( + ORDER_STATUS_COMPLETED, + ORDER_STATUS_CANCELLED, + ORDER_STATUS_PENDING, + ORDER_STATUS_FAILED, + ORDER_STATUS_PARTIAL, +) # 获取日志记录器 logger = get_logger('real_trader') @@ -46,6 +53,27 @@ class MyXtQuantTraderCallback: logger.info(f"委托回报: {order.stock_code} {order.order_status} {order.order_sysid}") def on_stock_trade(self, trade): logger.info(f"成交变动: {trade.account_id} {trade.stock_code} {trade.order_id}") + + # 当有成交回报时,立即更新订单状态 + try: + if self.trader_instance: + order_id = trade.order_id + # 查找该订单所属的策略 + strategy_name = None + + # 避免循环引用,不直接调用real_trader_manager的方法 + # 而是通过trader的方法间接调用 + for strategy, position_manager in self.trader_instance.get_all_position_managers().items(): + if position_manager.get_pending_order(order_id): + strategy_name = strategy + break + + if strategy_name: + # 使用trader的方法来处理订单状态更新,避免直接调用real_trader_manager + self.trader_instance.handle_order_update(order_id, strategy_name) + logger.info(f"成交回报触发订单状态更新: ID={order_id}, 策略={strategy_name}") + except Exception as e: + logger.error(f"成交回报处理异常: {str(e)}", exc_info=True) def on_stock_position(self, position): logger.info(f"持仓变动: {position.stock_code} {position.volume}") def on_order_error(self, order_error): @@ -490,4 +518,120 @@ class XtTrader(BaseTrader): self.last_reconnect_time = None else: logger.warning("定期重连失败") - self.last_reconnect_time = time.time() \ No newline at end of file + self.last_reconnect_time = time.time() + + def handle_order_update(self, order_id, strategy_name): + """处理订单状态更新,作为中间层避免循环引用 + + Args: + order_id: 订单ID + strategy_name: 策略名称 + + Returns: + str: 订单状态,如ORDER_STATUS_COMPLETED、ORDER_STATUS_PARTIAL等,如果处理失败则返回None + """ + try: + # 获取订单信息 + order = self.get_order(order_id) + if not order: + logger.warning(f"获取订单失败,无法更新状态: {order_id}") + return None + + # 获取position_manager + position_manager = self.get_position_manager(strategy_name) + if not position_manager: + logger.warning(f"获取position_manager失败,无法更新状态: {strategy_name}") + return None + + # 获取订单信息 + order_info = position_manager.get_pending_order(order_id) + if not order_info: + logger.warning(f"订单信息不存在,可能已完成: {order_id}") + return None + + # 获取之前的状态和成交量 + previous_status = order_info.status + previous_volume = order_info.filled + + # 根据委托状态更新订单状态 + if order["order_status"] == xtconstant.ORDER_SUCCEEDED: + # 全部成交 + filled = order["traded_volume"] + + # 如果订单已经标记为完成,则不重复更新 + if previous_status == ORDER_STATUS_COMPLETED: + return ORDER_STATUS_COMPLETED + + position_manager.update_order_status(order_id, filled, ORDER_STATUS_COMPLETED) + + # 更新持仓(只更新新增的成交部分) + if filled > previous_volume: + new_filled = filled - previous_volume + position_manager.update_position( + order_info.code, + order_info.direction, + new_filled, + ) + logger.info(f"订单全部成交: ID={order_id}, 代码={order_info.code}, 总成交量={filled}, 新增成交量={new_filled}") + + return ORDER_STATUS_COMPLETED + + elif order["order_status"] == xtconstant.ORDER_PART_SUCC: + # 部分成交 + filled = order.get("traded_volume", 0) + position_manager.update_order_status( + order_id, filled, ORDER_STATUS_PARTIAL + ) + + # 如果成交量有变化,记录日志并更新持仓 + if filled != previous_volume: + target_amount = order_info.amount + new_filled = filled - previous_volume + logger.info( + f"订单部分成交更新: ID={order_id}, 代码={order_info.code}, 目标数量={target_amount}, " + f"已成交数量={filled}, 新增成交量={new_filled}, 剩余数量={target_amount - filled}" + ) + + # 更新持仓(仅更新新增成交部分) + if new_filled > 0: + position_manager.update_position( + order_info.code, + order_info.direction, + new_filled, + ) + + return ORDER_STATUS_PARTIAL + + elif order["order_status"] in [ + xtconstant.ORDER_CANCELED, + xtconstant.ORDER_JUNK, + ]: + # 已撤单或废单 + # 如果已经标记为取消,则不重复更新 + if previous_status == ORDER_STATUS_CANCELLED: + return ORDER_STATUS_CANCELLED + + position_manager.update_order_status( + order_id, + previous_volume, # 保留已成交部分 + ORDER_STATUS_CANCELLED + ) + + return ORDER_STATUS_CANCELLED + + elif order["order_status"] in [ + xtconstant.ORDER_UNREPORTED, + xtconstant.ORDER_WAIT_REPORTING, + xtconstant.ORDER_REPORTED, + ]: + # 未报、待报、已报 + if previous_status != ORDER_STATUS_PENDING: + position_manager.update_order_status(order_id, previous_volume, ORDER_STATUS_PENDING) + + return ORDER_STATUS_PENDING + + return None + + except Exception as e: + logger.error(f"处理订单状态更新异常: order_id={order_id}, error={str(e)}", exc_info=True) + return None \ No newline at end of file