优化check_pending_orders

This commit is contained in:
zhiyong 2025-05-16 20:00:33 +08:00
parent 90ae3f286b
commit e7463158d2
5 changed files with 86 additions and 33 deletions

View File

@ -56,6 +56,9 @@ class Config:
RTM_ORDER_TIMEOUT = 10 # 订单超时时间(秒) RTM_ORDER_TIMEOUT = 10 # 订单超时时间(秒)
RTM_USE_MARKET_ORDER = True # 是否使用市价单进行补单 RTM_USE_MARKET_ORDER = True # 是否使用市价单进行补单
# 撤销订单超时时间(秒)
RTM_CANCEL_TIMEOUT = 10
# 计划任务运行时间 # 计划任务运行时间
CLEAN_ORDERS_TIME = "15:05" # 每天清理超时委托的时间 CLEAN_ORDERS_TIME = "15:05" # 每天清理超时委托的时间
STRATEGY_SAVE_TIME = "15:08" # 每天保存策略数据的时间 STRATEGY_SAVE_TIME = "15:08" # 每天保存策略数据的时间

View File

@ -10,6 +10,7 @@ from trade_constants import (
ORDER_TYPE_MARKET, ORDER_TYPE_MARKET,
ORDER_STATUS_COMPLETED, ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED, ORDER_STATUS_CANCELLED,
ORDER_STATUS_FAILED,
) )
from local_position import LocalPosition from local_position import LocalPosition
from local_order import LocalOrder from local_order import LocalOrder
@ -119,6 +120,7 @@ class PositionManager:
if new_status in [ if new_status in [
ORDER_STATUS_COMPLETED, ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED, ORDER_STATUS_CANCELLED,
ORDER_STATUS_FAILED,
]: ]:
# 保留订单信息以供参考,但标记为已完成 # 保留订单信息以供参考,但标记为已完成
del self.pending_orders[order_id] del self.pending_orders[order_id]

View File

@ -176,40 +176,85 @@ class RealTraderManager:
return {"success": False, "error": f"下单异常: {str(e)}"} return {"success": False, "error": f"下单异常: {str(e)}"}
def _handle_timed_out_limit_order(self, order_id: int, order_info: dict, strategy_name: str, duration: float) -> None:
"""处理超时的限价单,尝试撤销并进行市价补单。
Args:
order_id: 订单ID
order_info: 订单信息
strategy_name: 策略名称
duration: 订单持续时间
"""
self.trader.cancel(order_id)
# 使用轮询等待撤销完成,并设置超时
start_time = datetime.now()
cancel_success = False
while (datetime.now() - start_time).total_seconds() < Config.RTM_CANCEL_TIMEOUT:
order = self.trader.get_order(order_id)
if order and order.get('order_status') == xtconstant.ORDER_CANCELED:
logger.info(f"限价单已撤销: ID={order_id}, 策略={strategy_name}")
cancel_success = True
break
time.sleep(0.5) # 每0.5秒查询一次
if cancel_success:
self.trader.handle_order_update(order_id, strategy_name)
logger.info(f"检测到限价单被撤销,准备进行市价单补单: ID={order_id}")
self.trader.place_market_order_for_remainder(order_info, strategy_name)
else:
logger.warning(f"限价单撤销超时或失败: ID={order_id}, 策略={strategy_name}")
self.trader.handle_order_update(order_id, strategy_name) # 即使撤销失败,也更新一下状态
def _process_pending_order(self, order_id: int, order_info: dict, strategy_name: str) -> None:
"""处理单个未完成订单的逻辑,包括更新状态、检查超时、撤销和补单。"""
try:
# 处理订单更新, 更新订单状态, 更新持仓, 使position manager中的订单为最新状态
self.trader.handle_order_update(order_id, strategy_name)
# 重新获取更新后的订单信息
position_manager = self.trader.get_position_manager(strategy_name)
order_info = position_manager.get_pending_order(order_id)
if not order_info:
logger.warning(f"订单信息不存在, 可能已完成或撤单, 或下单失败: {order_id}")
return
# 如果订单类型为限价单,则检查是否超时
if order_info.order_type == ORDER_TYPE_LIMIT:
now = datetime.now()
duration = (now - order_info.created_time).total_seconds()
if duration > Config.RTM_ORDER_TIMEOUT:
# 将处理超时限价单的逻辑委托给新的私有方法
logger.info(f'订单创建时间: {order_info.created_time} 当前时间: {now}')
logger.info(f"限价单超时: ID={order_id}, 策略={strategy_name}, 持续时间={duration}")
self._handle_timed_out_limit_order(order_id, order_info, strategy_name, duration)
else:
# 市价单未完成,更新状态
logger.info(f"市价单未完成, 更新市价单: ID={order_id}, 策略={strategy_name}, 订单类型={order_info.order_type}")
self.trader.handle_order_update(order_id, strategy_name)
except Exception as e:
# 更细粒度的异常处理,捕获处理单个订单时的异常
logger.error(f"处理订单 {order_id} 时发生异常: {str(e)}", exc_info=True)
def check_pending_orders(self): def check_pending_orders(self):
"""检查限价单是否超时""" """检查限价单是否超时"""
try: try:
# 获取所有未完成订单 # 获取所有未完成订单
position_managers = self.trader.get_all_position_managers() position_managers = self.trader.get_all_position_managers()
for strategy_name, position_manager in position_managers.items(): for strategy_name, position_manager in position_managers.items():
pending_orders = position_manager.get_pending_orders() # 转换为列表避免在迭代过程中修改
for order_id, order_info in pending_orders.items(): pending_orders_list = list(position_manager.get_pending_orders().items())
# 如果订单类型为限价单,则检查是否超时 for order_id, order_info in pending_orders_list:
if order_info.order_type == ORDER_TYPE_LIMIT: # 将单个订单的处理逻辑委托给新的私有方法
duration = (datetime.now() - order_info.created_time).total_seconds() self._process_pending_order(order_id, order_info, strategy_name)
if duration > Config.RTM_ORDER_TIMEOUT:
logger.info(f'订单创建时间: {order_info.created_time} 当前时间: {datetime.now()}')
logger.info(f"限价单超时: ID={order_id}, 策略={strategy_name}, 持续时间={duration}")
self.trader.cancel(order_id)
time.sleep(3)
order = self.trader.get_order(order_id)
if order['order_status'] == xtconstant.ORDER_CANCELED:
logger.info(f"限价单已撤销: ID={order_id}, 策略={strategy_name}")
self.trader.handle_order_update(order_id, strategy_name)
logger.info(f"检测到限价单被撤销,准备进行市价单补单: ID={order_id}")
self.trader.place_market_order_for_remainder(order_info, strategy_name)
elif order['order_status'] == xtconstant.ORDER_SUCCEEDED:
logger.info(f"尝试撤单未成功, 限价单已成交: ID={order_id}, 策略={strategy_name}, 状态={order['order_status']}")
self.trader.handle_order_update(order_id, strategy_name)
else:
logger.warning(f"限价单撤销失败: ID={order_id}, 策略={strategy_name}, 状态={order['order_status']}")
self.trader.handle_order_update(order_id, strategy_name)
else:
logger.info(f"更新市价单: ID={order_id}, 策略={strategy_name}, 订单类型={order_info.order_type}")
self.trader.handle_order_update(order_id, strategy_name)
except Exception as e: except Exception as e:
logger.error(f"检查限价单是否超时时发生异常: {str(e)}") # 顶层异常处理捕获获取position managers或遍历时的异常
logger.error(f"检查限价单是否超时时发生异常: {str(e)}", exc_info=True)
def _check_order_feasibility(self, code, direction, amount, price): def _check_order_feasibility(self, code, direction, amount, price):

