From edc072230e238c2b0ef394d006a1fa0217487d4a Mon Sep 17 00:00:00 2001 From: zhiyong Date: Wed, 30 Apr 2025 16:01:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E5=8F=96=E7=AD=96=E7=95=A5=E4=BB=93?= =?UTF-8?q?=E4=BD=8D=E7=AE=A1=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/strategy_position_manager.py | 330 +++++++++++++++++++++++++++++++ src/trade_server.py | 314 +++++------------------------ 2 files changed, 374 insertions(+), 270 deletions(-) create mode 100644 src/strategy_position_manager.py diff --git a/src/strategy_position_manager.py b/src/strategy_position_manager.py new file mode 100644 index 0000000..5de0482 --- /dev/null +++ b/src/strategy_position_manager.py @@ -0,0 +1,330 @@ +import time +import os +import json +import logging +from simulation_trader import SimulationTrader +from xtquant import xtconstant + +# 策略仓位管理 +strategy_positions = { + 'real': {}, # 存储实盘策略持仓 + 'simulation': {} # 存储模拟交易策略持仓 +} +strategy_trades = { + 'real': {}, # 存储实盘策略交易记录 + 'simulation': {} # 存储模拟交易策略交易记录 +} +pending_orders = { + 'real': {}, # 存储实盘未完成委托 + 'simulation': {} # 存储模拟交易未完成委托 +} + +# 获取日志记录器 +logger = logging.getLogger('trade_server') + +class StrategyPositionManager: + """策略持仓管理器,负责管理不同策略的持仓情况""" + + @staticmethod + def get_trader_type(trader): + """根据交易实例确定交易类型 + + Args: + trader: 交易实例 + + Returns: + str: 'simulation'或'real' + """ + return 'simulation' if isinstance(trader, SimulationTrader) else 'real' + + @staticmethod + def update_strategy_position(trader, strategy_name, code, direction, amount): + """更新策略持仓 + + Args: + trader: 交易实例 + strategy_name: 策略名称 + code: 股票代码 + direction: 'buy'或'sell' + amount: 交易数量 + """ + if not strategy_name: + return + + # 判断交易类型 + trader_type = StrategyPositionManager.get_trader_type(trader) + + # 确保策略在字典中 + if strategy_name not in strategy_positions[trader_type]: + strategy_positions[trader_type][strategy_name] = {} + + try: + # 获取交易实例持仓情况 + actual_positions = trader.get_positions() + code_position = next((pos for pos in actual_positions if pos.get('stock_code') == code), None) + + # 记录实际持仓总量 + actual_total = code_position.get('volume', 0) if code_position else 0 + actual_can_use = code_position.get('can_use_volume', 0) if code_position else 0 + + logger.info(f"实际持仓 - 代码: {code}, 总量: {actual_total}, 可用: {actual_can_use}") + + # 如果股票代码在持仓字典中不存在,初始化它 + if code not in strategy_positions[trader_type][strategy_name]: + strategy_positions[trader_type][strategy_name][code] = { + 'total_amount': 0, + 'closeable_amount': 0 + } + + # 直接使用实际持仓数据更新策略持仓 + strategy_positions[trader_type][strategy_name][code]['total_amount'] = actual_total + strategy_positions[trader_type][strategy_name][code]['closeable_amount'] = actual_can_use + + logger.info(f"更新策略持仓 - 交易类型: {trader_type}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, 总量: {strategy_positions[trader_type][strategy_name][code]['total_amount']}, 可用: {strategy_positions[trader_type][strategy_name][code]['closeable_amount']}") + + except Exception as e: + logger.error(f"获取实际持仓失败: {str(e)}") + # 异常情况下只记录错误,不尝试更新持仓 + + # 移除total_amount为0的持仓 + if code in strategy_positions[trader_type][strategy_name] and strategy_positions[trader_type][strategy_name][code]['total_amount'] <= 0: + del strategy_positions[trader_type][strategy_name][code] + + @staticmethod + def update_pending_orders(trader): + """更新未完成委托状态 + + Args: + trader: 交易实例 + """ + try: + # 判断当前交易类型 + trader_type = StrategyPositionManager.get_trader_type(trader) + + # 获取今日委托 + today_entrusts = trader.get_today_entrust() + + # 更新委托状态 + for order_id, order_info in list(pending_orders[trader_type].items()): + entrust = next((e for e in today_entrusts if e.get('order_id') == order_id), None) + if entrust: + if entrust.get('order_status') in [xtconstant.ORDER_SUCCEEDED, xtconstant.ORDER_PART_SUCC]: + # 成交量计算 + traded_amount = int(entrust.get('traded_volume', 0)) + + # 更新策略持仓 + StrategyPositionManager.update_strategy_position( + trader, + order_info['strategy_name'], + order_info['code'], + order_info['direction'], + traded_amount + ) + + # 如果完全成交,从待处理列表中移除 + if entrust.get('order_status') == xtconstant.ORDER_SUCCEEDED: + del pending_orders[trader_type][order_id] + + # 如果已撤单、废单等终态,也从待处理列表中移除 + elif entrust.get('order_status') in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]: + del pending_orders[trader_type][order_id] + except Exception as e: + logger.error(f"更新未完成委托状态失败: {str(e)}") + + @staticmethod + def add_pending_order(trader, order_id, strategy_name, code, price, amount, direction): + """添加未完成委托 + + Args: + trader: 交易实例 + order_id: 委托编号 + strategy_name: 策略名称 + code: 股票代码 + price: 委托价格 + amount: 委托数量 + direction: 交易方向,'buy'或'sell' + """ + if not order_id or order_id == 'simulation': + return + + # 判断当前交易类型 + trader_type = StrategyPositionManager.get_trader_type(trader) + + # 添加到未完成委托列表 + pending_orders[trader_type][order_id] = { + 'strategy_name': strategy_name, + 'code': code, + 'price': price, + 'amount': amount, + 'direction': direction, + 'created_time': time.time() + } + + # 同时记录到交易历史 + if strategy_name: + if strategy_name not in strategy_trades[trader_type]: + strategy_trades[trader_type][strategy_name] = [] + + strategy_trades[trader_type][strategy_name].append({ + 'time': time.strftime('%Y-%m-%d %H:%M:%S'), + 'type': direction, + 'code': code, + 'price': price, + 'amount': amount, + 'order_id': order_id, + 'status': 'pending' + }) + + logger.info(f"添加未完成委托: {order_id}, 交易类型: {trader_type}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}") + + @staticmethod + def clean_timeout_orders(): + """清理超时委托""" + current_time = time.time() + # 遍历实盘和模拟两种类型的委托 + for trader_type in ['real', 'simulation']: + for order_id, order_info in list(pending_orders[trader_type].items()): + # 超过24小时的委托视为超时 + if current_time - order_info['created_time'] > 24 * 60 * 60: + del pending_orders[trader_type][order_id] + + @staticmethod + def load_strategy_data(): + """加载策略数据""" + global strategy_positions, strategy_trades, pending_orders + try: + if os.path.exists('strategy_data.json'): + with open('strategy_data.json', 'r') as f: + data = json.load(f) + # 兼容旧版数据格式 + if 'positions' in data and not isinstance(data['positions'].get('real', None), dict): + # 旧版数据,将其迁移到新结构 + strategy_positions = { + 'real': data.get('positions', {}), + 'simulation': {} + } + strategy_trades = { + 'real': data.get('trades', {}), + 'simulation': {} + } + pending_orders = { + 'real': data.get('pending_orders', {}), + 'simulation': {} + } + else: + # 新版数据结构 + strategy_positions = data.get('positions', {'real': {}, 'simulation': {}}) + strategy_trades = data.get('trades', {'real': {}, 'simulation': {}}) + pending_orders = data.get('pending_orders', {'real': {}, 'simulation': {}}) + except Exception as e: + logger.error(f"加载策略数据失败: {str(e)}") + + @staticmethod + def save_strategy_data(): + """保存策略数据""" + try: + with open('strategy_data.json', 'w') as f: + json.dump({ + 'positions': strategy_positions, + 'trades': strategy_trades, + 'pending_orders': pending_orders + }, f) + except Exception as e: + logger.error(f"保存策略数据失败: {str(e)}") + + @staticmethod + def get_strategy_positions(trader, strategy_name=None): + """获取策略持仓 + + Args: + trader: 交易实例 + strategy_name: 策略名称,如果为None,返回所有持仓 + + Returns: + 如果strategy_name为None,返回交易实例的所有持仓 + 否则返回指定策略的持仓 + """ + # 判断当前交易类型 + trader_type = StrategyPositionManager.get_trader_type(trader) + + # 如果指定了策略名称,返回该策略的持仓 + if strategy_name: + # 获取真实账户持仓,用于计算可交易量 + real_positions = trader.get_positions() + real_positions_map = {} + for pos in real_positions: + # 使用xt_trader返回的字段名 + if 'stock_code' in pos and 'can_use_volume' in pos: + real_positions_map[pos['stock_code']] = pos + + # 如果该策略没有记录,返回空列表 + if strategy_name not in strategy_positions[trader_type]: + logger.info(f"Strategy {strategy_name} has no positions in {trader_type} mode") + return [] + + # 合并策略持仓和真实持仓的可交易量 + result = [] + for code, pos_info in strategy_positions[trader_type][strategy_name].items(): + # 忽略total_amount为0的持仓 + if pos_info['total_amount'] <= 0: + continue + + # 使用真实账户的可交易量作为策略的可交易量上限 + real_pos = real_positions_map.get(code, {}) + closeable = min(pos_info['total_amount'], real_pos.get('can_use_volume', 0)) + + result.append({ + code: { + 'total_amount': pos_info['total_amount'], + 'closeable_amount': closeable + } + }) + + logger.info(f"Strategy {strategy_name} positions in {trader_type} mode: {result}") + return result + + # 否则返回原始持仓 + positions = trader.get_positions() + logger.info(f"Positions in {trader_type} mode: {positions}") + return positions + + @staticmethod + def clear_strategy(trader, strategy_name): + """清除指定策略的持仓管理数据 + + Args: + trader: 交易实例 + strategy_name: 策略名称 + + Returns: + tuple: (success, message) + success: 是否成功清除 + message: 提示信息 + """ + if not strategy_name: + return False, "缺少策略名称参数" + + # 判断当前交易类型 + trader_type = StrategyPositionManager.get_trader_type(trader) + + # 检查策略是否存在于当前交易类型中 + if strategy_name in strategy_positions[trader_type]: + # 从策略持仓字典中删除该策略 + del strategy_positions[trader_type][strategy_name] + # 清除该策略的交易记录 + if strategy_name in strategy_trades[trader_type]: + del strategy_trades[trader_type][strategy_name] + + # 清除与该策略相关的未完成委托 + for order_id, order_info in list(pending_orders[trader_type].items()): + if order_info.get('strategy_name') == strategy_name: + del pending_orders[trader_type][order_id] + + # 保存更新后的策略数据 + StrategyPositionManager.save_strategy_data() + + logger.info(f"成功清除策略持仓数据: {strategy_name} (交易类型: {trader_type})") + return True, f"成功清除策略 '{strategy_name}' 的持仓数据 (交易类型: {trader_type})" + else: + logger.info(f"策略不存在或没有持仓数据: {strategy_name} (交易类型: {trader_type})") + return True, f"策略 '{strategy_name}' 不存在或没有持仓数据 (交易类型: {trader_type})" \ No newline at end of file diff --git a/src/trade_server.py b/src/trade_server.py index 66e66c0..1c6616f 100644 --- a/src/trade_server.py +++ b/src/trade_server.py @@ -16,11 +16,8 @@ import atexit from simulation_trader import SimulationTrader import datetime from xtquant import xtconstant +from strategy_position_manager import StrategyPositionManager -# 策略仓位管理 -strategy_positions = {} # 存储各策略持仓 -strategy_trades = {} # 存储各策略交易记录 -pending_orders = {} # 存储未完成委托 # 全局交易实例(采用单例模式) _sim_trader_instance = None # 模拟交易实例(单例) @@ -141,165 +138,6 @@ def run_daily(time_str, job_func): schedule.every().day.at(time_str).do(job_func) -# 策略持仓管理辅助函数 -def update_strategy_position(strategy_name, code, direction, amount): - """更新策略持仓 - - Args: - strategy_name: 策略名称 - code: 股票代码 - direction: 'buy'或'sell' - amount: 交易数量 - """ - if not strategy_name: - return - - # 确保策略在字典中 - if strategy_name not in strategy_positions: - strategy_positions[strategy_name] = {} - - try: - # 获取交易实例持仓情况(不论真实还是模拟) - current_trader = get_trader() - # get_trader 已经确保交易实例是登录状态的,无需再次检查 - actual_positions = current_trader.get_positions() - code_position = next((pos for pos in actual_positions if pos.get('stock_code') == code), None) - - # 记录实际持仓总量 - actual_total = code_position.get('volume', 0) if code_position else 0 - actual_can_use = code_position.get('can_use_volume', 0) if code_position else 0 - - logger.info(f"实际持仓 - 代码: {code}, 总量: {actual_total}, 可用: {actual_can_use}") - - # 如果股票代码在持仓字典中不存在,初始化它 - if code not in strategy_positions[strategy_name]: - strategy_positions[strategy_name][code] = { - 'total_amount': 0, - 'closeable_amount': 0 - } - - # 直接使用实际持仓数据更新策略持仓 - strategy_positions[strategy_name][code]['total_amount'] = actual_total - strategy_positions[strategy_name][code]['closeable_amount'] = actual_can_use - - logger.info(f"更新策略持仓 - 策略: {strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, 总量: {strategy_positions[strategy_name][code]['total_amount']}, 可用: {strategy_positions[strategy_name][code]['closeable_amount']}") - - except Exception as e: - logger.error(f"获取实际持仓失败: {str(e)}") - # 异常情况下只记录错误,不尝试更新持仓 - - # 移除total_amount为0的持仓 - if code in strategy_positions[strategy_name] and strategy_positions[strategy_name][code]['total_amount'] <= 0: - del strategy_positions[strategy_name][code] - -def update_pending_orders(): - """更新未完成委托状态""" - try: - # 获取今日委托 - current_trader = get_trader() - # get_trader 已经确保交易实例是登录状态的,无需再次检查 - today_entrusts = current_trader.get_today_entrust() - - # 更新委托状态 - for order_id, order_info in list(pending_orders.items()): - entrust = next((e for e in today_entrusts if e.get('order_id') == order_id), None) - if entrust: - if entrust.get('order_status') in [xtconstant.ORDER_SUCCEEDED, xtconstant.ORDER_PART_SUCC]: - # 成交量计算 - traded_amount = int(entrust.get('traded_volume', 0)) - - # 更新策略持仓 - update_strategy_position( - order_info['strategy_name'], - order_info['code'], - order_info['direction'], - traded_amount - ) - - # 如果完全成交,从待处理列表中移除 - if entrust.get('order_status') == xtconstant.ORDER_SUCCEEDED: - del pending_orders[order_id] - - # 如果已撤单、废单等终态,也从待处理列表中移除 - elif entrust.get('order_status') in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]: - del pending_orders[order_id] - except Exception as e: - logger.error(f"更新未完成委托状态失败: {str(e)}") - -def add_pending_order(order_id, strategy_name, code, price, amount, direction): - """添加未完成委托 - - Args: - order_id: 委托编号 - strategy_name: 策略名称 - code: 股票代码 - price: 委托价格 - amount: 委托数量 - direction: 交易方向,'buy'或'sell' - """ - if not order_id or order_id == 'simulation': - return - - # 添加到未完成委托列表 - pending_orders[order_id] = { - 'strategy_name': strategy_name, - 'code': code, - 'price': price, - 'amount': amount, - 'direction': direction, - 'created_time': time.time() - } - - # 同时记录到交易历史 - if strategy_name: - if strategy_name not in strategy_trades: - strategy_trades[strategy_name] = [] - - strategy_trades[strategy_name].append({ - 'time': time.strftime('%Y-%m-%d %H:%M:%S'), - 'type': direction, - 'code': code, - 'price': price, - 'amount': amount, - 'order_id': order_id, - 'status': 'pending' - }) - - logger.info(f"添加未完成委托: {order_id}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}") - -def clean_timeout_orders(): - """清理超时委托""" - current_time = time.time() - for order_id, order_info in list(pending_orders.items()): - # 超过24小时的委托视为超时 - if current_time - order_info['created_time'] > 24 * 60 * 60: - del pending_orders[order_id] - -def load_strategy_data(): - """加载策略数据""" - global strategy_positions, strategy_trades, pending_orders - try: - if os.path.exists('strategy_data.json'): - with open('strategy_data.json', 'r') as f: - data = json.load(f) - strategy_positions = data.get('positions', {}) - strategy_trades = data.get('trades', {}) - pending_orders = data.get('pending_orders', {}) - except Exception as e: - logger.error(f"加载策略数据失败: {str(e)}") - -def save_strategy_data(): - """保存策略数据""" - try: - with open('strategy_data.json', 'w') as f: - json.dump({ - 'positions': strategy_positions, - 'trades': strategy_trades, - 'pending_orders': pending_orders - }, f) - except Exception as e: - logger.error(f"保存策略数据失败: {str(e)}") - def run_pending_tasks(): while True: try: @@ -319,14 +157,14 @@ get_trader().login() app = Flask(__name__) # 添加策略数据相关的定期任务 -schedule.every().day.at("00:01").do(clean_timeout_orders) # 每天清理超时委托 -schedule.every().day.at("15:30").do(save_strategy_data) # 每天收盘后保存策略数据 +schedule.every().day.at("00:01").do(StrategyPositionManager.clean_timeout_orders) # 每天清理超时委托 +schedule.every().day.at("15:30").do(StrategyPositionManager.save_strategy_data) # 每天收盘后保存策略数据 # 程序启动时加载策略数据 -load_strategy_data() +StrategyPositionManager.load_strategy_data() # 程序退出时保存策略数据 -atexit.register(save_strategy_data) +atexit.register(StrategyPositionManager.save_strategy_data) # 使用配置文件中的时间 run_daily(Config.MARKET_OPEN_TIME, lambda: get_trader().login()) @@ -367,7 +205,8 @@ def buy(): """Buy an item with given parameters.""" logger.info("Received buy request") # 每次操作前更新未完成委托状态 - update_pending_orders() + current_trader = get_trader() + StrategyPositionManager.update_pending_orders(current_trader) # Get data from request body data = request.get_json() @@ -400,15 +239,13 @@ def buy(): # 如果指定了策略名称,记录到策略持仓 if strategy_name: # 模拟交易立即生效,更新策略持仓 - update_strategy_position(strategy_name, code, 'buy', amount) + StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'buy', amount) return jsonify({"success": True, "data": result}), 200 # 尝试实盘交易 logger.info(f"Executing buy order: code={code}, price={price}, amount={amount}, strategy_name={strategy_name}") try: - current_trader = get_trader() - # get_trader 已经确保交易实例是登录状态的,无需再次检查 result = execute_with_timeout(current_trader.buy, Config.TRADE_TIMEOUT, code, price, amount) if result is None: # 超时时使用模拟交易 @@ -421,7 +258,7 @@ def buy(): # 如果指定了策略名称,记录到策略持仓 if strategy_name: # 超时情况下,使用模拟交易,立即更新策略持仓 - update_strategy_position(strategy_name, code, 'buy', amount) + StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'buy', amount) return jsonify({"success": True, "data": result}), 200 @@ -430,7 +267,8 @@ def buy(): order_id = result['order_id'] # 添加到未完成委托 - add_pending_order( + StrategyPositionManager.add_pending_order( + current_trader, order_id, strategy_name, code, @@ -456,7 +294,7 @@ def buy(): # 如果指定了策略名称,记录到策略持仓 if strategy_name: # 错误情况下,使用模拟交易,立即更新策略持仓 - update_strategy_position(strategy_name, code, 'buy', amount) + StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'buy', amount) return jsonify({"success": True, "data": result}), 200 except ValueError as e: @@ -472,7 +310,8 @@ def sell(): """Sell an item with given parameters.""" logger.info("Received sell request") # 每次操作前更新未完成委托状态 - update_pending_orders() + current_trader = get_trader() + StrategyPositionManager.update_pending_orders(current_trader) # Get data from request body data = request.get_json() @@ -505,15 +344,13 @@ def sell(): # 如果指定了策略名称,记录到策略持仓 if strategy_name: # 模拟交易下,使用简单更新模式 - update_strategy_position(strategy_name, code, 'sell', amount) + StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'sell', amount) return jsonify({"success": True, "data": result}), 200 # 尝试实盘交易 logger.info(f"Executing sell order: code={code}, price={price}, amount={amount}, strategy_name={strategy_name}") try: - current_trader = get_trader() - # get_trader 已经确保交易实例是登录状态的,无需再次检查 result = execute_with_timeout(current_trader.sell, Config.TRADE_TIMEOUT, code, price, amount) if result is None: # 超时时使用模拟交易 @@ -526,7 +363,7 @@ def sell(): # 如果指定了策略名称,记录到策略持仓 if strategy_name: # 超时情况下,使用简单更新模式 - update_strategy_position(strategy_name, code, 'sell', amount) + StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'sell', amount) return jsonify({"success": True, "data": result}), 200 @@ -535,7 +372,8 @@ def sell(): order_id = result['order_id'] # 添加到未完成委托 - add_pending_order( + StrategyPositionManager.add_pending_order( + current_trader, order_id, strategy_name, code, @@ -557,7 +395,7 @@ def sell(): # 如果指定了策略名称,记录到策略持仓 if strategy_name: # 错误情况下,使用简单更新模式 - update_strategy_position(strategy_name, code, 'sell', amount) + StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'sell', amount) return jsonify({"success": True, "data": result}), 200 except ValueError as e: @@ -573,15 +411,11 @@ def cancel(entrust_no): logger.info(f"Received cancel request for entrust_no={entrust_no}") try: current_trader = get_trader() - # get_trader 已经确保交易实例是登录状态的,无需再次检查 result = current_trader.cancel(entrust_no) logger.info(f"Cancel result: {result}") - - # 如果取消成功,从pending_orders中移除该订单 - if entrust_no in pending_orders: - order_info = pending_orders[entrust_no] - logger.info(f"从待处理委托中移除已取消订单: {entrust_no}, 代码: {order_info.get('code', 'unknown')}") - del pending_orders[entrust_no] + + # 更新未完成委托状态 + StrategyPositionManager.update_pending_orders(current_trader) response = {"success": True, "data": result} return jsonify(response), 200 @@ -596,7 +430,6 @@ def get_balance(): logger.info("Received balance request") try: current_trader = get_trader() - # get_trader 已经确保交易实例是登录状态的,无需再次检查 balance = current_trader.get_balance() logger.info(f"Balance: {balance}") @@ -613,58 +446,17 @@ def get_positions(): """Get the positions of the account.""" logger.info("Received positions request") # 每次查询前更新未完成委托状态 - update_pending_orders() + current_trader = get_trader() + StrategyPositionManager.update_pending_orders(current_trader) try: # 获取查询参数中的策略名称 strategy_name = request.args.get("strategy_name", "") - # 如果指定了策略名称,返回该策略的持仓 - if strategy_name: - # 获取真实账户持仓,用于计算可交易量 - current_trader = get_trader() - # get_trader 已经确保交易实例是登录状态的,无需再次检查 - real_positions = current_trader.get_positions() - real_positions_map = {} - for pos in real_positions: - # 使用xt_trader返回的字段名 - if 'stock_code' in pos and 'can_use_volume' in pos: - real_positions_map[pos['stock_code']] = pos - - # 如果该策略没有记录,返回空列表 - if strategy_name not in strategy_positions: - logger.info(f"Strategy {strategy_name} has no positions") - return jsonify({"success": True, "data": []}), 200 - - # 合并策略持仓和真实持仓的可交易量 - result = [] - for code, pos_info in strategy_positions[strategy_name].items(): - # 忽略total_amount为0的持仓 - if pos_info['total_amount'] <= 0: - continue - - # 使用真实账户的可交易量作为策略的可交易量上限 - real_pos = real_positions_map.get(code, {}) - closeable = min(pos_info['total_amount'], real_pos.get('can_use_volume', 0)) - - result.append({ - code: { - 'total_amount': pos_info['total_amount'], - 'closeable_amount': closeable - } - }) - - logger.info(f"Strategy {strategy_name} positions: {result}") - return jsonify({"success": True, "data": result}), 200 + # 使用StrategyPositionManager获取持仓信息 + result = StrategyPositionManager.get_strategy_positions(current_trader, strategy_name if strategy_name else None) - # 否则返回原始持仓 - current_trader = get_trader() - # get_trader 已经确保交易实例是登录状态的,无需再次检查 - positions = current_trader.get_positions() - logger.info(f"Positions: {positions}") - - response = {"success": True, "data": positions} - return jsonify(response), 200 + return jsonify({"success": True, "data": result}), 200 except Exception as e: logger.error(f"Error processing positions request: {str(e)}") abort(500, description="Internal server error") @@ -676,7 +468,6 @@ def get_today_trades(): logger.info("Received today trades request") try: current_trader = get_trader() - # get_trader 已经确保交易实例是登录状态的,无需再次检查 trades = current_trader.get_today_trades() logger.info(f"Today trades: {trades}") @@ -693,7 +484,6 @@ def get_today_entrust(): logger.info("Received today entrust request") try: current_trader = get_trader() - # get_trader 已经确保交易实例是登录状态的,无需再次检查 entrust = current_trader.get_today_entrust() logger.info(f"Today entrust: {entrust}") @@ -709,41 +499,25 @@ def clear_strategy(strategy_name): """清除指定策略的持仓管理数据""" logger.info(f"接收到清除策略持仓请求: {strategy_name}") try: - if not strategy_name: - raise ValueError("缺少策略名称参数") - - # 重置模拟交易实例(通过重新创建实例的方式) - global _sim_trader_instance - if _sim_trader_instance is not None: - logger.info("重置模拟交易实例") - # 创建一个新的模拟交易实例,替换原有实例 - _sim_trader_instance = SimulationTrader() - - # 检查策略是否存在 - if strategy_name in strategy_positions: - # 从策略持仓字典中删除该策略 - del strategy_positions[strategy_name] - # 清除该策略的交易记录 - if strategy_name in strategy_trades: - del strategy_trades[strategy_name] - - # 清除与该策略相关的未完成委托 - for order_id, order_info in list(pending_orders.items()): - if order_info.get('strategy_name') == strategy_name: - del pending_orders[order_id] - - # 保存更新后的策略数据 - save_strategy_data() - - logger.info(f"成功清除策略持仓数据: {strategy_name}") - return jsonify({"success": True, "message": f"成功清除策略 '{strategy_name}' 的持仓数据"}), 200 + current_trader = get_trader() + + # 如果是模拟交易实例,则重置模拟交易实例 + if isinstance(current_trader, SimulationTrader): + global _sim_trader_instance + if _sim_trader_instance is not None: + logger.info("重置模拟交易实例") + # 创建一个新的模拟交易实例,替换原有实例 + _sim_trader_instance = SimulationTrader() + current_trader = _sim_trader_instance + + # 使用StrategyPositionManager清除策略 + success, message = StrategyPositionManager.clear_strategy(current_trader, strategy_name) + + if success: + return jsonify({"success": True, "message": message}), 200 else: - logger.info(f"策略不存在或没有持仓数据: {strategy_name}") - return jsonify({"success": True, "message": f"策略 '{strategy_name}' 不存在或没有持仓数据"}), 200 + abort(400, description=message) - except ValueError as e: - logger.error(f"无效的请求参数: {str(e)}") - abort(400, description=str(e)) except Exception as e: logger.error(f"清除策略持仓时出错: {str(e)}") abort(500, description="服务器内部错误")