refactor: use order callback to update order status (#1)

* fix: logger is none

* 由订单回调触发订单状态更新和补单

* 使position manager 线程安全

* remove empty line
This commit is contained in:
biggerfish 2025-05-14 23:07:03 +08:00 committed by GitHub
parent 7d49e19f5d
commit 9e13f3c956
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 351 additions and 295 deletions

View File

@ -53,8 +53,7 @@ class Config:
MAIL_TO = ["jq@yushaoyou.com"] # 可以是多个邮箱 MAIL_TO = ["jq@yushaoyou.com"] # 可以是多个邮箱
# RealTraderManager配置 # RealTraderManager配置
RTM_ORDER_TIMEOUT = 60 # 订单超时时间(秒) RTM_ORDER_TIMEOUT = 30 # 订单超时时间(秒)
RTM_MAX_RETRIES = 3 # 最大重试次数
RTM_USE_MARKET_ORDER = True # 是否使用市价单进行补单 RTM_USE_MARKET_ORDER = True # 是否使用市价单进行补单
# 计划任务运行时间 # 计划任务运行时间

View File

@ -1,8 +1,8 @@
from trade_constants import ORDER_STATUS_PENDING from trade_constants import ORDER_STATUS_PENDING, ORDER_TYPE_LIMIT
from datetime import datetime from datetime import datetime
class LocalOrder: class LocalOrder:
def __init__(self, order_id, code, price, amount, direction, order_type='limit', filled=0, status=ORDER_STATUS_PENDING, created_time=datetime.now()): def __init__(self, order_id, code, price, amount, direction, order_type=ORDER_TYPE_LIMIT, filled=0, status=ORDER_STATUS_PENDING, created_time=datetime.now()):
self.order_id: str = order_id self.order_id: str = order_id
self.code: str = code self.code: str = code
self.price: float = price self.price: float = price

View File

@ -1,5 +1,6 @@
import os import os
import json import json
import threading
from logger_config import get_logger from logger_config import get_logger
from config import Config from config import Config
from trade_constants import ( from trade_constants import (
@ -8,7 +9,6 @@ from trade_constants import (
ORDER_TYPE_MARKET, ORDER_TYPE_MARKET,
ORDER_STATUS_COMPLETED, ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED, ORDER_STATUS_CANCELLED,
ORDER_STATUS_FAILED,
) )
from local_position import LocalPosition from local_position import LocalPosition
from local_order import LocalOrder from local_order import LocalOrder
@ -26,6 +26,8 @@ class PositionManager:
"""初始化实盘持仓管理器""" """初始化实盘持仓管理器"""
super().__init__() super().__init__()
self.strategy_name = strategy_name self.strategy_name = strategy_name
# 创建线程锁
self._lock = threading.Lock()
# 策略持仓信息 # 策略持仓信息
self.positions: Dict[str, LocalPosition] = {} # {股票代码 -> LocalPosition} self.positions: Dict[str, LocalPosition] = {} # {股票代码 -> LocalPosition}
# 待处理订单信息 # 待处理订单信息
@ -44,6 +46,7 @@ class PositionManager:
self.load_data() self.load_data()
def update_position(self, code, direction, amount): def update_position(self, code, direction, amount):
with self._lock:
# 如果股票代码在持仓字典中不存在,初始化它 # 如果股票代码在持仓字典中不存在,初始化它
if code not in self.positions: if code not in self.positions:
self.positions[code] = LocalPosition(code, 0, 0) self.positions[code] = LocalPosition(code, 0, 0)
@ -78,6 +81,7 @@ class PositionManager:
if not self.strategy_name: if not self.strategy_name:
return return
with self._lock:
order = LocalOrder(order_id, code, price, amount, direction, order_type) order = LocalOrder(order_id, code, price, amount, direction, order_type)
self.pending_orders[order_id] = order self.pending_orders[order_id] = order
if (order_type == ORDER_TYPE_LIMIT): if (order_type == ORDER_TYPE_LIMIT):
@ -91,6 +95,7 @@ class PositionManager:
self.save_data() self.save_data()
def update_order_status(self, order_id, filled, new_status): def update_order_status(self, order_id, filled, new_status):
with self._lock:
if order_id in self.pending_orders: if order_id in self.pending_orders:
_order = self.pending_orders[order_id] _order = self.pending_orders[order_id]
# 记录之前的状态用于日志 # 记录之前的状态用于日志
@ -111,7 +116,6 @@ class PositionManager:
if new_status in [ if new_status in [
ORDER_STATUS_COMPLETED, ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED, ORDER_STATUS_CANCELLED,
ORDER_STATUS_FAILED,
]: ]:
# 保留订单信息以供参考,但标记为已完成 # 保留订单信息以供参考,但标记为已完成
del self.pending_orders[order_id] del self.pending_orders[order_id]
@ -130,6 +134,7 @@ class PositionManager:
Returns: Returns:
dict: 委托信息如果不存在返回None dict: 委托信息如果不存在返回None
""" """
with self._lock:
return self.pending_orders.get(order_id) return self.pending_orders.get(order_id)
def get_pending_orders(self): def get_pending_orders(self):
@ -138,7 +143,9 @@ class PositionManager:
Returns: Returns:
dict: 订单ID到委托信息的映射 dict: 订单ID到委托信息的映射
""" """
return self.pending_orders with self._lock:
# 返回副本以避免外部修改
return self.pending_orders.copy()
def get_positions(self) -> Dict[str, LocalPosition]: def get_positions(self) -> Dict[str, LocalPosition]:
"""获取策略持仓 """获取策略持仓
@ -147,10 +154,13 @@ class PositionManager:
Dict[str, LocalPosition]: Dict[str, LocalPosition]:
key为股票代码strvalue为LocalPosition对象若无持仓则返回空字典 key为股票代码strvalue为LocalPosition对象若无持仓则返回空字典
""" """
return self.positions with self._lock:
# 返回副本以避免外部修改
return self.positions.copy()
def save_data(self): def save_data(self):
"""保存策略数据""" """保存策略数据"""
with self._lock:
try: try:
# 将对象转换为可序列化的字典 # 将对象转换为可序列化的字典
positions_dict = {} positions_dict = {}
@ -210,6 +220,7 @@ class PositionManager:
def load_data(self): def load_data(self):
"""加载策略数据""" """加载策略数据"""
with self._lock:
try: try:
if os.path.exists(self.data_path): if os.path.exists(self.data_path):
from datetime import datetime from datetime import datetime
@ -284,6 +295,7 @@ class PositionManager:
def clear(self): def clear(self):
"""清除所有持仓管理数据""" """清除所有持仓管理数据"""
with self._lock:
self.positions = {} self.positions = {}
self.pending_orders = {} self.pending_orders = {}
self.all_orders = [] self.all_orders = []
@ -291,14 +303,18 @@ class PositionManager:
def update_closeable_amount(self): def update_closeable_amount(self):
"""更新可卖持仓""" """更新可卖持仓"""
with self._lock:
for _, position in self.positions.items(): for _, position in self.positions.items():
if position.closeable_amount != position.total_amount: if position.closeable_amount != position.total_amount:
position.closeable_amount = position.total_amount position.closeable_amount = position.total_amount
def clear_pending_orders(self): def clear_pending_orders(self):
"""清除所有未完成订单""" """清除所有未完成订单"""
with self._lock:
self.pending_orders = {} self.pending_orders = {}
def get_all_orders(self): def get_all_orders(self):
"""获取所有订单""" """获取所有订单"""
return self.all_orders with self._lock:
# 返回副本以避免外部修改
return self.all_orders.copy()

View File

@ -9,7 +9,6 @@ from trade_constants import (
ORDER_STATUS_COMPLETED, ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED, ORDER_STATUS_CANCELLED,
ORDER_STATUS_PENDING, ORDER_STATUS_PENDING,
ORDER_STATUS_FAILED,
ORDER_STATUS_PARTIAL, ORDER_STATUS_PARTIAL,
ORDER_DIRECTION_BUY, ORDER_DIRECTION_BUY,
ORDER_DIRECTION_SELL, ORDER_DIRECTION_SELL,
@ -76,6 +75,17 @@ class RealTraderManager:
else: else:
logger.error("STRATEGY_SAVE_TIME 未配置") logger.error("STRATEGY_SAVE_TIME 未配置")
# 检查限价单是否超时
if hasattr(Config, "RTM_ORDER_TIMEOUT"):
try:
schedule.every(10).seconds.do(
run_threaded(self.check_limit_orders)
)
except Exception as e:
logger.error(f"限价单超时检查任务配置错误: {e}")
else:
logger.error("RTM_ORDER_TIMEOUT 未配置")
# 启动高精度调度线程 # 启动高精度调度线程
def run_scheduler(): def run_scheduler():
while True: while True:
@ -158,105 +168,32 @@ class RealTraderManager:
f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}" f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}"
) )
threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name)).start()
return {"success": True, "order_id": order_id} return {"success": True, "order_id": order_id}
except Exception as e: except Exception as e:
logger.error(f"下单过程发生异常: {str(e)}") logger.error(f"下单过程发生异常: {str(e)}")
return {"success": False, "error": f"下单异常: {str(e)}"} return {"success": False, "error": f"下单异常: {str(e)}"}
def _place_market_order_for_remainder(self, strategy_name, code, direction, left_amount):
"""对未完成的订单进行补单,下市价单
Args: def check_limit_orders(self):
strategy_name: 策略名称 """检查限价单是否超时"""
code: 股票代码
direction: 交易方向
left_amount: 剩余数量
Returns:
bool: 补单是否成功
"""
if left_amount <= 0:
logger.info(f"无需补单,剩余数量为零或负数: {left_amount}")
return True
logger.info(f"限价单补单: 市价单, 剩余数量={left_amount}")
new_order = self.place_order(strategy_name, code, direction, left_amount, 0, ORDER_TYPE_MARKET)
new_order_id = new_order.get("order_id")
if new_order.get("success") and new_order_id:
# 订单已在place_order中设置了60秒后检查
return True
else:
logger.error(f"补单失败: {new_order}")
return False
def check_and_retry(self, order_id, strategy_name):
"""检查订单状态并处理未完成订单
Args:
order_id: 订单ID
strategy_name: 策略名称
"""
try: try:
logger.info(f"开始检查订单状态: ID={order_id}, 策略={strategy_name}") logger.info("开始检查限价单是否超时...")
position_manager = self.trader.get_position_manager(strategy_name) # 获取所有未完成订单
order_info = position_manager.get_pending_order(order_id) position_managers = self.trader.get_all_position_managers()
if not order_info: for strategy_name, position_manager in position_managers.items():
logger.warning(f"订单信息不存在, 可能全部完成或者撤单: ID={order_id}") pending_orders = position_manager.get_pending_orders()
return for order_id, order_info in pending_orders.items():
# 如果订单类型为限价单,则检查是否超时
order_type = order_info.order_type if order_info.order_type == ORDER_TYPE_LIMIT:
duration = (time.time() - order_info.created_time).total_seconds()
# 使用trader的handle_order_update方法更新订单状态 if duration > Config.RTM_ORDER_TIMEOUT:
status = self.trader.handle_order_update(order_id, strategy_name) logger.info(f"限价单超时: ID={order_id}, 策略={strategy_name}, 持续时间={duration}")
# 如果状态为None说明处理失败则直接返回
if status is None:
logger.warning(f"订单状态更新失败: ID={order_id}")
return
# 重新获取订单信息,因为可能已经被更新
order_info = position_manager.get_pending_order(order_id)
if not order_info:
logger.info(f"订单已完成: ID={order_id}")
return
if order_type == ORDER_TYPE_MARKET:
# 市价单,如果未完成则继续检查
if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]:
logger.info(f"市价单未完成60秒后继续检查: ID={order_id}, 状态={status}")
schedule.every(60).seconds.do(
run_threaded(self.check_and_retry), order_id, strategy_name
).tag(f"order_{order_id}")
else:
logger.info(f"市价单已完成: ID={order_id}, 状态={status}")
elif order_type == ORDER_TYPE_LIMIT:
filled = order_info.filled
target_amount = order_info.amount
left_amount = target_amount - filled
# 限价单这是60秒后的检查如果未完成则撤单补市价单
if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]:
# 尝试撤单
try:
logger.info(f"限价单60秒后仍未完成尝试撤单: ID={order_id}, 状态={status}")
self.trader.cancel(order_id) self.trader.cancel(order_id)
time.sleep(1)
position_manager.update_order_status(order_id, filled, ORDER_STATUS_CANCELLED)
except Exception as e:
logger.error(f"撤单失败: order_id={order_id}, error={str(e)}")
# 计算剩余数量, 如果剩余数量大于0, 则补单 logger.info("限价单检查完毕")
self._place_market_order_for_remainder(strategy_name, order_info.code, order_info.direction, left_amount)
else:
logger.info(f"限价单已完成: ID={order_id}, 状态={status}")
else:
logger.warning(f"未知订单类型: ID={order_id}, type={order_type}")
except Exception as e: except Exception as e:
logger.error(f"检查订单状态时发生异常: ID={order_id}, error={str(e)}", exc_info=True) logger.error(f"检查限价单是否超时时发生异常: {str(e)}")
def _check_order_feasibility(self, code, direction, amount, price): def _check_order_feasibility(self, code, direction, amount, price):
@ -331,8 +268,8 @@ class RealTraderManager:
# 遍历未完成订单,检查是否有无法成交的订单(如跌停无法卖出) # 遍历未完成订单,检查是否有无法成交的订单(如跌停无法卖出)
for order_id, order_info in pending_orders.items(): for order_id, order_info in pending_orders.items():
try: try:
logger.warning( logger.error(
f"清理无法成交订单: ID={order_id}, 代码={order_info.code}, 方向={order_info.direction}, " f"清理无法成交订单(理论不应该有): ID={order_id}, 代码={order_info.code}, 方向={order_info.direction}, "
f"数量={order_info.amount}, 已成交数量={order_info.filled}" f"数量={order_info.amount}, 已成交数量={order_info.filled}"
) )