View File

@ -18,7 +18,8 @@ from trade_constants import (
ORDER_TYPE_LIMIT, ORDER_TYPE_LIMIT,
ORDER_TYPE_MARKET, ORDER_TYPE_MARKET,
ORDER_DIRECTION_BUY, ORDER_DIRECTION_BUY,
ORDER_DIRECTION_SELL ORDER_DIRECTION_SELL,
ORDER_STATUS_FAILED
) )
from local_order import LocalOrder from local_order import LocalOrder
@ -531,18 +532,19 @@ class XtTrader(BaseTrader):
str: 订单状态如ORDER_STATUS_COMPLETEDORDER_STATUS_PARTIAL等如果处理失败则返回None str: 订单状态如ORDER_STATUS_COMPLETEDORDER_STATUS_PARTIAL等如果处理失败则返回None
""" """
try: try:
# 获取订单信息
order = self.get_order(order_id)
if not order:
logger.warning(f"获取订单失败,无法更新状态: {order_id}")
return None
# 获取position_manager # 获取position_manager
position_manager = self.get_position_manager(strategy_name) position_manager = self.get_position_manager(strategy_name)
if not position_manager: if not position_manager:
logger.warning(f"获取position_manager失败无法更新状态: {strategy_name}") logger.warning(f"获取position_manager失败无法更新状态: {strategy_name}")
return None return None
# 获取订单
order = self.get_order(order_id)
if not order:
logger.warning(f"获取订单失败, 可能下单失败,将订单状态设置为失败: {order_id}")
position_manager.update_order_status(order_id, 0, ORDER_STATUS_FAILED)
return None
# 获取订单信息 # 获取订单信息
order_info = position_manager.get_pending_order(order_id) order_info = position_manager.get_pending_order(order_id)
if not order_info: if not order_info:

View File

@ -15,6 +15,7 @@ ORDER_STATUS_PENDING: Final[str] = 'pending'
ORDER_STATUS_PARTIAL: Final[str] = 'partial' ORDER_STATUS_PARTIAL: Final[str] = 'partial'
ORDER_STATUS_COMPLETED: Final[str] = 'completed' ORDER_STATUS_COMPLETED: Final[str] = 'completed'
ORDER_STATUS_CANCELLED: Final[str] = 'cancelled' ORDER_STATUS_CANCELLED: Final[str] = 'cancelled'
ORDER_STATUS_FAILED: Final[str] = 'failed'
# 订单类型 # 订单类型
ORDER_TYPE_LIMIT: Final[str] = 'limit' ORDER_TYPE_LIMIT: Final[str] = 'limit'