refactor: 检查订单更新

This commit is contained in:
zhiyong 2025-05-12 16:11:51 +08:00
parent 959a83d011
commit 9a292f1609
2 changed files with 196 additions and 125 deletions

View File

@ -1,12 +1,10 @@
import time import time
import threading import threading
import schedule import schedule
import weakref
from xtquant import xtconstant from xtquant import xtconstant
from logger_config import get_logger from logger_config import get_logger
from config import Config from config import Config
import json
from typing import Dict
from position_manager import PositionManager
from functools import wraps from functools import wraps
from trade_constants import ( from trade_constants import (
ORDER_STATUS_COMPLETED, ORDER_STATUS_COMPLETED,
@ -44,8 +42,8 @@ class RealTraderManager:
trader: XtTrader实例 trader: XtTrader实例
position_manager: StrategyPositionManager实例 position_manager: StrategyPositionManager实例
""" """
# 使用传入的trader和position_manager实例 # 使用传入的trader实例,使用弱引用避免循环引用
self.trader = trader self.trader = weakref.proxy(trader)
# 启动调度器 # 启动调度器
self._start_scheduler() self._start_scheduler()
@ -149,10 +147,11 @@ class RealTraderManager:
f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}" f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}"
) )
# 立即更新一次订单状态 # 设置60秒后检查订单状态
threading.Thread( threading.Timer(
target=self.check_and_retry, 60,
args=(order_id, strategy_name, code, direction, amount, 1), self.check_and_retry,
args=(order_id, strategy_name),
name=f"CheckOrder-{order_id}" name=f"CheckOrder-{order_id}"
).start() ).start()
@ -170,7 +169,6 @@ class RealTraderManager:
code: 股票代码 code: 股票代码
direction: 交易方向 direction: 交易方向
left_amount: 剩余数量 left_amount: 剩余数量
available_retry_count: 重试次数
Returns: Returns:
bool: 补单是否成功 bool: 补单是否成功
@ -183,142 +181,71 @@ class RealTraderManager:
new_order = self.place_order(strategy_name, code, direction, left_amount, 0, ORDER_TYPE_MARKET) new_order = self.place_order(strategy_name, code, direction, left_amount, 0, ORDER_TYPE_MARKET)
new_order_id = new_order.get("order_id") new_order_id = new_order.get("order_id")
if new_order.get("success") and new_order_id: if new_order.get("success") and new_order_id:
# 立即检查新市价单 # 订单已在place_order中设置了60秒后检查
threading.Thread(
target=self.check_and_retry,
args=(new_order_id, strategy_name, code, direction, left_amount),
name=f"CheckMarketOrder-{new_order_id}"
).start()
return True return True
else: else:
logger.error(f"补单失败: {new_order}") logger.error(f"补单失败: {new_order}")
return False return False
def check_and_retry(self, order_id, strategy_name, code, direction, amount, available_retry_count=1): def check_and_retry(self, order_id, strategy_name):
"""检查订单状态并处理未完成订单
Args:
order_id: 订单ID
strategy_name: 策略名称
"""
position_manager = self.trader.get_position_manager(strategy_name) position_manager = self.trader.get_position_manager(strategy_name)
order_info = position_manager.get_pending_order(order_id) order_info = position_manager.get_pending_order(order_id)
if not order_info: if not order_info:
logger.warning(f"订单信息不存在: ID={order_id}") logger.warning(f"订单信息不存在, 可能全部完成或者撤单: ID={order_id}")
return return
filled = order_info.filled
target_amount = order_info.amount
order_type = order_info.order_type order_type = order_info.order_type
status = self._update_order_status(order_id, strategy_name)
# 使用trader的handle_order_update方法更新订单状态
status = self.trader.handle_order_update(order_id, strategy_name)
# 如果状态为None说明处理失败则直接返回
if status is None:
logger.warning(f"订单状态更新失败: ID={order_id}")
return
# 重新获取订单信息,因为可能已经被更新
order_info = position_manager.get_pending_order(order_id)
if not order_info:
logger.info(f"订单已完成: ID={order_id}")
return
if order_type == ORDER_TYPE_MARKET: if order_type == ORDER_TYPE_MARKET:
# 市价单,只递归检查 # 市价单,如果未完成则继续检查
if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]: if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]:
logger.info(f"市价单未完成1分钟后继续检查: ID={order_id}, 状态={status}") logger.info(f"市价单未完成,60秒后继续检查: ID={order_id}, 状态={status}")
threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name, code, direction, amount)).start() threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name)).start()
else: else:
logger.info(f"市价单已完成: ID={order_id}, 状态={status}") logger.info(f"市价单已完成: ID={order_id}, 状态={status}")
elif order_type == ORDER_TYPE_LIMIT: elif order_type == ORDER_TYPE_LIMIT:
# 限价单,未完成则撤单补市价单 filled = order_info.filled
if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL] or status is None: target_amount = order_info.amount
if available_retry_count > 0: left_amount = target_amount - filled
logger.info(f"限价单未完成1分钟后继续检查: ID={order_id}, 状态={status}")
threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name, code, direction, amount, 0)).start()
else:
# 尝试撤单
try:
logger.info(f"限价单未完成,尝试撤单: ID={order_id}, 状态={status}")
self.trader.cancel(order_id)
position_manager.update_order_status(order_id, 0, ORDER_STATUS_CANCELLED)
except Exception as e:
logger.error(f"撤单失败: order_id={order_id}, error={str(e)}")
# 计算剩余数量, 如果剩余数量大于0, 则补单 # 限价单这是60秒后的检查如果未完成则撤单补市价单
left_amount = target_amount - filled if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]:
self._place_market_order_for_remainder(strategy_name, code, direction, left_amount) # 尝试撤单
try:
logger.info(f"限价单60秒后仍未完成尝试撤单: ID={order_id}, 状态={status}")
self.trader.cancel(order_id)
position_manager.update_order_status(order_id, filled, ORDER_STATUS_CANCELLED)
except Exception as e:
logger.error(f"撤单失败: order_id={order_id}, error={str(e)}")
# 计算剩余数量, 如果剩余数量大于0, 则补单
self._place_market_order_for_remainder(strategy_name, order_info.code, order_info.direction, left_amount)
else: else:
logger.info(f"限价单已完成: ID={order_id}, 状态={status}") logger.info(f"限价单已完成: ID={order_id}, 状态={status}")
else: else:
logger.warning(f"未知订单类型: ID={order_id}, type={order_type}") logger.warning(f"未知订单类型: ID={order_id}, type={order_type}")
def _update_order_status(self, order_id, strategy_name):
"""更新单个订单状态
Args:
order_id: 订单ID
"""
# 检查订单是否存在
position_manager = self.trader.get_position_manager(strategy_name)
order_info = position_manager.get_pending_order(order_id)
if not order_info:
return None
try:
# 获取订单之前的状态,用于判断是否发生变化
previous_status = order_info.status
previous_volume = order_info.filled
time.sleep(3)
updated_order = self.trader.get_order(order_id)
if not updated_order:
logger.error(f"获取订单失败, 订单可能正在报单: {order_id}")
return None
# 根据委托状态更新订单状态
if updated_order["order_status"] == xtconstant.ORDER_SUCCEEDED:
# 全部成交
filled = updated_order["traded_volume"]
position_manager.update_order_status(order_id, filled, ORDER_STATUS_COMPLETED)
# 更新持仓
position_manager.update_position(
order_info.code,
order_info.direction,
filled,
)
return ORDER_STATUS_COMPLETED
elif updated_order["order_status"] == xtconstant.ORDER_PART_SUCC:
# 部分成交
filled = updated_order.get("traded_volume", 0)
position_manager.update_order_status(
order_id, filled, ORDER_STATUS_PARTIAL
)
# 如果成交量有变化,记录日志并更新持仓
if filled != previous_volume:
target_amount = order_info.amount
logger.info(
f"订单部分成交更新: ID={order_id}, 代码={order_info.code}, 目标数量={target_amount}, 已成交数量={filled}, 剩余数量={target_amount - filled}"
)
# 更新持仓(仅更新已成交部分)
if filled > 0:
position_manager.update_position(
order_info.code,
order_info.direction,
filled,
)
return ORDER_STATUS_PARTIAL
elif updated_order["order_status"] in [
xtconstant.ORDER_CANCELED,
xtconstant.ORDER_JUNK,
]:
# 已撤单或废单
position_manager.update_order_status(
order_id,
0,
ORDER_STATUS_CANCELLED
)
return ORDER_STATUS_CANCELLED
elif updated_order["order_status"] in [
xtconstant.ORDER_UNREPORTED,
xtconstant.ORDER_WAIT_REPORTING,
xtconstant.ORDER_REPORTED,
]:
# 未报、待报、已报
if previous_status != ORDER_STATUS_PENDING:
position_manager.update_order_status(order_id, 0, ORDER_STATUS_PENDING)
return ORDER_STATUS_PENDING
except Exception as e:
logger.error(f"更新订单状态时发生异常: order_id={order_id}, error={str(e)}")
return None
def _check_order_feasibility(self, code, direction, amount, price): def _check_order_feasibility(self, code, direction, amount, price):
"""检查订单是否可行(资金或持仓是否足够) """检查订单是否可行(资金或持仓是否足够)

