diff --git a/README.md b/README.md index e8132c7..9b4d12c 100644 --- a/README.md +++ b/README.md @@ -100,3 +100,26 @@ print(response.json()) - 系统默认会根据交易时间自动判断是否使用模拟交易 - 交易日判断基于chinese-calendar库 - 请确保配置正确的交易账号和路径 + + +## design +### strategy position manager +策略仓位管理是用于保存,更新基于策略名的股票仓位, 和未完成订单的 +父类: BasePositionManager +子类: RealPositionManager(放入real 模块), SimulationPositionManager(放入simulation 模块) +position manager 中保存两个字典, positions, pending_orders, key都是策略名 +position manager在trade_server中初始化, 作为参数传入trader +完整的交易流程是: +1. 下单 +用户调用trader下单, trader在发出下单信号的同时添加一个pending_order给position manager +pending_order的结构是{order_id, order_status}, 当order_status是完成状态时, 应该从字典中删除 +下单没有给策略名的, 策略名默认为"default_strategy" +2. 更新pending order状态 +模拟盘立刻全部成交, 在下单后立刻更新仓位, 并删除pending order, 需要打印日志 +实盘由real_trader_manager管理pending order状态, 具体是 + - 下单后立刻尝试更新pending order状态, 比如状态变为部分成交, 全部成交等, 同时更新持仓,并计划一个1分钟后的任务 + - 1分钟后再次更新订单状态, 如果全部成交, 则更新持仓, 否则(部分成交, 无成交), 撤单, 并下一个市价单数量是原先订单数量, 或者补单数量(部分成交) + - 如果下单发生错误, 表示没有成功下单, 则不添加pending order, 也不更新仓位, 即忽略这笔订单, 打印错误日志 +3. 收盘后保存策略持仓(模拟盘, 实盘单独保存) +4. server启动时载入持仓文件 +以上设计基于简洁, 逻辑清晰, 流程简单的思路, 如果有更好的建议, 可以提供 \ No newline at end of file diff --git a/src/position_manager.py b/src/position_manager.py new file mode 100644 index 0000000..288188e --- /dev/null +++ b/src/position_manager.py @@ -0,0 +1,258 @@ +import os +import json +from logger_config import get_logger +from trade_constants import ORDER_DIRECTION_BUY + +# 获取日志记录器 +logger = get_logger('position_manager') + +class PositionManager(): + """实盘策略持仓管理器,负责管理不同策略在实盘环境下的持仓情况""" + + def __init__(self, trade_type): + """初始化实盘持仓管理器""" + super().__init__() + # 策略持仓信息 + self.positions = {} # 策略名 -> {股票代码 -> {total_amount, closeable_amount}} + # 待处理订单信息 + self.pending_orders = {} # order_id -> 订单信息 + self.data_path = trade_type + '_strategy_positions.json' + self.load_data() + + def update_position(self, strategy_name, code, direction, amount): + """更新策略持仓 + + Args: + strategy_name: 策略名称 + code: 股票代码 + direction: 'buy'或'sell' + amount: 交易数量 + """ + if not strategy_name: + return + + # 确保策略在字典中 + if strategy_name not in self.positions: + self.positions[strategy_name] = {} + + # 如果股票代码在持仓字典中不存在,初始化它 + if code not in self.positions[strategy_name]: + self.positions[strategy_name][code] = { + 'total_amount': 0, + 'closeable_amount': 0 + } + + # 根据方向更新持仓 + if direction == ORDER_DIRECTION_BUY: + self.positions[strategy_name][code]['total_amount'] += amount + self.positions[strategy_name][code]['closeable_amount'] += amount + else: # sell + self.positions[strategy_name][code]['total_amount'] -= amount + self.positions[strategy_name][code]['closeable_amount'] -= amount + + logger.info(f"更新策略持仓 - 策略: {strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, " + f"更新后总量: {self.positions[strategy_name][code]['total_amount']}, " + f"可用: {self.positions[strategy_name][code]['closeable_amount']}") + + # 移除total_amount为0的持仓 + if code in self.positions[strategy_name] and self.positions[strategy_name][code]['total_amount'] <= 0: + del self.positions[strategy_name][code] + logger.info(f"移除空持仓 - 策略: {strategy_name}, 代码: {code}") + + def add_pending_order(self, order_id, strategy_name, code, price, amount, direction, order_type='limit'): + """添加未完成委托 + + Args: + order_id: 订单ID + strategy_name: 策略名称 + code: 股票代码 + price: 委托价格 + amount: 委托数量 + direction: 交易方向 + order_type: 订单类型 + """ + # 添加未处理订单 + self.pending_orders[order_id] = { + 'strategy_name': strategy_name, + 'code': code, + 'price': price, + 'target_amount': amount, + 'direction': direction, + 'order_type': order_type, + 'status': 'pending', + 'created_time': self._get_current_time(), + 'retry_count': 0 + } + + logger.info(f"添加未完成委托 - ID: {order_id}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}, " + f"数量: {amount}, 价格: {price}, 类型: {order_type}") + + def update_order_status(self, order_id, new_status): + """更新订单状态 + + Args: + order_id: 订单ID + new_status: 新状态 + + Returns: + bool: 是否成功更新 + """ + if order_id in self.pending_orders: + # 记录之前的状态用于日志 + previous_status = self.pending_orders[order_id].get('status') + + # 更新状态和最后检查时间 + self.pending_orders[order_id]['status'] = new_status + + # 记录状态变化日志 + if previous_status != new_status: + code = self.pending_orders[order_id].get('code') + logger.info(f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}") + + # 如果订单已完成,移除它 + if new_status in ['completed', 'cancelled', 'failed']: + # 保留订单信息以供参考,但标记为已完成 + self.remove_pending_order(order_id) + logger.info(f"订单已删除 - ID: {order_id}, 状态: {new_status}") + + return True + return False + + def remove_pending_order(self, order_id): + """移除未完成委托 + + Args: + order_id: 订单ID + + Returns: + bool: 是否成功移除 + """ + if order_id in self.pending_orders: + del self.pending_orders[order_id] + return True + return False + + def get_pending_order(self, order_id): + """获取未完成委托信息 + + Args: + order_id: 订单ID + + Returns: + dict: 委托信息,如果不存在返回None + """ + return self.pending_orders.get(order_id) + + def get_pending_orders(self): + """获取所有未完成委托 + + Returns: + dict: 订单ID到委托信息的映射 + """ + return self.pending_orders + + def get_positions(self, strategy_name=None): + """获取策略持仓 + + Args: + strategy_name: 策略名称,如果为None,返回所有持仓 + + Returns: + dict: 策略持仓信息 + """ + if strategy_name: + if strategy_name not in self.positions: + return {} + return self.positions[strategy_name] + return self.positions + + def save_data(self): + """保存策略数据""" + try: + with open(self.data_path, 'w') as f: + json.dump({ + 'positions': self.positions, + 'pending_orders': self.pending_orders + }, f) + logger.info("成功保存实盘策略数据") + except Exception as e: + logger.error(f"保存实盘策略数据失败: {str(e)}") + + def load_data(self): + """加载策略数据""" + try: + if os.path.exists(self.data_path): + with open(self.data_path, 'r') as f: + data = json.load(f) + self.positions = data.get('positions', {}) + self.pending_orders = data.get('pending_orders', {}) + + logger.info("已加载实盘策略数据") + logger.info(f"策略数: {len(self.positions)}") + else: + logger.info(f"实盘策略数据文件不存在: {self.data_path}") + self.positions = {} + self.pending_orders = {} + except Exception as e: + logger.error(f"加载实盘策略数据失败: {str(e)}") + # 初始化空数据结构 + self.positions = {} + self.pending_orders = {} + + def _get_current_time(self): + """获取当前时间戳""" + import time + return time.time() + + def clean_timeout_orders(self): + """清理超时未完成订单""" + timeout_limit = 24 * 60 * 60 # 24小时 + current_time = self._get_current_time() + + timeout_orders = [] + for order_id, order_info in list(self.pending_orders.items()): + # 检查是否超时 + if current_time - order_info['created_time'] > timeout_limit: + timeout_orders.append(order_id) + + # 更新状态 + self.update_order_status(order_id, 'failed') + + if timeout_orders: + logger.warn(f"清理超时订单完成,共 {len(timeout_orders)} 个: {', '.join(timeout_orders)}") + + def clear_strategy(self, strategy_name): + """清除指定策略的持仓管理数据 + + Args: + strategy_name: 策略名称 + + Returns: + tuple: (success, message) + success: 是否成功清除 + message: 提示信息 + """ + if not strategy_name: + return False, "缺少策略名称参数" + + # 检查策略是否存在 + if strategy_name in self.positions: + # 从策略持仓字典中删除该策略 + del self.positions[strategy_name] + # 清除该策略的交易记录 + if strategy_name in self.trades: + del self.trades[strategy_name] + + # 清除与该策略相关的未完成委托 + for order_id, order_info in list(self.pending_orders.items()): + if order_info.get('strategy_name') == strategy_name: + del self.pending_orders[order_id] + + # 保存更新后的策略数据 + self.save_data() + + logger.info(f"成功清除策略持仓数据: {strategy_name}") + return True, f"成功清除策略 '{strategy_name}' 的持仓数据" + else: + logger.info(f"策略不存在或没有持仓数据: {strategy_name}") + return True, f"策略 '{strategy_name}' 不存在或没有持仓数据" \ No newline at end of file diff --git a/src/real/__init__.py b/src/real/__init__.py new file mode 100644 index 0000000..e23fc3e --- /dev/null +++ b/src/real/__init__.py @@ -0,0 +1,10 @@ +""" +实盘交易模块 + +此模块提供实盘交易的功能,使用xtquant接口连接到实际交易系统。 +""" + +from .xt_trader import XtTrader +from .real_trader_manager import RealTraderManager + +__all__ = ['XtTrader', 'RealTraderManager'] \ No newline at end of file diff --git a/src/real_trader_manager.py b/src/real/real_trader_manager.py similarity index 66% rename from src/real_trader_manager.py rename to src/real/real_trader_manager.py index 2c7acde..830e604 100644 --- a/src/real_trader_manager.py +++ b/src/real/real_trader_manager.py @@ -4,7 +4,6 @@ import schedule from xtquant import xtconstant from logger_config import get_logger from config import Config -from strategy_position_manager import StrategyPositionManager import json # 获取日志记录器 @@ -13,29 +12,24 @@ logger = get_logger('real_trader_manager') class RealTraderManager: """实盘交易管理器,处理实盘下单失败、部分成交等问题,尽量保证仓位与策略信号一致""" - def __init__(self, trader): + def __init__(self, trader, position_manager): """初始化实盘交易管理器 Args: - trader: XtTrader实例,如果为None则自动获取 + trader: XtTrader实例 + position_manager: StrategyPositionManager实例 """ - # 使用传入的trader实例或获取单例 + # 使用传入的trader和position_manager实例 self.trader = trader + self.position_manager = position_manager # 确保已登录 if not self.trader.is_logged_in(): self.trader.login() - # 不再自己维护pending_orders,改用StrategyPositionManager管理 - # self.pending_orders = {} - # 启动调度器 self._start_scheduler() - # 记录策略期望持仓状态 - # 格式: {strategy_name: {code: target_amount}} - self.strategy_targets = {} - logger.info("实盘交易管理器初始化完成") def _start_scheduler(self): @@ -94,9 +88,6 @@ class RealTraderManager: logger.warning(f"资金或持仓不足,忽略订单: {direction} {code} {amount}股 {price}元") return {"success": False, "error": "资金或持仓不足"} - # 更新策略目标持仓 - self._update_strategy_target(strategy_name, code, direction, amount) - # 下单 logger.info(f"准备{direction}订单: 代码={code}, 数量={amount}, 价格={price}, 订单类型={order_type}") if direction == 'buy': @@ -109,9 +100,8 @@ class RealTraderManager: logger.error(f"下单失败: {result}") return {"success": False, "error": "下单失败"} - # 使用StrategyPositionManager添加未完成委托 - StrategyPositionManager.add_pending_order( - self.trader, + # 添加未完成委托到position_manager + self.position_manager.add_pending_order( order_id, strategy_name, code, @@ -138,18 +128,12 @@ class RealTraderManager: logger.info("开始检查未完成订单...") # 获取所有未完成订单 - pending_orders = StrategyPositionManager.get_pending_orders(self.trader) + pending_orders = self.position_manager.get_pending_orders() # 如果没有未完成订单,直接返回 if not pending_orders: logger.info("没有未完成订单需要检查") return - - # 更新StrategyPositionManager中的未完成委托状态 - try: - StrategyPositionManager.update_pending_orders(self.trader) - except Exception as e: - logger.error(f"更新StrategyPositionManager未完成委托状态失败: {str(e)}") # 获取最新的委托列表 try: @@ -174,11 +158,11 @@ class RealTraderManager: self._update_order_status(order_id, entrust_map) # 获取最新的订单信息 - order_info = StrategyPositionManager.get_pending_order(self.trader, order_id) + order_info = self.position_manager.get_pending_order(order_id) if not order_info: continue - # 处理超时未成交或部分成交的订单 + # 处理未成交或部分成交的订单 current_time = time.time() order_age = current_time - order_info['created_time'] @@ -198,12 +182,6 @@ class RealTraderManager: except Exception as e: logger.error(f"处理订单 {order_id} 时出错: {str(e)}") - # 同步策略持仓和实际持仓 - try: - self._sync_strategy_positions() - except Exception as e: - logger.error(f"同步策略持仓和实际持仓失败: {str(e)}") - logger.info("未完成订单检查完毕") except Exception as e: @@ -217,7 +195,7 @@ class RealTraderManager: entrust_map: 可选的委托字典,如果为None则重新获取 """ # 检查订单是否存在 - order_info = StrategyPositionManager.get_pending_order(self.trader, order_id) + order_info = self.position_manager.get_pending_order(order_id) if not order_info: return @@ -238,53 +216,56 @@ class RealTraderManager: # 根据委托状态更新订单状态 if entrust['order_status'] == xtconstant.ORDER_SUCCEEDED: # 全部成交 - StrategyPositionManager.update_order_status(self.trader, order_id, 'completed') - # 日志记录在update_order_status中处理 + self.position_manager.update_order_status(order_id, 'completed') + # 更新持仓 + self.position_manager.update_position( + order_info['strategy_name'], + order_info['code'], + order_info['direction'], + order_info['target_amount'] + ) elif entrust['order_status'] == xtconstant.ORDER_PART_SUCC: # 部分成交 current_volume = entrust.get('traded_volume', 0) - StrategyPositionManager.update_order_status( - self.trader, + self.position_manager.update_order_status( order_id, 'partial', traded_volume=current_volume ) - # 如果成交量有变化,记录日志 + # 如果成交量有变化,记录日志并更新持仓 if current_volume != previous_volume: target_amount = order_info['target_amount'] logger.info(f"订单部分成交更新: ID={order_id}, 代码={entrust['stock_code']}, 目标数量={target_amount}, 已成交数量={current_volume}, 剩余数量={target_amount - current_volume}") + + # 更新持仓(仅更新已成交部分) + if current_volume > 0: + self.position_manager.update_position( + order_info['strategy_name'], + order_info['code'], + order_info['direction'], + current_volume + ) elif entrust['order_status'] in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]: # 已撤单或废单 - StrategyPositionManager.update_order_status( - self.trader, + self.position_manager.update_order_status( order_id, 'cancelled', err_msg=entrust.get('err_msg', '未知原因') ) - elif entrust['order_status'] == xtconstant.ORDER_UNREPORTED: - # 未报 + elif entrust['order_status'] in [xtconstant.ORDER_UNREPORTED, xtconstant.ORDER_WAIT_REPORTING, xtconstant.ORDER_REPORTED]: + # 未报、待报、已报 if previous_status != 'pending': - StrategyPositionManager.update_order_status(self.trader, order_id, 'pending') - - elif entrust['order_status'] == xtconstant.ORDER_WAIT_REPORTING: - # 待报 - if previous_status != 'pending': - StrategyPositionManager.update_order_status(self.trader, order_id, 'pending') - - elif entrust['order_status'] == xtconstant.ORDER_REPORTED: - # 已报 - if previous_status != 'pending': - StrategyPositionManager.update_order_status(self.trader, order_id, 'pending') + self.position_manager.update_order_status(order_id, 'pending') else: # 委托列表中找不到该订单,可能已经太久 current_time = time.time() if current_time - order_info['created_time'] > 24 * 60 * 60: previous_status = order_info.get('status') - StrategyPositionManager.update_order_status(self.trader, order_id, 'failed') + self.position_manager.update_order_status(order_id, 'failed') logger.warning(f"订单状态未知且过期: ID={order_id}, 旧状态={previous_status}, 新状态=failed, 创建时长={(current_time - order_info['created_time'])/3600:.1f}小时") except Exception as e: @@ -320,21 +301,16 @@ class RealTraderManager: # 如果有未成交的部分,使用市价单补充交易 if remaining_amount > 0: # 递增重试计数 - new_retry_count = StrategyPositionManager.increment_retry_count(self.trader, order_id) + new_retry_count = self.position_manager.increment_retry_count(order_id) - # 决定是否使用市价单进行补单 - use_market_order = Config.RTM_USE_MARKET_ORDER + # 使用市价单进行补单 + new_order_type = 'market' + new_price = 0 # 市价单价格设为0 - logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={new_retry_count}/{Config.RTM_MAX_RETRIES}, 使用市价单={use_market_order}") + logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={new_retry_count}/{Config.RTM_MAX_RETRIES}") # 如果重试次数少于最大重试次数,则进行补单 if new_retry_count <= Config.RTM_MAX_RETRIES: - # 决定使用的订单类型 - new_order_type = 'market' if use_market_order else 'limit' - - # 对于市价单,价格参数可设为0;对于限价单,使用原价格 - new_price = 0 if new_order_type == 'market' else order_info['price'] - # 下新订单 new_order = self.place_order( order_info['strategy_name'], @@ -356,7 +332,7 @@ class RealTraderManager: # 更新原订单状态 previous_status = order_info['status'] - StrategyPositionManager.update_order_status(self.trader, order_id, 'cancelled') + self.position_manager.update_order_status(order_id, 'cancelled') logger.info(f"更新原订单状态: ID={order_id}, 旧状态={previous_status}, 新状态=cancelled") except Exception as e: @@ -415,114 +391,39 @@ class RealTraderManager: logger.error(f"检查订单可行性时发生异常: {str(e)}") return False - def _update_strategy_target(self, strategy_name, code, direction, amount): - """更新策略目标持仓 - - Args: - strategy_name: 策略名称 - code: 股票代码 - direction: 交易方向 - amount: 交易数量 - """ - # 确保策略存在于目标字典中 - if strategy_name not in self.strategy_targets: - self.strategy_targets[strategy_name] = {} - - # 确保股票代码存在于策略目标中 - if code not in self.strategy_targets[strategy_name]: - self.strategy_targets[strategy_name][code] = 0 - - # 根据交易方向更新目标持仓 - if direction == 'buy': - self.strategy_targets[strategy_name][code] += amount - else: # sell - self.strategy_targets[strategy_name][code] -= amount - # 避免负数持仓 - if self.strategy_targets[strategy_name][code] < 0: - self.strategy_targets[strategy_name][code] = 0 - - logger.info(f"更新策略目标持仓: 策略={strategy_name}, 代码={code}, 目标持仓={self.strategy_targets[strategy_name][code]}") - - def _sync_strategy_positions(self): - """同步策略持仓和实际持仓""" - try: - # 获取实际持仓 - actual_positions = self.trader.get_positions() - if actual_positions is None: - logger.error("获取实际持仓失败,跳过同步") - return - - position_map = {p['stock_code']: p for p in actual_positions} - - # 如果没有策略目标持仓,直接返回 - if not self.strategy_targets: - logger.info("没有策略目标持仓需要同步") - return - - # 遍历每个策略的目标持仓 - for strategy_name, targets in self.strategy_targets.items(): - # 该策略的实际持仓映射 - strategy_actual_positions = {} - - # 遍历该策略的目标持仓 - for code, target_amount in targets.items(): - try: - # 获取股票的实际持仓 - actual_position = position_map.get(code, {}) - actual_amount = actual_position.get('volume', 0) - - if actual_amount > 0: - strategy_actual_positions[code] = actual_amount - - # 更新策略持仓管理器中的持仓记录 - try: - StrategyPositionManager.update_strategy_position( - self.trader, - strategy_name, - code, - 'sync', # 使用同步模式 - actual_amount - ) - except Exception as e: - logger.error(f"更新策略持仓管理器持仓记录失败: {str(e)}") - - # 检查是否需要调整持仓 - if actual_amount != target_amount: - diff = target_amount - actual_amount - if diff != 0: - logger.warning(f"持仓不一致: 策略={strategy_name}, 代码={code}, 目标={target_amount}, 实际={actual_amount}") - except Exception as e: - logger.error(f"同步股票 {code} 持仓时出错: {str(e)}") - - # 记录日志 - logger.info(f"策略 {strategy_name} 的目标持仓: {targets}") - logger.info(f"策略 {strategy_name} 的实际持仓: {strategy_actual_positions}") - - except Exception as e: - logger.error(f"同步策略持仓时发生异常: {str(e)}") - def clean_expired_orders(self): """清理过期的未完成订单""" - # 直接调用StrategyPositionManager的方法 - StrategyPositionManager.clean_timeout_orders() - - def get_pending_orders(self): - """获取所有未完成订单 - - Returns: - list: 未完成订单列表 - """ - # 从StrategyPositionManager获取未完成订单 - pending_orders = StrategyPositionManager.get_pending_orders(self.trader) - return [{ - 'order_id': order_id, - **order_info - } for order_id, order_info in pending_orders.items()] - - def get_strategy_targets(self): - """获取策略目标持仓 - - Returns: - dict: 策略目标持仓 - """ - return self.strategy_targets \ No newline at end of file + try: + logger.info("开始清理过期未完成订单...") + + # 获取所有未完成订单 + pending_orders = self.position_manager.get_pending_orders() + + if not pending_orders: + logger.info("没有未完成订单需要清理") + return + + # 遍历未完成订单,检查是否有无法成交的订单(如跌停无法卖出) + for order_id, order_info in list(pending_orders.items()): + try: + # 只处理pending和partial状态的订单 + if order_info['status'] not in ['pending', 'partial']: + continue + + # 标记为失败并记录日志 + self.position_manager.update_order_status(order_id, 'failed', err_msg="收盘清理:订单无法成交") + + logger.warning(f"清理无法成交订单: ID={order_id}, 代码={order_info['code']}, 方向={order_info['direction']}, " + f"数量={order_info['target_amount']}, 已成交数量={order_info.get('traded_volume', 0)}") + + # 对于特殊情况(如跌停无法卖出),记录错误日志 + if order_info['direction'] == 'sell': + logger.error(f"可能存在跌停无法卖出情况: ID={order_id}, 代码={order_info['code']}, " + f"目标数量={order_info['target_amount']}, 已成交数量={order_info.get('traded_volume', 0)}") + except Exception as e: + logger.error(f"清理订单 {order_id} 时出错: {str(e)}") + + logger.info("过期未完成订单清理完毕") + + except Exception as e: + logger.error(f"清理过期未完成订单时发生异常: {str(e)}") \ No newline at end of file diff --git a/src/xt_trader.py b/src/real/xt_trader.py similarity index 89% rename from src/xt_trader.py rename to src/real/xt_trader.py index 5e18f0d..2feb7ab 100644 --- a/src/xt_trader.py +++ b/src/real/xt_trader.py @@ -248,14 +248,32 @@ class XtTrader(BaseTrader): def cancel(self, order_id): # 撤单接口需要订单编号 result = self.xt_trader.cancel_order_stock(self.account, int(order_id)) - return {"cancel_result": result} + return {"result": result == 0, "message": f"撤单结果: {result}"} + + def get_quote(self, code): + """获取行情数据 - - -if __name__ == "__main__": - trader = XtTrader() - trader.login() - logger.info(f"账户余额: {trader.get_balance()}") - logger.info(f"持仓: {trader.get_positions()}") - logger.info(f"当日成交: {trader.get_today_trades()}") - logger.info(f"当日委托: {trader.get_today_orders()}") + Args: + code: 股票代码 + + Returns: + dict: 行情数据,如果获取失败则返回None + """ + try: + quote = self.xt_trader.query_quote(code) + if quote: + return { + "code": quote.stock_code, + "last": quote.last, + "open": quote.open, + "high": quote.high, + "low": quote.low, + "ask_price": [quote.ask_price1, quote.ask_price2, quote.ask_price3, quote.ask_price4, quote.ask_price5], + "ask_volume": [quote.ask_volume1, quote.ask_volume2, quote.ask_volume3, quote.ask_volume4, quote.ask_volume5], + "bid_price": [quote.bid_price1, quote.bid_price2, quote.bid_price3, quote.bid_price4, quote.bid_price5], + "bid_volume": [quote.bid_volume1, quote.bid_volume2, quote.bid_volume3, quote.bid_volume4, quote.bid_volume5], + } + return None + except Exception as e: + logger.error(f"获取行情失败: {code}, {str(e)}") + return None \ No newline at end of file diff --git a/src/simulation/__init__.py b/src/simulation/__init__.py new file mode 100644 index 0000000..ac0e0a7 --- /dev/null +++ b/src/simulation/__init__.py @@ -0,0 +1,9 @@ +""" +模拟交易模块 + +此模块提供模拟交易的功能,用于在不涉及真实资金的情况下测试交易策略。 +""" + +from .simulation_trader import SimulationTrader + +__all__ = ['SimulationTrader'] \ No newline at end of file diff --git a/src/simulation_trader.py b/src/simulation/simulation_trader.py similarity index 99% rename from src/simulation_trader.py rename to src/simulation/simulation_trader.py index 4c7c3ce..803ef22 100644 --- a/src/simulation_trader.py +++ b/src/simulation/simulation_trader.py @@ -1,4 +1,3 @@ - from logger_config import get_logger class SimulationTrader: diff --git a/src/strategy_position_manager.py b/src/strategy_position_manager.py deleted file mode 100644 index f605f07..0000000 --- a/src/strategy_position_manager.py +++ /dev/null @@ -1,438 +0,0 @@ -import time -import os -import json -from simulation_trader import SimulationTrader -from xtquant import xtconstant -from logger_config import get_logger - -# 获取日志记录器 -logger = get_logger('strategy') - -# 策略仓位管理 -strategy_positions = { - 'real': {}, # 存储实盘策略持仓 - 'simulation': {} # 存储模拟交易策略持仓 -} -strategy_trades = { - 'real': {}, # 存储实盘策略交易记录 - 'simulation': {} # 存储模拟交易策略交易记录 -} -pending_orders = { - 'real': {}, # 存储实盘未完成委托 - 'simulation': {} # 存储模拟交易未完成委托 -} - -class StrategyPositionManager: - """策略持仓管理器,负责管理不同策略的持仓情况""" - - @staticmethod - def get_trader_type(trader): - """根据交易实例确定交易类型 - - Args: - trader: 交易实例 - - Returns: - str: 'simulation'或'real' - """ - return 'simulation' if isinstance(trader, SimulationTrader) else 'real' - - @staticmethod - def update_strategy_position(trader, strategy_name, code, direction, amount): - """更新策略持仓 - - Args: - trader: 交易实例 - strategy_name: 策略名称 - code: 股票代码 - direction: 'buy'或'sell' - amount: 交易数量 - """ - if not strategy_name: - return - - # 判断交易类型 - trader_type = StrategyPositionManager.get_trader_type(trader) - - # 确保策略在字典中 - if strategy_name not in strategy_positions[trader_type]: - strategy_positions[trader_type][strategy_name] = {} - - try: - # 获取交易实例持仓情况 - actual_positions = trader.get_positions() - code_position = next((pos for pos in actual_positions if pos.get('stock_code') == code), None) - - # 记录实际持仓总量 - actual_total = code_position.get('volume', 0) if code_position else 0 - actual_can_use = code_position.get('can_use_volume', 0) if code_position else 0 - - logger.info(f"实际持仓 - 代码: {code}, 总量: {actual_total}, 可用: {actual_can_use}") - - # 如果股票代码在持仓字典中不存在,初始化它 - if code not in strategy_positions[trader_type][strategy_name]: - strategy_positions[trader_type][strategy_name][code] = { - 'total_amount': 0, - 'closeable_amount': 0 - } - - # 直接使用实际持仓数据更新策略持仓 - strategy_positions[trader_type][strategy_name][code]['total_amount'] = actual_total - strategy_positions[trader_type][strategy_name][code]['closeable_amount'] = actual_can_use - - logger.info(f"更新策略持仓 - 交易类型: {trader_type}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, 总量: {strategy_positions[trader_type][strategy_name][code]['total_amount']}, 可用: {strategy_positions[trader_type][strategy_name][code]['closeable_amount']}") - - except Exception as e: - logger.error(f"获取实际持仓失败: {str(e)}") - # 异常情况下只记录错误,不尝试更新持仓 - - # 移除total_amount为0的持仓 - if code in strategy_positions[trader_type][strategy_name] and strategy_positions[trader_type][strategy_name][code]['total_amount'] <= 0: - del strategy_positions[trader_type][strategy_name][code] - - @staticmethod - def update_pending_orders(trader): - """更新未完成委托状态 - - Args: - trader: 交易实例 - """ - try: - # 判断当前交易类型 - trader_type = StrategyPositionManager.get_trader_type(trader) - - # 获取今日委托 - today_entrusts = trader.get_today_orders() - - # 更新委托状态 - for order_id, order_info in list(pending_orders[trader_type].items()): - entrust = next((e for e in today_entrusts if e.get('order_id') == order_id), None) - if entrust: - if entrust.get('order_status') in [xtconstant.ORDER_SUCCEEDED, xtconstant.ORDER_PART_SUCC]: - # 成交量计算 - traded_amount = int(entrust.get('traded_volume', 0)) - - # 更新策略持仓 - StrategyPositionManager.update_strategy_position( - trader, - order_info['strategy_name'], - order_info['code'], - order_info['direction'], - traded_amount - ) - - # 如果完全成交,从待处理列表中移除 - if entrust.get('order_status') == xtconstant.ORDER_SUCCEEDED: - del pending_orders[trader_type][order_id] - - # 如果已撤单、废单等终态,也从待处理列表中移除 - elif entrust.get('order_status') in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]: - del pending_orders[trader_type][order_id] - except Exception as e: - logger.error(f"更新未完成委托状态失败: {str(e)}") - - @staticmethod - def add_pending_order(trader, order_id, strategy_name, code, price, amount, direction, order_type='limit'): - """添加未完成委托 - - Args: - trader: 交易实例 - order_id: 委托编号 - strategy_name: 策略名称 - code: 股票代码 - price: 委托价格 - amount: 委托数量 - direction: 交易方向,'buy'或'sell' - order_type: 订单类型,'limit'或'market',默认为'limit' - """ - if not order_id or order_id == 'simulation': - return - - # 判断当前交易类型 - trader_type = StrategyPositionManager.get_trader_type(trader) - - # 添加到未完成委托列表 - pending_orders[trader_type][order_id] = { - 'strategy_name': strategy_name, - 'code': code, - 'price': price, - 'amount': amount, - 'direction': direction, - 'created_time': time.time(), - 'target_amount': amount, - 'status': 'pending', - 'last_check_time': time.time(), - 'retry_count': 0, - 'order_type': order_type - } - - # 同时记录到交易历史 - if strategy_name: - if strategy_name not in strategy_trades[trader_type]: - strategy_trades[trader_type][strategy_name] = [] - - strategy_trades[trader_type][strategy_name].append({ - 'time': time.strftime('%Y-%m-%d %H:%M:%S'), - 'type': direction, - 'code': code, - 'price': price, - 'amount': amount, - 'order_id': order_id, - 'status': 'pending' - }) - - logger.info(f"添加未完成委托: {order_id}, 交易类型: {trader_type}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}") - - @staticmethod - def get_pending_orders(trader): - """获取指定交易类型的所有未完成委托 - - Args: - trader: 交易实例 - - Returns: - dict: 未完成委托字典,以order_id为键 - """ - trader_type = StrategyPositionManager.get_trader_type(trader) - return pending_orders[trader_type] - - @staticmethod - def get_pending_order(trader, order_id): - """获取指定订单信息 - - Args: - trader: 交易实例 - order_id: 订单ID - - Returns: - dict: 订单信息字典,如果不存在则返回None - """ - trader_type = StrategyPositionManager.get_trader_type(trader) - return pending_orders[trader_type].get(order_id) - - @staticmethod - def update_order_status(trader, order_id, new_status, **additional_data): - """更新订单状态 - - Args: - trader: 交易实例 - order_id: 订单ID - new_status: 新状态 - additional_data: 附加数据(如成交量、重试次数等) - - Returns: - bool: 是否成功更新 - """ - trader_type = StrategyPositionManager.get_trader_type(trader) - if order_id in pending_orders[trader_type]: - # 记录之前的状态用于日志 - previous_status = pending_orders[trader_type][order_id].get('status') - - # 更新状态和最后检查时间 - pending_orders[trader_type][order_id]['status'] = new_status - pending_orders[trader_type][order_id]['last_check_time'] = time.time() - - # 更新附加数据 - for key, value in additional_data.items(): - pending_orders[trader_type][order_id][key] = value - - # 记录状态变化日志 - if previous_status != new_status: - code = pending_orders[trader_type][order_id].get('code') - logger.info(f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}") - - return True - return False - - @staticmethod - def increment_retry_count(trader, order_id): - """增加订单重试次数 - - Args: - trader: 交易实例 - order_id: 订单ID - - Returns: - int: 新的重试次数,如果订单不存在则返回-1 - """ - trader_type = StrategyPositionManager.get_trader_type(trader) - if order_id in pending_orders[trader_type]: - current = pending_orders[trader_type][order_id].get('retry_count', 0) - pending_orders[trader_type][order_id]['retry_count'] = current + 1 - return current + 1 - return -1 - - @staticmethod - def remove_pending_order(trader, order_id): - """移除未完成委托 - - Args: - trader: 交易实例 - order_id: 订单ID - - Returns: - bool: 是否成功移除 - """ - trader_type = StrategyPositionManager.get_trader_type(trader) - if order_id in pending_orders[trader_type]: - del pending_orders[trader_type][order_id] - return True - return False - - @staticmethod - def clean_timeout_orders(): - """清理超时委托""" - current_time = time.time() - # 遍历实盘和模拟两种类型的委托 - for trader_type in ['real', 'simulation']: - for order_id, order_info in list(pending_orders[trader_type].items()): - # 超过24小时的委托视为超时 - if current_time - order_info['created_time'] > 24 * 60 * 60: - del pending_orders[trader_type][order_id] - logger.warning(f"清理超时委托: ID={order_id}, 状态={order_info.get('status')}") - - @staticmethod - def load_strategy_data(): - """加载策略数据""" - global strategy_positions, strategy_trades, pending_orders - try: - if os.path.exists('strategy_data.json'): - with open('strategy_data.json', 'r') as f: - data = json.load(f) - # 直接使用新版数据结构,不再兼容旧版格式 - strategy_positions = data.get('positions', {'real': {}, 'simulation': {}}) - strategy_trades = data.get('trades', {'real': {}, 'simulation': {}}) - pending_orders = data.get('pending_orders', {'real': {}, 'simulation': {}}) - - # 确保数据结构完整 - if 'real' not in strategy_positions: - strategy_positions['real'] = {} - if 'simulation' not in strategy_positions: - strategy_positions['simulation'] = {} - if 'real' not in strategy_trades: - strategy_trades['real'] = {} - if 'simulation' not in strategy_trades: - strategy_trades['simulation'] = {} - if 'real' not in pending_orders: - pending_orders['real'] = {} - if 'simulation' not in pending_orders: - pending_orders['simulation'] = {} - - logger.info("已加载策略数据") - logger.info(f"实盘策略数: {len(strategy_positions['real'])}, 模拟策略数: {len(strategy_positions['simulation'])}") - except Exception as e: - logger.error(f"加载策略数据失败: {str(e)}") - # 初始化空数据结构 - strategy_positions = {'real': {}, 'simulation': {}} - strategy_trades = {'real': {}, 'simulation': {}} - pending_orders = {'real': {}, 'simulation': {}} - - @staticmethod - def save_strategy_data(): - """保存策略数据""" - try: - with open('strategy_data.json', 'w') as f: - json.dump({ - 'positions': strategy_positions, - 'trades': strategy_trades, - 'pending_orders': pending_orders - }, f) - except Exception as e: - logger.error(f"保存策略数据失败: {str(e)}") - - @staticmethod - def get_strategy_positions(trader, strategy_name=None): - """获取策略持仓 - - Args: - trader: 交易实例 - strategy_name: 策略名称,如果为None,返回所有持仓 - - Returns: - 如果strategy_name为None,返回交易实例的所有持仓 - 否则返回指定策略的持仓 - """ - # 判断当前交易类型 - trader_type = StrategyPositionManager.get_trader_type(trader) - - # 如果指定了策略名称,返回该策略的持仓 - if strategy_name: - # 获取真实账户持仓,用于计算可交易量 - real_positions = trader.get_positions() - real_positions_map = {} - for pos in real_positions: - # 使用xt_trader返回的字段名 - if 'stock_code' in pos and 'can_use_volume' in pos: - real_positions_map[pos['stock_code']] = pos - - # 如果该策略没有记录,返回空列表 - if strategy_name not in strategy_positions[trader_type]: - logger.info(f"Strategy {strategy_name} has no positions in {trader_type} mode") - return [] - - # 合并策略持仓和真实持仓的可交易量 - result = [] - for code, pos_info in strategy_positions[trader_type][strategy_name].items(): - # 忽略total_amount为0的持仓 - if pos_info['total_amount'] <= 0: - continue - - # 使用真实账户的可交易量作为策略的可交易量上限 - real_pos = real_positions_map.get(code, {}) - closeable = min(pos_info['total_amount'], real_pos.get('can_use_volume', 0)) - - result.append({ - code: { - 'total_amount': pos_info['total_amount'], - 'closeable_amount': closeable - } - }) - - logger.info(f"Strategy {strategy_name} positions in {trader_type} mode: {result}") - return result - - # 否则返回原始持仓 - positions = trader.get_positions() - logger.info(f"Positions in {trader_type} mode: {positions}") - return positions - - @staticmethod - def clear_strategy(trader, strategy_name): - """清除指定策略的持仓管理数据 - - Args: - trader: 交易实例 - strategy_name: 策略名称 - - Returns: - tuple: (success, message) - success: 是否成功清除 - message: 提示信息 - """ - if not strategy_name: - return False, "缺少策略名称参数" - - # 判断当前交易类型 - trader_type = StrategyPositionManager.get_trader_type(trader) - - # 检查策略是否存在于当前交易类型中 - if strategy_name in strategy_positions[trader_type]: - # 从策略持仓字典中删除该策略 - del strategy_positions[trader_type][strategy_name] - # 清除该策略的交易记录 - if strategy_name in strategy_trades[trader_type]: - del strategy_trades[trader_type][strategy_name] - - # 清除与该策略相关的未完成委托 - for order_id, order_info in list(pending_orders[trader_type].items()): - if order_info.get('strategy_name') == strategy_name: - del pending_orders[trader_type][order_id] - - # 保存更新后的策略数据 - StrategyPositionManager.save_strategy_data() - - logger.info(f"成功清除策略持仓数据: {strategy_name} (交易类型: {trader_type})") - return True, f"成功清除策略 '{strategy_name}' 的持仓数据 (交易类型: {trader_type})" - else: - logger.info(f"策略不存在或没有持仓数据: {strategy_name} (交易类型: {trader_type})") - return True, f"策略 '{strategy_name}' 不存在或没有持仓数据 (交易类型: {trader_type})" \ No newline at end of file diff --git a/src/trade_constants.py b/src/trade_constants.py new file mode 100644 index 0000000..cda9257 --- /dev/null +++ b/src/trade_constants.py @@ -0,0 +1,18 @@ +# 交易常量 +TRADE_TYPE_REAL = 'real' +TRADE_TYPE_SIMULATION = 'simulation' + +# 订单状态 +ORDER_STATUS_PENDING = 'pending' +ORDER_STATUS_COMPLETED = 'completed' +ORDER_STATUS_CANCELLED = 'cancelled' +ORDER_STATUS_FAILED = 'failed' + +# 订单类型 +ORDER_TYPE_LIMIT = 'limit' +ORDER_TYPE_MARKET = 'market' + +# 订单方向 +ORDER_DIRECTION_BUY = 'buy' +ORDER_DIRECTION_SELL = 'sell' + diff --git a/src/trade_server.py b/src/trade_server.py index 59e0aa0..79b72bc 100644 --- a/src/trade_server.py +++ b/src/trade_server.py @@ -1,13 +1,13 @@ import schedule import threading import time -from xt_trader import XtTrader +from real.xt_trader import XtTrader from flask import Flask, request, abort, jsonify from config import Config from concurrent.futures import TimeoutError import concurrent.futures import atexit -from simulation_trader import SimulationTrader +from simulation.simulation_trader import SimulationTrader import datetime from strategy_position_manager import StrategyPositionManager from logger_config import get_logger @@ -69,7 +69,7 @@ def get_real_trader_manager(): with _instance_lock: if _real_trader_manager_instance is None: # 延迟导入避免循环依赖 - from real_trader_manager import RealTraderManager + from real.real_trader_manager import RealTraderManager _real_trader_manager_instance = RealTraderManager(get_real_trader()) logger.info("创建新的RealTraderManager实例") return _real_trader_manager_instance diff --git a/test_imports.py b/test_imports.py new file mode 100644 index 0000000..780fcfe --- /dev/null +++ b/test_imports.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""测试导入新模块结构""" + +import sys +import os + +# 添加src目录到Python导入路径 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) + +try: + from simulation.simulation_trader import SimulationTrader + print("导入 SimulationTrader 成功!") +except Exception as e: + print(f"导入 SimulationTrader 失败: {e}") + +try: + from real.xt_trader import XtTrader + print("导入 XtTrader 成功!") +except Exception as e: + print(f"导入 XtTrader 失败: {e}") + +print("测试完成") \ No newline at end of file