From fdee1c3da1193494a42cf9be03effde0c8b65d08 Mon Sep 17 00:00:00 2001 From: zhiyong Date: Wed, 30 Apr 2025 22:16:48 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BD=BF=E7=94=A8=E5=AE=9E?= =?UTF-8?q?=E7=9B=98=E8=BF=98=E6=98=AF=E6=A8=A1=E6=8B=9F=E7=9B=98=E7=9A=84?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config.py | 30 +- src/real_trader_manager.py | 510 +++++++++++++++++++++++++++++++ src/strategy_position_manager.py | 45 +-- src/trade_server.py | 467 ++++++++++++++++------------ 4 files changed, 825 insertions(+), 227 deletions(-) create mode 100644 src/real_trader_manager.py diff --git a/src/config.py b/src/config.py index 11db102..35d09bd 100644 --- a/src/config.py +++ b/src/config.py @@ -1,19 +1,19 @@ import os +import datetime class Config: # Server settings - PORT = 9527 - HOST = '0.0.0.0' - DEBUG = False + PORT = int(os.environ.get("PORT", 9527)) + HOST = os.environ.get("HOST", "0.0.0.0") + DEBUG = os.environ.get("DEBUG", "False").lower() == "true" # Trading settings - TRADE_TIMEOUT = 3 # 交易超时时间(秒) - SIMULATION_ONLY = True # 是否仅使用模拟交易 + TRADE_TIMEOUT = int(os.environ.get("TRADE_TIMEOUT", 10)) # 交易超时时间(秒) + SIMULATION_ONLY = os.environ.get("SIMULATION_ONLY", "False").lower() == "true" # Trading hours - MARKET_OPEN_TIME = "09:20" - MARKET_ACTIVE_TIME = "09:15" - MARKET_CLOSE_TIME = "15:10" + MARKET_OPEN_TIME = os.environ.get("MARKET_OPEN_TIME", "09:15") + MARKET_CLOSE_TIME = os.environ.get("MARKET_CLOSE_TIME", "15:30") # Logging LOG_DIR = "logs" @@ -27,5 +27,15 @@ class Config: RATE_LIMIT_PERIOD = 60 # seconds # XtQuant 相关配置 - XT_ACCOUNT = '80391818' - XT_PATH = r'C:\\江海证券QMT实盘_交易\\userdata_mini' + XT_ACCOUNT = os.environ.get("XT_ACCOUNT", "80391818") + XT_PATH = os.environ.get("XT_PATH", r'C:\\江海证券QMT实盘_交易\\userdata_mini') + + # 新增RealTraderManager配置 + USE_REAL_TRADER_MANAGER = os.environ.get("USE_REAL_TRADER_MANAGER", "True").lower() == "true" + RTM_ORDER_TIMEOUT = int(os.environ.get("RTM_ORDER_TIMEOUT", 60)) # 订单超时时间(秒) + RTM_MAX_RETRIES = int(os.environ.get("RTM_MAX_RETRIES", 3)) # 最大重试次数 + RTM_USE_MARKET_ORDER = os.environ.get("RTM_USE_MARKET_ORDER", "True").lower() == "true" # 是否使用市价单进行补单 + + # 计划任务运行时间 + STRATEGY_SAVE_TIME = os.environ.get("STRATEGY_SAVE_TIME", "15:30") # 每天保存策略数据的时间 + CLEAN_ORDERS_TIME = os.environ.get("CLEAN_ORDERS_TIME", "00:01") # 每天清理超时委托的时间 diff --git a/src/real_trader_manager.py b/src/real_trader_manager.py new file mode 100644 index 0000000..6123875 --- /dev/null +++ b/src/real_trader_manager.py @@ -0,0 +1,510 @@ +import time +import threading +import schedule +from xt_trader import XtTrader +from xtquant import xtconstant +from logger_config import get_logger +from config import Config +from strategy_position_manager import StrategyPositionManager +import json + +# 获取日志记录器 +logger = get_logger('real_trader_manager') + +class RealTraderManager: + """实盘交易管理器,处理实盘下单失败、部分成交等问题,尽量保证仓位与策略信号一致""" + + def __init__(self, trader=None): + """初始化实盘交易管理器 + + Args: + trader: XtTrader实例,如果为None则自动获取 + """ + # 使用传入的trader实例或获取单例 + from trade_server import get_real_trader + self.trader = trader if trader is not None else get_real_trader() + + # 确保已登录 + if not self.trader.is_logged_in(): + self.trader.login() + + # 存储待处理的交易请求及其状态 + # 格式: {order_id: {strategy_name, code, direction, target_amount, price, status, create_time, last_check_time, retry_count}} + self.pending_orders = {} + + # 启动调度器 + self._start_scheduler() + + # 记录策略期望持仓状态 + # 格式: {strategy_name: {code: target_amount}} + self.strategy_targets = {} + + logger.info("实盘交易管理器初始化完成") + + def _start_scheduler(self): + """启动定时任务调度器""" + # 每分钟检查一次未完成订单状态并处理 + schedule.every(1).minutes.do(self.check_pending_orders) + + # 每天收盘后清理过期未完成订单 + schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(self.clean_expired_orders) + + # 启动调度线程 + def run_scheduler(): + while True: + try: + schedule.run_pending() + time.sleep(10) + except Exception as e: + logger.error(f"调度器运行错误: {str(e)}") + + scheduler_thread = threading.Thread(target=run_scheduler) + scheduler_thread.daemon = True + scheduler_thread.start() + logger.info("交易管理器调度器已启动") + + def place_order(self, strategy_name, code, direction, amount, price, order_type='limit'): + """下单接口,处理买入/卖出请求 + + Args: + strategy_name: 策略名称 + code: 股票代码 + direction: 交易方向 'buy'或'sell' + amount: 交易数量 + price: 交易价格 + order_type: 订单类型,'limit'表示限价单,'market'表示市价单 + + Returns: + dict: 包含订单ID和状态信息 + """ + if not strategy_name or not code or not direction: + logger.error("下单参数不完整") + return {"success": False, "error": "参数不完整"} + + # 检查交易方向 + if direction not in ['buy', 'sell']: + logger.error(f"无效的交易方向: {direction}") + return {"success": False, "error": "无效的交易方向"} + + try: + # 检查资金和持仓是否足够 + if not self._check_order_feasibility(code, direction, amount, price): + logger.warning(f"资金或持仓不足,忽略订单: {direction} {code} {amount}股") + return {"success": False, "error": "资金或持仓不足"} + + # 更新策略目标持仓 + self._update_strategy_target(strategy_name, code, direction, amount) + + # 执行实际下单 + price_type = xtconstant.FIX_PRICE if order_type == 'limit' else xtconstant.MARKET_BEST + + # 下单 + if direction == 'buy': + result = self.trader.buy(code, price, amount) + else: + result = self.trader.sell(code, price, amount) + + order_id = result.get('order_id') + if not order_id or order_id == 'simulation': + logger.error(f"下单失败: {result}") + return {"success": False, "error": "下单失败"} + + # 记录到待处理订单 + self.pending_orders[order_id] = { + 'strategy_name': strategy_name, + 'code': code, + 'direction': direction, + 'target_amount': amount, + 'price': price, + 'status': 'pending', + 'create_time': time.time(), + 'last_check_time': time.time(), + 'retry_count': 0, + 'order_type': order_type + } + + # 添加到策略持仓管理器中的未完成委托 + StrategyPositionManager.add_pending_order( + self.trader, + order_id, + strategy_name, + code, + price, + amount, + direction + ) + + logger.info(f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}") + + # 立即更新一次订单状态 + self._update_order_status(order_id) + + return {"success": True, "order_id": order_id} + + except Exception as e: + logger.error(f"下单过程发生异常: {str(e)}") + return {"success": False, "error": f"下单异常: {str(e)}"} + + def check_pending_orders(self): + """检查所有未完成订单状态并处理,定时任务调用""" + try: + logger.info("开始检查未完成订单...") + + # 更新StrategyPositionManager中的未完成委托状态 + StrategyPositionManager.update_pending_orders(self.trader) + + # 获取最新的委托列表 + entrusts = self.trader.get_today_entrust() + entrust_map = {str(e['order_id']): e for e in entrusts} + + # 检查每个未完成订单 + for order_id, order_info in list(self.pending_orders.items()): + # 跳过已完成的订单 + if order_info['status'] in ['completed', 'cancelled', 'failed']: + continue + + # 更新订单状态 + self._update_order_status(order_id, entrust_map) + + # 处理超时未成交或部分成交的订单 + current_time = time.time() + order_age = current_time - order_info['create_time'] + + # 如果订单超过配置的超时时间且状态仍为pending或partial + if order_age > Config.RTM_ORDER_TIMEOUT and order_info['status'] in ['pending', 'partial']: + # 记录超时信息 + logger.warning(f"订单已超时({order_age:.0f}秒 > {Config.RTM_ORDER_TIMEOUT}秒): ID={order_id}, 代码={order_info['code']}, 状态={order_info['status']}") + + # 如果是部分成交,记录详情 + if order_info['status'] == 'partial' and 'traded_volume' in order_info: + original = order_info['target_amount'] + traded = order_info['traded_volume'] + remaining = original - traded + logger.info(f"订单部分成交详情: ID={order_id}, 原始数量={original}, 已成交={traded}, 剩余={remaining}") + + self._handle_timeout_order(order_id, order_info) + + # 同步策略持仓和实际持仓 + self._sync_strategy_positions() + + logger.info("未完成订单检查完毕") + + except Exception as e: + logger.error(f"检查未完成订单时发生异常: {str(e)}") + + def _update_order_status(self, order_id, entrust_map=None): + """更新单个订单状态 + + Args: + order_id: 订单ID + entrust_map: 可选的委托字典,如果为None则重新获取 + """ + if order_id not in self.pending_orders: + return + + try: + # 如果没有提供委托字典,则获取当前委托 + if entrust_map is None: + entrusts = self.trader.get_today_entrust() + entrust_map = {str(e['order_id']): e for e in entrusts} + + # 查找对应的委托记录 + entrust = entrust_map.get(str(order_id)) + + if entrust: + # 获取订单之前的状态,用于判断是否发生变化 + previous_status = self.pending_orders[order_id].get('status') + previous_volume = self.pending_orders[order_id].get('traded_volume', 0) + + # 更新最后检查时间 + self.pending_orders[order_id]['last_check_time'] = time.time() + + # 根据委托状态更新订单状态 + if entrust['order_status'] == xtconstant.ORDER_SUCCEEDED: + # 全部成交 + self.pending_orders[order_id]['status'] = 'completed' + # 记录状态变化 + if previous_status != 'completed': + logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=completed, 成交量={entrust.get('traded_volume', 0)}") + + elif entrust['order_status'] == xtconstant.ORDER_PART_SUCC: + # 部分成交 + self.pending_orders[order_id]['status'] = 'partial' + current_volume = entrust.get('traded_volume', 0) + self.pending_orders[order_id]['traded_volume'] = current_volume + + # 如果成交量有变化,记录日志 + if current_volume != previous_volume: + target_amount = self.pending_orders[order_id]['target_amount'] + logger.info(f"订单部分成交更新: ID={order_id}, 代码={entrust['stock_code']}, 目标数量={target_amount}, 已成交数量={current_volume}, 剩余数量={target_amount - current_volume}") + + elif entrust['order_status'] in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]: + # 已撤单或废单 + self.pending_orders[order_id]['status'] = 'cancelled' + # 记录状态变化 + if previous_status != 'cancelled': + logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=cancelled, 原因={entrust.get('err_msg', '未知原因')}") + + elif entrust['order_status'] == xtconstant.ORDER_UNREPORTED: + # 未报 + self.pending_orders[order_id]['status'] = 'pending' + if previous_status != 'pending': + logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(未报)") + + elif entrust['order_status'] == xtconstant.ORDER_WAIT_REPORTING: + # 待报 + self.pending_orders[order_id]['status'] = 'pending' + if previous_status != 'pending': + logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(待报)") + + elif entrust['order_status'] == xtconstant.ORDER_REPORTED: + # 已报 + self.pending_orders[order_id]['status'] = 'pending' + if previous_status != 'pending': + logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(已报)") + else: + # 委托列表中找不到该订单,可能已经太久 + current_time = time.time() + if current_time - self.pending_orders[order_id]['create_time'] > 24 * 60 * 60: + previous_status = self.pending_orders[order_id].get('status') + self.pending_orders[order_id]['status'] = 'failed' + logger.warning(f"订单状态未知且过期: ID={order_id}, 旧状态={previous_status}, 新状态=failed, 创建时长={(current_time - self.pending_orders[order_id]['create_time'])/3600:.1f}小时") + + except Exception as e: + logger.error(f"更新订单状态时发生异常: order_id={order_id}, error={str(e)}") + + def _handle_timeout_order(self, order_id, order_info): + """处理超时或部分成交的订单 + + Args: + order_id: 订单ID + order_info: 订单信息字典 + """ + try: + # 首先尝试撤单 + logger.info(f"尝试撤销超时订单: ID={order_id}, 代码={order_info['code']}, 超时时间={(time.time() - order_info['create_time']):.0f}秒") + cancel_result = self.trader.cancel(order_id) + + # 记录撤单结果 + if isinstance(cancel_result, dict): + result_str = json.dumps(cancel_result) + else: + result_str = str(cancel_result) + logger.info(f"撤单结果: ID={order_id}, 结果={result_str}") + + # 计算未成交数量 + original_amount = order_info['target_amount'] + traded_amount = order_info.get('traded_volume', 0) + remaining_amount = original_amount - traded_amount + + # 记录详细的成交情况 + logger.info(f"订单成交情况: ID={order_id}, 代码={order_info['code']}, 原始数量={original_amount}, 已成交={traded_amount}, 剩余={remaining_amount}") + + # 如果有未成交的部分,使用市价单补充交易 + if remaining_amount > 0: + # 递增重试计数 + order_info['retry_count'] += 1 + + logger.info(f"准备使用市价单补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}") + + # 如果重试次数少于最大重试次数,则使用市价单补单 + if order_info['retry_count'] <= Config.RTM_MAX_RETRIES: + # 使用市价单 + new_order = self.place_order( + order_info['strategy_name'], + order_info['code'], + order_info['direction'], + remaining_amount, + 0, # 市价单价格参数无效 + 'market' # 使用市价单 + ) + + if new_order.get('success'): + logger.info(f"市价补单成功: 原订单ID={order_id}, 新订单ID={new_order['order_id']}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}") + else: + logger.error(f"市价补单失败: 原订单ID={order_id}, 错误={new_order.get('error')}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}") + else: + logger.warning(f"订单重试次数过多,不再尝试: ID={order_id}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 代码={order_info['code']}, 方向={order_info['direction']}, 未成交数量={remaining_amount}") + else: + logger.info(f"订单已全部成交,无需补单: ID={order_id}, 代码={order_info['code']}, 成交数量={traded_amount}") + + # 更新原订单状态 + previous_status = order_info['status'] + order_info['status'] = 'cancelled' + logger.info(f"更新原订单状态: ID={order_id}, 旧状态={previous_status}, 新状态=cancelled") + + except Exception as e: + logger.error(f"处理超时订单时发生异常: order_id={order_id}, error={str(e)}") + + def _check_order_feasibility(self, code, direction, amount, price): + """检查订单是否可行(资金或持仓是否足够) + + Args: + code: 股票代码 + direction: 交易方向 + amount: 交易数量 + price: 交易价格 + + Returns: + bool: 订单是否可行 + """ + try: + if direction == 'buy': + # 检查资金是否足够 + balance = self.trader.get_balance() + if not balance: + logger.error("获取账户余额失败") + return False + + # 计算所需资金(加上3%的手续费作为缓冲) + required_cash = price * amount * 1.03 + available_cash = balance.get('cash', 0) + + if required_cash > available_cash: + logger.warning(f"资金不足: 需要 {required_cash:.2f}, 可用 {available_cash:.2f}") + return False + + return True + + elif direction == 'sell': + # 检查持仓是否足够 + positions = self.trader.get_positions() + position = next((p for p in positions if p.get('stock_code') == code), None) + + if not position: + logger.warning(f"没有持仓: {code}") + return False + + available_volume = position.get('can_use_volume', 0) + + if amount > available_volume: + logger.warning(f"可用持仓不足: 需要 {amount}, 可用 {available_volume}") + return False + + return True + + return False + + except Exception as e: + logger.error(f"检查订单可行性时发生异常: {str(e)}") + return False + + def _update_strategy_target(self, strategy_name, code, direction, amount): + """更新策略目标持仓 + + Args: + strategy_name: 策略名称 + code: 股票代码 + direction: 交易方向 + amount: 交易数量 + """ + # 确保策略存在于目标字典中 + if strategy_name not in self.strategy_targets: + self.strategy_targets[strategy_name] = {} + + # 确保股票代码存在于策略目标中 + if code not in self.strategy_targets[strategy_name]: + self.strategy_targets[strategy_name][code] = 0 + + # 根据交易方向更新目标持仓 + if direction == 'buy': + self.strategy_targets[strategy_name][code] += amount + else: # sell + self.strategy_targets[strategy_name][code] -= amount + # 避免负数持仓 + if self.strategy_targets[strategy_name][code] < 0: + self.strategy_targets[strategy_name][code] = 0 + + logger.info(f"更新策略目标持仓: 策略={strategy_name}, 代码={code}, 目标持仓={self.strategy_targets[strategy_name][code]}") + + def _sync_strategy_positions(self): + """同步策略持仓和实际持仓""" + try: + # 获取实际持仓 + actual_positions = self.trader.get_positions() + position_map = {p['stock_code']: p for p in actual_positions} + + # 遍历每个策略的目标持仓 + for strategy_name, targets in self.strategy_targets.items(): + # 该策略的实际持仓映射 + strategy_actual_positions = {} + + # 遍历该策略的目标持仓 + for code, target_amount in targets.items(): + # 获取股票的实际持仓 + actual_position = position_map.get(code, {}) + actual_amount = actual_position.get('volume', 0) + + if actual_amount > 0: + strategy_actual_positions[code] = actual_amount + + # 更新策略持仓管理器中的持仓记录 + StrategyPositionManager.update_strategy_position( + self.trader, + strategy_name, + code, + 'sync', # 使用同步模式 + actual_amount + ) + + # 检查是否需要调整持仓 + if actual_amount != target_amount: + diff = target_amount - actual_amount + if diff != 0: + logger.warning(f"持仓不一致: 策略={strategy_name}, 代码={code}, 目标={target_amount}, 实际={actual_amount}") + + # 记录日志 + logger.info(f"策略 {strategy_name} 的目标持仓: {targets}") + logger.info(f"策略 {strategy_name} 的实际持仓: {strategy_actual_positions}") + + except Exception as e: + logger.error(f"同步策略持仓时发生异常: {str(e)}") + + def clean_expired_orders(self): + """清理过期的未完成订单""" + try: + current_time = time.time() + for order_id, order_info in list(self.pending_orders.items()): + # 如果订单创建时间超过24小时 + if current_time - order_info['create_time'] > 24 * 60 * 60: + if order_info['status'] not in ['completed', 'cancelled', 'failed']: + logger.warning(f"清理过期订单: ID={order_id}, 状态={order_info['status']}") + order_info['status'] = 'expired' + except Exception as e: + logger.error(f"清理过期订单时发生异常: {str(e)}") + + def get_pending_orders(self): + """获取所有未完成订单 + + Returns: + list: 未完成订单列表 + """ + return [{ + 'order_id': order_id, + **order_info + } for order_id, order_info in self.pending_orders.items()] + + def get_strategy_targets(self): + """获取策略目标持仓 + + Returns: + dict: 策略目标持仓 + """ + return self.strategy_targets + +# 单例模式实现 +_instance = None + +def get_real_trader_manager(): + """获取实盘交易管理器单例实例 + + Returns: + RealTraderManager: 实盘交易管理器实例 + """ + global _instance + if _instance is None: + # 从trade_server获取实盘交易实例 + from trade_server import get_real_trader + trader = get_real_trader() + _instance = RealTraderManager(trader) + return _instance \ No newline at end of file diff --git a/src/strategy_position_manager.py b/src/strategy_position_manager.py index cb1226d..f623b02 100644 --- a/src/strategy_position_manager.py +++ b/src/strategy_position_manager.py @@ -196,28 +196,33 @@ class StrategyPositionManager: if os.path.exists('strategy_data.json'): with open('strategy_data.json', 'r') as f: data = json.load(f) - # 兼容旧版数据格式 - if 'positions' in data and not isinstance(data['positions'].get('real', None), dict): - # 旧版数据,将其迁移到新结构 - strategy_positions = { - 'real': data.get('positions', {}), - 'simulation': {} - } - strategy_trades = { - 'real': data.get('trades', {}), - 'simulation': {} - } - pending_orders = { - 'real': data.get('pending_orders', {}), - 'simulation': {} - } - else: - # 新版数据结构 - strategy_positions = data.get('positions', {'real': {}, 'simulation': {}}) - strategy_trades = data.get('trades', {'real': {}, 'simulation': {}}) - pending_orders = data.get('pending_orders', {'real': {}, 'simulation': {}}) + # 直接使用新版数据结构,不再兼容旧版格式 + strategy_positions = data.get('positions', {'real': {}, 'simulation': {}}) + strategy_trades = data.get('trades', {'real': {}, 'simulation': {}}) + pending_orders = data.get('pending_orders', {'real': {}, 'simulation': {}}) + + # 确保数据结构完整 + if 'real' not in strategy_positions: + strategy_positions['real'] = {} + if 'simulation' not in strategy_positions: + strategy_positions['simulation'] = {} + if 'real' not in strategy_trades: + strategy_trades['real'] = {} + if 'simulation' not in strategy_trades: + strategy_trades['simulation'] = {} + if 'real' not in pending_orders: + pending_orders['real'] = {} + if 'simulation' not in pending_orders: + pending_orders['simulation'] = {} + + logger.info("已加载策略数据") + logger.info(f"实盘策略数: {len(strategy_positions['real'])}, 模拟策略数: {len(strategy_positions['simulation'])}") except Exception as e: logger.error(f"加载策略数据失败: {str(e)}") + # 初始化空数据结构 + strategy_positions = {'real': {}, 'simulation': {}} + strategy_trades = {'real': {}, 'simulation': {}} + pending_orders = {'real': {}, 'simulation': {}} @staticmethod def save_strategy_data(): diff --git a/src/trade_server.py b/src/trade_server.py index d99dde2..bad1c9b 100644 --- a/src/trade_server.py +++ b/src/trade_server.py @@ -11,6 +11,7 @@ from simulation_trader import SimulationTrader import datetime from strategy_position_manager import StrategyPositionManager from logger_config import get_logger +from real_trader_manager import get_real_trader_manager # 获取日志记录器 logger = get_logger('server') @@ -47,25 +48,46 @@ def get_real_trader(): _real_trader_instance.login() return _real_trader_instance -# 获取交易实例 -def get_trader(use_sim_trader=False): - """获取交易实例 - 采用单例模式 +# 判断当前是否应该使用模拟交易 +def should_use_simulation(): + """判断是否应该使用模拟交易 - Args: - use_sim_trader (bool): 是否强制使用模拟交易,True表示必定返回模拟交易实例 - Returns: - 返回交易实例,根据参数和配置决定是模拟交易还是实盘交易 + tuple: (should_simulate: bool, simulation_reason: str) + should_simulate: 是否应该使用模拟交易 + simulation_reason: 使用模拟交易的原因 """ - # 如果强制使用模拟交易,返回模拟交易单例 - if use_sim_trader: - return get_sim_trader() - - # 如果配置为仅模拟交易,返回模拟交易单例 + # 如果配置为仅模拟交易,返回True if Config.SIMULATION_ONLY: - return get_sim_trader() + return True, "配置为仅模拟交易" - # 判断当前是否为交易时间 + # 判断当前是否为交易日(只基于日期,不考虑时间) + now = datetime.datetime.now() + + # 尝试导入chinese_calendar判断是否为交易日 + try: + from chinese_calendar import is_workday, is_holiday + is_trading_day = is_workday(now) and not is_holiday(now) + except ImportError: + # 如果无法导入chinese_calendar,则简单地用工作日判断 + is_trading_day = now.weekday() < 5 # 0-4 为周一至周五 + + # 如果不是交易日,返回True(使用模拟交易) + if not is_trading_day: + return True, f"当前非交易日 - {now.date()}" + + # 如果是交易日,无论是否在交易时间,都返回False(使用实盘) + return False, "" + +# 判断当前是否在交易时间内 +def is_trading_hours(): + """判断当前是否在交易时间内 + + Returns: + tuple: (is_trading: bool, message: str) + is_trading: 是否在交易时间 + message: 相关信息 + """ now = datetime.datetime.now() current_time = now.time() @@ -77,21 +99,40 @@ def get_trader(use_sim_trader=False): is_trading_hour = (morning_start <= current_time <= morning_end) or (afternoon_start <= current_time <= afternoon_end) - # 尝试导入chinese_calendar判断是否为交易日 - try: - from chinese_calendar import is_workday, is_holiday - is_trading_day = is_workday(now) and not is_holiday(now) - except ImportError: - # 如果无法导入chinese_calendar,则简单地用工作日判断 - is_trading_day = now.weekday() < 5 # 0-4 为周一至周五 + if is_trading_hour: + return True, "" + else: + return False, f"当前非交易时段 - 时间: {current_time.strftime('%H:%M:%S')}" + +# 获取交易实例 - 根据情况返回模拟或实盘交易实例 +def get_trader(): + """获取交易实例 - 根据当前状态决定返回模拟还是实盘交易实例 - # 如果不是交易日或不在交易时间内,返回模拟交易单例 - if not is_trading_day or not is_trading_hour: - logger.info(f"当前非交易时段 - 日期: {now.date()}, 时间: {current_time}, 使用模拟交易") + Returns: + 返回交易实例,根据配置和当前时间决定是模拟交易还是实盘交易 + """ + should_simulate, _ = should_use_simulation() + if should_simulate: return get_sim_trader() + else: + return get_real_trader() + +# 获取指定类型的交易实例 - 供内部API查询等使用 +def get_trader_by_type(trader_type='auto'): + """获取指定类型的交易实例 - # 否则返回真实交易单例 - return get_real_trader() + Args: + trader_type: 'simulation'=模拟交易, 'real'=实盘交易, 'auto'=自动判断 + + Returns: + 指定类型的交易实例 + """ + if trader_type == 'simulation': + return get_sim_trader() + elif trader_type == 'real': + return get_real_trader() + else: # 'auto' + return get_trader() def run_daily(time_str, job_func): """设置每天在指定时间运行的任务 @@ -135,7 +176,6 @@ atexit.register(StrategyPositionManager.save_strategy_data) # 使用配置文件中的时间 run_daily(Config.MARKET_OPEN_TIME, lambda: get_trader().login()) -run_daily(Config.MARKET_ACTIVE_TIME, lambda: get_trader().get_balance()) run_daily(Config.MARKET_CLOSE_TIME, lambda: get_trader().logout()) @@ -144,36 +184,10 @@ def health_check(): return "ok", 200 -def should_use_simulation(): - """判断是否应该使用模拟交易 - - Returns: - tuple: (should_simulate: bool, simulation_reason: str) - should_simulate: 是否应该使用模拟交易 - simulation_reason: 使用模拟交易的原因 - """ - # 直接使用get_trader()返回的实例类型判断 - trader = get_trader() - - if isinstance(trader, SimulationTrader): - # 获取原因 - if Config.SIMULATION_ONLY: - return True, "配置为仅模拟交易" - else: - now = datetime.datetime.now() - return True, f"当前非交易时段 - {now.strftime('%Y-%m-%d %H:%M:%S')}" - - # 如果是实盘交易实例 - return False, "" - - @app.route("/yu/buy", methods=["POST"]) def buy(): """Buy an item with given parameters.""" logger.info("Received buy request") - # 每次操作前更新未完成委托状态 - current_trader = get_trader() - StrategyPositionManager.update_pending_orders(current_trader) # Get data from request body data = request.get_json() @@ -195,12 +209,13 @@ def buy(): # 检查是否需要模拟交易 should_simulate, simulation_reason = should_use_simulation() + # 自动判断需要使用模拟交易 if should_simulate: # 使用模拟交易 logger.info(f"使用模拟交易 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}") # 获取模拟交易实例并执行买入操作 - sim_trader = get_trader(True) + sim_trader = get_sim_trader() result = sim_trader.buy(code, price, amount) # 如果指定了策略名称,记录到策略持仓 @@ -208,62 +223,26 @@ def buy(): # 模拟交易立即生效,更新策略持仓 StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'buy', amount) - return jsonify({"success": True, "data": result}), 200 + return jsonify({"success": True, "data": result, "simulation": True}), 200 + + # 检查是否在交易时间内 + trading_hours, hours_message = is_trading_hours() + if not trading_hours: + logger.warning(f"实盘交易失败 - {hours_message} - 代码: {code}, 价格: {price}, 数量: {amount}") + return jsonify({"success": False, "error": f"交易失败: {hours_message},非交易时间不能实盘交易"}), 400 - # 尝试实盘交易 - logger.info(f"Executing buy order: code={code}, price={price}, amount={amount}, strategy_name={strategy_name}") - try: - result = execute_with_timeout(current_trader.buy, Config.TRADE_TIMEOUT, code, price, amount) - if result is None: - # 超时时使用模拟交易 - logger.warning(f"Buy order timeout after {Config.TRADE_TIMEOUT} seconds, switching to simulation mode") - - # 创建模拟交易实例并执行买入操作 - sim_trader = get_trader(True) - result = sim_trader.buy(code, price, amount) - - # 如果指定了策略名称,记录到策略持仓 - if strategy_name: - # 超时情况下,使用模拟交易,立即更新策略持仓 - StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'buy', amount) - - return jsonify({"success": True, "data": result}), 200 + # 使用RealTraderManager执行实盘交易 + logger.info(f"使用RealTraderManager执行买入: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}") + rtm = get_real_trader_manager() + result = rtm.place_order(strategy_name, code, 'buy', amount, price) + + if result.get('success'): + logger.info(f"RealTraderManager买入成功: {result}") + return jsonify({"success": True, "data": result, "simulation": False}), 200 + else: + logger.error(f"RealTraderManager买入失败: {result.get('error')}") + return jsonify({"success": False, "error": result.get('error')}), 400 - # 如果指定了策略名称且是真实交易 - if strategy_name and 'order_id' in result and result['order_id'] != 'simulation': - order_id = result['order_id'] - - # 添加到未完成委托 - StrategyPositionManager.add_pending_order( - current_trader, - order_id, - strategy_name, - code, - price, - amount, - 'buy' - ) - - # 注意:不在这里调用update_strategy_position - # 持仓更新将由update_pending_orders函数处理 - # 这避免了持仓更新的冗余操作 - - logger.info(f"Buy order result: {result}") - return jsonify({"success": True, "data": result}), 200 - except Exception as e: - # 发生错误时使用模拟交易 - logger.error(f"Buy order failed: {str(e)}, switching to simulation mode") - - # 创建模拟交易实例并执行买入操作 - sim_trader = get_trader(True) - result = sim_trader.buy(code, price, amount) - - # 如果指定了策略名称,记录到策略持仓 - if strategy_name: - # 错误情况下,使用模拟交易,立即更新策略持仓 - StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'buy', amount) - - return jsonify({"success": True, "data": result}), 200 except ValueError as e: logger.error(f"Invalid request parameters: {str(e)}") abort(400, description=str(e)) @@ -276,9 +255,6 @@ def buy(): def sell(): """Sell an item with given parameters.""" logger.info("Received sell request") - # 每次操作前更新未完成委托状态 - current_trader = get_trader() - StrategyPositionManager.update_pending_orders(current_trader) # Get data from request body data = request.get_json() @@ -300,12 +276,13 @@ def sell(): # 检查是否需要模拟交易 should_simulate, simulation_reason = should_use_simulation() + # 自动判断需要使用模拟交易 if should_simulate: # 使用模拟交易 logger.info(f"使用模拟交易 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}") # 获取模拟交易实例并执行卖出操作 - sim_trader = get_trader(True) + sim_trader = get_sim_trader() result = sim_trader.sell(code, price, amount) # 如果指定了策略名称,记录到策略持仓 @@ -313,58 +290,26 @@ def sell(): # 模拟交易下,使用简单更新模式 StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'sell', amount) - return jsonify({"success": True, "data": result}), 200 + return jsonify({"success": True, "data": result, "simulation": True}), 200 - # 尝试实盘交易 - logger.info(f"Executing sell order: code={code}, price={price}, amount={amount}, strategy_name={strategy_name}") - try: - result = execute_with_timeout(current_trader.sell, Config.TRADE_TIMEOUT, code, price, amount) - if result is None: - # 超时时使用模拟交易 - logger.warning(f"Sell order timeout after {Config.TRADE_TIMEOUT} seconds, switching to simulation mode") - - # 创建模拟交易实例并执行卖出操作 - sim_trader = get_trader(True) - result = sim_trader.sell(code, price, amount) - - # 如果指定了策略名称,记录到策略持仓 - if strategy_name: - # 超时情况下,使用简单更新模式 - StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'sell', amount) - - return jsonify({"success": True, "data": result}), 200 + # 检查是否在交易时间内 + trading_hours, hours_message = is_trading_hours() + if not trading_hours: + logger.warning(f"实盘交易失败 - {hours_message} - 代码: {code}, 价格: {price}, 数量: {amount}") + return jsonify({"success": False, "error": f"交易失败: {hours_message},非交易时间不能实盘交易"}), 400 - # 如果指定了策略名称,记录到未完成委托 - if strategy_name and 'order_id' in result and result['order_id'] != 'simulation': - order_id = result['order_id'] - - # 添加到未完成委托 - StrategyPositionManager.add_pending_order( - current_trader, - order_id, - strategy_name, - code, - price, - amount, - 'sell' - ) + # 使用RealTraderManager执行实盘交易 + logger.info(f"使用RealTraderManager执行卖出: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}") + rtm = get_real_trader_manager() + result = rtm.place_order(strategy_name, code, 'sell', amount, price) + + if result.get('success'): + logger.info(f"RealTraderManager卖出成功: {result}") + return jsonify({"success": True, "data": result, "simulation": False}), 200 + else: + logger.error(f"RealTraderManager卖出失败: {result.get('error')}") + return jsonify({"success": False, "error": result.get('error')}), 400 - logger.info(f"Sell order result: {result}") - return jsonify({"success": True, "data": result}), 200 - except Exception as e: - # 发生错误时使用模拟交易 - logger.error(f"Sell order failed: {str(e)}, switching to simulation mode") - - # 创建模拟交易实例并执行卖出操作 - sim_trader = get_trader(True) - result = sim_trader.sell(code, price, amount) - - # 如果指定了策略名称,记录到策略持仓 - if strategy_name: - # 错误情况下,使用简单更新模式 - StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'sell', amount) - - return jsonify({"success": True, "data": result}), 200 except ValueError as e: logger.error(f"Invalid request parameters: {str(e)}") abort(400, description=str(e)) @@ -377,15 +322,52 @@ def sell(): def cancel(entrust_no): logger.info(f"Received cancel request for entrust_no={entrust_no}") try: - current_trader = get_trader() - result = current_trader.cancel(entrust_no) - logger.info(f"Cancel result: {result}") + # 检查是否为模拟交易 + should_simulate, simulation_reason = should_use_simulation() - # 更新未完成委托状态 - StrategyPositionManager.update_pending_orders(current_trader) + if should_simulate: + # 模拟交易 + sim_trader = get_sim_trader() + result = sim_trader.cancel(entrust_no) + logger.info(f"模拟交易撤单结果: {result}") + + # 更新未完成委托状态 + StrategyPositionManager.update_pending_orders(sim_trader) + return jsonify({"success": True, "data": result, "simulation": True}), 200 + else: + # 尝试使用RealTraderManager撤单 + try: + rtm = get_real_trader_manager() + for order in rtm.get_pending_orders(): + if str(order['order_id']) == str(entrust_no): + # 找到对应订单,使用RealTraderManager处理 + real_trader = get_real_trader() + result = real_trader.cancel(entrust_no) + logger.info(f"实盘交易撤单结果: {result}") + + # 更新订单状态 + rtm.check_pending_orders() + return jsonify({"success": True, "data": result, "simulation": False}), 200 + + # 如果RealTraderManager中找不到,则使用普通实盘 + real_trader = get_real_trader() + result = real_trader.cancel(entrust_no) + logger.info(f"实盘交易撤单结果: {result}") + + # 更新未完成委托状态 + StrategyPositionManager.update_pending_orders(real_trader) + return jsonify({"success": True, "data": result, "simulation": False}), 200 + except Exception as e: + logger.error(f"使用RealTraderManager撤单失败: {str(e)}") + # 回退到普通方式 + real_trader = get_real_trader() + result = real_trader.cancel(entrust_no) + logger.info(f"实盘交易撤单结果: {result}") + + # 更新未完成委托状态 + StrategyPositionManager.update_pending_orders(real_trader) + return jsonify({"success": True, "data": result, "simulation": False}), 200 - response = {"success": True, "data": result} - return jsonify(response), 200 except Exception as e: logger.error(f"Error processing cancel request: {str(e)}") abort(500, description="Internal server error") @@ -396,14 +378,22 @@ def get_balance(): """Get the balance of the account.""" logger.info("Received balance request") try: - current_trader = get_trader() - balance = current_trader.get_balance() - logger.info(f"Balance: {balance}") - - response = {"success": True, "data": balance} - return jsonify(response), 200 + # 判断当前交易模式 + should_simulate, _ = should_use_simulation() + + if should_simulate: + # 模拟交易 + trader = get_sim_trader() + balance = trader.get_balance() + logger.info(f"模拟交易余额: {balance}") + return jsonify({"success": True, "data": balance, "simulation": True}), 200 + else: + # 实盘交易 + trader = get_real_trader() + balance = trader.get_balance() + logger.info(f"实盘交易余额: {balance}") + return jsonify({"success": True, "data": balance, "simulation": False}), 200 except Exception as e: - print(e) logger.error(f"Error processing balance request: {str(e)}") abort(500, description="Internal server error") @@ -412,18 +402,36 @@ def get_balance(): def get_positions(): """Get the positions of the account.""" logger.info("Received positions request") - # 每次查询前更新未完成委托状态 - current_trader = get_trader() - StrategyPositionManager.update_pending_orders(current_trader) try: - # 获取查询参数中的策略名称 + # 获取查询参数 strategy_name = request.args.get("strategy_name", "") - # 使用StrategyPositionManager获取持仓信息 - result = StrategyPositionManager.get_strategy_positions(current_trader, strategy_name if strategy_name else None) + # 判断当前交易模式 + should_simulate, _ = should_use_simulation() - return jsonify({"success": True, "data": result}), 200 + # 选择相应的交易实例 + trader = get_sim_trader() if should_simulate else get_real_trader() + + # 更新未完成委托状态 + StrategyPositionManager.update_pending_orders(trader) + + # 如果实盘且指定要查询RealTraderManager中的目标持仓 + if not should_simulate and request.args.get("target", "").lower() == "true": + rtm = get_real_trader_manager() + targets = rtm.get_strategy_targets() + + # 如果指定了策略名称 + if strategy_name: + strategy_target = targets.get(strategy_name, {}) + return jsonify({"success": True, "data": {strategy_name: strategy_target}, "simulation": False}), 200 + + return jsonify({"success": True, "data": targets, "simulation": False}), 200 + + # 使用StrategyPositionManager获取持仓信息 + result = StrategyPositionManager.get_strategy_positions(trader, strategy_name if strategy_name else None) + + return jsonify({"success": True, "data": result, "simulation": should_simulate}), 200 except Exception as e: logger.error(f"Error processing positions request: {str(e)}") abort(500, description="Internal server error") @@ -434,12 +442,15 @@ def get_today_trades(): """Get the today's trades of the account.""" logger.info("Received today trades request") try: - current_trader = get_trader() - trades = current_trader.get_today_trades() - logger.info(f"Today trades: {trades}") + # 判断当前交易模式 + should_simulate, _ = should_use_simulation() + + # 选择相应的交易实例 + trader = get_sim_trader() if should_simulate else get_real_trader() + trades = trader.get_today_trades() + logger.info(f"今日成交: {trades}") - response = {"success": True, "data": trades} - return jsonify(response), 200 + return jsonify({"success": True, "data": trades, "simulation": should_simulate}), 200 except Exception as e: logger.error(f"Error processing today trades request: {str(e)}") abort(500, description="Internal server error") @@ -450,12 +461,15 @@ def get_today_entrust(): """Get the today's entrust of the account.""" logger.info("Received today entrust request") try: - current_trader = get_trader() - entrust = current_trader.get_today_entrust() - logger.info(f"Today entrust: {entrust}") + # 判断当前交易模式 + should_simulate, _ = should_use_simulation() + + # 选择相应的交易实例 + trader = get_sim_trader() if should_simulate else get_real_trader() + entrust = trader.get_today_entrust() + logger.info(f"今日委托: {entrust}") - response = {"success": True, "data": entrust} - return jsonify(response), 200 + return jsonify({"success": True, "data": entrust, "simulation": should_simulate}), 200 except Exception as e: logger.error(f"Error processing today entrust request: {str(e)}") abort(500, description="Internal server error") @@ -466,22 +480,34 @@ def clear_strategy(strategy_name): """清除指定策略的持仓管理数据""" logger.info(f"接收到清除策略持仓请求: {strategy_name}") try: - current_trader = get_trader() + # 判断当前交易模式 + should_simulate, _ = should_use_simulation() + + # 如果是实盘模式 + if not should_simulate: + # 先尝试清除RealTraderManager中的策略目标 + rtm = get_real_trader_manager() + if strategy_name in rtm.strategy_targets: + del rtm.strategy_targets[strategy_name] + logger.info(f"已清除RealTraderManager中的策略目标: {strategy_name}") + + # 获取相应的交易实例 + trader = get_sim_trader() if should_simulate else get_real_trader() # 如果是模拟交易实例,则重置模拟交易实例 - if isinstance(current_trader, SimulationTrader): + if should_simulate and isinstance(trader, SimulationTrader): global _sim_trader_instance if _sim_trader_instance is not None: logger.info("重置模拟交易实例") # 创建一个新的模拟交易实例,替换原有实例 _sim_trader_instance = SimulationTrader() - current_trader = _sim_trader_instance + trader = _sim_trader_instance # 使用StrategyPositionManager清除策略 - success, message = StrategyPositionManager.clear_strategy(current_trader, strategy_name) + success, message = StrategyPositionManager.clear_strategy(trader, strategy_name) if success: - return jsonify({"success": True, "message": message}), 200 + return jsonify({"success": True, "message": message, "simulation": should_simulate}), 200 else: abort(400, description=message) @@ -500,6 +526,53 @@ def execute_with_timeout(func, timeout, *args, **kwargs): return None +# 添加新的API端点查询订单状态 +@app.route("/yu/order_status", methods=["GET"]) +def get_order_status(): + """获取订单状态""" + logger.info("Received order status request") + + # 判断当前交易模式 + should_simulate, _ = should_use_simulation() + + if not should_simulate and Config.USE_REAL_TRADER_MANAGER: + # 实盘 + RealTraderManager模式 + rtm = get_real_trader_manager() + pending_orders = rtm.get_pending_orders() + return jsonify({"success": True, "data": pending_orders, "simulation": False}), 200 + else: + # 模拟交易或实盘但未使用RealTraderManager + trader = get_sim_trader() if should_simulate else get_real_trader() + entrusts = trader.get_today_entrust() + return jsonify({"success": True, "data": entrusts, "simulation": should_simulate}), 200 + + +# 添加新的API端点查询策略目标持仓 +@app.route("/yu/strategy_targets", methods=["GET"]) +def get_strategy_targets(): + """获取策略目标持仓""" + logger.info("Received strategy targets request") + + # 获取查询参数 + strategy_name = request.args.get("strategy_name") + + # 检查是否是实盘模式且使用RealTraderManager + should_simulate, _ = should_use_simulation() + + if not should_simulate and Config.USE_REAL_TRADER_MANAGER: + rtm = get_real_trader_manager() + targets = rtm.get_strategy_targets() + + # 如果指定了策略名称,则只返回该策略的目标持仓 + if strategy_name: + strategy_target = targets.get(strategy_name, {}) + return jsonify({"success": True, "data": {strategy_name: strategy_target}, "simulation": False}), 200 + + return jsonify({"success": True, "data": targets, "simulation": False}), 200 + else: + return jsonify({"success": False, "error": "无法获取目标持仓:非实盘模式或RealTraderManager未启用"}), 400 + + if __name__ == "__main__": logger.info(f"Server starting on {Config.HOST}:{Config.PORT}") app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT)