real_trader/src/real/real_trader_manager.py
2025-05-10 21:33:57 +08:00

414 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import threading
import schedule
from xtquant import xtconstant
from logger_config import get_logger
from config import Config
import json
from typing import Dict
from position_manager import PositionManager
from functools import wraps
from trade_constants import (
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
ORDER_STATUS_PENDING,
ORDER_STATUS_FAILED,
ORDER_STATUS_PARTIAL,
ORDER_DIRECTION_BUY,
ORDER_DIRECTION_SELL,
ORDER_TYPE_LIMIT,
ORDER_TYPE_MARKET,
)
from real.xt_trader import XtTrader
# 获取日志记录器
logger = get_logger("real_trader_manager")
def run_threaded(func):
@wraps(func)
def wrapper(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return wrapper
class RealTraderManager:
"""实盘交易管理器,处理实盘下单失败、部分成交等问题,尽量保证仓位与策略信号一致"""
def __init__(self, trader: XtTrader):
"""初始化实盘交易管理器
Args:
trader: XtTrader实例
position_manager: StrategyPositionManager实例
"""
# 使用传入的trader和position_manager实例
self.trader = trader
# 启动调度器
self._start_scheduler()
logger.info("实盘交易管理器初始化完成")
def _start_scheduler(self):
# 每日定时清理(增加配置校验)
if hasattr(Config, "STRATEGY_SAVE_TIME"):
try:
schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(
run_threaded(self.clean_expired_orders)
)
schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(
run_threaded(self.update_closeable_amount)
)
except Exception as e:
logger.error(f"清理任务配置错误: {e}")
else:
logger.error("STRATEGY_SAVE_TIME 未配置")
# 启动高精度调度线程
def run_scheduler():
while True:
try:
schedule.run_pending()
time.sleep(1) # 将休眠时间缩短至1秒提高精度
except Exception as e:
logger.error(f"调度器异常: {e}", exc_info=True)
time.sleep(10) # 发生错误时延长休眠避免日志风暴
scheduler_thread = threading.Thread(
target=run_scheduler, name="SchedulerThread"
)
scheduler_thread.daemon = True # 设为守护线程随主进程退出
scheduler_thread.start()
logger.info("交易管理器调度器已启动")
def place_order(
self, strategy_name, code, direction, amount, price, order_type=ORDER_TYPE_LIMIT
):
"""下单接口,处理买入/卖出请求
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向 'buy''sell'
amount: 交易数量
price: 交易价格市价单时可为0
order_type: 订单类型,'limit'表示限价单,'market'表示市价单,默认为'limit'
Returns:
dict: 包含订单ID和状态信息
"""
if not strategy_name or not code or not direction:
logger.error("下单参数不完整")
return {"success": False, "error": "参数不完整"}
# 检查交易方向
if direction not in [ORDER_DIRECTION_BUY, ORDER_DIRECTION_SELL]:
logger.error(f"无效的交易方向: {direction}")
return {"success": False, "error": "无效的交易方向"}
# 检查订单类型
if order_type not in [ORDER_TYPE_LIMIT, ORDER_TYPE_MARKET]:
logger.error(f"无效的订单类型: {order_type}")
return {
"success": False,
"error": "无效的订单类型,必须是'limit''market'",
}
try:
# 对于限价单,检查资金和持仓是否足够
if order_type == ORDER_TYPE_LIMIT and not self._check_order_feasibility(
code, direction, amount, price
):
logger.warning(
f"资金或持仓不足,忽略订单: {direction} {code} {amount}{price}"
)
return {"success": False, "error": "资金或持仓不足"}
# 下单
logger.info(
f"准备{direction}订单: 代码={code}, 数量={amount}, 价格={price}, 订单类型={order_type}"
)
if direction == ORDER_DIRECTION_BUY:
result = self.trader.buy(code, price, amount, order_type)
else:
result = self.trader.sell(code, price, amount, order_type)
order_id = result.get("order_id")
if not order_id:
logger.error(f"下单失败: {result}")
return {"success": False, "error": "下单失败"}
# 添加未完成委托到position_manager
position_manager = self.trader.get_position_manager(strategy_name)
position_manager.add_pending_order(order_id, code, price, amount, direction, order_type)
logger.info(
f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}"
)
# 立即更新一次订单状态
self.check_and_retry(order_id, strategy_name, code, direction, amount, 1)
return {"success": True, "order_id": order_id}
except Exception as e:
logger.error(f"下单过程发生异常: {str(e)}")
return {"success": False, "error": f"下单异常: {str(e)}"}
def _place_market_order_for_remainder(self, strategy_name, code, direction, left_amount):
"""对未完成的订单进行补单,下市价单
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向
left_amount: 剩余数量
available_retry_count: 重试次数
Returns:
bool: 补单是否成功
"""
if left_amount <= 0:
logger.info(f"无需补单,剩余数量为零或负数: {left_amount}")
return True
logger.info(f"限价单补单: 市价单, 剩余数量={left_amount}")
new_order = self.place_order(strategy_name, code, direction, left_amount, 0, ORDER_TYPE_MARKET)
new_order_id = new_order.get("order_id")
if new_order.get("success") and new_order_id:
# 立即检查新市价单
self.check_and_retry(new_order_id, strategy_name, code, direction, left_amount)
return True
else:
logger.error(f"补单失败: {new_order}")
return False
def check_and_retry(self, order_id, strategy_name, code, direction, amount, available_retry_count=1):
position_manager = self.trader.get_position_manager(strategy_name)
order_info = position_manager.get_pending_order(order_id)
filled = order_info.filled
target_amount = order_info.amount
if not order_info:
logger.warning(f"订单信息不存在: ID={order_id}")
return
order_type = order_info.order_type
status = self._update_order_status(order_id, strategy_name)
if order_type == ORDER_TYPE_MARKET:
# 市价单,只递归检查
if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]:
logger.info(f"市价单未完成1分钟后继续检查: ID={order_id}, 状态={status}")
threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name, code, direction, amount)).start()
else:
logger.info(f"市价单已完成: ID={order_id}, 状态={status}")
elif order_type == ORDER_TYPE_LIMIT:
# 限价单,未完成则撤单补市价单
if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]:
if available_retry_count > 0:
logger.info(f"限价单未完成1分钟后继续检查: ID={order_id}, 状态={status}")
threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name, code, direction, amount, 0)).start()
else:
# 尝试撤单
try:
logger.info(f"限价单未完成,尝试撤单: ID={order_id}, 状态={status}")
self.trader.cancel(order_id)
position_manager.update_order_status(order_id, 0, ORDER_STATUS_CANCELLED)
except Exception as e:
logger.error(f"撤单失败: order_id={order_id}, error={str(e)}")
# 计算剩余数量, 如果剩余数量大于0, 则补单
left_amount = target_amount - filled
self._place_market_order_for_remainder(strategy_name, code, direction, left_amount)
else:
logger.info(f"限价单已完成: ID={order_id}, 状态={status}")
else:
logger.warning(f"未知订单类型: ID={order_id}, type={order_type}")
def _update_order_status(self, order_id, strategy_name):
"""更新单个订单状态
Args:
order_id: 订单ID
"""
# 检查订单是否存在
position_manager = self.trader.get_position_manager(strategy_name)
order_info = position_manager.get_pending_order(order_id)
if not order_info:
return None
try:
# 获取订单之前的状态,用于判断是否发生变化
previous_status = order_info.status
previous_volume = order_info.filled
updated_order = self.trader.get_order(order_id)
# 根据委托状态更新订单状态
if updated_order["order_status"] == xtconstant.ORDER_SUCCEEDED:
# 全部成交
filled = updated_order["traded_volume"]
position_manager.update_order_status(order_id, filled, ORDER_STATUS_COMPLETED)
# 更新持仓
position_manager.update_position(
order_info.code,
order_info.direction,
filled,
)
return ORDER_STATUS_COMPLETED
elif updated_order["order_status"] == xtconstant.ORDER_PART_SUCC:
# 部分成交
filled = updated_order.get("traded_volume", 0)
position_manager.update_order_status(
order_id, filled, ORDER_STATUS_PARTIAL
)
# 如果成交量有变化,记录日志并更新持仓
if filled != previous_volume:
target_amount = order_info.amount
logger.info(
f"订单部分成交更新: ID={order_id}, 代码={order_info.code}, 目标数量={target_amount}, 已成交数量={filled}, 剩余数量={target_amount - filled}"
)
# 更新持仓(仅更新已成交部分)
if filled > 0:
position_manager.update_position(
order_info.code,
order_info.direction,
filled,
)
return ORDER_STATUS_PARTIAL
elif updated_order["order_status"] in [
xtconstant.ORDER_CANCELED,
xtconstant.ORDER_JUNK,
]:
# 已撤单或废单
position_manager.update_order_status(
order_id,
0,
ORDER_STATUS_CANCELLED
)
return ORDER_STATUS_CANCELLED
elif updated_order["order_status"] in [
xtconstant.ORDER_UNREPORTED,
xtconstant.ORDER_WAIT_REPORTING,
xtconstant.ORDER_REPORTED,
]:
# 未报、待报、已报
if previous_status != ORDER_STATUS_PENDING:
position_manager.update_order_status(order_id, 0, ORDER_STATUS_PENDING)
return ORDER_STATUS_PENDING
except Exception as e:
logger.error(f"更新订单状态时发生异常: order_id={order_id}, error={str(e)}")
return None
def _check_order_feasibility(self, code, direction, amount, price):
"""检查订单是否可行(资金或持仓是否足够)
Args:
code: 股票代码
direction: 交易方向
amount: 交易数量
price: 交易价格
Returns:
bool: 订单是否可行
"""
try:
if direction == ORDER_DIRECTION_BUY:
# 检查资金是否足够
balance = self.trader.get_balance()
if not balance:
logger.error("获取账户余额失败")
return False
# 计算所需资金加上3%的手续费作为缓冲)
required_cash = price * amount * 1.03
available_cash = balance.get("cash", 0) - balance.get("frozen_cash", 0)
if required_cash > available_cash:
logger.warning(
f"资金不足: 需要 {required_cash:.2f}, 可用 {available_cash:.2f}"
)
return False
return True
elif direction == "sell":
# 检查持仓是否足够
position = self.trader.get_position(code)
if not position:
logger.warning(f"没有持仓: {code}")
return False
available_volume = position.get("can_use_volume", 0)
if amount > available_volume:
logger.warning(
f"可用持仓不足: 需要 {amount}, 可用 {available_volume}"
)
return False
return True
return False
except Exception as e:
logger.error(f"检查订单可行性时发生异常: {str(e)}")
return False
def clean_expired_orders(self):
"""清理过期的未完成订单"""
try:
logger.info("开始清理过期未完成订单...")
# 获取所有未完成订单
position_managers = self.trader.get_all_position_managers()
# 遍历所有持仓管理器
for position_manager in position_managers.values():
# 获取所有未完成订单
pending_orders = position_manager.get_pending_orders()
# 遍历未完成订单,检查是否有无法成交的订单(如跌停无法卖出)
for order_id, order_info in pending_orders.items():
try:
logger.warning(
f"清理无法成交订单: ID={order_id}, 代码={order_info.code}, 方向={order_info.direction}, "
f"数量={order_info.amount}, 已成交数量={order_info.filled}"
)
except Exception as e:
logger.error(f"清理订单 {order_id} 时出错: {str(e)}")
position_manager.clear_pending_orders()
logger.info("过期未完成订单清理完毕")
except Exception as e:
logger.error(f"清理过期未完成订单时发生异常: {str(e)}")
def update_closeable_amount(self):
"""更新可卖持仓"""
try:
logger.info("开始更新可卖持仓...")
# 获取所有持仓
position_managers = self.trader.get_all_position_managers()
# 遍历持仓,更新可卖持仓
for position_manager in position_managers.values():
position_manager.update_closeable_amount()
logger.info("可卖持仓更新完毕")
except Exception as e:
logger.error(f"更新可卖持仓时发生异常: {str(e)}")