real_trader/src/real/real_trader_manager.py
2025-05-29 19:57:09 +08:00

454 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import threading
import schedule
from datetime import datetime
from xtquant import xtconstant
from logger_config import get_logger
from config import Config
from functools import wraps
from core.trade_constants import (
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
ORDER_STATUS_PENDING,
ORDER_STATUS_PARTIAL,
ORDER_DIRECTION_BUY,
ORDER_DIRECTION_SELL,
ORDER_TYPE_LIMIT,
ORDER_TYPE_MARKET,
)
from real.xt_trader import XtTrader
# 获取日志记录器
logger = get_logger("real_trader_manager")
def run_threaded(func):
@wraps(func)
def wrapper(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return wrapper
class RealTraderManager:
"""实盘交易管理器,处理实盘下单失败、部分成交等问题,尽量保证仓位与策略信号一致"""
def __init__(self, trader: XtTrader):
"""初始化实盘交易管理器
Args:
trader: XtTrader实例
position_manager: StrategyPositionManager实例
"""
# 使用传入的trader实例使用弱引用避免循环引用
self.trader = trader
# 初始化锁
self._lock = threading.Lock()
# 添加停止事件
self._stop_event = threading.Event()
self.scheduler_thread = None
# 启动调度器
self._start_scheduler()
logger.info("实盘交易管理器初始化完成")
def stop_scheduler(self) -> None:
"""停止调度器线程
正确的停止流程:
1. 设置停止事件通知线程退出
2. 等待线程自然结束
3. 清理线程引用
"""
if self.scheduler_thread and self.scheduler_thread.is_alive():
logger.info("正在停止调度器...")
# 设置停止事件
self._stop_event.set()
# 等待线程结束,设置超时避免无限等待
try:
self.scheduler_thread.join(timeout=5.0)
if self.scheduler_thread.is_alive():
logger.warning("调度器线程未能在超时时间内停止")
else:
logger.info("调度器已成功停止")
except Exception as e:
logger.error(f"等待调度器线程结束时发生异常: {e}")
self.scheduler_thread = None
else:
logger.info("调度器未运行或已停止")
def start_scheduler(self) -> None:
"""启动调度器(公共方法,支持重启)"""
if self.scheduler_thread and self.scheduler_thread.is_alive():
logger.warning("调度器已在运行,请先停止")
return
# 重置停止事件
self._stop_event.clear()
self._start_scheduler()
def __del__(self) -> None:
"""析构方法,确保资源清理"""
try:
self.stop_scheduler()
except Exception as e:
# 析构方法中的异常不应该向上传播
pass
def _start_scheduler(self) -> None:
"""内部方法:启动调度器线程"""
# 每日定时清理(增加配置校验)
if hasattr(Config, "CLEAN_ORDERS_TIME"):
try:
schedule.every().day.at(Config.CLEAN_ORDERS_TIME).do(
run_threaded(self.clean_expired_orders)
)
schedule.every().day.at(Config.CLEAN_ORDERS_TIME).do(
run_threaded(self.update_closeable_amount)
)
except Exception as e:
logger.error(f"清理任务配置错误: {e}")
else:
logger.error("CLEAN_ORDERS_TIME 未配置")
# 每日定时保存策略数据(增加配置校验)
if hasattr(Config, "STRATEGY_SAVE_TIME"):
try:
schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(
run_threaded(self.save_strategy_data)
)
except Exception as e:
logger.error(f"保存策略数据任务配置错误: {e}")
else:
logger.error("STRATEGY_SAVE_TIME 未配置")
# 检查限价单是否超时
if hasattr(Config, "RTM_ORDER_TIMEOUT"):
try:
schedule.every(10).seconds.do(
run_threaded(self.check_pending_orders)
)
except Exception as e:
logger.error(f"限价单超时检查任务配置错误: {e}")
else:
logger.error("RTM_ORDER_TIMEOUT 未配置")
# 启动高精度调度线程
def run_scheduler():
"""调度器线程主循环,响应停止事件"""
logger.info("调度器线程开始运行")
while not self._stop_event.is_set():
try:
schedule.run_pending()
# 使用wait代替sleep这样可以立即响应停止事件
self._stop_event.wait(timeout=1.0)
except Exception as e:
logger.error(f"调度器异常: {e}", exc_info=True)
# 发生错误时也要检查停止事件
if not self._stop_event.is_set():
self._stop_event.wait(timeout=10.0)
logger.info("调度器线程收到停止信号,正在退出")
self.scheduler_thread = threading.Thread(
target=run_scheduler, name="SchedulerThread"
)
self.scheduler_thread.daemon = True # 设为守护线程随主进程退出
self.scheduler_thread.start()
logger.info("交易管理器调度器已启动")
def place_order(
self, strategy_name, code, direction, amount, price, order_type=ORDER_TYPE_LIMIT
):
"""下单接口,处理买入/卖出请求
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向 'buy''sell'
amount: 交易数量
price: 交易价格市价单时可为0
order_type: 订单类型,'limit'表示限价单,'market'表示市价单,默认为'limit'
Returns:
dict: 包含订单ID和状态信息
"""
if not strategy_name or not code or not direction:
logger.error("下单参数不完整")
return {"success": False, "error": "参数不完整"}
# 检查交易方向
if direction not in [ORDER_DIRECTION_BUY, ORDER_DIRECTION_SELL]:
logger.error(f"无效的交易方向: {direction}")
return {"success": False, "error": "无效的交易方向"}
# 检查订单类型
if order_type not in [ORDER_TYPE_LIMIT, ORDER_TYPE_MARKET]:
logger.error(f"无效的订单类型: {order_type}")
return {
"success": False,
"error": "无效的订单类型,必须是'limit''market'",
}
try:
# 对于限价单,检查资金和持仓是否足够
if order_type == ORDER_TYPE_LIMIT and not self._check_order_feasibility(
code, direction, amount, price
):
logger.warning(
f"资金或持仓不足,忽略订单: {direction} {code} {amount}{price}"
)
return {"success": False, "error": "资金或持仓不足"}
# 下单
logger.info(
f"准备{direction}订单: 代码={code}, 数量={amount}, 价格={price}, 订单类型={order_type}"
)
if direction == ORDER_DIRECTION_BUY:
result = self.trader.buy(code, price, amount, order_type)
else:
result = self.trader.sell(code, price, amount, order_type)
order_id = result.get("order_id")
if not order_id:
logger.error(f"下单失败: {result}")
return {"success": False, "error": "下单失败"}
# 添加未完成委托到position_manager
position_manager = self.trader.get_position_manager(strategy_name)
position_manager.add_pending_order(order_id, code, price, amount, direction, order_type)
logger.info(
f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}"
)
return {"success": True, "order_id": order_id}
except Exception as e:
logger.error(f"下单过程发生异常: {str(e)}")
return {"success": False, "error": f"下单异常: {str(e)}"}
def _handle_timed_out_limit_order(self, order_id: int, order_info: dict, strategy_name: str, duration: float) -> None:
"""处理超时的限价单,尝试撤销并进行市价补单。
Args:
order_id: 订单ID
order_info: 订单信息
strategy_name: 策略名称
duration: 订单持续时间(秒)
"""
self.trader.cancel(order_id)
# 使用轮询等待撤销完成,并设置超时
start_time = datetime.now()
cancel_success = False
while (datetime.now() - start_time).total_seconds() < Config.RTM_CANCEL_TIMEOUT:
order = self.trader.get_order(order_id)
if order and order.get('order_status') == xtconstant.ORDER_CANCELED:
logger.info(f"限价单已撤销: ID={order_id}, 策略={strategy_name}")
cancel_success = True
break
time.sleep(0.5) # 每0.5秒查询一次
if cancel_success:
self.trader.handle_order_update(order_id, strategy_name)
logger.info(f"检测到限价单被撤销,准备进行市价单补单: ID={order_id}")
self.trader.place_market_order_for_remainder(order_info, strategy_name)
else:
logger.error(f"限价单撤销超时或失败: ID={order_id}, 策略={strategy_name}")
self.trader.handle_order_update(order_id, strategy_name) # 即使撤销失败,也更新一下状态
def _process_pending_order(self, order_id: int, order_info: dict, strategy_name: str) -> None:
"""处理单个未完成订单的逻辑,包括更新状态、检查超时、撤销和补单。"""
try:
# 处理订单更新, 更新订单状态, 更新持仓, 使position manager中的订单为最新状态
self.trader.handle_order_update(order_id, strategy_name)
# 重新获取更新后的订单信息
position_manager = self.trader.get_position_manager(strategy_name)
order_info = position_manager.get_pending_order(order_id)
if not order_info:
logger.warning(f"订单信息不存在, 可能已完成或撤单, 或下单失败: {order_id}")
return
# 如果订单类型为限价单,则检查是否超时
if order_info.order_type == ORDER_TYPE_LIMIT:
# 检查是否配置为自动撤单并挂市价单补单
if not Config.RTM_AUTO_MARKET_ORDER:
return
now = datetime.now()
duration = (now - order_info.created_time).total_seconds()
if duration > Config.RTM_ORDER_TIMEOUT:
# 将处理超时限价单的逻辑委托给新的私有方法
logger.warning(f"限价单超时: ID={order_id}, 策略={strategy_name}, 持续时间={duration}秒, 订单创建时间: {order_info.created_time} 当前时间: {now}")
self._handle_timed_out_limit_order(order_id, order_info, strategy_name, duration)
else:
# 市价单未完成,更新状态
logger.info(f"市价单未完成, 更新市价单: ID={order_id}, 策略={strategy_name}, 订单类型={order_info.order_type}")
self.trader.handle_order_update(order_id, strategy_name)
except Exception as e:
# 更细粒度的异常处理,捕获处理单个订单时的异常
logger.error(f"处理订单 {order_id} 时发生异常: {str(e)}", exc_info=True)
def check_pending_orders(self):
"""检查限价单是否超时,使用锁避免重复执行"""
# 尝试获取锁,如果获取不到则说明上一个任务还在执行,直接返回
if not self._lock.acquire(blocking=False):
logger.info("check_pending_orders: 上一个任务仍在执行,跳过本次执行。")
return
try:
# 获取所有未完成订单
position_managers = self.trader.get_all_position_managers()
for strategy_name, position_manager in position_managers.items():
# 转换为列表避免在迭代过程中修改
pending_orders_list = list(position_manager.get_pending_orders().items())
for order_id, order_info in pending_orders_list:
# 将单个订单的处理逻辑委托给新的私有方法
self._process_pending_order(order_id, order_info, strategy_name)
except Exception as e:
# 顶层异常处理捕获获取position managers或遍历时的异常
logger.error(f"检查限价单是否超时时发生异常: {str(e)}", exc_info=True)
finally:
# 确保在任何情况下都释放锁
if self._lock.locked():
self._lock.release()
def _check_order_feasibility(self, code, direction, amount, price):
"""检查订单是否可行(资金或持仓是否足够)
Args:
code: 股票代码
direction: 交易方向
amount: 交易数量
price: 交易价格
Returns:
bool: 订单是否可行
"""
try:
if direction == ORDER_DIRECTION_BUY:
# 检查资金是否足够
balance = self.trader.get_balance()
if not balance:
logger.error("获取账户余额失败")
return False
# 计算所需资金加上3%的手续费作为缓冲)
required_cash = price * amount * 1.03
available_cash = balance.get("cash", 0)
if required_cash > available_cash:
logger.warning(
f"资金不足: 需要 {required_cash:.2f}, 可用 {available_cash:.2f}"
)
return False
return True
elif direction == "sell":
# 检查持仓是否足够
position = self.trader.get_position(code)
if not position:
logger.warning(f"没有持仓: {code}")
return False
available_volume = position.get("can_use_volume", 0)
if amount > available_volume:
logger.warning(
f"可用持仓不足: 需要 {amount}, 可用 {available_volume}"
)
return False
return True
return False
except Exception as e:
logger.error(f"检查订单可行性时发生异常: {str(e)}")
return False
def clean_expired_orders(self):
"""清理过期的未完成订单"""
try:
logger.info("开始清理过期未完成订单...")
# 获取所有未完成订单
position_managers = self.trader.get_all_position_managers()
# 遍历所有持仓管理器
for position_manager in position_managers.values():
# 获取所有未完成订单
pending_orders = position_manager.get_pending_orders()
# 遍历未完成订单,检查是否有无法成交的订单(如跌停无法卖出)
for order_id, order_info in pending_orders.items():
try:
logger.error(
f"清理无法成交订单(理论不应该有): ID={order_id}, 代码={order_info.code}, 方向={order_info.direction}, "
f"数量={order_info.amount}, 已成交数量={order_info.filled}"
)
except Exception as e:
logger.error(f"清理订单 {order_id} 时出错: {str(e)}")
position_manager.clear_pending_orders()
logger.info("过期未完成订单清理完毕")
except Exception as e:
logger.error(f"清理过期未完成订单时发生异常: {str(e)}")
def update_closeable_amount(self):
"""更新可卖持仓"""
try:
logger.info("开始更新可卖持仓...")
# 获取所有持仓
position_managers = self.trader.get_all_position_managers()
# 遍历持仓,更新可卖持仓
for position_manager in position_managers.values():
position_manager.update_closeable_amount()
logger.info("可卖持仓更新完毕")
except Exception as e:
logger.error(f"更新可卖持仓时发生异常: {str(e)}")
def save_strategy_data(self):
"""保存策略数据"""
try:
logger.info("开始保存策略数据...")
# 获取所有持仓管理器
position_managers = self.trader.get_all_position_managers()
# 遍历所有持仓管理器,保存策略数据
for position_manager in position_managers.values():
position_manager.save_data()
logger.info("策略数据保存完毕")
except Exception as e:
logger.error(f"保存策略数据时发生异常: {str(e)}")