real_trader/src/real/real_trader_manager.py

382 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import threading
import schedule
from xtquant import xtconstant
from logger_config import get_logger
from config import Config
from functools import wraps
from trade_constants import (
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
ORDER_STATUS_PENDING,
ORDER_STATUS_FAILED,
ORDER_STATUS_PARTIAL,
ORDER_DIRECTION_BUY,
ORDER_DIRECTION_SELL,
ORDER_TYPE_LIMIT,
ORDER_TYPE_MARKET,
)
from real.xt_trader import XtTrader
# 获取日志记录器
logger = get_logger("real_trader_manager")
def run_threaded(func):
@wraps(func)
def wrapper(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return wrapper
class RealTraderManager:
"""实盘交易管理器,处理实盘下单失败、部分成交等问题,尽量保证仓位与策略信号一致"""
def __init__(self, trader: XtTrader):
"""初始化实盘交易管理器
Args:
trader: XtTrader实例
position_manager: StrategyPositionManager实例
"""
# 使用传入的trader实例使用弱引用避免循环引用
self.trader = trader
# 启动调度器
self._start_scheduler()
logger.info("实盘交易管理器初始化完成")
def _start_scheduler(self):
# 每日定时清理(增加配置校验)
if hasattr(Config, "CLEAN_ORDERS_TIME"):
try:
schedule.every().day.at(Config.CLEAN_ORDERS_TIME).do(
run_threaded(self.clean_expired_orders)
)
schedule.every().day.at(Config.CLEAN_ORDERS_TIME).do(
run_threaded(self.update_closeable_amount)
)
except Exception as e:
logger.error(f"清理任务配置错误: {e}")
else:
logger.error("CLEAN_ORDERS_TIME 未配置")
# 每日定时保存策略数据(增加配置校验)
if hasattr(Config, "STRATEGY_SAVE_TIME"):
try:
schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(
run_threaded(self.save_strategy_data)
)
except Exception as e:
logger.error(f"保存策略数据任务配置错误: {e}")
else:
logger.error("STRATEGY_SAVE_TIME 未配置")
# 启动高精度调度线程
def run_scheduler():
while True:
try:
schedule.run_pending()
time.sleep(1) # 将休眠时间缩短至1秒提高精度
except Exception as e:
logger.error(f"调度器异常: {e}", exc_info=True)
time.sleep(10) # 发生错误时延长休眠避免日志风暴
scheduler_thread = threading.Thread(
target=run_scheduler, name="SchedulerThread"
)
scheduler_thread.daemon = True # 设为守护线程随主进程退出
scheduler_thread.start()
logger.info("交易管理器调度器已启动")
def place_order(
self, strategy_name, code, direction, amount, price, order_type=ORDER_TYPE_LIMIT
):
"""下单接口,处理买入/卖出请求
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向 'buy''sell'
amount: 交易数量
price: 交易价格市价单时可为0
order_type: 订单类型,'limit'表示限价单,'market'表示市价单,默认为'limit'
Returns:
dict: 包含订单ID和状态信息
"""
if not strategy_name or not code or not direction:
logger.error("下单参数不完整")
return {"success": False, "error": "参数不完整"}
# 检查交易方向
if direction not in [ORDER_DIRECTION_BUY, ORDER_DIRECTION_SELL]:
logger.error(f"无效的交易方向: {direction}")
return {"success": False, "error": "无效的交易方向"}
# 检查订单类型
if order_type not in [ORDER_TYPE_LIMIT, ORDER_TYPE_MARKET]:
logger.error(f"无效的订单类型: {order_type}")
return {
"success": False,
"error": "无效的订单类型,必须是'limit''market'",
}
try:
# 对于限价单,检查资金和持仓是否足够
if order_type == ORDER_TYPE_LIMIT and not self._check_order_feasibility(
code, direction, amount, price
):
logger.warning(
f"资金或持仓不足,忽略订单: {direction} {code} {amount}{price}"
)
return {"success": False, "error": "资金或持仓不足"}
# 下单
logger.info(
f"准备{direction}订单: 代码={code}, 数量={amount}, 价格={price}, 订单类型={order_type}"
)
if direction == ORDER_DIRECTION_BUY:
result = self.trader.buy(code, price, amount, order_type)
else:
result = self.trader.sell(code, price, amount, order_type)
order_id = result.get("order_id")
if not order_id:
logger.error(f"下单失败: {result}")
return {"success": False, "error": "下单失败"}
# 添加未完成委托到position_manager
position_manager = self.trader.get_position_manager(strategy_name)
position_manager.add_pending_order(order_id, code, price, amount, direction, order_type)
logger.info(
f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}"
)
threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name)).start()
return {"success": True, "order_id": order_id}
except Exception as e:
logger.error(f"下单过程发生异常: {str(e)}")
return {"success": False, "error": f"下单异常: {str(e)}"}
def _place_market_order_for_remainder(self, strategy_name, code, direction, left_amount):
"""对未完成的订单进行补单,下市价单
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向
left_amount: 剩余数量
Returns:
bool: 补单是否成功
"""
if left_amount <= 0:
logger.info(f"无需补单,剩余数量为零或负数: {left_amount}")
return True
logger.info(f"限价单补单: 市价单, 剩余数量={left_amount}")
new_order = self.place_order(strategy_name, code, direction, left_amount, 0, ORDER_TYPE_MARKET)
new_order_id = new_order.get("order_id")
if new_order.get("success") and new_order_id:
# 订单已在place_order中设置了60秒后检查
return True
else:
logger.error(f"补单失败: {new_order}")
return False
def check_and_retry(self, order_id, strategy_name):
"""检查订单状态并处理未完成订单
Args:
order_id: 订单ID
strategy_name: 策略名称
"""
try:
logger.info(f"开始检查订单状态: ID={order_id}, 策略={strategy_name}")
position_manager = self.trader.get_position_manager(strategy_name)
order_info = position_manager.get_pending_order(order_id)
if not order_info:
logger.warning(f"订单信息不存在, 可能全部完成或者撤单: ID={order_id}")
return
order_type = order_info.order_type
# 使用trader的handle_order_update方法更新订单状态
status = self.trader.handle_order_update(order_id, strategy_name)
# 如果状态为None说明处理失败则直接返回
if status is None:
logger.warning(f"订单状态更新失败: ID={order_id}")
return
# 重新获取订单信息,因为可能已经被更新
order_info = position_manager.get_pending_order(order_id)
if not order_info:
logger.info(f"订单已完成: ID={order_id}")
return
if order_type == ORDER_TYPE_MARKET:
# 市价单,如果未完成则继续检查
if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]:
logger.info(f"市价单未完成60秒后继续检查: ID={order_id}, 状态={status}")
schedule.every(60).seconds.do(
run_threaded(self.check_and_retry), order_id, strategy_name
).tag(f"order_{order_id}")
else:
logger.info(f"市价单已完成: ID={order_id}, 状态={status}")
elif order_type == ORDER_TYPE_LIMIT:
filled = order_info.filled
target_amount = order_info.amount
left_amount = target_amount - filled
# 限价单这是60秒后的检查如果未完成则撤单补市价单
if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]:
# 尝试撤单
try:
logger.info(f"限价单60秒后仍未完成尝试撤单: ID={order_id}, 状态={status}")
self.trader.cancel(order_id)
position_manager.update_order_status(order_id, filled, ORDER_STATUS_CANCELLED)
except Exception as e:
logger.error(f"撤单失败: order_id={order_id}, error={str(e)}")
# 计算剩余数量, 如果剩余数量大于0, 则补单
self._place_market_order_for_remainder(strategy_name, order_info.code, order_info.direction, left_amount)
else:
logger.info(f"限价单已完成: ID={order_id}, 状态={status}")
else:
logger.warning(f"未知订单类型: ID={order_id}, type={order_type}")
except Exception as e:
logger.error(f"检查订单状态时发生异常: ID={order_id}, error={str(e)}", exc_info=True)
def _check_order_feasibility(self, code, direction, amount, price):
"""检查订单是否可行(资金或持仓是否足够)
Args:
code: 股票代码
direction: 交易方向
amount: 交易数量
price: 交易价格
Returns:
bool: 订单是否可行
"""
try:
if direction == ORDER_DIRECTION_BUY:
# 检查资金是否足够
balance = self.trader.get_balance()
if not balance:
logger.error("获取账户余额失败")
return False
# 计算所需资金加上3%的手续费作为缓冲)
required_cash = price * amount * 1.03
available_cash = balance.get("cash", 0) - balance.get("frozen_cash", 0)
if required_cash > available_cash:
logger.warning(
f"资金不足: 需要 {required_cash:.2f}, 可用 {available_cash:.2f}"
)
return False
return True
elif direction == "sell":
# 检查持仓是否足够
position = self.trader.get_position(code)
if not position:
logger.warning(f"没有持仓: {code}")
return False
available_volume = position.get("can_use_volume", 0)
if amount > available_volume:
logger.warning(
f"可用持仓不足: 需要 {amount}, 可用 {available_volume}"
)
return False
return True
return False
except Exception as e:
logger.error(f"检查订单可行性时发生异常: {str(e)}")
return False
def clean_expired_orders(self):
"""清理过期的未完成订单"""
try:
logger.info("开始清理过期未完成订单...")
# 获取所有未完成订单
position_managers = self.trader.get_all_position_managers()
# 遍历所有持仓管理器
for position_manager in position_managers.values():
# 获取所有未完成订单
pending_orders = position_manager.get_pending_orders()
# 遍历未完成订单,检查是否有无法成交的订单(如跌停无法卖出)
for order_id, order_info in pending_orders.items():
try:
logger.warning(
f"清理无法成交订单: ID={order_id}, 代码={order_info.code}, 方向={order_info.direction}, "
f"数量={order_info.amount}, 已成交数量={order_info.filled}"
)
except Exception as e:
logger.error(f"清理订单 {order_id} 时出错: {str(e)}")
position_manager.clear_pending_orders()
logger.info("过期未完成订单清理完毕")
except Exception as e:
logger.error(f"清理过期未完成订单时发生异常: {str(e)}")
def update_closeable_amount(self):
"""更新可卖持仓"""
try:
logger.info("开始更新可卖持仓...")
# 获取所有持仓
position_managers = self.trader.get_all_position_managers()
# 遍历持仓,更新可卖持仓
for position_manager in position_managers.values():
position_manager.update_closeable_amount()
logger.info("可卖持仓更新完毕")
except Exception as e:
logger.error(f"更新可卖持仓时发生异常: {str(e)}")
def save_strategy_data(self):
"""保存策略数据"""
try:
logger.info("开始保存策略数据...")
# 获取所有持仓管理器
position_managers = self.trader.get_all_position_managers()
# 遍历所有持仓管理器,保存策略数据
for position_manager in position_managers.values():
position_manager.save_data()
logger.info("策略数据保存完毕")
except Exception as e:
logger.error(f"保存策略数据时发生异常: {str(e)}")