From 95cd287675e13b1e040eb0f3d225ff56bee101dd Mon Sep 17 00:00:00 2001 From: zhiyong Date: Wed, 30 Apr 2025 22:50:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BE=88=E5=A4=9A=E5=B0=8F?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/real_trader_manager.py | 195 ++++++++++++++-------- src/trade_server.py | 325 ++++++++++++++++++++++++++++--------- src/xt_trader.py | 60 ++++++- 3 files changed, 426 insertions(+), 154 deletions(-) diff --git a/src/real_trader_manager.py b/src/real_trader_manager.py index 6123875..785add9 100644 --- a/src/real_trader_manager.py +++ b/src/real_trader_manager.py @@ -21,8 +21,7 @@ class RealTraderManager: trader: XtTrader实例,如果为None则自动获取 """ # 使用传入的trader实例或获取单例 - from trade_server import get_real_trader - self.trader = trader if trader is not None else get_real_trader() + self.trader = trader if trader is not None else XtTrader() # 确保已登录 if not self.trader.is_logged_in(): @@ -71,8 +70,8 @@ class RealTraderManager: code: 股票代码 direction: 交易方向 'buy'或'sell' amount: 交易数量 - price: 交易价格 - order_type: 订单类型,'limit'表示限价单,'market'表示市价单 + price: 交易价格(市价单时可为0) + order_type: 订单类型,'limit'表示限价单,'market'表示市价单,默认为'limit' Returns: dict: 包含订单ID和状态信息 @@ -85,24 +84,27 @@ class RealTraderManager: if direction not in ['buy', 'sell']: logger.error(f"无效的交易方向: {direction}") return {"success": False, "error": "无效的交易方向"} + + # 检查订单类型 + if order_type not in ['limit', 'market']: + logger.error(f"无效的订单类型: {order_type}") + return {"success": False, "error": "无效的订单类型,必须是'limit'或'market'"} try: - # 检查资金和持仓是否足够 - if not self._check_order_feasibility(code, direction, amount, price): - logger.warning(f"资金或持仓不足,忽略订单: {direction} {code} {amount}股") + # 对于限价单,检查资金和持仓是否足够 + if order_type == 'limit' and not self._check_order_feasibility(code, direction, amount, price): + logger.warning(f"资金或持仓不足,忽略订单: {direction} {code} {amount}股 {price}元") return {"success": False, "error": "资金或持仓不足"} # 更新策略目标持仓 self._update_strategy_target(strategy_name, code, direction, amount) - # 执行实际下单 - price_type = xtconstant.FIX_PRICE if order_type == 'limit' else xtconstant.MARKET_BEST - # 下单 + logger.info(f"准备{direction}订单: 代码={code}, 数量={amount}, 价格={price}, 订单类型={order_type}") if direction == 'buy': - result = self.trader.buy(code, price, amount) + result = self.trader.buy(code, price, amount, order_type) else: - result = self.trader.sell(code, price, amount) + result = self.trader.sell(code, price, amount, order_type) order_id = result.get('order_id') if not order_id or order_id == 'simulation': @@ -134,7 +136,7 @@ class RealTraderManager: direction ) - logger.info(f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}") + logger.info(f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}") # 立即更新一次订单状态 self._update_order_status(order_id) @@ -150,42 +152,64 @@ class RealTraderManager: try: logger.info("开始检查未完成订单...") + # 如果没有未完成订单,直接返回 + if not self.pending_orders: + logger.info("没有未完成订单需要检查") + return + # 更新StrategyPositionManager中的未完成委托状态 - StrategyPositionManager.update_pending_orders(self.trader) + try: + StrategyPositionManager.update_pending_orders(self.trader) + except Exception as e: + logger.error(f"更新StrategyPositionManager未完成委托状态失败: {str(e)}") # 获取最新的委托列表 - entrusts = self.trader.get_today_entrust() - entrust_map = {str(e['order_id']): e for e in entrusts} + try: + entrusts = self.trader.get_today_entrust() + if entrusts is None: + logger.error("获取今日委托失败,跳过本次检查") + return + + entrust_map = {str(e['order_id']): e for e in entrusts} + except Exception as e: + logger.error(f"获取今日委托失败: {str(e)},跳过本次检查") + return # 检查每个未完成订单 for order_id, order_info in list(self.pending_orders.items()): - # 跳过已完成的订单 - if order_info['status'] in ['completed', 'cancelled', 'failed']: - continue - - # 更新订单状态 - self._update_order_status(order_id, entrust_map) - - # 处理超时未成交或部分成交的订单 - current_time = time.time() - order_age = current_time - order_info['create_time'] - - # 如果订单超过配置的超时时间且状态仍为pending或partial - if order_age > Config.RTM_ORDER_TIMEOUT and order_info['status'] in ['pending', 'partial']: - # 记录超时信息 - logger.warning(f"订单已超时({order_age:.0f}秒 > {Config.RTM_ORDER_TIMEOUT}秒): ID={order_id}, 代码={order_info['code']}, 状态={order_info['status']}") + try: + # 跳过已完成的订单 + if order_info['status'] in ['completed', 'cancelled', 'failed']: + continue - # 如果是部分成交,记录详情 - if order_info['status'] == 'partial' and 'traded_volume' in order_info: - original = order_info['target_amount'] - traded = order_info['traded_volume'] - remaining = original - traded - logger.info(f"订单部分成交详情: ID={order_id}, 原始数量={original}, 已成交={traded}, 剩余={remaining}") + # 更新订单状态 + self._update_order_status(order_id, entrust_map) - self._handle_timeout_order(order_id, order_info) + # 处理超时未成交或部分成交的订单 + current_time = time.time() + order_age = current_time - order_info['create_time'] + + # 如果订单超过配置的超时时间且状态仍为pending或partial + if order_age > Config.RTM_ORDER_TIMEOUT and order_info['status'] in ['pending', 'partial']: + # 记录超时信息 + logger.warning(f"订单已超时({order_age:.0f}秒 > {Config.RTM_ORDER_TIMEOUT}秒): ID={order_id}, 代码={order_info['code']}, 状态={order_info['status']}") + + # 如果是部分成交,记录详情 + if order_info['status'] == 'partial' and 'traded_volume' in order_info: + original = order_info['target_amount'] + traded = order_info['traded_volume'] + remaining = original - traded + logger.info(f"订单部分成交详情: ID={order_id}, 原始数量={original}, 已成交={traded}, 剩余={remaining}") + + self._handle_timeout_order(order_id, order_info) + except Exception as e: + logger.error(f"处理订单 {order_id} 时出错: {str(e)}") # 同步策略持仓和实际持仓 - self._sync_strategy_positions() + try: + self._sync_strategy_positions() + except Exception as e: + logger.error(f"同步策略持仓和实际持仓失败: {str(e)}") logger.info("未完成订单检查完毕") @@ -305,24 +329,33 @@ class RealTraderManager: # 递增重试计数 order_info['retry_count'] += 1 - logger.info(f"准备使用市价单补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}") + # 决定是否使用市价单进行补单 + use_market_order = Config.RTM_USE_MARKET_ORDER - # 如果重试次数少于最大重试次数,则使用市价单补单 + logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 使用市价单={use_market_order}") + + # 如果重试次数少于最大重试次数,则进行补单 if order_info['retry_count'] <= Config.RTM_MAX_RETRIES: - # 使用市价单 + # 决定使用的订单类型 + new_order_type = 'market' if use_market_order else 'limit' + + # 对于市价单,价格参数可设为0;对于限价单,使用原价格 + new_price = 0 if new_order_type == 'market' else order_info['price'] + + # 下新订单 new_order = self.place_order( order_info['strategy_name'], order_info['code'], order_info['direction'], remaining_amount, - 0, # 市价单价格参数无效 - 'market' # 使用市价单 + new_price, + new_order_type ) if new_order.get('success'): - logger.info(f"市价补单成功: 原订单ID={order_id}, 新订单ID={new_order['order_id']}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}") + logger.info(f"补单成功: 原订单ID={order_id}, 新订单ID={new_order['order_id']}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}, 订单类型={new_order_type}") else: - logger.error(f"市价补单失败: 原订单ID={order_id}, 错误={new_order.get('error')}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}") + logger.error(f"补单失败: 原订单ID={order_id}, 错误={new_order.get('error')}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}, 订单类型={new_order_type}") else: logger.warning(f"订单重试次数过多,不再尝试: ID={order_id}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 代码={order_info['code']}, 方向={order_info['direction']}, 未成交数量={remaining_amount}") else: @@ -422,8 +455,17 @@ class RealTraderManager: try: # 获取实际持仓 actual_positions = self.trader.get_positions() + if actual_positions is None: + logger.error("获取实际持仓失败,跳过同步") + return + position_map = {p['stock_code']: p for p in actual_positions} + # 如果没有策略目标持仓,直接返回 + if not self.strategy_targets: + logger.info("没有策略目标持仓需要同步") + return + # 遍历每个策略的目标持仓 for strategy_name, targets in self.strategy_targets.items(): # 该策略的实际持仓映射 @@ -431,28 +473,34 @@ class RealTraderManager: # 遍历该策略的目标持仓 for code, target_amount in targets.items(): - # 获取股票的实际持仓 - actual_position = position_map.get(code, {}) - actual_amount = actual_position.get('volume', 0) - - if actual_amount > 0: - strategy_actual_positions[code] = actual_amount + try: + # 获取股票的实际持仓 + actual_position = position_map.get(code, {}) + actual_amount = actual_position.get('volume', 0) - # 更新策略持仓管理器中的持仓记录 - StrategyPositionManager.update_strategy_position( - self.trader, - strategy_name, - code, - 'sync', # 使用同步模式 - actual_amount - ) - - # 检查是否需要调整持仓 - if actual_amount != target_amount: - diff = target_amount - actual_amount - if diff != 0: - logger.warning(f"持仓不一致: 策略={strategy_name}, 代码={code}, 目标={target_amount}, 实际={actual_amount}") - + if actual_amount > 0: + strategy_actual_positions[code] = actual_amount + + # 更新策略持仓管理器中的持仓记录 + try: + StrategyPositionManager.update_strategy_position( + self.trader, + strategy_name, + code, + 'sync', # 使用同步模式 + actual_amount + ) + except Exception as e: + logger.error(f"更新策略持仓管理器持仓记录失败: {str(e)}") + + # 检查是否需要调整持仓 + if actual_amount != target_amount: + diff = target_amount - actual_amount + if diff != 0: + logger.warning(f"持仓不一致: 策略={strategy_name}, 代码={code}, 目标={target_amount}, 实际={actual_amount}") + except Exception as e: + logger.error(f"同步股票 {code} 持仓时出错: {str(e)}") + # 记录日志 logger.info(f"策略 {strategy_name} 的目标持仓: {targets}") logger.info(f"策略 {strategy_name} 的实际持仓: {strategy_actual_positions}") @@ -494,6 +542,7 @@ class RealTraderManager: # 单例模式实现 _instance = None +_instance_lock = threading.RLock() # 添加线程锁 def get_real_trader_manager(): """获取实盘交易管理器单例实例 @@ -502,9 +551,11 @@ def get_real_trader_manager(): RealTraderManager: 实盘交易管理器实例 """ global _instance - if _instance is None: - # 从trade_server获取实盘交易实例 - from trade_server import get_real_trader - trader = get_real_trader() - _instance = RealTraderManager(trader) + # 使用线程锁确保线程安全 + with _instance_lock: + if _instance is None: + # 避免循环导入,使用参数传递XtTrader实例 + # 注意:这里依赖trade_server.py中的实现提供trader实例 + # 在trade_server.py中会直接传入trader实例 + _instance = RealTraderManager(None) return _instance \ No newline at end of file diff --git a/src/trade_server.py b/src/trade_server.py index bad1c9b..76b421a 100644 --- a/src/trade_server.py +++ b/src/trade_server.py @@ -11,7 +11,6 @@ from simulation_trader import SimulationTrader import datetime from strategy_position_manager import StrategyPositionManager from logger_config import get_logger -from real_trader_manager import get_real_trader_manager # 获取日志记录器 logger = get_logger('server') @@ -19,6 +18,13 @@ logger = get_logger('server') # 全局交易实例(采用单例模式) _sim_trader_instance = None # 模拟交易实例(单例) _real_trader_instance = None # 实盘交易实例(单例) +_real_trader_manager_instance = None # 实盘交易管理器实例(单例) + +# 添加线程锁,保护单例实例的创建 +_instance_lock = threading.RLock() + +# 后台任务执行线程 +_scheduler_thread = None # 获取模拟交易实例的辅助函数 def get_sim_trader(): @@ -28,8 +34,9 @@ def get_sim_trader(): 返回模拟交易单例实例 """ global _sim_trader_instance - if _sim_trader_instance is None: - _sim_trader_instance = SimulationTrader() + with _instance_lock: + if _sim_trader_instance is None: + _sim_trader_instance = SimulationTrader() return _sim_trader_instance # 获取实盘交易实例的辅助函数 @@ -40,14 +47,33 @@ def get_real_trader(): 返回实盘交易单例实例 """ global _real_trader_instance - if _real_trader_instance is None: - _real_trader_instance = XtTrader() - # 检查交易实例是否已登录,如果未登录则进行登录 - if not _real_trader_instance.is_logged_in(): - logger.info("创建新的XtTrader实例并登录") - _real_trader_instance.login() + with _instance_lock: + if _real_trader_instance is None: + _real_trader_instance = XtTrader() + # 检查交易实例是否已登录,如果未登录则进行登录 + if not _real_trader_instance.is_logged_in(): + logger.info("创建新的XtTrader实例并登录") + login_success = _real_trader_instance.login() + if not login_success: + logger.error("XtTrader登录失败") return _real_trader_instance +# 获取实盘交易管理器实例的辅助函数 +def get_real_trader_manager(): + """获取实盘交易管理器实例 - 保证单例模式 + + Returns: + 返回实盘交易管理器单例实例 + """ + global _real_trader_manager_instance + with _instance_lock: + if _real_trader_manager_instance is None: + # 延迟导入避免循环依赖 + from real_trader_manager import RealTraderManager + _real_trader_manager_instance = RealTraderManager(get_real_trader()) + logger.info("创建新的RealTraderManager实例") + return _real_trader_manager_instance + # 判断当前是否应该使用模拟交易 def should_use_simulation(): """判断是否应该使用模拟交易 @@ -68,9 +94,46 @@ def should_use_simulation(): try: from chinese_calendar import is_workday, is_holiday is_trading_day = is_workday(now) and not is_holiday(now) + logger.debug(f"使用chinese_calendar判断交易日: {now.date()}, 是交易日: {is_trading_day}") except ImportError: - # 如果无法导入chinese_calendar,则简单地用工作日判断 + # 如果无法导入chinese_calendar,使用简单的工作日判断,并记录警告 is_trading_day = now.weekday() < 5 # 0-4 为周一至周五 + logger.warning(f"无法导入chinese_calendar,使用简单工作日判断: {now.date()}, 是工作日: {is_trading_day}") + + # 额外检查是否为已知的中国法定节假日(如果必要,可以扩展此列表) + holidays_2023 = [ + datetime.date(2023, 1, 1), # 元旦 + datetime.date(2023, 1, 2), # 元旦调休 + datetime.date(2023, 1, 21), # 春节 + datetime.date(2023, 1, 22), # 春节 + datetime.date(2023, 1, 23), # 春节 + datetime.date(2023, 1, 24), # 春节 + datetime.date(2023, 1, 25), # 春节 + datetime.date(2023, 1, 26), # 春节 + datetime.date(2023, 1, 27), # 春节 + # ... 可以添加更多已知节假日 + ] + + holidays_2024 = [ + datetime.date(2024, 1, 1), # 元旦 + datetime.date(2024, 2, 10), # 春节 + datetime.date(2024, 2, 11), # 春节 + datetime.date(2024, 2, 12), # 春节 + datetime.date(2024, 2, 13), # 春节 + datetime.date(2024, 2, 14), # 春节 + datetime.date(2024, 2, 15), # 春节 + datetime.date(2024, 2, 16), # 春节 + datetime.date(2024, 2, 17), # 春节 + # ... 可以添加更多已知节假日 + ] + + # 合并所有年份的节假日 + all_holidays = holidays_2023 + holidays_2024 + + # 检查当前日期是否在已知节假日列表中 + if now.date() in all_holidays: + is_trading_day = False + logger.info(f"当前日期 {now.date()} 在已知节假日列表中,判定为非交易日") # 如果不是交易日,返回True(使用模拟交易) if not is_trading_day: @@ -147,16 +210,61 @@ def run_daily(time_str, job_func): def run_pending_tasks(): - while True: + """定时任务执行线程""" + global _scheduler_thread_running + logger.info("定时任务调度线程已启动") + while _scheduler_thread_running: try: schedule.run_pending() time.sleep(1) except Exception as e: logger.error(f"Error in scheduler: {str(e)}") + logger.info("定时任务调度线程已停止") -# Run the task scheduler in a new thread -threading.Thread(target=run_pending_tasks).start() +# 程序启动时初始化线程 +_scheduler_thread_running = True +_scheduler_thread = threading.Thread(target=run_pending_tasks, daemon=True) +_scheduler_thread.start() + +# 程序退出清理函数 +def cleanup(): + """程序退出时执行的清理操作""" + logger.info("开始执行程序退出清理...") + + # 停止调度线程 + global _scheduler_thread_running + _scheduler_thread_running = False + + # 等待调度线程结束(最多等待5秒) + if _scheduler_thread and _scheduler_thread.is_alive(): + _scheduler_thread.join(timeout=5) + + # 保存策略数据 + try: + StrategyPositionManager.save_strategy_data() + logger.info("策略数据已保存") + except Exception as e: + logger.error(f"保存策略数据失败: {str(e)}") + + # 登出交易实例 + try: + # 登出模拟交易实例 + if _sim_trader_instance is not None: + _sim_trader_instance.logout() + logger.info("模拟交易实例已登出") + + # 登出实盘交易实例 + if _real_trader_instance is not None: + _real_trader_instance.logout() + logger.info("实盘交易实例已登出") + except Exception as e: + logger.error(f"登出交易实例失败: {str(e)}") + + logger.info("程序退出清理完成") + +# 注册程序退出处理函数 +atexit.register(cleanup) # 初始化交易环境 get_trader().login() @@ -165,8 +273,8 @@ get_trader().login() app = Flask(__name__) # 添加策略数据相关的定期任务 -schedule.every().day.at("00:01").do(StrategyPositionManager.clean_timeout_orders) # 每天清理超时委托 -schedule.every().day.at("15:30").do(StrategyPositionManager.save_strategy_data) # 每天收盘后保存策略数据 +schedule.every().day.at(Config.CLEAN_ORDERS_TIME).do(StrategyPositionManager.clean_timeout_orders) # 每天清理超时委托 +schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(StrategyPositionManager.save_strategy_data) # 每天收盘后保存策略数据 # 程序启动时加载策略数据 StrategyPositionManager.load_strategy_data() @@ -335,38 +443,34 @@ def cancel(entrust_no): StrategyPositionManager.update_pending_orders(sim_trader) return jsonify({"success": True, "data": result, "simulation": True}), 200 else: - # 尝试使用RealTraderManager撤单 - try: + # 实盘交易,优先使用RealTraderManager + if Config.USE_REAL_TRADER_MANAGER: rtm = get_real_trader_manager() + # 在RealTraderManager的待处理订单中查找 + found_in_rtm = False for order in rtm.get_pending_orders(): if str(order['order_id']) == str(entrust_no): - # 找到对应订单,使用RealTraderManager处理 - real_trader = get_real_trader() - result = real_trader.cancel(entrust_no) - logger.info(f"实盘交易撤单结果: {result}") + found_in_rtm = True + # 使用RealTraderManager中的trader进行撤单 + result = rtm.trader.cancel(entrust_no) + logger.info(f"通过RealTraderManager撤单结果: {result}") # 更新订单状态 rtm.check_pending_orders() return jsonify({"success": True, "data": result, "simulation": False}), 200 - # 如果RealTraderManager中找不到,则使用普通实盘 - real_trader = get_real_trader() - result = real_trader.cancel(entrust_no) - logger.info(f"实盘交易撤单结果: {result}") - - # 更新未完成委托状态 - StrategyPositionManager.update_pending_orders(real_trader) - return jsonify({"success": True, "data": result, "simulation": False}), 200 - except Exception as e: - logger.error(f"使用RealTraderManager撤单失败: {str(e)}") - # 回退到普通方式 - real_trader = get_real_trader() - result = real_trader.cancel(entrust_no) - logger.info(f"实盘交易撤单结果: {result}") - - # 更新未完成委托状态 - StrategyPositionManager.update_pending_orders(real_trader) - return jsonify({"success": True, "data": result, "simulation": False}), 200 + # 如果RealTraderManager中未找到,执行下面的普通实盘撤单 + if not found_in_rtm: + logger.info(f"在RealTraderManager中未找到订单{entrust_no},使用普通实盘撤单") + + # 普通实盘撤单方式 + real_trader = get_real_trader() + result = real_trader.cancel(entrust_no) + logger.info(f"普通实盘撤单结果: {result}") + + # 更新未完成委托状态 + StrategyPositionManager.update_pending_orders(real_trader) + return jsonify({"success": True, "data": result, "simulation": False}), 200 except Exception as e: logger.error(f"Error processing cancel request: {str(e)}") @@ -388,9 +492,14 @@ def get_balance(): logger.info(f"模拟交易余额: {balance}") return jsonify({"success": True, "data": balance, "simulation": True}), 200 else: - # 实盘交易 + # 实盘交易,添加超时处理 trader = get_real_trader() - balance = trader.get_balance() + balance = execute_with_timeout(trader.get_balance, Config.TRADE_TIMEOUT) + + if balance is None: + logger.error("获取实盘余额超时") + return jsonify({"success": False, "error": "获取余额超时,请稍后重试", "simulation": False}), 500 + logger.info(f"实盘交易余额: {balance}") return jsonify({"success": True, "data": balance, "simulation": False}), 200 except Exception as e: @@ -483,25 +592,41 @@ def clear_strategy(strategy_name): # 判断当前交易模式 should_simulate, _ = should_use_simulation() - # 如果是实盘模式 - if not should_simulate: + # 如果是实盘模式且启用了RealTraderManager + if not should_simulate and Config.USE_REAL_TRADER_MANAGER: # 先尝试清除RealTraderManager中的策略目标 rtm = get_real_trader_manager() if strategy_name in rtm.strategy_targets: - del rtm.strategy_targets[strategy_name] - logger.info(f"已清除RealTraderManager中的策略目标: {strategy_name}") + with _instance_lock: # 使用锁保护操作 + if strategy_name in rtm.strategy_targets: + del rtm.strategy_targets[strategy_name] + logger.info(f"已清除RealTraderManager中的策略目标: {strategy_name}") + + # 清除RealTraderManager中相关的待处理订单 + pending_orders_to_remove = [] + for order_id, order_info in rtm.pending_orders.items(): + if order_info.get('strategy_name') == strategy_name: + pending_orders_to_remove.append(order_id) + + # 删除相关订单 + for order_id in pending_orders_to_remove: + with _instance_lock: # 使用锁保护操作 + if order_id in rtm.pending_orders: + del rtm.pending_orders[order_id] + logger.info(f"已清除RealTraderManager中的订单: {order_id}") # 获取相应的交易实例 trader = get_sim_trader() if should_simulate else get_real_trader() # 如果是模拟交易实例,则重置模拟交易实例 if should_simulate and isinstance(trader, SimulationTrader): - global _sim_trader_instance - if _sim_trader_instance is not None: - logger.info("重置模拟交易实例") - # 创建一个新的模拟交易实例,替换原有实例 - _sim_trader_instance = SimulationTrader() - trader = _sim_trader_instance + with _instance_lock: # 使用锁保护操作 + global _sim_trader_instance + if _sim_trader_instance is not None: + logger.info("重置模拟交易实例") + # 创建一个新的模拟交易实例,替换原有实例 + _sim_trader_instance = SimulationTrader() + trader = _sim_trader_instance # 使用StrategyPositionManager清除策略 success, message = StrategyPositionManager.clear_strategy(trader, strategy_name) @@ -518,11 +643,25 @@ def clear_strategy(strategy_name): # 超时处理函数 def execute_with_timeout(func, timeout, *args, **kwargs): + """执行函数并设置超时时间,如果超时则返回None + + Args: + func: 要执行的函数 + timeout: 超时时间(秒) + args, kwargs: 传递给func的参数 + + Returns: + func的返回值,如果超时则返回None + """ with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(func, *args, **kwargs) try: return future.result(timeout=timeout) except TimeoutError: + logger.warning(f"函数 {func.__name__} 执行超时 (>{timeout}秒)") + return None + except Exception as e: + logger.error(f"函数 {func.__name__} 执行出错: {str(e)}") return None @@ -532,19 +671,38 @@ def get_order_status(): """获取订单状态""" logger.info("Received order status request") - # 判断当前交易模式 - should_simulate, _ = should_use_simulation() - - if not should_simulate and Config.USE_REAL_TRADER_MANAGER: - # 实盘 + RealTraderManager模式 - rtm = get_real_trader_manager() - pending_orders = rtm.get_pending_orders() - return jsonify({"success": True, "data": pending_orders, "simulation": False}), 200 - else: + try: + # 判断当前交易模式 + should_simulate, _ = should_use_simulation() + + if not should_simulate and Config.USE_REAL_TRADER_MANAGER: + # 实盘 + RealTraderManager模式 + try: + rtm = get_real_trader_manager() + pending_orders = rtm.get_pending_orders() + if pending_orders is None: + logger.error("从RealTraderManager获取订单状态失败") + return jsonify({"success": False, "error": "获取订单状态失败", "simulation": False}), 500 + return jsonify({"success": True, "data": pending_orders, "simulation": False}), 200 + except Exception as e: + logger.error(f"从RealTraderManager获取订单状态时出错: {str(e)}") + # 发生错误时,回退到使用普通交易实例 + logger.info("回退到使用普通交易实例获取订单状态") + # 模拟交易或实盘但未使用RealTraderManager trader = get_sim_trader() if should_simulate else get_real_trader() - entrusts = trader.get_today_entrust() - return jsonify({"success": True, "data": entrusts, "simulation": should_simulate}), 200 + try: + entrusts = execute_with_timeout(trader.get_today_entrust, Config.TRADE_TIMEOUT) + if entrusts is None: + logger.error("获取今日委托超时") + return jsonify({"success": False, "error": "获取今日委托超时", "simulation": should_simulate}), 500 + return jsonify({"success": True, "data": entrusts, "simulation": should_simulate}), 200 + except Exception as e: + logger.error(f"获取今日委托时出错: {str(e)}") + return jsonify({"success": False, "error": f"获取今日委托时出错: {str(e)}", "simulation": should_simulate}), 500 + except Exception as e: + logger.error(f"处理订单状态请求时出错: {str(e)}") + abort(500, description="Internal server error") # 添加新的API端点查询策略目标持仓 @@ -553,24 +711,35 @@ def get_strategy_targets(): """获取策略目标持仓""" logger.info("Received strategy targets request") - # 获取查询参数 - strategy_name = request.args.get("strategy_name") - - # 检查是否是实盘模式且使用RealTraderManager - should_simulate, _ = should_use_simulation() - - if not should_simulate and Config.USE_REAL_TRADER_MANAGER: - rtm = get_real_trader_manager() - targets = rtm.get_strategy_targets() + try: + # 获取查询参数 + strategy_name = request.args.get("strategy_name") - # 如果指定了策略名称,则只返回该策略的目标持仓 - if strategy_name: - strategy_target = targets.get(strategy_name, {}) - return jsonify({"success": True, "data": {strategy_name: strategy_target}, "simulation": False}), 200 + # 检查是否是实盘模式且使用RealTraderManager + should_simulate, _ = should_use_simulation() - return jsonify({"success": True, "data": targets, "simulation": False}), 200 - else: - return jsonify({"success": False, "error": "无法获取目标持仓:非实盘模式或RealTraderManager未启用"}), 400 + if should_simulate: + return jsonify({"success": False, "error": "模拟交易模式下不支持目标持仓", "simulation": True}), 400 + + if not Config.USE_REAL_TRADER_MANAGER: + return jsonify({"success": False, "error": "RealTraderManager未启用,无法获取目标持仓", "simulation": False}), 400 + + try: + rtm = get_real_trader_manager() + targets = rtm.get_strategy_targets() + + # 如果指定了策略名称,则只返回该策略的目标持仓 + if strategy_name: + strategy_target = targets.get(strategy_name, {}) + return jsonify({"success": True, "data": {strategy_name: strategy_target}, "simulation": False}), 200 + + return jsonify({"success": True, "data": targets, "simulation": False}), 200 + except Exception as e: + logger.error(f"获取策略目标持仓时出错: {str(e)}") + return jsonify({"success": False, "error": f"获取策略目标持仓时出错: {str(e)}", "simulation": False}), 500 + except Exception as e: + logger.error(f"处理策略目标持仓请求时出错: {str(e)}") + abort(500, description="Internal server error") if __name__ == "__main__": diff --git a/src/xt_trader.py b/src/xt_trader.py index 42e6324..c035feb 100644 --- a/src/xt_trader.py +++ b/src/xt_trader.py @@ -165,15 +165,67 @@ class XtTrader: } return price_type_map.get(price_type, f"unknown_{price_type}") - def buy(self, code, price, amount): + def buy(self, code, price, amount, order_type='limit'): + """买入股票 + + Args: + code: 股票代码 + price: 买入价格(市价单时可为0) + amount: 买入数量 + order_type: 订单类型,'limit'=限价单,'market'=市价单,默认为'limit' + + Returns: + dict: 包含订单ID的字典 + """ + # 确定价格类型 + price_type = xtconstant.FIX_PRICE # 默认为限价单 + + if order_type == 'market': + # 市价单,根据不同市场选择合适的市价单类型 + if code.startswith('1') or code.startswith('5'): + # 基金等可能需要不同的市价单类型 + price_type = xtconstant.MARKET_BEST + else: + price_type = xtconstant.MARKET_BEST # 市价最优价 + + # 如果是市价单,价格可以设为0 + if price_type != xtconstant.FIX_PRICE: + price = 0 + order_id = self.xt_trader.order_stock( - self.account, code, xtconstant.STOCK_BUY, amount, xtconstant.FIX_PRICE, price, self._strategy_name, self._remark + self.account, code, xtconstant.STOCK_BUY, amount, price_type, price, self._strategy_name, self._remark ) return {"order_id": order_id} - def sell(self, code, price, amount): + def sell(self, code, price, amount, order_type='limit'): + """卖出股票 + + Args: + code: 股票代码 + price: 卖出价格(市价单时可为0) + amount: 卖出数量 + order_type: 订单类型,'limit'=限价单,'market'=市价单,默认为'limit' + + Returns: + dict: 包含订单ID的字典 + """ + # 确定价格类型 + price_type = xtconstant.FIX_PRICE # 默认为限价单 + + if order_type == 'market': + # 市价单,根据不同市场选择合适的市价单类型 + if code.startswith('1') or code.startswith('5'): + # 基金等可能需要不同的市价单类型 + price_type = xtconstant.MARKET_BEST + else: + price_type = xtconstant.MARKET_BEST # 市价最优价 + + # 如果是市价单,价格可以设为0 + if price_type != xtconstant.FIX_PRICE: + price = 0 + order_id = self.xt_trader.order_stock( - self.account, code, xtconstant.STOCK_SELL, amount, xtconstant.FIX_PRICE, price, self._strategy_name, self._remark + self.account, code, xtconstant.STOCK_SELL, amount, price_type, price, self._strategy_name, self._remark ) return {"order_id": order_id}