real_trader/src/real/real_trader_manager.py
2025-05-15 10:52:51 +08:00

337 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import threading
import schedule
import datetime
from xtquant import xtconstant
from logger_config import get_logger
from config import Config
from functools import wraps
from trade_constants import (
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
ORDER_STATUS_PENDING,
ORDER_STATUS_PARTIAL,
ORDER_DIRECTION_BUY,
ORDER_DIRECTION_SELL,
ORDER_TYPE_LIMIT,
ORDER_TYPE_MARKET,
)
from real.xt_trader import XtTrader
# 获取日志记录器
logger = get_logger("real_trader_manager")
def run_threaded(func):
@wraps(func)
def wrapper(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return wrapper
class RealTraderManager:
"""实盘交易管理器,处理实盘下单失败、部分成交等问题,尽量保证仓位与策略信号一致"""
def __init__(self, trader: XtTrader):
"""初始化实盘交易管理器
Args:
trader: XtTrader实例
position_manager: StrategyPositionManager实例
"""
# 使用传入的trader实例使用弱引用避免循环引用
self.trader = trader
# 启动调度器
self._start_scheduler()
logger.info("实盘交易管理器初始化完成")
def _start_scheduler(self):
# 每日定时清理(增加配置校验)
if hasattr(Config, "CLEAN_ORDERS_TIME"):
try:
schedule.every().day.at(Config.CLEAN_ORDERS_TIME).do(
run_threaded(self.clean_expired_orders)
)
schedule.every().day.at(Config.CLEAN_ORDERS_TIME).do(
run_threaded(self.update_closeable_amount)
)
except Exception as e:
logger.error(f"清理任务配置错误: {e}")
else:
logger.error("CLEAN_ORDERS_TIME 未配置")
# 每日定时保存策略数据(增加配置校验)
if hasattr(Config, "STRATEGY_SAVE_TIME"):
try:
schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(
run_threaded(self.save_strategy_data)
)
except Exception as e:
logger.error(f"保存策略数据任务配置错误: {e}")
else:
logger.error("STRATEGY_SAVE_TIME 未配置")
# 检查限价单是否超时
if hasattr(Config, "RTM_ORDER_TIMEOUT"):
try:
schedule.every(10).seconds.do(
run_threaded(self.check_limit_orders)
)
except Exception as e:
logger.error(f"限价单超时检查任务配置错误: {e}")
else:
logger.error("RTM_ORDER_TIMEOUT 未配置")
# 启动高精度调度线程
def run_scheduler():
while True:
try:
schedule.run_pending()
time.sleep(1) # 将休眠时间缩短至1秒提高精度
except Exception as e:
logger.error(f"调度器异常: {e}", exc_info=True)
time.sleep(10) # 发生错误时延长休眠避免日志风暴
scheduler_thread = threading.Thread(
target=run_scheduler, name="SchedulerThread"
)
scheduler_thread.daemon = True # 设为守护线程随主进程退出
scheduler_thread.start()
logger.info("交易管理器调度器已启动")
def place_order(
self, strategy_name, code, direction, amount, price, order_type=ORDER_TYPE_LIMIT
):
"""下单接口,处理买入/卖出请求
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向 'buy''sell'
amount: 交易数量
price: 交易价格市价单时可为0
order_type: 订单类型,'limit'表示限价单,'market'表示市价单,默认为'limit'
Returns:
dict: 包含订单ID和状态信息
"""
if not strategy_name or not code or not direction:
logger.error("下单参数不完整")
return {"success": False, "error": "参数不完整"}
# 检查交易方向
if direction not in [ORDER_DIRECTION_BUY, ORDER_DIRECTION_SELL]:
logger.error(f"无效的交易方向: {direction}")
return {"success": False, "error": "无效的交易方向"}
# 检查订单类型
if order_type not in [ORDER_TYPE_LIMIT, ORDER_TYPE_MARKET]:
logger.error(f"无效的订单类型: {order_type}")
return {
"success": False,
"error": "无效的订单类型,必须是'limit''market'",
}
try:
# 对于限价单,检查资金和持仓是否足够
if order_type == ORDER_TYPE_LIMIT and not self._check_order_feasibility(
code, direction, amount, price
):
logger.warning(
f"资金或持仓不足,忽略订单: {direction} {code} {amount}{price}"
)
return {"success": False, "error": "资金或持仓不足"}
# 下单
logger.info(
f"准备{direction}订单: 代码={code}, 数量={amount}, 价格={price}, 订单类型={order_type}"
)
if direction == ORDER_DIRECTION_BUY:
result = self.trader.buy(code, price, amount, order_type)
else:
result = self.trader.sell(code, price, amount, order_type)
order_id = result.get("order_id")
if not order_id:
logger.error(f"下单失败: {result}")
return {"success": False, "error": "下单失败"}
# 添加未完成委托到position_manager
position_manager = self.trader.get_position_manager(strategy_name)
position_manager.add_pending_order(order_id, code, price, amount, direction, order_type)
logger.info(
f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}"
)
return {"success": True, "order_id": order_id}
except Exception as e:
logger.error(f"下单过程发生异常: {str(e)}")
return {"success": False, "error": f"下单异常: {str(e)}"}
def check_limit_orders(self):
"""检查限价单是否超时"""
try:
# 获取所有未完成订单
position_managers = self.trader.get_all_position_managers()
for strategy_name, position_manager in position_managers.items():
pending_orders = position_manager.get_pending_orders()
for order_id, order_info in pending_orders.items():
# 如果订单类型为限价单,则检查是否超时
if order_info.order_type == ORDER_TYPE_LIMIT:
# 将datetime对象转换为时间戳进行比较
if isinstance(order_info.created_time, datetime.datetime):
created_timestamp = order_info.created_time.timestamp()
duration = time.time() - created_timestamp
else:
duration = time.time() - order_info.created_time
if duration > Config.RTM_ORDER_TIMEOUT:
logger.info(f"限价单超时: ID={order_id}, 策略={strategy_name}, 持续时间={duration}")
self.trader.cancel(order_id)
time.sleep(1)
order = self.trader.get_order(order_id)
if order['order_status'] == xtconstant.ORDER_CANCELED:
logger.info(f"限价单已撤销: ID={order_id}, 策略={strategy_name}")
self.trader.handle_order_update(order_id, strategy_name)
logger.info(f"检测到限价单被撤销,准备进行市价单补单: ID={order_id}")
self.trader.place_market_order_for_remainder(order_info, strategy_name)
else:
logger.warning(f"限价单撤销失败: ID={order_id}, 策略={strategy_name}")
else:
logger.info(f"限价单未超时: ID={order_id}, 策略={strategy_name}, 持续时间={duration}")
logger.info("限价单检查完毕")
except Exception as e:
logger.error(f"检查限价单是否超时时发生异常: {str(e)}")
def _check_order_feasibility(self, code, direction, amount, price):
"""检查订单是否可行(资金或持仓是否足够)
Args:
code: 股票代码
direction: 交易方向
amount: 交易数量
price: 交易价格
Returns:
bool: 订单是否可行
"""
try:
if direction == ORDER_DIRECTION_BUY:
# 检查资金是否足够
balance = self.trader.get_balance()
if not balance:
logger.error("获取账户余额失败")
return False
# 计算所需资金加上3%的手续费作为缓冲)
required_cash = price * amount * 1.03
available_cash = balance.get("cash", 0) - balance.get("frozen_cash", 0)
if required_cash > available_cash:
logger.warning(
f"资金不足: 需要 {required_cash:.2f}, 可用 {available_cash:.2f}"
)
return False
return True
elif direction == "sell":
# 检查持仓是否足够
position = self.trader.get_position(code)
if not position:
logger.warning(f"没有持仓: {code}")
return False
available_volume = position.get("can_use_volume", 0)
if amount > available_volume:
logger.warning(
f"可用持仓不足: 需要 {amount}, 可用 {available_volume}"
)
return False
return True
return False
except Exception as e:
logger.error(f"检查订单可行性时发生异常: {str(e)}")
return False
def clean_expired_orders(self):
"""清理过期的未完成订单"""
try:
logger.info("开始清理过期未完成订单...")
# 获取所有未完成订单
position_managers = self.trader.get_all_position_managers()
# 遍历所有持仓管理器
for position_manager in position_managers.values():
# 获取所有未完成订单
pending_orders = position_manager.get_pending_orders()
# 遍历未完成订单,检查是否有无法成交的订单(如跌停无法卖出)
for order_id, order_info in pending_orders.items():
try:
logger.error(
f"清理无法成交订单(理论不应该有): ID={order_id}, 代码={order_info.code}, 方向={order_info.direction}, "
f"数量={order_info.amount}, 已成交数量={order_info.filled}"
)
except Exception as e:
logger.error(f"清理订单 {order_id} 时出错: {str(e)}")
position_manager.clear_pending_orders()
logger.info("过期未完成订单清理完毕")
except Exception as e:
logger.error(f"清理过期未完成订单时发生异常: {str(e)}")
def update_closeable_amount(self):
"""更新可卖持仓"""
try:
logger.info("开始更新可卖持仓...")
# 获取所有持仓
position_managers = self.trader.get_all_position_managers()
# 遍历持仓,更新可卖持仓
for position_manager in position_managers.values():
position_manager.update_closeable_amount()
logger.info("可卖持仓更新完毕")
except Exception as e:
logger.error(f"更新可卖持仓时发生异常: {str(e)}")
def save_strategy_data(self):
"""保存策略数据"""
try:
logger.info("开始保存策略数据...")
# 获取所有持仓管理器
position_managers = self.trader.get_all_position_managers()
# 遍历所有持仓管理器,保存策略数据
for position_manager in position_managers.values():
position_manager.save_data()
logger.info("策略数据保存完毕")
except Exception as e:
logger.error(f"保存策略数据时发生异常: {str(e)}")