View File

@ -10,6 +10,13 @@ from xtquant import xtconstant
from xtquant.xtdata import get_instrument_detail from xtquant.xtdata import get_instrument_detail
from logger_config import get_logger from logger_config import get_logger
from utils.mail_util import MailUtil from utils.mail_util import MailUtil
from trade_constants import (
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
ORDER_STATUS_PENDING,
ORDER_STATUS_FAILED,
ORDER_STATUS_PARTIAL,
)
# 获取日志记录器 # 获取日志记录器
logger = get_logger('real_trader') logger = get_logger('real_trader')
@ -46,6 +53,27 @@ class MyXtQuantTraderCallback:
logger.info(f"委托回报: {order.stock_code} {order.order_status} {order.order_sysid}") logger.info(f"委托回报: {order.stock_code} {order.order_status} {order.order_sysid}")
def on_stock_trade(self, trade): def on_stock_trade(self, trade):
logger.info(f"成交变动: {trade.account_id} {trade.stock_code} {trade.order_id}") logger.info(f"成交变动: {trade.account_id} {trade.stock_code} {trade.order_id}")
# 当有成交回报时,立即更新订单状态
try:
if self.trader_instance:
order_id = trade.order_id
# 查找该订单所属的策略
strategy_name = None
# 避免循环引用不直接调用real_trader_manager的方法
# 而是通过trader的方法间接调用
for strategy, position_manager in self.trader_instance.get_all_position_managers().items():
if position_manager.get_pending_order(order_id):
strategy_name = strategy
break
if strategy_name:
# 使用trader的方法来处理订单状态更新避免直接调用real_trader_manager
self.trader_instance.handle_order_update(order_id, strategy_name)
logger.info(f"成交回报触发订单状态更新: ID={order_id}, 策略={strategy_name}")
except Exception as e:
logger.error(f"成交回报处理异常: {str(e)}", exc_info=True)
def on_stock_position(self, position): def on_stock_position(self, position):
logger.info(f"持仓变动: {position.stock_code} {position.volume}") logger.info(f"持仓变动: {position.stock_code} {position.volume}")
def on_order_error(self, order_error): def on_order_error(self, order_error):
@ -491,3 +519,119 @@ class XtTrader(BaseTrader):
else: else:
logger.warning("定期重连失败") logger.warning("定期重连失败")
self.last_reconnect_time = time.time() self.last_reconnect_time = time.time()
def handle_order_update(self, order_id, strategy_name):
"""处理订单状态更新,作为中间层避免循环引用
Args:
order_id: 订单ID
strategy_name: 策略名称
Returns:
str: 订单状态如ORDER_STATUS_COMPLETEDORDER_STATUS_PARTIAL等如果处理失败则返回None
"""
try:
# 获取订单信息
order = self.get_order(order_id)
if not order:
logger.warning(f"获取订单失败,无法更新状态: {order_id}")
return None
# 获取position_manager
position_manager = self.get_position_manager(strategy_name)
if not position_manager:
logger.warning(f"获取position_manager失败无法更新状态: {strategy_name}")
return None
# 获取订单信息
order_info = position_manager.get_pending_order(order_id)
if not order_info:
logger.warning(f"订单信息不存在,可能已完成: {order_id}")
return None
# 获取之前的状态和成交量
previous_status = order_info.status
previous_volume = order_info.filled
# 根据委托状态更新订单状态
if order["order_status"] == xtconstant.ORDER_SUCCEEDED:
# 全部成交
filled = order["traded_volume"]
# 如果订单已经标记为完成,则不重复更新
if previous_status == ORDER_STATUS_COMPLETED:
return ORDER_STATUS_COMPLETED
position_manager.update_order_status(order_id, filled, ORDER_STATUS_COMPLETED)
# 更新持仓(只更新新增的成交部分)
if filled > previous_volume:
new_filled = filled - previous_volume
position_manager.update_position(
order_info.code,
order_info.direction,
new_filled,
)
logger.info(f"订单全部成交: ID={order_id}, 代码={order_info.code}, 总成交量={filled}, 新增成交量={new_filled}")
return ORDER_STATUS_COMPLETED
elif order["order_status"] == xtconstant.ORDER_PART_SUCC:
# 部分成交
filled = order.get("traded_volume", 0)
position_manager.update_order_status(
order_id, filled, ORDER_STATUS_PARTIAL
)
# 如果成交量有变化,记录日志并更新持仓
if filled != previous_volume:
target_amount = order_info.amount
new_filled = filled - previous_volume
logger.info(
f"订单部分成交更新: ID={order_id}, 代码={order_info.code}, 目标数量={target_amount}, "
f"已成交数量={filled}, 新增成交量={new_filled}, 剩余数量={target_amount - filled}"
)
# 更新持仓(仅更新新增成交部分)
if new_filled > 0:
position_manager.update_position(
order_info.code,
order_info.direction,
new_filled,
)
return ORDER_STATUS_PARTIAL
elif order["order_status"] in [
xtconstant.ORDER_CANCELED,
xtconstant.ORDER_JUNK,
]:
# 已撤单或废单
# 如果已经标记为取消,则不重复更新
if previous_status == ORDER_STATUS_CANCELLED:
return ORDER_STATUS_CANCELLED
position_manager.update_order_status(
order_id,
previous_volume, # 保留已成交部分
ORDER_STATUS_CANCELLED
)
return ORDER_STATUS_CANCELLED
elif order["order_status"] in [
xtconstant.ORDER_UNREPORTED,
xtconstant.ORDER_WAIT_REPORTING,
xtconstant.ORDER_REPORTED,
]:
# 未报、待报、已报
if previous_status != ORDER_STATUS_PENDING:
position_manager.update_order_status(order_id, previous_volume, ORDER_STATUS_PENDING)
return ORDER_STATUS_PENDING
return None
except Exception as e:
logger.error(f"处理订单状态更新异常: order_id={order_id}, error={str(e)}", exc_info=True)
return None