View File

@ -14,16 +14,20 @@ from trade_constants import (
ORDER_STATUS_COMPLETED, ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED, ORDER_STATUS_CANCELLED,
ORDER_STATUS_PENDING, ORDER_STATUS_PENDING,
ORDER_STATUS_FAILED,
ORDER_STATUS_PARTIAL, ORDER_STATUS_PARTIAL,
ORDER_TYPE_LIMIT,
ORDER_TYPE_MARKET,
ORDER_DIRECTION_BUY,
ORDER_DIRECTION_SELL
) )
from local_order import LocalOrder
# 获取日志记录器 # 获取日志记录器
logger = get_logger('real_trader') logger = get_logger('real_trader')
class MyXtQuantTraderCallback: class MyXtQuantTraderCallback:
def __init__(self, trader_instance): def __init__(self, trader_instance):
self.trader_instance = trader_instance self.trader_instance: XtTrader = trader_instance
def on_connected(self): def on_connected(self):
logger.info("连接成功") logger.info("连接成功")
def on_disconnected(self): def on_disconnected(self):
@ -52,7 +56,38 @@ class MyXtQuantTraderCallback:
def on_stock_asset(self, asset): def on_stock_asset(self, asset):
logger.info(f"资金变动: {asset.account_id} {asset.cash} {asset.total_asset}") logger.info(f"资金变动: {asset.account_id} {asset.cash} {asset.total_asset}")
def on_stock_order(self, order): def on_stock_order(self, order):
logger.info(f"委托回报: {order.stock_code} {order.order_status} {order.order_sysid}") if order.order_status == xtconstant.ORDER_PART_SUCC:
strategy_name = self.trader_instance.get_strategy_name(order.order_id)
logger.info(f"委托部分成交: code={order.stock_code} id={order.order_id} strategy={strategy_name}")
self.trader_instance.handle_order_update(order.order_id, strategy_name)
elif order.order_status == xtconstant.ORDER_SUCCEEDED:
strategy_name = self.trader_instance.get_strategy_name(order.order_id)
logger.info(f"委托全部成交: code={order.stock_code} id={order.order_id} strategy={strategy_name}")
self.trader_instance.handle_order_update(order.order_id, strategy_name)
elif order.order_status in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_PART_CANCEL]:
strategy_name = self.trader_instance.get_strategy_name(order.order_id)
logger.info(f"委托撤单成功: code={order.stock_code} id={order.order_id} strategy={strategy_name}")
# 撤单后, 如果position manager中订单为pedding, 且为限价单, 则需要补单
position_manager = self.trader_instance.get_position_manager(strategy_name)
if position_manager and strategy_name:
# 先获取已经撤单的order
order_info = position_manager.get_pending_order(order.order_id)
if order_info:
# 更新订单状态
self.trader_instance.handle_order_update(order.order_id, strategy_name)
# 如果order_info为限价单, 则进行市价单补单
if order_info.order_type == ORDER_TYPE_LIMIT:
logger.info(f"检测到限价单被撤销,准备进行市价单补单: ID={order.order_id}")
self.trader_instance.place_market_order_for_remainder(order_info, strategy_name)
else:
logger.warning(f"撤单成功但未找到订单信息: ID={order.order_id}")
else:
logger.warning(f"撤单成功但未找到策略或持仓管理器: ID={order.order_id}, strategy={strategy_name}")
else:
logger.warning(f"委托回报变化: ID={order.order_id} 状态={order.order_status}")
def on_stock_trade(self, trade): def on_stock_trade(self, trade):
logger.info(f"成交变动: {trade.account_id} {trade.stock_code} {trade.order_id}") logger.info(f"成交变动: {trade.account_id} {trade.stock_code} {trade.order_id}")
@ -339,7 +374,7 @@ class XtTrader(BaseTrader):
logger.error(f"获取股票名称失败: {stock_code}, {str(e)}") logger.error(f"获取股票名称失败: {stock_code}, {str(e)}")
return "" return ""
def buy(self, code, price, amount, order_type='limit'): def buy(self, code, price, amount, order_type=ORDER_TYPE_LIMIT):
if not self.is_available(): if not self.is_available():
return {"error": self.connection_error_message or "交易系统连接失败"} return {"error": self.connection_error_message or "交易系统连接失败"}
@ -361,12 +396,12 @@ class XtTrader(BaseTrader):
) )
return {"order_id": order_id} return {"order_id": order_id}
def sell(self, code, price, amount, order_type='limit'): def sell(self, code, price, amount, order_type=ORDER_TYPE_LIMIT):
if not self.is_available(): if not self.is_available():
return {"error": self.connection_error_message or "交易系统连接失败"} return {"error": self.connection_error_message or "交易系统连接失败"}
# 确定价格类型 # 确定价格类型
if order_type == 'limit': if order_type == ORDER_TYPE_LIMIT:
price_type = xtconstant.FIX_PRICE price_type = xtconstant.FIX_PRICE
else: else:
if code.endswith('.SH'): if code.endswith('.SH'):
@ -555,7 +590,7 @@ class XtTrader(BaseTrader):
# 获取订单信息 # 获取订单信息
order_info = position_manager.get_pending_order(order_id) order_info = position_manager.get_pending_order(order_id)
if not order_info: if not order_info:
logger.warning(f"订单信息不存在,可能已完成: {order_id}") logger.warning(f"订单信息不存在,可能已完成或撤单: {order_id}")
return None return None
# 获取之前的状态和成交量 # 获取之前的状态和成交量
@ -644,3 +679,74 @@ class XtTrader(BaseTrader):
except Exception as e: except Exception as e:
logger.error(f"处理订单状态更新异常: order_id={order_id}, error={str(e)}", exc_info=True) logger.error(f"处理订单状态更新异常: order_id={order_id}, error={str(e)}", exc_info=True)
return None return None
def get_strategy_name(self, order_id):
"""获取订单对应的策略名称
Args:
order_id: 订单ID
Returns:
str: 策略名称
"""
for strategy_name, position_manager in self.position_managers.items():
if position_manager.get_pending_order(order_id):
return strategy_name
return None
def place_market_order_for_remainder(self, order_info:LocalOrder, strategy_name):
"""对未完成的限价单进行市价单补单
当限价单被撤销后使用此方法下市价单补单确保交易意图得到执行
Args:
order_id: 被撤销的限价单ID
strategy_name: 策略名称
Returns:
dict: 包含新订单ID和状态信息如果补单失败则返回错误信息
"""
try:
logger.info(f"准备对撤销的限价单进行市价单补单: ID={order_info.order_id}, 策略={strategy_name}")
# 获取position_manager
position_manager = self.get_position_manager(strategy_name)
if not position_manager:
logger.error(f"获取position_manager失败无法补单: {strategy_name}")
return {"success": False, "error": "获取position_manager失败"}
# 计算未成交数量
filled = order_info.filled
target_amount = order_info.amount
left_amount = target_amount - filled
# 如果已全部成交,则无需补单
if left_amount <= 0:
logger.info(f"无需补单,订单已全部成交: ID={order_info.order_id}")
return {"success": True, "message": "无需补单,订单已全部成交"}
# 下市价单补单
code = order_info.code
direction = order_info.direction
logger.info(f"开始补单: 代码={code}, 方向={direction}, 数量={left_amount}, 类型=market")
if direction == ORDER_DIRECTION_BUY:
result = self.buy(code, 0, left_amount, ORDER_TYPE_MARKET)
else:
result = self.sell(code, 0, left_amount, ORDER_TYPE_MARKET)
new_order_id = result.get("order_id")
if not new_order_id:
logger.error(f"市价单补单失败: {result}")
# 添加未完成委托到position_manager
position_manager.add_pending_order(new_order_id, code, 0, left_amount, direction, ORDER_TYPE_MARKET)
logger.info(f"市价单补单成功: 新订单ID={new_order_id}, 原订单ID={order_info.order_id}, 代码={code}, 方向={direction}, 数量={left_amount}")
return {"success": True, "order_id": new_order_id}
except Exception as e:
logger.error(f"市价单补单过程发生异常: {str(e)}", exc_info=True)
return {"success": False, "error": f"市价单补单异常: {str(e)}"}

