diff --git a/src/config.py b/src/config.py index 77c2fb4..c99dc22 100644 --- a/src/config.py +++ b/src/config.py @@ -36,5 +36,20 @@ class Config: RTM_USE_MARKET_ORDER = os.environ.get("RTM_USE_MARKET_ORDER", "True").lower() == "true" # 是否使用市价单进行补单 # 计划任务运行时间 - STRATEGY_SAVE_TIME = os.environ.get("STRATEGY_SAVE_TIME", "15:30") # 每天保存策略数据的时间 - CLEAN_ORDERS_TIME = os.environ.get("CLEAN_ORDERS_TIME", "00:01") # 每天清理超时委托的时间 + STRATEGY_SAVE_TIME = "15:10" # 每天保存策略数据的时间 + CLEAN_ORDERS_TIME = "15:05" # 每天清理超时委托的时间 + + @staticmethod + def is_market_open(): + """判断当前是否在交易时间内 + + Returns: + bool: 是否在交易时间内 + """ + now = datetime.datetime.now().time() + morning_start = datetime.time(9, 30) + morning_end = datetime.time(11, 30) + afternoon_start = datetime.time(13, 0) + afternoon_end = datetime.time(15, 0) + + return (morning_start <= now <= morning_end) or (afternoon_start <= now <= afternoon_end) diff --git a/src/real_trader_manager.py b/src/real_trader_manager.py index 9772279..8f45ca9 100644 --- a/src/real_trader_manager.py +++ b/src/real_trader_manager.py @@ -26,9 +26,8 @@ class RealTraderManager: if not self.trader.is_logged_in(): self.trader.login() - # 存储待处理的交易请求及其状态 - # 格式: {order_id: {strategy_name, code, direction, target_amount, price, status, create_time, last_check_time, retry_count}} - self.pending_orders = {} + # 不再自己维护pending_orders,改用StrategyPositionManager管理 + # self.pending_orders = {} # 启动调度器 self._start_scheduler() @@ -110,21 +109,7 @@ class RealTraderManager: logger.error(f"下单失败: {result}") return {"success": False, "error": "下单失败"} - # 记录到待处理订单 - self.pending_orders[order_id] = { - 'strategy_name': strategy_name, - 'code': code, - 'direction': direction, - 'target_amount': amount, - 'price': price, - 'status': 'pending', - 'create_time': time.time(), - 'last_check_time': time.time(), - 'retry_count': 0, - 'order_type': order_type - } - - # 添加到策略持仓管理器中的未完成委托 + # 使用StrategyPositionManager添加未完成委托 StrategyPositionManager.add_pending_order( self.trader, order_id, @@ -132,7 +117,8 @@ class RealTraderManager: code, price, amount, - direction + direction, + order_type ) logger.info(f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}") @@ -151,8 +137,11 @@ class RealTraderManager: try: logger.info("开始检查未完成订单...") + # 获取所有未完成订单 + pending_orders = StrategyPositionManager.get_pending_orders(self.trader) + # 如果没有未完成订单,直接返回 - if not self.pending_orders: + if not pending_orders: logger.info("没有未完成订单需要检查") return @@ -175,7 +164,7 @@ class RealTraderManager: return # 检查每个未完成订单 - for order_id, order_info in list(self.pending_orders.items()): + for order_id, order_info in list(pending_orders.items()): try: # 跳过已完成的订单 if order_info['status'] in ['completed', 'cancelled', 'failed']: @@ -184,9 +173,14 @@ class RealTraderManager: # 更新订单状态 self._update_order_status(order_id, entrust_map) + # 获取最新的订单信息 + order_info = StrategyPositionManager.get_pending_order(self.trader, order_id) + if not order_info: + continue + # 处理超时未成交或部分成交的订单 current_time = time.time() - order_age = current_time - order_info['create_time'] + order_age = current_time - order_info['created_time'] # 如果订单超过配置的超时时间且状态仍为pending或partial if order_age > Config.RTM_ORDER_TIMEOUT and order_info['status'] in ['pending', 'partial']: @@ -222,7 +216,9 @@ class RealTraderManager: order_id: 订单ID entrust_map: 可选的委托字典,如果为None则重新获取 """ - if order_id not in self.pending_orders: + # 检查订单是否存在 + order_info = StrategyPositionManager.get_pending_order(self.trader, order_id) + if not order_info: return try: @@ -236,62 +232,60 @@ class RealTraderManager: if entrust: # 获取订单之前的状态,用于判断是否发生变化 - previous_status = self.pending_orders[order_id].get('status') - previous_volume = self.pending_orders[order_id].get('traded_volume', 0) - - # 更新最后检查时间 - self.pending_orders[order_id]['last_check_time'] = time.time() + previous_status = order_info.get('status') + previous_volume = order_info.get('traded_volume', 0) # 根据委托状态更新订单状态 if entrust['order_status'] == xtconstant.ORDER_SUCCEEDED: # 全部成交 - self.pending_orders[order_id]['status'] = 'completed' - # 记录状态变化 - if previous_status != 'completed': - logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=completed, 成交量={entrust.get('traded_volume', 0)}") + StrategyPositionManager.update_order_status(self.trader, order_id, 'completed') + # 日志记录在update_order_status中处理 elif entrust['order_status'] == xtconstant.ORDER_PART_SUCC: # 部分成交 - self.pending_orders[order_id]['status'] = 'partial' current_volume = entrust.get('traded_volume', 0) - self.pending_orders[order_id]['traded_volume'] = current_volume + StrategyPositionManager.update_order_status( + self.trader, + order_id, + 'partial', + traded_volume=current_volume + ) # 如果成交量有变化,记录日志 if current_volume != previous_volume: - target_amount = self.pending_orders[order_id]['target_amount'] + target_amount = order_info['target_amount'] logger.info(f"订单部分成交更新: ID={order_id}, 代码={entrust['stock_code']}, 目标数量={target_amount}, 已成交数量={current_volume}, 剩余数量={target_amount - current_volume}") elif entrust['order_status'] in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]: # 已撤单或废单 - self.pending_orders[order_id]['status'] = 'cancelled' - # 记录状态变化 - if previous_status != 'cancelled': - logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=cancelled, 原因={entrust.get('err_msg', '未知原因')}") + StrategyPositionManager.update_order_status( + self.trader, + order_id, + 'cancelled', + err_msg=entrust.get('err_msg', '未知原因') + ) elif entrust['order_status'] == xtconstant.ORDER_UNREPORTED: # 未报 - self.pending_orders[order_id]['status'] = 'pending' if previous_status != 'pending': - logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(未报)") + StrategyPositionManager.update_order_status(self.trader, order_id, 'pending') elif entrust['order_status'] == xtconstant.ORDER_WAIT_REPORTING: # 待报 - self.pending_orders[order_id]['status'] = 'pending' if previous_status != 'pending': - logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(待报)") + StrategyPositionManager.update_order_status(self.trader, order_id, 'pending') elif entrust['order_status'] == xtconstant.ORDER_REPORTED: # 已报 - self.pending_orders[order_id]['status'] = 'pending' if previous_status != 'pending': - logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(已报)") + StrategyPositionManager.update_order_status(self.trader, order_id, 'pending') else: # 委托列表中找不到该订单,可能已经太久 current_time = time.time() - if current_time - self.pending_orders[order_id]['create_time'] > 24 * 60 * 60: - previous_status = self.pending_orders[order_id].get('status') - self.pending_orders[order_id]['status'] = 'failed' - logger.warning(f"订单状态未知且过期: ID={order_id}, 旧状态={previous_status}, 新状态=failed, 创建时长={(current_time - self.pending_orders[order_id]['create_time'])/3600:.1f}小时") + if current_time - order_info['created_time'] > 24 * 60 * 60: + previous_status = order_info.get('status') + StrategyPositionManager.update_order_status(self.trader, order_id, 'failed') + logger.warning(f"订单状态未知且过期: ID={order_id}, 旧状态={previous_status}, 新状态=failed, 创建时长={(current_time - order_info['created_time'])/3600:.1f}小时") except Exception as e: logger.error(f"更新订单状态时发生异常: order_id={order_id}, error={str(e)}") @@ -305,7 +299,7 @@ class RealTraderManager: """ try: # 首先尝试撤单 - logger.info(f"尝试撤销超时订单: ID={order_id}, 代码={order_info['code']}, 超时时间={(time.time() - order_info['create_time']):.0f}秒") + logger.info(f"尝试撤销超时订单: ID={order_id}, 代码={order_info['code']}, 超时时间={(time.time() - order_info['created_time']):.0f}秒") cancel_result = self.trader.cancel(order_id) # 记录撤单结果 @@ -326,15 +320,15 @@ class RealTraderManager: # 如果有未成交的部分,使用市价单补充交易 if remaining_amount > 0: # 递增重试计数 - order_info['retry_count'] += 1 + new_retry_count = StrategyPositionManager.increment_retry_count(self.trader, order_id) # 决定是否使用市价单进行补单 use_market_order = Config.RTM_USE_MARKET_ORDER - logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 使用市价单={use_market_order}") + logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={new_retry_count}/{Config.RTM_MAX_RETRIES}, 使用市价单={use_market_order}") # 如果重试次数少于最大重试次数,则进行补单 - if order_info['retry_count'] <= Config.RTM_MAX_RETRIES: + if new_retry_count <= Config.RTM_MAX_RETRIES: # 决定使用的订单类型 new_order_type = 'market' if use_market_order else 'limit' @@ -356,13 +350,13 @@ class RealTraderManager: else: logger.error(f"补单失败: 原订单ID={order_id}, 错误={new_order.get('error')}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}, 订单类型={new_order_type}") else: - logger.warning(f"订单重试次数过多,不再尝试: ID={order_id}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 代码={order_info['code']}, 方向={order_info['direction']}, 未成交数量={remaining_amount}") + logger.warning(f"订单重试次数过多,不再尝试: ID={order_id}, 重试次数={new_retry_count}/{Config.RTM_MAX_RETRIES}, 代码={order_info['code']}, 方向={order_info['direction']}, 未成交数量={remaining_amount}") else: logger.info(f"订单已全部成交,无需补单: ID={order_id}, 代码={order_info['code']}, 成交数量={traded_amount}") # 更新原订单状态 previous_status = order_info['status'] - order_info['status'] = 'cancelled' + StrategyPositionManager.update_order_status(self.trader, order_id, 'cancelled') logger.info(f"更新原订单状态: ID={order_id}, 旧状态={previous_status}, 新状态=cancelled") except Exception as e: @@ -509,16 +503,8 @@ class RealTraderManager: def clean_expired_orders(self): """清理过期的未完成订单""" - try: - current_time = time.time() - for order_id, order_info in list(self.pending_orders.items()): - # 如果订单创建时间超过24小时 - if current_time - order_info['create_time'] > 24 * 60 * 60: - if order_info['status'] not in ['completed', 'cancelled', 'failed']: - logger.warning(f"清理过期订单: ID={order_id}, 状态={order_info['status']}") - order_info['status'] = 'expired' - except Exception as e: - logger.error(f"清理过期订单时发生异常: {str(e)}") + # 直接调用StrategyPositionManager的方法 + StrategyPositionManager.clean_timeout_orders() def get_pending_orders(self): """获取所有未完成订单 @@ -526,10 +512,12 @@ class RealTraderManager: Returns: list: 未完成订单列表 """ + # 从StrategyPositionManager获取未完成订单 + pending_orders = StrategyPositionManager.get_pending_orders(self.trader) return [{ 'order_id': order_id, **order_info - } for order_id, order_info in self.pending_orders.items()] + } for order_id, order_info in pending_orders.items()] def get_strategy_targets(self): """获取策略目标持仓 diff --git a/src/strategy_position_manager.py b/src/strategy_position_manager.py index f623b02..6b69319 100644 --- a/src/strategy_position_manager.py +++ b/src/strategy_position_manager.py @@ -132,7 +132,7 @@ class StrategyPositionManager: logger.error(f"更新未完成委托状态失败: {str(e)}") @staticmethod - def add_pending_order(trader, order_id, strategy_name, code, price, amount, direction): + def add_pending_order(trader, order_id, strategy_name, code, price, amount, direction, order_type='limit'): """添加未完成委托 Args: @@ -143,6 +143,7 @@ class StrategyPositionManager: price: 委托价格 amount: 委托数量 direction: 交易方向,'buy'或'sell' + order_type: 订单类型,'limit'或'market',默认为'limit' """ if not order_id or order_id == 'simulation': return @@ -157,7 +158,12 @@ class StrategyPositionManager: 'price': price, 'amount': amount, 'direction': direction, - 'created_time': time.time() + 'created_time': time.time(), + 'target_amount': amount, + 'status': 'pending', + 'last_check_time': time.time(), + 'retry_count': 0, + 'order_type': order_type } # 同时记录到交易历史 @@ -177,6 +183,102 @@ class StrategyPositionManager: logger.info(f"添加未完成委托: {order_id}, 交易类型: {trader_type}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}") + @staticmethod + def get_pending_orders(trader): + """获取指定交易类型的所有未完成委托 + + Args: + trader: 交易实例 + + Returns: + dict: 未完成委托字典,以order_id为键 + """ + trader_type = StrategyPositionManager.get_trader_type(trader) + return pending_orders[trader_type] + + @staticmethod + def get_pending_order(trader, order_id): + """获取指定订单信息 + + Args: + trader: 交易实例 + order_id: 订单ID + + Returns: + dict: 订单信息字典,如果不存在则返回None + """ + trader_type = StrategyPositionManager.get_trader_type(trader) + return pending_orders[trader_type].get(order_id) + + @staticmethod + def update_order_status(trader, order_id, new_status, **additional_data): + """更新订单状态 + + Args: + trader: 交易实例 + order_id: 订单ID + new_status: 新状态 + additional_data: 附加数据(如成交量、重试次数等) + + Returns: + bool: 是否成功更新 + """ + trader_type = StrategyPositionManager.get_trader_type(trader) + if order_id in pending_orders[trader_type]: + # 记录之前的状态用于日志 + previous_status = pending_orders[trader_type][order_id].get('status') + + # 更新状态和最后检查时间 + pending_orders[trader_type][order_id]['status'] = new_status + pending_orders[trader_type][order_id]['last_check_time'] = time.time() + + # 更新附加数据 + for key, value in additional_data.items(): + pending_orders[trader_type][order_id][key] = value + + # 记录状态变化日志 + if previous_status != new_status: + code = pending_orders[trader_type][order_id].get('code') + logger.info(f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}") + + return True + return False + + @staticmethod + def increment_retry_count(trader, order_id): + """增加订单重试次数 + + Args: + trader: 交易实例 + order_id: 订单ID + + Returns: + int: 新的重试次数,如果订单不存在则返回-1 + """ + trader_type = StrategyPositionManager.get_trader_type(trader) + if order_id in pending_orders[trader_type]: + current = pending_orders[trader_type][order_id].get('retry_count', 0) + pending_orders[trader_type][order_id]['retry_count'] = current + 1 + return current + 1 + return -1 + + @staticmethod + def remove_pending_order(trader, order_id): + """移除未完成委托 + + Args: + trader: 交易实例 + order_id: 订单ID + + Returns: + bool: 是否成功移除 + """ + trader_type = StrategyPositionManager.get_trader_type(trader) + if order_id in pending_orders[trader_type]: + del pending_orders[trader_type][order_id] + return True + return False + @staticmethod def clean_timeout_orders(): """清理超时委托""" @@ -187,6 +289,7 @@ class StrategyPositionManager: # 超过24小时的委托视为超时 if current_time - order_info['created_time'] > 24 * 60 * 60: del pending_orders[trader_type][order_id] + logger.warning(f"清理超时委托: ID={order_id}, 状态={order_info.get('status')}") @staticmethod def load_strategy_data():