refactor position manager

This commit is contained in:
zhiyong 2025-05-01 15:19:27 +08:00
parent a407ce1f2f
commit 978834772b
11 changed files with 448 additions and 626 deletions

View File

@ -100,3 +100,26 @@ print(response.json())
- 系统默认会根据交易时间自动判断是否使用模拟交易
- 交易日判断基于chinese-calendar库
- 请确保配置正确的交易账号和路径
## design
### strategy position manager
策略仓位管理是用于保存,更新基于策略名的股票仓位, 和未完成订单的
父类: BasePositionManager
子类: RealPositionManager(放入real 模块), SimulationPositionManager(放入simulation 模块)
position manager 中保存两个字典, positions, pending_orders, key都是策略名
position manager在trade_server中初始化, 作为参数传入trader
完整的交易流程是:
1. 下单
用户调用trader下单, trader在发出下单信号的同时添加一个pending_order给position manager
pending_order的结构是{order_id, order_status}, 当order_status是完成状态时, 应该从字典中删除
下单没有给策略名的, 策略名默认为"default_strategy"
2. 更新pending order状态
模拟盘立刻全部成交, 在下单后立刻更新仓位, 并删除pending order, 需要打印日志
实盘由real_trader_manager管理pending order状态, 具体是
- 下单后立刻尝试更新pending order状态, 比如状态变为部分成交, 全部成交等, 同时更新持仓,并计划一个1分钟后的任务
- 1分钟后再次更新订单状态, 如果全部成交, 则更新持仓, 否则(部分成交, 无成交), 撤单, 并下一个市价单数量是原先订单数量, 或者补单数量(部分成交)
- 如果下单发生错误, 表示没有成功下单, 则不添加pending order, 也不更新仓位, 即忽略这笔订单, 打印错误日志
3. 收盘后保存策略持仓(模拟盘, 实盘单独保存)
4. server启动时载入持仓文件
以上设计基于简洁, 逻辑清晰, 流程简单的思路, 如果有更好的建议, 可以提供

258
src/position_manager.py Normal file
View File

@ -0,0 +1,258 @@
import os
import json
from logger_config import get_logger
from trade_constants import ORDER_DIRECTION_BUY
# 获取日志记录器
logger = get_logger('position_manager')
class PositionManager():
"""实盘策略持仓管理器,负责管理不同策略在实盘环境下的持仓情况"""
def __init__(self, trade_type):
"""初始化实盘持仓管理器"""
super().__init__()
# 策略持仓信息
self.positions = {} # 策略名 -> {股票代码 -> {total_amount, closeable_amount}}
# 待处理订单信息
self.pending_orders = {} # order_id -> 订单信息
self.data_path = trade_type + '_strategy_positions.json'
self.load_data()
def update_position(self, strategy_name, code, direction, amount):
"""更新策略持仓
Args:
strategy_name: 策略名称
code: 股票代码
direction: 'buy''sell'
amount: 交易数量
"""
if not strategy_name:
return
# 确保策略在字典中
if strategy_name not in self.positions:
self.positions[strategy_name] = {}
# 如果股票代码在持仓字典中不存在,初始化它
if code not in self.positions[strategy_name]:
self.positions[strategy_name][code] = {
'total_amount': 0,
'closeable_amount': 0
}
# 根据方向更新持仓
if direction == ORDER_DIRECTION_BUY:
self.positions[strategy_name][code]['total_amount'] += amount
self.positions[strategy_name][code]['closeable_amount'] += amount
else: # sell
self.positions[strategy_name][code]['total_amount'] -= amount
self.positions[strategy_name][code]['closeable_amount'] -= amount
logger.info(f"更新策略持仓 - 策略: {strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, "
f"更新后总量: {self.positions[strategy_name][code]['total_amount']}, "
f"可用: {self.positions[strategy_name][code]['closeable_amount']}")
# 移除total_amount为0的持仓
if code in self.positions[strategy_name] and self.positions[strategy_name][code]['total_amount'] <= 0:
del self.positions[strategy_name][code]
logger.info(f"移除空持仓 - 策略: {strategy_name}, 代码: {code}")
def add_pending_order(self, order_id, strategy_name, code, price, amount, direction, order_type='limit'):
"""添加未完成委托
Args:
order_id: 订单ID
strategy_name: 策略名称
code: 股票代码
price: 委托价格
amount: 委托数量
direction: 交易方向
order_type: 订单类型
"""
# 添加未处理订单
self.pending_orders[order_id] = {
'strategy_name': strategy_name,
'code': code,
'price': price,
'target_amount': amount,
'direction': direction,
'order_type': order_type,
'status': 'pending',
'created_time': self._get_current_time(),
'retry_count': 0
}
logger.info(f"添加未完成委托 - ID: {order_id}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}, "
f"数量: {amount}, 价格: {price}, 类型: {order_type}")
def update_order_status(self, order_id, new_status):
"""更新订单状态
Args:
order_id: 订单ID
new_status: 新状态
Returns:
bool: 是否成功更新
"""
if order_id in self.pending_orders:
# 记录之前的状态用于日志
previous_status = self.pending_orders[order_id].get('status')
# 更新状态和最后检查时间
self.pending_orders[order_id]['status'] = new_status
# 记录状态变化日志
if previous_status != new_status:
code = self.pending_orders[order_id].get('code')
logger.info(f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}")
# 如果订单已完成,移除它
if new_status in ['completed', 'cancelled', 'failed']:
# 保留订单信息以供参考,但标记为已完成
self.remove_pending_order(order_id)
logger.info(f"订单已删除 - ID: {order_id}, 状态: {new_status}")
return True
return False
def remove_pending_order(self, order_id):
"""移除未完成委托
Args:
order_id: 订单ID
Returns:
bool: 是否成功移除
"""
if order_id in self.pending_orders:
del self.pending_orders[order_id]
return True
return False
def get_pending_order(self, order_id):
"""获取未完成委托信息
Args:
order_id: 订单ID
Returns:
dict: 委托信息如果不存在返回None
"""
return self.pending_orders.get(order_id)
def get_pending_orders(self):
"""获取所有未完成委托
Returns:
dict: 订单ID到委托信息的映射
"""
return self.pending_orders
def get_positions(self, strategy_name=None):
"""获取策略持仓
Args:
strategy_name: 策略名称如果为None返回所有持仓
Returns:
dict: 策略持仓信息
"""
if strategy_name:
if strategy_name not in self.positions:
return {}
return self.positions[strategy_name]
return self.positions
def save_data(self):
"""保存策略数据"""
try:
with open(self.data_path, 'w') as f:
json.dump({
'positions': self.positions,
'pending_orders': self.pending_orders
}, f)
logger.info("成功保存实盘策略数据")
except Exception as e:
logger.error(f"保存实盘策略数据失败: {str(e)}")
def load_data(self):
"""加载策略数据"""
try:
if os.path.exists(self.data_path):
with open(self.data_path, 'r') as f:
data = json.load(f)
self.positions = data.get('positions', {})
self.pending_orders = data.get('pending_orders', {})
logger.info("已加载实盘策略数据")
logger.info(f"策略数: {len(self.positions)}")
else:
logger.info(f"实盘策略数据文件不存在: {self.data_path}")
self.positions = {}
self.pending_orders = {}
except Exception as e:
logger.error(f"加载实盘策略数据失败: {str(e)}")
# 初始化空数据结构
self.positions = {}
self.pending_orders = {}
def _get_current_time(self):
"""获取当前时间戳"""
import time
return time.time()
def clean_timeout_orders(self):
"""清理超时未完成订单"""
timeout_limit = 24 * 60 * 60 # 24小时
current_time = self._get_current_time()
timeout_orders = []
for order_id, order_info in list(self.pending_orders.items()):
# 检查是否超时
if current_time - order_info['created_time'] > timeout_limit:
timeout_orders.append(order_id)
# 更新状态
self.update_order_status(order_id, 'failed')
if timeout_orders:
logger.warn(f"清理超时订单完成,共 {len(timeout_orders)} 个: {', '.join(timeout_orders)}")
def clear_strategy(self, strategy_name):
"""清除指定策略的持仓管理数据
Args:
strategy_name: 策略名称
Returns:
tuple: (success, message)
success: 是否成功清除
message: 提示信息
"""
if not strategy_name:
return False, "缺少策略名称参数"
# 检查策略是否存在
if strategy_name in self.positions:
# 从策略持仓字典中删除该策略
del self.positions[strategy_name]
# 清除该策略的交易记录
if strategy_name in self.trades:
del self.trades[strategy_name]
# 清除与该策略相关的未完成委托
for order_id, order_info in list(self.pending_orders.items()):
if order_info.get('strategy_name') == strategy_name:
del self.pending_orders[order_id]
# 保存更新后的策略数据
self.save_data()
logger.info(f"成功清除策略持仓数据: {strategy_name}")
return True, f"成功清除策略 '{strategy_name}' 的持仓数据"
else:
logger.info(f"策略不存在或没有持仓数据: {strategy_name}")
return True, f"策略 '{strategy_name}' 不存在或没有持仓数据"

10
src/real/__init__.py Normal file
View File

@ -0,0 +1,10 @@
"""
实盘交易模块
此模块提供实盘交易的功能使用xtquant接口连接到实际交易系统
"""
from .xt_trader import XtTrader
from .real_trader_manager import RealTraderManager
__all__ = ['XtTrader', 'RealTraderManager']

View File

@ -4,7 +4,6 @@ import schedule
from xtquant import xtconstant
from logger_config import get_logger
from config import Config
from strategy_position_manager import StrategyPositionManager
import json
# 获取日志记录器
@ -13,29 +12,24 @@ logger = get_logger('real_trader_manager')
class RealTraderManager:
"""实盘交易管理器,处理实盘下单失败、部分成交等问题,尽量保证仓位与策略信号一致"""
def __init__(self, trader):
def __init__(self, trader, position_manager):
"""初始化实盘交易管理器
Args:
trader: XtTrader实例如果为None则自动获取
trader: XtTrader实例
position_manager: StrategyPositionManager实例
"""
# 使用传入的trader例或获取单
# 使用传入的trader和position_manager实例
self.trader = trader
self.position_manager = position_manager
# 确保已登录
if not self.trader.is_logged_in():
self.trader.login()
# 不再自己维护pending_orders改用StrategyPositionManager管理
# self.pending_orders = {}
# 启动调度器
self._start_scheduler()
# 记录策略期望持仓状态
# 格式: {strategy_name: {code: target_amount}}
self.strategy_targets = {}
logger.info("实盘交易管理器初始化完成")
def _start_scheduler(self):
@ -94,9 +88,6 @@ class RealTraderManager:
logger.warning(f"资金或持仓不足,忽略订单: {direction} {code} {amount}{price}")
return {"success": False, "error": "资金或持仓不足"}
# 更新策略目标持仓
self._update_strategy_target(strategy_name, code, direction, amount)
# 下单
logger.info(f"准备{direction}订单: 代码={code}, 数量={amount}, 价格={price}, 订单类型={order_type}")
if direction == 'buy':
@ -109,9 +100,8 @@ class RealTraderManager:
logger.error(f"下单失败: {result}")
return {"success": False, "error": "下单失败"}
# 使用StrategyPositionManager添加未完成委托
StrategyPositionManager.add_pending_order(
self.trader,
# 添加未完成委托到position_manager
self.position_manager.add_pending_order(
order_id,
strategy_name,
code,
@ -138,19 +128,13 @@ class RealTraderManager:
logger.info("开始检查未完成订单...")
# 获取所有未完成订单
pending_orders = StrategyPositionManager.get_pending_orders(self.trader)
pending_orders = self.position_manager.get_pending_orders()
# 如果没有未完成订单,直接返回
if not pending_orders:
logger.info("没有未完成订单需要检查")
return
# 更新StrategyPositionManager中的未完成委托状态
try:
StrategyPositionManager.update_pending_orders(self.trader)
except Exception as e:
logger.error(f"更新StrategyPositionManager未完成委托状态失败: {str(e)}")
# 获取最新的委托列表
try:
entrusts = self.trader.get_today_orders()
@ -174,11 +158,11 @@ class RealTraderManager:
self._update_order_status(order_id, entrust_map)
# 获取最新的订单信息
order_info = StrategyPositionManager.get_pending_order(self.trader, order_id)
order_info = self.position_manager.get_pending_order(order_id)
if not order_info:
continue
# 处理超时未成交或部分成交的订单
# 处理未成交或部分成交的订单
current_time = time.time()
order_age = current_time - order_info['created_time']
@ -198,12 +182,6 @@ class RealTraderManager:
except Exception as e:
logger.error(f"处理订单 {order_id} 时出错: {str(e)}")
# 同步策略持仓和实际持仓
try:
self._sync_strategy_positions()
except Exception as e:
logger.error(f"同步策略持仓和实际持仓失败: {str(e)}")
logger.info("未完成订单检查完毕")
except Exception as e:
@ -217,7 +195,7 @@ class RealTraderManager:
entrust_map: 可选的委托字典如果为None则重新获取
"""
# 检查订单是否存在
order_info = StrategyPositionManager.get_pending_order(self.trader, order_id)
order_info = self.position_manager.get_pending_order(order_id)
if not order_info:
return
@ -238,53 +216,56 @@ class RealTraderManager:
# 根据委托状态更新订单状态
if entrust['order_status'] == xtconstant.ORDER_SUCCEEDED:
# 全部成交
StrategyPositionManager.update_order_status(self.trader, order_id, 'completed')
# 日志记录在update_order_status中处理
self.position_manager.update_order_status(order_id, 'completed')
# 更新持仓
self.position_manager.update_position(
order_info['strategy_name'],
order_info['code'],
order_info['direction'],
order_info['target_amount']
)
elif entrust['order_status'] == xtconstant.ORDER_PART_SUCC:
# 部分成交
current_volume = entrust.get('traded_volume', 0)
StrategyPositionManager.update_order_status(
self.trader,
self.position_manager.update_order_status(
order_id,
'partial',
traded_volume=current_volume
)
# 如果成交量有变化,记录日志
# 如果成交量有变化,记录日志并更新持仓
if current_volume != previous_volume:
target_amount = order_info['target_amount']
logger.info(f"订单部分成交更新: ID={order_id}, 代码={entrust['stock_code']}, 目标数量={target_amount}, 已成交数量={current_volume}, 剩余数量={target_amount - current_volume}")
# 更新持仓(仅更新已成交部分)
if current_volume > 0:
self.position_manager.update_position(
order_info['strategy_name'],
order_info['code'],
order_info['direction'],
current_volume
)
elif entrust['order_status'] in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]:
# 已撤单或废单
StrategyPositionManager.update_order_status(
self.trader,
self.position_manager.update_order_status(
order_id,
'cancelled',
err_msg=entrust.get('err_msg', '未知原因')
)
elif entrust['order_status'] == xtconstant.ORDER_UNREPORTED:
# 未报
elif entrust['order_status'] in [xtconstant.ORDER_UNREPORTED, xtconstant.ORDER_WAIT_REPORTING, xtconstant.ORDER_REPORTED]:
# 未报、待报、已报
if previous_status != 'pending':
StrategyPositionManager.update_order_status(self.trader, order_id, 'pending')
elif entrust['order_status'] == xtconstant.ORDER_WAIT_REPORTING:
# 待报
if previous_status != 'pending':
StrategyPositionManager.update_order_status(self.trader, order_id, 'pending')
elif entrust['order_status'] == xtconstant.ORDER_REPORTED:
# 已报
if previous_status != 'pending':
StrategyPositionManager.update_order_status(self.trader, order_id, 'pending')
self.position_manager.update_order_status(order_id, 'pending')
else:
# 委托列表中找不到该订单,可能已经太久
current_time = time.time()
if current_time - order_info['created_time'] > 24 * 60 * 60:
previous_status = order_info.get('status')
StrategyPositionManager.update_order_status(self.trader, order_id, 'failed')
self.position_manager.update_order_status(order_id, 'failed')
logger.warning(f"订单状态未知且过期: ID={order_id}, 旧状态={previous_status}, 新状态=failed, 创建时长={(current_time - order_info['created_time'])/3600:.1f}小时")
except Exception as e:
@ -320,21 +301,16 @@ class RealTraderManager:
# 如果有未成交的部分,使用市价单补充交易
if remaining_amount > 0:
# 递增重试计数
new_retry_count = StrategyPositionManager.increment_retry_count(self.trader, order_id)
new_retry_count = self.position_manager.increment_retry_count(order_id)
# 决定是否使用市价单进行补单
use_market_order = Config.RTM_USE_MARKET_ORDER
# 使用市价单进行补单
new_order_type = 'market'
new_price = 0 # 市价单价格设为0
logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={new_retry_count}/{Config.RTM_MAX_RETRIES}, 使用市价单={use_market_order}")
logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={new_retry_count}/{Config.RTM_MAX_RETRIES}")
# 如果重试次数少于最大重试次数,则进行补单
if new_retry_count <= Config.RTM_MAX_RETRIES:
# 决定使用的订单类型
new_order_type = 'market' if use_market_order else 'limit'
# 对于市价单价格参数可设为0对于限价单使用原价格
new_price = 0 if new_order_type == 'market' else order_info['price']
# 下新订单
new_order = self.place_order(
order_info['strategy_name'],
@ -356,7 +332,7 @@ class RealTraderManager:
# 更新原订单状态
previous_status = order_info['status']
StrategyPositionManager.update_order_status(self.trader, order_id, 'cancelled')
self.position_manager.update_order_status(order_id, 'cancelled')
logger.info(f"更新原订单状态: ID={order_id}, 旧状态={previous_status}, 新状态=cancelled")
except Exception as e:
@ -415,114 +391,39 @@ class RealTraderManager:
logger.error(f"检查订单可行性时发生异常: {str(e)}")
return False
def _update_strategy_target(self, strategy_name, code, direction, amount):
"""更新策略目标持仓
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向
amount: 交易数量
"""
# 确保策略存在于目标字典中
if strategy_name not in self.strategy_targets:
self.strategy_targets[strategy_name] = {}
# 确保股票代码存在于策略目标中
if code not in self.strategy_targets[strategy_name]:
self.strategy_targets[strategy_name][code] = 0
# 根据交易方向更新目标持仓
if direction == 'buy':
self.strategy_targets[strategy_name][code] += amount
else: # sell
self.strategy_targets[strategy_name][code] -= amount
# 避免负数持仓
if self.strategy_targets[strategy_name][code] < 0:
self.strategy_targets[strategy_name][code] = 0
logger.info(f"更新策略目标持仓: 策略={strategy_name}, 代码={code}, 目标持仓={self.strategy_targets[strategy_name][code]}")
def _sync_strategy_positions(self):
"""同步策略持仓和实际持仓"""
try:
# 获取实际持仓
actual_positions = self.trader.get_positions()
if actual_positions is None:
logger.error("获取实际持仓失败,跳过同步")
return
position_map = {p['stock_code']: p for p in actual_positions}
# 如果没有策略目标持仓,直接返回
if not self.strategy_targets:
logger.info("没有策略目标持仓需要同步")
return
# 遍历每个策略的目标持仓
for strategy_name, targets in self.strategy_targets.items():
# 该策略的实际持仓映射
strategy_actual_positions = {}
# 遍历该策略的目标持仓
for code, target_amount in targets.items():
try:
# 获取股票的实际持仓
actual_position = position_map.get(code, {})
actual_amount = actual_position.get('volume', 0)
if actual_amount > 0:
strategy_actual_positions[code] = actual_amount
# 更新策略持仓管理器中的持仓记录
try:
StrategyPositionManager.update_strategy_position(
self.trader,
strategy_name,
code,
'sync', # 使用同步模式
actual_amount
)
except Exception as e:
logger.error(f"更新策略持仓管理器持仓记录失败: {str(e)}")
# 检查是否需要调整持仓
if actual_amount != target_amount:
diff = target_amount - actual_amount
if diff != 0:
logger.warning(f"持仓不一致: 策略={strategy_name}, 代码={code}, 目标={target_amount}, 实际={actual_amount}")
except Exception as e:
logger.error(f"同步股票 {code} 持仓时出错: {str(e)}")
# 记录日志
logger.info(f"策略 {strategy_name} 的目标持仓: {targets}")
logger.info(f"策略 {strategy_name} 的实际持仓: {strategy_actual_positions}")
except Exception as e:
logger.error(f"同步策略持仓时发生异常: {str(e)}")
def clean_expired_orders(self):
"""清理过期的未完成订单"""
# 直接调用StrategyPositionManager的方法
StrategyPositionManager.clean_timeout_orders()
try:
logger.info("开始清理过期未完成订单...")
def get_pending_orders(self):
"""获取所有未完成订单
# 获取所有未完成订单
pending_orders = self.position_manager.get_pending_orders()
Returns:
list: 未完成订单列表
"""
# 从StrategyPositionManager获取未完成订单
pending_orders = StrategyPositionManager.get_pending_orders(self.trader)
return [{
'order_id': order_id,
**order_info
} for order_id, order_info in pending_orders.items()]
if not pending_orders:
logger.info("没有未完成订单需要清理")
return
def get_strategy_targets(self):
"""获取策略目标持仓
# 遍历未完成订单,检查是否有无法成交的订单(如跌停无法卖出)
for order_id, order_info in list(pending_orders.items()):
try:
# 只处理pending和partial状态的订单
if order_info['status'] not in ['pending', 'partial']:
continue
Returns:
dict: 策略目标持仓
"""
return self.strategy_targets
# 标记为失败并记录日志
self.position_manager.update_order_status(order_id, 'failed', err_msg="收盘清理:订单无法成交")
logger.warning(f"清理无法成交订单: ID={order_id}, 代码={order_info['code']}, 方向={order_info['direction']}, "
f"数量={order_info['target_amount']}, 已成交数量={order_info.get('traded_volume', 0)}")
# 对于特殊情况(如跌停无法卖出),记录错误日志
if order_info['direction'] == 'sell':
logger.error(f"可能存在跌停无法卖出情况: ID={order_id}, 代码={order_info['code']}, "
f"目标数量={order_info['target_amount']}, 已成交数量={order_info.get('traded_volume', 0)}")
except Exception as e:
logger.error(f"清理订单 {order_id} 时出错: {str(e)}")
logger.info("过期未完成订单清理完毕")
except Exception as e:
logger.error(f"清理过期未完成订单时发生异常: {str(e)}")

View File

@ -248,14 +248,32 @@ class XtTrader(BaseTrader):
def cancel(self, order_id):
# 撤单接口需要订单编号
result = self.xt_trader.cancel_order_stock(self.account, int(order_id))
return {"cancel_result": result}
return {"result": result == 0, "message": f"撤单结果: {result}"}
def get_quote(self, code):
"""获取行情数据
Args:
code: 股票代码
if __name__ == "__main__":
trader = XtTrader()
trader.login()
logger.info(f"账户余额: {trader.get_balance()}")
logger.info(f"持仓: {trader.get_positions()}")
logger.info(f"当日成交: {trader.get_today_trades()}")
logger.info(f"当日委托: {trader.get_today_orders()}")
Returns:
dict: 行情数据如果获取失败则返回None
"""
try:
quote = self.xt_trader.query_quote(code)
if quote:
return {
"code": quote.stock_code,
"last": quote.last,
"open": quote.open,
"high": quote.high,
"low": quote.low,
"ask_price": [quote.ask_price1, quote.ask_price2, quote.ask_price3, quote.ask_price4, quote.ask_price5],
"ask_volume": [quote.ask_volume1, quote.ask_volume2, quote.ask_volume3, quote.ask_volume4, quote.ask_volume5],
"bid_price": [quote.bid_price1, quote.bid_price2, quote.bid_price3, quote.bid_price4, quote.bid_price5],
"bid_volume": [quote.bid_volume1, quote.bid_volume2, quote.bid_volume3, quote.bid_volume4, quote.bid_volume5],
}
return None
except Exception as e:
logger.error(f"获取行情失败: {code}, {str(e)}")
return None

View File

@ -0,0 +1,9 @@
"""
模拟交易模块
此模块提供模拟交易的功能用于在不涉及真实资金的情况下测试交易策略
"""
from .simulation_trader import SimulationTrader
__all__ = ['SimulationTrader']

View File

@ -1,4 +1,3 @@
from logger_config import get_logger
class SimulationTrader:

View File

@ -1,438 +0,0 @@
import time
import os
import json
from simulation_trader import SimulationTrader
from xtquant import xtconstant
from logger_config import get_logger
# 获取日志记录器
logger = get_logger('strategy')
# 策略仓位管理
strategy_positions = {
'real': {}, # 存储实盘策略持仓
'simulation': {} # 存储模拟交易策略持仓
}
strategy_trades = {
'real': {}, # 存储实盘策略交易记录
'simulation': {} # 存储模拟交易策略交易记录
}
pending_orders = {
'real': {}, # 存储实盘未完成委托
'simulation': {} # 存储模拟交易未完成委托
}
class StrategyPositionManager:
"""策略持仓管理器,负责管理不同策略的持仓情况"""
@staticmethod
def get_trader_type(trader):
"""根据交易实例确定交易类型
Args:
trader: 交易实例
Returns:
str: 'simulation''real'
"""
return 'simulation' if isinstance(trader, SimulationTrader) else 'real'
@staticmethod
def update_strategy_position(trader, strategy_name, code, direction, amount):
"""更新策略持仓
Args:
trader: 交易实例
strategy_name: 策略名称
code: 股票代码
direction: 'buy''sell'
amount: 交易数量
"""
if not strategy_name:
return
# 判断交易类型
trader_type = StrategyPositionManager.get_trader_type(trader)
# 确保策略在字典中
if strategy_name not in strategy_positions[trader_type]:
strategy_positions[trader_type][strategy_name] = {}
try:
# 获取交易实例持仓情况
actual_positions = trader.get_positions()
code_position = next((pos for pos in actual_positions if pos.get('stock_code') == code), None)
# 记录实际持仓总量
actual_total = code_position.get('volume', 0) if code_position else 0
actual_can_use = code_position.get('can_use_volume', 0) if code_position else 0
logger.info(f"实际持仓 - 代码: {code}, 总量: {actual_total}, 可用: {actual_can_use}")
# 如果股票代码在持仓字典中不存在,初始化它
if code not in strategy_positions[trader_type][strategy_name]:
strategy_positions[trader_type][strategy_name][code] = {
'total_amount': 0,
'closeable_amount': 0
}
# 直接使用实际持仓数据更新策略持仓
strategy_positions[trader_type][strategy_name][code]['total_amount'] = actual_total
strategy_positions[trader_type][strategy_name][code]['closeable_amount'] = actual_can_use
logger.info(f"更新策略持仓 - 交易类型: {trader_type}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, 总量: {strategy_positions[trader_type][strategy_name][code]['total_amount']}, 可用: {strategy_positions[trader_type][strategy_name][code]['closeable_amount']}")
except Exception as e:
logger.error(f"获取实际持仓失败: {str(e)}")
# 异常情况下只记录错误,不尝试更新持仓
# 移除total_amount为0的持仓
if code in strategy_positions[trader_type][strategy_name] and strategy_positions[trader_type][strategy_name][code]['total_amount'] <= 0:
del strategy_positions[trader_type][strategy_name][code]
@staticmethod
def update_pending_orders(trader):
"""更新未完成委托状态
Args:
trader: 交易实例
"""
try:
# 判断当前交易类型
trader_type = StrategyPositionManager.get_trader_type(trader)
# 获取今日委托
today_entrusts = trader.get_today_orders()
# 更新委托状态
for order_id, order_info in list(pending_orders[trader_type].items()):
entrust = next((e for e in today_entrusts if e.get('order_id') == order_id), None)
if entrust:
if entrust.get('order_status') in [xtconstant.ORDER_SUCCEEDED, xtconstant.ORDER_PART_SUCC]:
# 成交量计算
traded_amount = int(entrust.get('traded_volume', 0))
# 更新策略持仓
StrategyPositionManager.update_strategy_position(
trader,
order_info['strategy_name'],
order_info['code'],
order_info['direction'],
traded_amount
)
# 如果完全成交,从待处理列表中移除
if entrust.get('order_status') == xtconstant.ORDER_SUCCEEDED:
del pending_orders[trader_type][order_id]
# 如果已撤单、废单等终态,也从待处理列表中移除
elif entrust.get('order_status') in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]:
del pending_orders[trader_type][order_id]
except Exception as e:
logger.error(f"更新未完成委托状态失败: {str(e)}")
@staticmethod
def add_pending_order(trader, order_id, strategy_name, code, price, amount, direction, order_type='limit'):
"""添加未完成委托
Args:
trader: 交易实例
order_id: 委托编号
strategy_name: 策略名称
code: 股票代码
price: 委托价格
amount: 委托数量
direction: 交易方向'buy''sell'
order_type: 订单类型'limit''market'默认为'limit'
"""
if not order_id or order_id == 'simulation':
return
# 判断当前交易类型
trader_type = StrategyPositionManager.get_trader_type(trader)
# 添加到未完成委托列表
pending_orders[trader_type][order_id] = {
'strategy_name': strategy_name,
'code': code,
'price': price,
'amount': amount,
'direction': direction,
'created_time': time.time(),
'target_amount': amount,
'status': 'pending',
'last_check_time': time.time(),
'retry_count': 0,
'order_type': order_type
}
# 同时记录到交易历史
if strategy_name:
if strategy_name not in strategy_trades[trader_type]:
strategy_trades[trader_type][strategy_name] = []
strategy_trades[trader_type][strategy_name].append({
'time': time.strftime('%Y-%m-%d %H:%M:%S'),
'type': direction,
'code': code,
'price': price,
'amount': amount,
'order_id': order_id,
'status': 'pending'
})
logger.info(f"添加未完成委托: {order_id}, 交易类型: {trader_type}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}")
@staticmethod
def get_pending_orders(trader):
"""获取指定交易类型的所有未完成委托
Args:
trader: 交易实例
Returns:
dict: 未完成委托字典以order_id为键
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
return pending_orders[trader_type]
@staticmethod
def get_pending_order(trader, order_id):
"""获取指定订单信息
Args:
trader: 交易实例
order_id: 订单ID
Returns:
dict: 订单信息字典如果不存在则返回None
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
return pending_orders[trader_type].get(order_id)
@staticmethod
def update_order_status(trader, order_id, new_status, **additional_data):
"""更新订单状态
Args:
trader: 交易实例
order_id: 订单ID
new_status: 新状态
additional_data: 附加数据如成交量重试次数等
Returns:
bool: 是否成功更新
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
if order_id in pending_orders[trader_type]:
# 记录之前的状态用于日志
previous_status = pending_orders[trader_type][order_id].get('status')
# 更新状态和最后检查时间
pending_orders[trader_type][order_id]['status'] = new_status
pending_orders[trader_type][order_id]['last_check_time'] = time.time()
# 更新附加数据
for key, value in additional_data.items():
pending_orders[trader_type][order_id][key] = value
# 记录状态变化日志
if previous_status != new_status:
code = pending_orders[trader_type][order_id].get('code')
logger.info(f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}")
return True
return False
@staticmethod
def increment_retry_count(trader, order_id):
"""增加订单重试次数
Args:
trader: 交易实例
order_id: 订单ID
Returns:
int: 新的重试次数如果订单不存在则返回-1
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
if order_id in pending_orders[trader_type]:
current = pending_orders[trader_type][order_id].get('retry_count', 0)
pending_orders[trader_type][order_id]['retry_count'] = current + 1
return current + 1
return -1
@staticmethod
def remove_pending_order(trader, order_id):
"""移除未完成委托
Args:
trader: 交易实例
order_id: 订单ID
Returns:
bool: 是否成功移除
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
if order_id in pending_orders[trader_type]:
del pending_orders[trader_type][order_id]
return True
return False
@staticmethod
def clean_timeout_orders():
"""清理超时委托"""
current_time = time.time()
# 遍历实盘和模拟两种类型的委托
for trader_type in ['real', 'simulation']:
for order_id, order_info in list(pending_orders[trader_type].items()):
# 超过24小时的委托视为超时
if current_time - order_info['created_time'] > 24 * 60 * 60:
del pending_orders[trader_type][order_id]
logger.warning(f"清理超时委托: ID={order_id}, 状态={order_info.get('status')}")
@staticmethod
def load_strategy_data():
"""加载策略数据"""
global strategy_positions, strategy_trades, pending_orders
try:
if os.path.exists('strategy_data.json'):
with open('strategy_data.json', 'r') as f:
data = json.load(f)
# 直接使用新版数据结构,不再兼容旧版格式
strategy_positions = data.get('positions', {'real': {}, 'simulation': {}})
strategy_trades = data.get('trades', {'real': {}, 'simulation': {}})
pending_orders = data.get('pending_orders', {'real': {}, 'simulation': {}})
# 确保数据结构完整
if 'real' not in strategy_positions:
strategy_positions['real'] = {}
if 'simulation' not in strategy_positions:
strategy_positions['simulation'] = {}
if 'real' not in strategy_trades:
strategy_trades['real'] = {}
if 'simulation' not in strategy_trades:
strategy_trades['simulation'] = {}
if 'real' not in pending_orders:
pending_orders['real'] = {}
if 'simulation' not in pending_orders:
pending_orders['simulation'] = {}
logger.info("已加载策略数据")
logger.info(f"实盘策略数: {len(strategy_positions['real'])}, 模拟策略数: {len(strategy_positions['simulation'])}")
except Exception as e:
logger.error(f"加载策略数据失败: {str(e)}")
# 初始化空数据结构
strategy_positions = {'real': {}, 'simulation': {}}
strategy_trades = {'real': {}, 'simulation': {}}
pending_orders = {'real': {}, 'simulation': {}}
@staticmethod
def save_strategy_data():
"""保存策略数据"""
try:
with open('strategy_data.json', 'w') as f:
json.dump({
'positions': strategy_positions,
'trades': strategy_trades,
'pending_orders': pending_orders
}, f)
except Exception as e:
logger.error(f"保存策略数据失败: {str(e)}")
@staticmethod
def get_strategy_positions(trader, strategy_name=None):
"""获取策略持仓
Args:
trader: 交易实例
strategy_name: 策略名称如果为None返回所有持仓
Returns:
如果strategy_name为None返回交易实例的所有持仓
否则返回指定策略的持仓
"""
# 判断当前交易类型
trader_type = StrategyPositionManager.get_trader_type(trader)
# 如果指定了策略名称,返回该策略的持仓
if strategy_name:
# 获取真实账户持仓,用于计算可交易量
real_positions = trader.get_positions()
real_positions_map = {}
for pos in real_positions:
# 使用xt_trader返回的字段名
if 'stock_code' in pos and 'can_use_volume' in pos:
real_positions_map[pos['stock_code']] = pos
# 如果该策略没有记录,返回空列表
if strategy_name not in strategy_positions[trader_type]:
logger.info(f"Strategy {strategy_name} has no positions in {trader_type} mode")
return []
# 合并策略持仓和真实持仓的可交易量
result = []
for code, pos_info in strategy_positions[trader_type][strategy_name].items():
# 忽略total_amount为0的持仓
if pos_info['total_amount'] <= 0:
continue
# 使用真实账户的可交易量作为策略的可交易量上限
real_pos = real_positions_map.get(code, {})
closeable = min(pos_info['total_amount'], real_pos.get('can_use_volume', 0))
result.append({
code: {
'total_amount': pos_info['total_amount'],
'closeable_amount': closeable
}
})
logger.info(f"Strategy {strategy_name} positions in {trader_type} mode: {result}")
return result
# 否则返回原始持仓
positions = trader.get_positions()
logger.info(f"Positions in {trader_type} mode: {positions}")
return positions
@staticmethod
def clear_strategy(trader, strategy_name):
"""清除指定策略的持仓管理数据
Args:
trader: 交易实例
strategy_name: 策略名称
Returns:
tuple: (success, message)
success: 是否成功清除
message: 提示信息
"""
if not strategy_name:
return False, "缺少策略名称参数"
# 判断当前交易类型
trader_type = StrategyPositionManager.get_trader_type(trader)
# 检查策略是否存在于当前交易类型中
if strategy_name in strategy_positions[trader_type]:
# 从策略持仓字典中删除该策略
del strategy_positions[trader_type][strategy_name]
# 清除该策略的交易记录
if strategy_name in strategy_trades[trader_type]:
del strategy_trades[trader_type][strategy_name]
# 清除与该策略相关的未完成委托
for order_id, order_info in list(pending_orders[trader_type].items()):
if order_info.get('strategy_name') == strategy_name:
del pending_orders[trader_type][order_id]
# 保存更新后的策略数据
StrategyPositionManager.save_strategy_data()
logger.info(f"成功清除策略持仓数据: {strategy_name} (交易类型: {trader_type})")
return True, f"成功清除策略 '{strategy_name}' 的持仓数据 (交易类型: {trader_type})"
else:
logger.info(f"策略不存在或没有持仓数据: {strategy_name} (交易类型: {trader_type})")
return True, f"策略 '{strategy_name}' 不存在或没有持仓数据 (交易类型: {trader_type})"

18
src/trade_constants.py Normal file
View File

@ -0,0 +1,18 @@
# 交易常量
TRADE_TYPE_REAL = 'real'
TRADE_TYPE_SIMULATION = 'simulation'
# 订单状态
ORDER_STATUS_PENDING = 'pending'
ORDER_STATUS_COMPLETED = 'completed'
ORDER_STATUS_CANCELLED = 'cancelled'
ORDER_STATUS_FAILED = 'failed'
# 订单类型
ORDER_TYPE_LIMIT = 'limit'
ORDER_TYPE_MARKET = 'market'
# 订单方向
ORDER_DIRECTION_BUY = 'buy'
ORDER_DIRECTION_SELL = 'sell'

View File

@ -1,13 +1,13 @@
import schedule
import threading
import time
from xt_trader import XtTrader
from real.xt_trader import XtTrader
from flask import Flask, request, abort, jsonify
from config import Config
from concurrent.futures import TimeoutError
import concurrent.futures
import atexit
from simulation_trader import SimulationTrader
from simulation.simulation_trader import SimulationTrader
import datetime
from strategy_position_manager import StrategyPositionManager
from logger_config import get_logger
@ -69,7 +69,7 @@ def get_real_trader_manager():
with _instance_lock:
if _real_trader_manager_instance is None:
# 延迟导入避免循环依赖
from real_trader_manager import RealTraderManager
from real.real_trader_manager import RealTraderManager
_real_trader_manager_instance = RealTraderManager(get_real_trader())
logger.info("创建新的RealTraderManager实例")
return _real_trader_manager_instance

24
test_imports.py Normal file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""测试导入新模块结构"""
import sys
import os
# 添加src目录到Python导入路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
try:
from simulation.simulation_trader import SimulationTrader
print("导入 SimulationTrader 成功!")
except Exception as e:
print(f"导入 SimulationTrader 失败: {e}")
try:
from real.xt_trader import XtTrader
print("导入 XtTrader 成功!")
except Exception as e:
print(f"导入 XtTrader 失败: {e}")
print("测试完成")