View File

@ -14,9 +14,8 @@ from local_position import LocalPosition
class SimulationTrader(BaseTrader): class SimulationTrader(BaseTrader):
def __init__(self, logger=None): def __init__(self):
super().__init__(logger) super().__init__(get_logger("simulation_trader"))
self.logger = logger or get_logger("simulation_trader")
# 模拟资金账户信息 # 模拟资金账户信息
self.sim_balance = {"account_id": "simulation", "cash": 1000000.00, "frozen": 0.00, "total": 1000000.00} self.sim_balance = {"account_id": "simulation", "cash": 1000000.00, "frozen": 0.00, "total": 1000000.00}

View File

@ -7,7 +7,6 @@ ORDER_STATUS_PENDING = 'pending'
ORDER_STATUS_PARTIAL = 'partial' ORDER_STATUS_PARTIAL = 'partial'
ORDER_STATUS_COMPLETED = 'completed' ORDER_STATUS_COMPLETED = 'completed'
ORDER_STATUS_CANCELLED = 'cancelled' ORDER_STATUS_CANCELLED = 'cancelled'
ORDER_STATUS_FAILED = 'failed'
# 订单类型 # 订单类型
ORDER_TYPE_LIMIT = 'limit' ORDER_TYPE_LIMIT = 'limit'