diff --git a/src/base_trader.py b/src/base_trader.py index a82f37b..be506c7 100644 --- a/src/base_trader.py +++ b/src/base_trader.py @@ -4,6 +4,8 @@ from abc import ABC, abstractmethod from logger_config import get_logger from position_manager import PositionManager from typing import Dict +from config import Config +import os # 获取日志记录器 @@ -14,6 +16,7 @@ class BaseTrader(ABC): """初始化交易基类""" self.position_managers: Dict[str, PositionManager] = {} self.logger = logger + self._load_all_position_managers_from_data_dir() # 新增:自动加载所有持仓管理器 pass @abstractmethod @@ -230,3 +233,18 @@ class BaseTrader(ABC): self.position_managers[strategy_name].clear() return True return False + + def _load_all_position_managers_from_data_dir(self): + """从Config.DATA_DIR目录下的持仓文件自动加载所有PositionManager""" + data_dir = Config.DATA_DIR + if not os.path.exists(data_dir): + self.logger.info(f"持仓数据目录不存在: {data_dir}") + return + for fname in os.listdir(data_dir): + if fname.endswith('_positions.json'): + strategy_name = fname[:-len('_positions.json')] + try: + self.position_managers[strategy_name] = PositionManager(strategy_name) + self.logger.info(f"已自动加载策略持仓: {strategy_name}") + except Exception as e: + self.logger.error(f"加载策略持仓失败: {strategy_name}, 错误: {e}") diff --git a/src/real/real_trader_manager.py b/src/real/real_trader_manager.py index 938c3b5..6e65972 100644 --- a/src/real/real_trader_manager.py +++ b/src/real/real_trader_manager.py @@ -1,7 +1,6 @@ import time import threading import schedule -import weakref from xtquant import xtconstant from logger_config import get_logger from config import Config @@ -43,7 +42,7 @@ class RealTraderManager: position_manager: StrategyPositionManager实例 """ # 使用传入的trader实例,使用弱引用避免循环引用 - self.trader = weakref.proxy(trader) + self.trader = trader # 启动调度器 self._start_scheduler() @@ -159,12 +158,7 @@ class RealTraderManager: f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}" ) - # 设置60秒后检查订单状态 - threading.Timer( - 60, - self.check_and_retry, - args=(order_id, strategy_name) - ).start() + threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name)).start() return {"success": True, "order_id": order_id} @@ -205,56 +199,62 @@ class RealTraderManager: order_id: 订单ID strategy_name: 策略名称 """ - position_manager = self.trader.get_position_manager(strategy_name) - order_info = position_manager.get_pending_order(order_id) - if not order_info: - logger.warning(f"订单信息不存在, 可能全部完成或者撤单: ID={order_id}") - return - - order_type = order_info.order_type - - # 使用trader的handle_order_update方法更新订单状态 - status = self.trader.handle_order_update(order_id, strategy_name) - - # 如果状态为None,说明处理失败,则直接返回 - if status is None: - logger.warning(f"订单状态更新失败: ID={order_id}") - return - - # 重新获取订单信息,因为可能已经被更新 - order_info = position_manager.get_pending_order(order_id) - if not order_info: - logger.info(f"订单已完成: ID={order_id}") - return - - if order_type == ORDER_TYPE_MARKET: - # 市价单,如果未完成则继续检查 - if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]: - logger.info(f"市价单未完成,60秒后继续检查: ID={order_id}, 状态={status}") - threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name)).start() - else: - logger.info(f"市价单已完成: ID={order_id}, 状态={status}") - elif order_type == ORDER_TYPE_LIMIT: - filled = order_info.filled - target_amount = order_info.amount - left_amount = target_amount - filled + try: + logger.info(f"开始检查订单状态: ID={order_id}, 策略={strategy_name}") + position_manager = self.trader.get_position_manager(strategy_name) + order_info = position_manager.get_pending_order(order_id) + if not order_info: + logger.warning(f"订单信息不存在, 可能全部完成或者撤单: ID={order_id}") + return + + order_type = order_info.order_type + + # 使用trader的handle_order_update方法更新订单状态 + status = self.trader.handle_order_update(order_id, strategy_name) + + # 如果状态为None,说明处理失败,则直接返回 + if status is None: + logger.warning(f"订单状态更新失败: ID={order_id}") + return + + # 重新获取订单信息,因为可能已经被更新 + order_info = position_manager.get_pending_order(order_id) + if not order_info: + logger.info(f"订单已完成: ID={order_id}") + return + + if order_type == ORDER_TYPE_MARKET: + # 市价单,如果未完成则继续检查 + if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]: + logger.info(f"市价单未完成,60秒后继续检查: ID={order_id}, 状态={status}") + schedule.every(60).seconds.do( + run_threaded(self.check_and_retry), order_id, strategy_name + ).tag(f"order_{order_id}") + else: + logger.info(f"市价单已完成: ID={order_id}, 状态={status}") + elif order_type == ORDER_TYPE_LIMIT: + filled = order_info.filled + target_amount = order_info.amount + left_amount = target_amount - filled - # 限价单,这是60秒后的检查,如果未完成则撤单补市价单 - if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]: - # 尝试撤单 - try: - logger.info(f"限价单60秒后仍未完成,尝试撤单: ID={order_id}, 状态={status}") - self.trader.cancel(order_id) - position_manager.update_order_status(order_id, filled, ORDER_STATUS_CANCELLED) - except Exception as e: - logger.error(f"撤单失败: order_id={order_id}, error={str(e)}") - - # 计算剩余数量, 如果剩余数量大于0, 则补单 - self._place_market_order_for_remainder(strategy_name, order_info.code, order_info.direction, left_amount) + # 限价单,这是60秒后的检查,如果未完成则撤单补市价单 + if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]: + # 尝试撤单 + try: + logger.info(f"限价单60秒后仍未完成,尝试撤单: ID={order_id}, 状态={status}") + self.trader.cancel(order_id) + position_manager.update_order_status(order_id, filled, ORDER_STATUS_CANCELLED) + except Exception as e: + logger.error(f"撤单失败: order_id={order_id}, error={str(e)}") + + # 计算剩余数量, 如果剩余数量大于0, 则补单 + self._place_market_order_for_remainder(strategy_name, order_info.code, order_info.direction, left_amount) + else: + logger.info(f"限价单已完成: ID={order_id}, 状态={status}") else: - logger.info(f"限价单已完成: ID={order_id}, 状态={status}") - else: - logger.warning(f"未知订单类型: ID={order_id}, type={order_type}") + logger.warning(f"未知订单类型: ID={order_id}, type={order_type}") + except Exception as e: + logger.error(f"检查订单状态时发生异常: ID={order_id}, error={str(e)}", exc_info=True) diff --git a/src/real/xt_trader.py b/src/real/xt_trader.py index a686a0d..7c51b76 100644 --- a/src/real/xt_trader.py +++ b/src/real/xt_trader.py @@ -72,6 +72,8 @@ class MyXtQuantTraderCallback: # 使用trader的方法来处理订单状态更新,避免直接调用real_trader_manager self.trader_instance.handle_order_update(order_id, strategy_name) logger.info(f"成交回报触发订单状态更新: ID={order_id}, 策略={strategy_name}") + else: + logger.warning(f"成交回报触发订单状态更新失败: 根据order id未找到策略名 ID={order_id}, 策略={strategy_name}") except Exception as e: logger.error(f"成交回报处理异常: {str(e)}", exc_info=True) def on_stock_position(self, position):