From 9e13f3c9564ad9f57eb4146e4bbdc505781e5ed4 Mon Sep 17 00:00:00 2001 From: biggerfish Date: Wed, 14 May 2025 23:07:03 +0800 Subject: [PATCH] refactor: use order callback to update order status (#1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: logger is none * 由订单回调触发订单状态更新和补单 * 使position manager 线程安全 * remove empty line --- src/config.py | 3 +- src/local_order.py | 4 +- src/position_manager.py | 390 +++++++++++++++------------- src/real/real_trader_manager.py | 121 +++------ src/real/xt_trader.py | 122 ++++++++- src/simulation/simulation_trader.py | 5 +- src/trade_constants.py | 1 - 7 files changed, 351 insertions(+), 295 deletions(-) diff --git a/src/config.py b/src/config.py index 55e261c..5c070c0 100644 --- a/src/config.py +++ b/src/config.py @@ -53,8 +53,7 @@ class Config: MAIL_TO = ["jq@yushaoyou.com"] # 可以是多个邮箱 # RealTraderManager配置 - RTM_ORDER_TIMEOUT = 60 # 订单超时时间(秒) - RTM_MAX_RETRIES = 3 # 最大重试次数 + RTM_ORDER_TIMEOUT = 30 # 订单超时时间(秒) RTM_USE_MARKET_ORDER = True # 是否使用市价单进行补单 # 计划任务运行时间 diff --git a/src/local_order.py b/src/local_order.py index 4f2f804..7a36702 100644 --- a/src/local_order.py +++ b/src/local_order.py @@ -1,8 +1,8 @@ -from trade_constants import ORDER_STATUS_PENDING +from trade_constants import ORDER_STATUS_PENDING, ORDER_TYPE_LIMIT from datetime import datetime class LocalOrder: - def __init__(self, order_id, code, price, amount, direction, order_type='limit', filled=0, status=ORDER_STATUS_PENDING, created_time=datetime.now()): + def __init__(self, order_id, code, price, amount, direction, order_type=ORDER_TYPE_LIMIT, filled=0, status=ORDER_STATUS_PENDING, created_time=datetime.now()): self.order_id: str = order_id self.code: str = code self.price: float = price diff --git a/src/position_manager.py b/src/position_manager.py index fa038b5..1cbeec8 100644 --- a/src/position_manager.py +++ b/src/position_manager.py @@ -1,5 +1,6 @@ import os import json +import threading from logger_config import get_logger from config import Config from trade_constants import ( @@ -8,7 +9,6 @@ from trade_constants import ( ORDER_TYPE_MARKET, ORDER_STATUS_COMPLETED, ORDER_STATUS_CANCELLED, - ORDER_STATUS_FAILED, ) from local_position import LocalPosition from local_order import LocalOrder @@ -26,6 +26,8 @@ class PositionManager: """初始化实盘持仓管理器""" super().__init__() self.strategy_name = strategy_name + # 创建线程锁 + self._lock = threading.Lock() # 策略持仓信息 self.positions: Dict[str, LocalPosition] = {} # {股票代码 -> LocalPosition} # 待处理订单信息 @@ -44,33 +46,34 @@ class PositionManager: self.load_data() def update_position(self, code, direction, amount): - # 如果股票代码在持仓字典中不存在,初始化它 - if code not in self.positions: - self.positions[code] = LocalPosition(code, 0, 0) + with self._lock: + # 如果股票代码在持仓字典中不存在,初始化它 + if code not in self.positions: + self.positions[code] = LocalPosition(code, 0, 0) - # 根据方向更新持仓 - position = self.positions[code] - is_t0_stock = is_t0(code) - if direction == ORDER_DIRECTION_BUY: - position.total_amount += amount - if is_t0_stock: - position.closeable_amount += amount - else: # sell - position.total_amount -= amount - position.closeable_amount -= amount + # 根据方向更新持仓 + position = self.positions[code] + is_t0_stock = is_t0(code) + if direction == ORDER_DIRECTION_BUY: + position.total_amount += amount + if is_t0_stock: + position.closeable_amount += amount + else: # sell + position.total_amount -= amount + position.closeable_amount -= amount - logger.info( - f"更新策略持仓 - 策略: {self.strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, " - f"更新后总量: {position.total_amount}, " - f"可用: {position.closeable_amount}" - ) + logger.info( + f"更新策略持仓 - 策略: {self.strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, " + f"更新后总量: {position.total_amount}, " + f"可用: {position.closeable_amount}" + ) - # 移除total_amount为0的持仓 - if code in self.positions and self.positions[code].total_amount <= 0: - del self.positions[code] - logger.info(f"移除空持仓 - 策略: {self.strategy_name}, 代码: {code}") - - self.save_data() + # 移除total_amount为0的持仓 + if code in self.positions and self.positions[code].total_amount <= 0: + del self.positions[code] + logger.info(f"移除空持仓 - 策略: {self.strategy_name}, 代码: {code}") + + self.save_data() def add_pending_order( self, order_id, code, price, amount, direction, order_type=ORDER_TYPE_LIMIT @@ -78,48 +81,49 @@ class PositionManager: if not self.strategy_name: return - order = LocalOrder(order_id, code, price, amount, direction, order_type) - self.pending_orders[order_id] = order - if (order_type == ORDER_TYPE_LIMIT): - self.all_orders.append(order) + with self._lock: + order = LocalOrder(order_id, code, price, amount, direction, order_type) + self.pending_orders[order_id] = order + if (order_type == ORDER_TYPE_LIMIT): + self.all_orders.append(order) - logger.info( - f"添加订单 - ID: {order_id}, 策略: {self.strategy_name}, 代码: {code}, 方向: {direction}, " - f"数量: {amount}, 价格: {price}, 类型: {order_type}" - ) + logger.info( + f"添加订单 - ID: {order_id}, 策略: {self.strategy_name}, 代码: {code}, 方向: {direction}, " + f"数量: {amount}, 价格: {price}, 类型: {order_type}" + ) - self.save_data() + self.save_data() def update_order_status(self, order_id, filled, new_status): - if order_id in self.pending_orders: - _order = self.pending_orders[order_id] - # 记录之前的状态用于日志 - previous_status = _order.status + with self._lock: + if order_id in self.pending_orders: + _order = self.pending_orders[order_id] + # 记录之前的状态用于日志 + previous_status = _order.status - # 更新状态 - _order.status = new_status - _order.filled = filled + # 更新状态 + _order.status = new_status + _order.filled = filled - # 记录状态变化日志 - if previous_status != new_status: - code = self.pending_orders[order_id].code - logger.info( - f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}" - ) + # 记录状态变化日志 + if previous_status != new_status: + code = self.pending_orders[order_id].code + logger.info( + f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}" + ) - # 如果订单已完成,移除它 - if new_status in [ - ORDER_STATUS_COMPLETED, - ORDER_STATUS_CANCELLED, - ORDER_STATUS_FAILED, - ]: - # 保留订单信息以供参考,但标记为已完成 - del self.pending_orders[order_id] - logger.info(f"订单已删除 - ID: {order_id}, 状态: {new_status}") - self.save_data() - return True - - return False + # 如果订单已完成,移除它 + if new_status in [ + ORDER_STATUS_COMPLETED, + ORDER_STATUS_CANCELLED, + ]: + # 保留订单信息以供参考,但标记为已完成 + del self.pending_orders[order_id] + logger.info(f"订单已删除 - ID: {order_id}, 状态: {new_status}") + self.save_data() + return True + + return False def get_pending_order(self, order_id) -> LocalOrder: """获取未完成委托信息 @@ -130,7 +134,8 @@ class PositionManager: Returns: dict: 委托信息,如果不存在返回None """ - return self.pending_orders.get(order_id) + with self._lock: + return self.pending_orders.get(order_id) def get_pending_orders(self): """获取所有未完成委托 @@ -138,7 +143,9 @@ class PositionManager: Returns: dict: 订单ID到委托信息的映射 """ - return self.pending_orders + with self._lock: + # 返回副本以避免外部修改 + return self.pending_orders.copy() def get_positions(self) -> Dict[str, LocalPosition]: """获取策略持仓 @@ -147,158 +154,167 @@ class PositionManager: Dict[str, LocalPosition]: key为股票代码(str),value为LocalPosition对象,若无持仓则返回空字典。 """ - return self.positions + with self._lock: + # 返回副本以避免外部修改 + return self.positions.copy() def save_data(self): """保存策略数据""" - try: - # 将对象转换为可序列化的字典 - positions_dict = {} - for code, pos in self.positions.items(): - positions_dict[code] = { - "code": pos.code, - "total_amount": pos.total_amount, - "closeable_amount": pos.closeable_amount, - } + with self._lock: + try: + # 将对象转换为可序列化的字典 + positions_dict = {} + for code, pos in self.positions.items(): + positions_dict[code] = { + "code": pos.code, + "total_amount": pos.total_amount, + "closeable_amount": pos.closeable_amount, + } - pending_orders_dict = {} - for order_id, order in self.pending_orders.items(): - pending_orders_dict[order_id] = { - "order_id": order.order_id, - "code": order.code, - "price": order.price, - "amount": order.amount, - "filled": order.filled, - "direction": order.direction, - "order_type": order.order_type, - "status": order.status, - "created_time": ( - order.created_time.isoformat() - if hasattr(order, "created_time") - else None - ), - } + pending_orders_dict = {} + for order_id, order in self.pending_orders.items(): + pending_orders_dict[order_id] = { + "order_id": order.order_id, + "code": order.code, + "price": order.price, + "amount": order.amount, + "filled": order.filled, + "direction": order.direction, + "order_type": order.order_type, + "status": order.status, + "created_time": ( + order.created_time.isoformat() + if hasattr(order, "created_time") + else None + ), + } - all_orders_array = [] - for order in self.all_orders: - all_orders_array.append({ - "order_id": order.order_id, - "code": order.code, - "price": order.price, - "amount": order.amount, - "direction": order.direction, - "order_type": order.order_type, - "created_time": ( - order.created_time.isoformat() - if hasattr(order, "created_time") - else None - ), - }) + all_orders_array = [] + for order in self.all_orders: + all_orders_array.append({ + "order_id": order.order_id, + "code": order.code, + "price": order.price, + "amount": order.amount, + "direction": order.direction, + "order_type": order.order_type, + "created_time": ( + order.created_time.isoformat() + if hasattr(order, "created_time") + else None + ), + }) - with open(self.data_path, "w") as f: - json.dump( - { - "positions": positions_dict, - "pending_orders": pending_orders_dict, - "all_orders": all_orders_array, - }, - f, - ) - logger.info("成功保存实盘策略数据") - except Exception as e: - logger.error(f"保存实盘策略数据失败: {str(e)}") + with open(self.data_path, "w") as f: + json.dump( + { + "positions": positions_dict, + "pending_orders": pending_orders_dict, + "all_orders": all_orders_array, + }, + f, + ) + logger.info("成功保存实盘策略数据") + except Exception as e: + logger.error(f"保存实盘策略数据失败: {str(e)}") def load_data(self): """加载策略数据""" - try: - if os.path.exists(self.data_path): - from datetime import datetime + with self._lock: + try: + if os.path.exists(self.data_path): + from datetime import datetime - with open(self.data_path, "r") as f: - data = json.load(f) + with open(self.data_path, "r") as f: + data = json.load(f) - # 还原positions对象 - self.positions = {} - positions_dict = data.get("positions", {}) - for code, pos_data in positions_dict.items(): - self.positions[code] = LocalPosition( - pos_data["code"], - int(pos_data["total_amount"]), - int(pos_data["closeable_amount"]), - ) + # 还原positions对象 + self.positions = {} + positions_dict = data.get("positions", {}) + for code, pos_data in positions_dict.items(): + self.positions[code] = LocalPosition( + pos_data["code"], + int(pos_data["total_amount"]), + int(pos_data["closeable_amount"]), + ) - # 还原pending_orders对象 - self.pending_orders = {} - pending_orders_dict = data.get("pending_orders", {}) - for order_id, order_data in pending_orders_dict.items(): - order = LocalOrder( - order_data["order_id"], - order_data["code"], - float(order_data["price"]), - int(order_data["amount"]), - order_data["direction"], - order_data["order_type"], - int(order_data["filled"]), - order_data["status"], - ) - if order_data.get("created_time"): - try: - order.created_time = datetime.fromisoformat( + # 还原pending_orders对象 + self.pending_orders = {} + pending_orders_dict = data.get("pending_orders", {}) + for order_id, order_data in pending_orders_dict.items(): + order = LocalOrder( + order_data["order_id"], + order_data["code"], + float(order_data["price"]), + int(order_data["amount"]), + order_data["direction"], + order_data["order_type"], + int(order_data["filled"]), + order_data["status"], + ) + if order_data.get("created_time"): + try: + order.created_time = datetime.fromisoformat( + order_data["created_time"] + ) + except (ValueError, TypeError): + order.created_time = datetime.now() + + self.pending_orders[order_id] = order + + # 还原all_orders对象 + self.all_orders = [] + all_orders_array = data.get("all_orders", []) + for order_data in all_orders_array: + order = LocalOrder( + order_data["order_id"], + order_data["code"], + float(order_data["price"]), + int(order_data["amount"]), + order_data["direction"], + order_data["order_type"], + created_time=datetime.fromisoformat( order_data["created_time"] - ) - except (ValueError, TypeError): - order.created_time = datetime.now() + ) if order_data.get("created_time") else datetime.now() + ) + self.all_orders.append(order) - self.pending_orders[order_id] = order - - # 还原all_orders对象 + logger.info("已加载实盘策略数据") + logger.info(f"本策略: {self.strategy_name} 持仓股票个数: {len(self.positions)} 未完成委托数: {len(self.pending_orders)} 历史订单数: {len(self.all_orders)}") + else: + logger.info(f"实盘策略数据文件不存在: {self.data_path}") + self.positions = {} + self.pending_orders = {} self.all_orders = [] - all_orders_array = data.get("all_orders", []) - for order_data in all_orders_array: - order = LocalOrder( - order_data["order_id"], - order_data["code"], - float(order_data["price"]), - int(order_data["amount"]), - order_data["direction"], - order_data["order_type"], - created_time=datetime.fromisoformat( - order_data["created_time"] - ) if order_data.get("created_time") else datetime.now() - ) - self.all_orders.append(order) - - logger.info("已加载实盘策略数据") - logger.info(f"本策略: {self.strategy_name} 持仓股票个数: {len(self.positions)} 未完成委托数: {len(self.pending_orders)} 历史订单数: {len(self.all_orders)}") - else: - logger.info(f"实盘策略数据文件不存在: {self.data_path}") + except Exception as e: + logger.error(f"加载实盘策略数据失败: {str(e)}") + # 初始化空数据结构 self.positions = {} self.pending_orders = {} self.all_orders = [] - except Exception as e: - logger.error(f"加载实盘策略数据失败: {str(e)}") - # 初始化空数据结构 - self.positions = {} - self.pending_orders = {} - self.all_orders = [] def clear(self): """清除所有持仓管理数据""" - self.positions = {} - self.pending_orders = {} - self.all_orders = [] - self.save_data() + with self._lock: + self.positions = {} + self.pending_orders = {} + self.all_orders = [] + self.save_data() def update_closeable_amount(self): """更新可卖持仓""" - for _, position in self.positions.items(): - if position.closeable_amount != position.total_amount: - position.closeable_amount = position.total_amount + with self._lock: + for _, position in self.positions.items(): + if position.closeable_amount != position.total_amount: + position.closeable_amount = position.total_amount def clear_pending_orders(self): """清除所有未完成订单""" - self.pending_orders = {} + with self._lock: + self.pending_orders = {} def get_all_orders(self): """获取所有订单""" - return self.all_orders + with self._lock: + # 返回副本以避免外部修改 + return self.all_orders.copy() diff --git a/src/real/real_trader_manager.py b/src/real/real_trader_manager.py index 4a3fb2e..476d2df 100644 --- a/src/real/real_trader_manager.py +++ b/src/real/real_trader_manager.py @@ -9,7 +9,6 @@ from trade_constants import ( ORDER_STATUS_COMPLETED, ORDER_STATUS_CANCELLED, ORDER_STATUS_PENDING, - ORDER_STATUS_FAILED, ORDER_STATUS_PARTIAL, ORDER_DIRECTION_BUY, ORDER_DIRECTION_SELL, @@ -76,6 +75,17 @@ class RealTraderManager: else: logger.error("STRATEGY_SAVE_TIME 未配置") + # 检查限价单是否超时 + if hasattr(Config, "RTM_ORDER_TIMEOUT"): + try: + schedule.every(10).seconds.do( + run_threaded(self.check_limit_orders) + ) + except Exception as e: + logger.error(f"限价单超时检查任务配置错误: {e}") + else: + logger.error("RTM_ORDER_TIMEOUT 未配置") + # 启动高精度调度线程 def run_scheduler(): while True: @@ -158,105 +168,32 @@ class RealTraderManager: f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}" ) - threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name)).start() - return {"success": True, "order_id": order_id} except Exception as e: logger.error(f"下单过程发生异常: {str(e)}") return {"success": False, "error": f"下单异常: {str(e)}"} - def _place_market_order_for_remainder(self, strategy_name, code, direction, left_amount): - """对未完成的订单进行补单,下市价单 - - Args: - strategy_name: 策略名称 - code: 股票代码 - direction: 交易方向 - left_amount: 剩余数量 - Returns: - bool: 补单是否成功 - """ - if left_amount <= 0: - logger.info(f"无需补单,剩余数量为零或负数: {left_amount}") - return True - - logger.info(f"限价单补单: 市价单, 剩余数量={left_amount}") - new_order = self.place_order(strategy_name, code, direction, left_amount, 0, ORDER_TYPE_MARKET) - new_order_id = new_order.get("order_id") - if new_order.get("success") and new_order_id: - # 订单已在place_order中设置了60秒后检查 - return True - else: - logger.error(f"补单失败: {new_order}") - return False - - def check_and_retry(self, order_id, strategy_name): - """检查订单状态并处理未完成订单 - - Args: - order_id: 订单ID - strategy_name: 策略名称 - """ + def check_limit_orders(self): + """检查限价单是否超时""" try: - logger.info(f"开始检查订单状态: ID={order_id}, 策略={strategy_name}") - position_manager = self.trader.get_position_manager(strategy_name) - order_info = position_manager.get_pending_order(order_id) - if not order_info: - logger.warning(f"订单信息不存在, 可能全部完成或者撤单: ID={order_id}") - return - - order_type = order_info.order_type - - # 使用trader的handle_order_update方法更新订单状态 - status = self.trader.handle_order_update(order_id, strategy_name) - - # 如果状态为None,说明处理失败,则直接返回 - if status is None: - logger.warning(f"订单状态更新失败: ID={order_id}") - return - - # 重新获取订单信息,因为可能已经被更新 - order_info = position_manager.get_pending_order(order_id) - if not order_info: - logger.info(f"订单已完成: ID={order_id}") - return - - if order_type == ORDER_TYPE_MARKET: - # 市价单,如果未完成则继续检查 - if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]: - logger.info(f"市价单未完成,60秒后继续检查: ID={order_id}, 状态={status}") - schedule.every(60).seconds.do( - run_threaded(self.check_and_retry), order_id, strategy_name - ).tag(f"order_{order_id}") - else: - logger.info(f"市价单已完成: ID={order_id}, 状态={status}") - elif order_type == ORDER_TYPE_LIMIT: - filled = order_info.filled - target_amount = order_info.amount - left_amount = target_amount - filled + logger.info("开始检查限价单是否超时...") + # 获取所有未完成订单 + position_managers = self.trader.get_all_position_managers() + for strategy_name, position_manager in position_managers.items(): + pending_orders = position_manager.get_pending_orders() + for order_id, order_info in pending_orders.items(): + # 如果订单类型为限价单,则检查是否超时 + if order_info.order_type == ORDER_TYPE_LIMIT: + duration = (time.time() - order_info.created_time).total_seconds() + if duration > Config.RTM_ORDER_TIMEOUT: + logger.info(f"限价单超时: ID={order_id}, 策略={strategy_name}, 持续时间={duration}秒") + self.trader.cancel(order_id) - # 限价单,这是60秒后的检查,如果未完成则撤单补市价单 - if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]: - # 尝试撤单 - try: - logger.info(f"限价单60秒后仍未完成,尝试撤单: ID={order_id}, 状态={status}") - self.trader.cancel(order_id) - time.sleep(1) - position_manager.update_order_status(order_id, filled, ORDER_STATUS_CANCELLED) - except Exception as e: - logger.error(f"撤单失败: order_id={order_id}, error={str(e)}") - - # 计算剩余数量, 如果剩余数量大于0, 则补单 - self._place_market_order_for_remainder(strategy_name, order_info.code, order_info.direction, left_amount) - else: - logger.info(f"限价单已完成: ID={order_id}, 状态={status}") - else: - logger.warning(f"未知订单类型: ID={order_id}, type={order_type}") + logger.info("限价单检查完毕") except Exception as e: - logger.error(f"检查订单状态时发生异常: ID={order_id}, error={str(e)}", exc_info=True) - + logger.error(f"检查限价单是否超时时发生异常: {str(e)}") def _check_order_feasibility(self, code, direction, amount, price): @@ -331,8 +268,8 @@ class RealTraderManager: # 遍历未完成订单,检查是否有无法成交的订单(如跌停无法卖出) for order_id, order_info in pending_orders.items(): try: - logger.warning( - f"清理无法成交订单: ID={order_id}, 代码={order_info.code}, 方向={order_info.direction}, " + logger.error( + f"清理无法成交订单(理论不应该有): ID={order_id}, 代码={order_info.code}, 方向={order_info.direction}, " f"数量={order_info.amount}, 已成交数量={order_info.filled}" ) diff --git a/src/real/xt_trader.py b/src/real/xt_trader.py index 5b8d4a3..c3fb4d1 100644 --- a/src/real/xt_trader.py +++ b/src/real/xt_trader.py @@ -14,16 +14,20 @@ from trade_constants import ( ORDER_STATUS_COMPLETED, ORDER_STATUS_CANCELLED, ORDER_STATUS_PENDING, - ORDER_STATUS_FAILED, ORDER_STATUS_PARTIAL, + ORDER_TYPE_LIMIT, + ORDER_TYPE_MARKET, + ORDER_DIRECTION_BUY, + ORDER_DIRECTION_SELL ) +from local_order import LocalOrder # 获取日志记录器 logger = get_logger('real_trader') class MyXtQuantTraderCallback: def __init__(self, trader_instance): - self.trader_instance = trader_instance + self.trader_instance: XtTrader = trader_instance def on_connected(self): logger.info("连接成功") def on_disconnected(self): @@ -52,7 +56,38 @@ class MyXtQuantTraderCallback: def on_stock_asset(self, asset): logger.info(f"资金变动: {asset.account_id} {asset.cash} {asset.total_asset}") def on_stock_order(self, order): - logger.info(f"委托回报: {order.stock_code} {order.order_status} {order.order_sysid}") + if order.order_status == xtconstant.ORDER_PART_SUCC: + strategy_name = self.trader_instance.get_strategy_name(order.order_id) + logger.info(f"委托部分成交: code={order.stock_code} id={order.order_id} strategy={strategy_name}") + self.trader_instance.handle_order_update(order.order_id, strategy_name) + + elif order.order_status == xtconstant.ORDER_SUCCEEDED: + strategy_name = self.trader_instance.get_strategy_name(order.order_id) + logger.info(f"委托全部成交: code={order.stock_code} id={order.order_id} strategy={strategy_name}") + self.trader_instance.handle_order_update(order.order_id, strategy_name) + + elif order.order_status in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_PART_CANCEL]: + strategy_name = self.trader_instance.get_strategy_name(order.order_id) + logger.info(f"委托撤单成功: code={order.stock_code} id={order.order_id} strategy={strategy_name}") + # 撤单后, 如果position manager中订单为pedding, 且为限价单, 则需要补单 + position_manager = self.trader_instance.get_position_manager(strategy_name) + if position_manager and strategy_name: + # 先获取已经撤单的order + order_info = position_manager.get_pending_order(order.order_id) + if order_info: + # 更新订单状态 + self.trader_instance.handle_order_update(order.order_id, strategy_name) + + # 如果order_info为限价单, 则进行市价单补单 + if order_info.order_type == ORDER_TYPE_LIMIT: + logger.info(f"检测到限价单被撤销,准备进行市价单补单: ID={order.order_id}") + self.trader_instance.place_market_order_for_remainder(order_info, strategy_name) + else: + logger.warning(f"撤单成功但未找到订单信息: ID={order.order_id}") + else: + logger.warning(f"撤单成功但未找到策略或持仓管理器: ID={order.order_id}, strategy={strategy_name}") + else: + logger.warning(f"委托回报变化: ID={order.order_id} 状态={order.order_status}") def on_stock_trade(self, trade): logger.info(f"成交变动: {trade.account_id} {trade.stock_code} {trade.order_id}") @@ -339,7 +374,7 @@ class XtTrader(BaseTrader): logger.error(f"获取股票名称失败: {stock_code}, {str(e)}") return "" - def buy(self, code, price, amount, order_type='limit'): + def buy(self, code, price, amount, order_type=ORDER_TYPE_LIMIT): if not self.is_available(): return {"error": self.connection_error_message or "交易系统连接失败"} @@ -361,12 +396,12 @@ class XtTrader(BaseTrader): ) return {"order_id": order_id} - def sell(self, code, price, amount, order_type='limit'): + def sell(self, code, price, amount, order_type=ORDER_TYPE_LIMIT): if not self.is_available(): return {"error": self.connection_error_message or "交易系统连接失败"} # 确定价格类型 - if order_type == 'limit': + if order_type == ORDER_TYPE_LIMIT: price_type = xtconstant.FIX_PRICE else: if code.endswith('.SH'): @@ -555,7 +590,7 @@ class XtTrader(BaseTrader): # 获取订单信息 order_info = position_manager.get_pending_order(order_id) if not order_info: - logger.warning(f"订单信息不存在,可能已完成: {order_id}") + logger.warning(f"订单信息不存在,可能已完成或撤单: {order_id}") return None # 获取之前的状态和成交量 @@ -643,4 +678,75 @@ class XtTrader(BaseTrader): except Exception as e: logger.error(f"处理订单状态更新异常: order_id={order_id}, error={str(e)}", exc_info=True) - return None \ No newline at end of file + return None + + def get_strategy_name(self, order_id): + """获取订单对应的策略名称 + + Args: + order_id: 订单ID + + Returns: + str: 策略名称 + """ + for strategy_name, position_manager in self.position_managers.items(): + if position_manager.get_pending_order(order_id): + return strategy_name + return None + + def place_market_order_for_remainder(self, order_info:LocalOrder, strategy_name): + """对未完成的限价单进行市价单补单 + + 当限价单被撤销后,使用此方法下市价单补单,确保交易意图得到执行 + + Args: + order_id: 被撤销的限价单ID + strategy_name: 策略名称 + + Returns: + dict: 包含新订单ID和状态信息,如果补单失败则返回错误信息 + """ + try: + logger.info(f"准备对撤销的限价单进行市价单补单: ID={order_info.order_id}, 策略={strategy_name}") + + # 获取position_manager + position_manager = self.get_position_manager(strategy_name) + if not position_manager: + logger.error(f"获取position_manager失败,无法补单: {strategy_name}") + return {"success": False, "error": "获取position_manager失败"} + + # 计算未成交数量 + filled = order_info.filled + target_amount = order_info.amount + left_amount = target_amount - filled + + # 如果已全部成交,则无需补单 + if left_amount <= 0: + logger.info(f"无需补单,订单已全部成交: ID={order_info.order_id}") + return {"success": True, "message": "无需补单,订单已全部成交"} + + # 下市价单补单 + code = order_info.code + direction = order_info.direction + + logger.info(f"开始补单: 代码={code}, 方向={direction}, 数量={left_amount}, 类型=market") + + if direction == ORDER_DIRECTION_BUY: + result = self.buy(code, 0, left_amount, ORDER_TYPE_MARKET) + else: + result = self.sell(code, 0, left_amount, ORDER_TYPE_MARKET) + + new_order_id = result.get("order_id") + if not new_order_id: + logger.error(f"市价单补单失败: {result}") + + # 添加未完成委托到position_manager + position_manager.add_pending_order(new_order_id, code, 0, left_amount, direction, ORDER_TYPE_MARKET) + + logger.info(f"市价单补单成功: 新订单ID={new_order_id}, 原订单ID={order_info.order_id}, 代码={code}, 方向={direction}, 数量={left_amount}") + + return {"success": True, "order_id": new_order_id} + + except Exception as e: + logger.error(f"市价单补单过程发生异常: {str(e)}", exc_info=True) + return {"success": False, "error": f"市价单补单异常: {str(e)}"} diff --git a/src/simulation/simulation_trader.py b/src/simulation/simulation_trader.py index 5e0cfce..acb7825 100644 --- a/src/simulation/simulation_trader.py +++ b/src/simulation/simulation_trader.py @@ -14,9 +14,8 @@ from local_position import LocalPosition class SimulationTrader(BaseTrader): - def __init__(self, logger=None): - super().__init__(logger) - self.logger = logger or get_logger("simulation_trader") + def __init__(self): + super().__init__(get_logger("simulation_trader")) # 模拟资金账户信息 self.sim_balance = {"account_id": "simulation", "cash": 1000000.00, "frozen": 0.00, "total": 1000000.00} diff --git a/src/trade_constants.py b/src/trade_constants.py index a4da054..0f16c9b 100644 --- a/src/trade_constants.py +++ b/src/trade_constants.py @@ -7,7 +7,6 @@ ORDER_STATUS_PENDING = 'pending' ORDER_STATUS_PARTIAL = 'partial' ORDER_STATUS_COMPLETED = 'completed' ORDER_STATUS_CANCELLED = 'cancelled' -ORDER_STATUS_FAILED = 'failed' # 订单类型 ORDER_TYPE_LIMIT = 'limit'