refactor: manage pending order in strategy position manager only

This commit is contained in:
zhiyong 2025-04-30 23:47:32 +08:00
parent 1b462a3044
commit aec79fb718
3 changed files with 176 additions and 70 deletions

View File

@ -36,5 +36,20 @@ class Config:
RTM_USE_MARKET_ORDER = os.environ.get("RTM_USE_MARKET_ORDER", "True").lower() == "true" # 是否使用市价单进行补单
# 计划任务运行时间
STRATEGY_SAVE_TIME = os.environ.get("STRATEGY_SAVE_TIME", "15:30") # 每天保存策略数据的时间
CLEAN_ORDERS_TIME = os.environ.get("CLEAN_ORDERS_TIME", "00:01") # 每天清理超时委托的时间
STRATEGY_SAVE_TIME = "15:10" # 每天保存策略数据的时间
CLEAN_ORDERS_TIME = "15:05" # 每天清理超时委托的时间
@staticmethod
def is_market_open():
"""判断当前是否在交易时间内
Returns:
bool: 是否在交易时间内
"""
now = datetime.datetime.now().time()
morning_start = datetime.time(9, 30)
morning_end = datetime.time(11, 30)
afternoon_start = datetime.time(13, 0)
afternoon_end = datetime.time(15, 0)
return (morning_start <= now <= morning_end) or (afternoon_start <= now <= afternoon_end)

View File

@ -26,9 +26,8 @@ class RealTraderManager:
if not self.trader.is_logged_in():
self.trader.login()
# 存储待处理的交易请求及其状态
# 格式: {order_id: {strategy_name, code, direction, target_amount, price, status, create_time, last_check_time, retry_count}}
self.pending_orders = {}
# 不再自己维护pending_orders改用StrategyPositionManager管理
# self.pending_orders = {}
# 启动调度器
self._start_scheduler()
@ -110,21 +109,7 @@ class RealTraderManager:
logger.error(f"下单失败: {result}")
return {"success": False, "error": "下单失败"}
# 记录到待处理订单
self.pending_orders[order_id] = {
'strategy_name': strategy_name,
'code': code,
'direction': direction,
'target_amount': amount,
'price': price,
'status': 'pending',
'create_time': time.time(),
'last_check_time': time.time(),
'retry_count': 0,
'order_type': order_type
}
# 添加到策略持仓管理器中的未完成委托
# 使用StrategyPositionManager添加未完成委托
StrategyPositionManager.add_pending_order(
self.trader,
order_id,
@ -132,7 +117,8 @@ class RealTraderManager:
code,
price,
amount,
direction
direction,
order_type
)
logger.info(f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}")
@ -151,8 +137,11 @@ class RealTraderManager:
try:
logger.info("开始检查未完成订单...")
# 获取所有未完成订单
pending_orders = StrategyPositionManager.get_pending_orders(self.trader)
# 如果没有未完成订单,直接返回
if not self.pending_orders:
if not pending_orders:
logger.info("没有未完成订单需要检查")
return
@ -175,7 +164,7 @@ class RealTraderManager:
return
# 检查每个未完成订单
for order_id, order_info in list(self.pending_orders.items()):
for order_id, order_info in list(pending_orders.items()):
try:
# 跳过已完成的订单
if order_info['status'] in ['completed', 'cancelled', 'failed']:
@ -184,9 +173,14 @@ class RealTraderManager:
# 更新订单状态
self._update_order_status(order_id, entrust_map)
# 获取最新的订单信息
order_info = StrategyPositionManager.get_pending_order(self.trader, order_id)
if not order_info:
continue
# 处理超时未成交或部分成交的订单
current_time = time.time()
order_age = current_time - order_info['create_time']
order_age = current_time - order_info['created_time']
# 如果订单超过配置的超时时间且状态仍为pending或partial
if order_age > Config.RTM_ORDER_TIMEOUT and order_info['status'] in ['pending', 'partial']:
@ -222,7 +216,9 @@ class RealTraderManager:
order_id: 订单ID
entrust_map: 可选的委托字典如果为None则重新获取
"""
if order_id not in self.pending_orders:
# 检查订单是否存在
order_info = StrategyPositionManager.get_pending_order(self.trader, order_id)
if not order_info:
return
try:
@ -236,62 +232,60 @@ class RealTraderManager:
if entrust:
# 获取订单之前的状态,用于判断是否发生变化
previous_status = self.pending_orders[order_id].get('status')
previous_volume = self.pending_orders[order_id].get('traded_volume', 0)
# 更新最后检查时间
self.pending_orders[order_id]['last_check_time'] = time.time()
previous_status = order_info.get('status')
previous_volume = order_info.get('traded_volume', 0)
# 根据委托状态更新订单状态
if entrust['order_status'] == xtconstant.ORDER_SUCCEEDED:
# 全部成交
self.pending_orders[order_id]['status'] = 'completed'
# 记录状态变化
if previous_status != 'completed':
logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=completed, 成交量={entrust.get('traded_volume', 0)}")
StrategyPositionManager.update_order_status(self.trader, order_id, 'completed')
# 日志记录在update_order_status中处理
elif entrust['order_status'] == xtconstant.ORDER_PART_SUCC:
# 部分成交
self.pending_orders[order_id]['status'] = 'partial'
current_volume = entrust.get('traded_volume', 0)
self.pending_orders[order_id]['traded_volume'] = current_volume
StrategyPositionManager.update_order_status(
self.trader,
order_id,
'partial',
traded_volume=current_volume
)
# 如果成交量有变化,记录日志
if current_volume != previous_volume:
target_amount = self.pending_orders[order_id]['target_amount']
target_amount = order_info['target_amount']
logger.info(f"订单部分成交更新: ID={order_id}, 代码={entrust['stock_code']}, 目标数量={target_amount}, 已成交数量={current_volume}, 剩余数量={target_amount - current_volume}")
elif entrust['order_status'] in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]:
# 已撤单或废单
self.pending_orders[order_id]['status'] = 'cancelled'
# 记录状态变化
if previous_status != 'cancelled':
logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=cancelled, 原因={entrust.get('err_msg', '未知原因')}")
StrategyPositionManager.update_order_status(
self.trader,
order_id,
'cancelled',
err_msg=entrust.get('err_msg', '未知原因')
)
elif entrust['order_status'] == xtconstant.ORDER_UNREPORTED:
# 未报
self.pending_orders[order_id]['status'] = 'pending'
if previous_status != 'pending':
logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(未报)")
StrategyPositionManager.update_order_status(self.trader, order_id, 'pending')
elif entrust['order_status'] == xtconstant.ORDER_WAIT_REPORTING:
# 待报
self.pending_orders[order_id]['status'] = 'pending'
if previous_status != 'pending':
logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(待报)")
StrategyPositionManager.update_order_status(self.trader, order_id, 'pending')
elif entrust['order_status'] == xtconstant.ORDER_REPORTED:
# 已报
self.pending_orders[order_id]['status'] = 'pending'
if previous_status != 'pending':
logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(已报)")
StrategyPositionManager.update_order_status(self.trader, order_id, 'pending')
else:
# 委托列表中找不到该订单,可能已经太久
current_time = time.time()
if current_time - self.pending_orders[order_id]['create_time'] > 24 * 60 * 60:
previous_status = self.pending_orders[order_id].get('status')
self.pending_orders[order_id]['status'] = 'failed'
logger.warning(f"订单状态未知且过期: ID={order_id}, 旧状态={previous_status}, 新状态=failed, 创建时长={(current_time - self.pending_orders[order_id]['create_time'])/3600:.1f}小时")
if current_time - order_info['created_time'] > 24 * 60 * 60:
previous_status = order_info.get('status')
StrategyPositionManager.update_order_status(self.trader, order_id, 'failed')
logger.warning(f"订单状态未知且过期: ID={order_id}, 旧状态={previous_status}, 新状态=failed, 创建时长={(current_time - order_info['created_time'])/3600:.1f}小时")
except Exception as e:
logger.error(f"更新订单状态时发生异常: order_id={order_id}, error={str(e)}")
@ -305,7 +299,7 @@ class RealTraderManager:
"""
try:
# 首先尝试撤单
logger.info(f"尝试撤销超时订单: ID={order_id}, 代码={order_info['code']}, 超时时间={(time.time() - order_info['create_time']):.0f}")
logger.info(f"尝试撤销超时订单: ID={order_id}, 代码={order_info['code']}, 超时时间={(time.time() - order_info['created_time']):.0f}")
cancel_result = self.trader.cancel(order_id)
# 记录撤单结果
@ -326,15 +320,15 @@ class RealTraderManager:
# 如果有未成交的部分,使用市价单补充交易
if remaining_amount > 0:
# 递增重试计数
order_info['retry_count'] += 1
new_retry_count = StrategyPositionManager.increment_retry_count(self.trader, order_id)
# 决定是否使用市价单进行补单
use_market_order = Config.RTM_USE_MARKET_ORDER
logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 使用市价单={use_market_order}")
logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={new_retry_count}/{Config.RTM_MAX_RETRIES}, 使用市价单={use_market_order}")
# 如果重试次数少于最大重试次数,则进行补单
if order_info['retry_count'] <= Config.RTM_MAX_RETRIES:
if new_retry_count <= Config.RTM_MAX_RETRIES:
# 决定使用的订单类型
new_order_type = 'market' if use_market_order else 'limit'
@ -356,13 +350,13 @@ class RealTraderManager:
else:
logger.error(f"补单失败: 原订单ID={order_id}, 错误={new_order.get('error')}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}, 订单类型={new_order_type}")
else:
logger.warning(f"订单重试次数过多,不再尝试: ID={order_id}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 代码={order_info['code']}, 方向={order_info['direction']}, 未成交数量={remaining_amount}")
logger.warning(f"订单重试次数过多,不再尝试: ID={order_id}, 重试次数={new_retry_count}/{Config.RTM_MAX_RETRIES}, 代码={order_info['code']}, 方向={order_info['direction']}, 未成交数量={remaining_amount}")
else:
logger.info(f"订单已全部成交,无需补单: ID={order_id}, 代码={order_info['code']}, 成交数量={traded_amount}")
# 更新原订单状态
previous_status = order_info['status']
order_info['status'] = 'cancelled'
StrategyPositionManager.update_order_status(self.trader, order_id, 'cancelled')
logger.info(f"更新原订单状态: ID={order_id}, 旧状态={previous_status}, 新状态=cancelled")
except Exception as e:
@ -509,16 +503,8 @@ class RealTraderManager:
def clean_expired_orders(self):
"""清理过期的未完成订单"""
try:
current_time = time.time()
for order_id, order_info in list(self.pending_orders.items()):
# 如果订单创建时间超过24小时
if current_time - order_info['create_time'] > 24 * 60 * 60:
if order_info['status'] not in ['completed', 'cancelled', 'failed']:
logger.warning(f"清理过期订单: ID={order_id}, 状态={order_info['status']}")
order_info['status'] = 'expired'
except Exception as e:
logger.error(f"清理过期订单时发生异常: {str(e)}")
# 直接调用StrategyPositionManager的方法
StrategyPositionManager.clean_timeout_orders()
def get_pending_orders(self):
"""获取所有未完成订单
@ -526,10 +512,12 @@ class RealTraderManager:
Returns:
list: 未完成订单列表
"""
# 从StrategyPositionManager获取未完成订单
pending_orders = StrategyPositionManager.get_pending_orders(self.trader)
return [{
'order_id': order_id,
**order_info
} for order_id, order_info in self.pending_orders.items()]
} for order_id, order_info in pending_orders.items()]
def get_strategy_targets(self):
"""获取策略目标持仓

View File

@ -132,7 +132,7 @@ class StrategyPositionManager:
logger.error(f"更新未完成委托状态失败: {str(e)}")
@staticmethod
def add_pending_order(trader, order_id, strategy_name, code, price, amount, direction):
def add_pending_order(trader, order_id, strategy_name, code, price, amount, direction, order_type='limit'):
"""添加未完成委托
Args:
@ -143,6 +143,7 @@ class StrategyPositionManager:
price: 委托价格
amount: 委托数量
direction: 交易方向'buy''sell'
order_type: 订单类型'limit''market'默认为'limit'
"""
if not order_id or order_id == 'simulation':
return
@ -157,7 +158,12 @@ class StrategyPositionManager:
'price': price,
'amount': amount,
'direction': direction,
'created_time': time.time()
'created_time': time.time(),
'target_amount': amount,
'status': 'pending',
'last_check_time': time.time(),
'retry_count': 0,
'order_type': order_type
}
# 同时记录到交易历史
@ -177,6 +183,102 @@ class StrategyPositionManager:
logger.info(f"添加未完成委托: {order_id}, 交易类型: {trader_type}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}")
@staticmethod
def get_pending_orders(trader):
"""获取指定交易类型的所有未完成委托
Args:
trader: 交易实例
Returns:
dict: 未完成委托字典以order_id为键
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
return pending_orders[trader_type]
@staticmethod
def get_pending_order(trader, order_id):
"""获取指定订单信息
Args:
trader: 交易实例
order_id: 订单ID
Returns:
dict: 订单信息字典如果不存在则返回None
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
return pending_orders[trader_type].get(order_id)
@staticmethod
def update_order_status(trader, order_id, new_status, **additional_data):
"""更新订单状态
Args:
trader: 交易实例
order_id: 订单ID
new_status: 新状态
additional_data: 附加数据如成交量重试次数等
Returns:
bool: 是否成功更新
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
if order_id in pending_orders[trader_type]:
# 记录之前的状态用于日志
previous_status = pending_orders[trader_type][order_id].get('status')
# 更新状态和最后检查时间
pending_orders[trader_type][order_id]['status'] = new_status
pending_orders[trader_type][order_id]['last_check_time'] = time.time()
# 更新附加数据
for key, value in additional_data.items():
pending_orders[trader_type][order_id][key] = value
# 记录状态变化日志
if previous_status != new_status:
code = pending_orders[trader_type][order_id].get('code')
logger.info(f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}")
return True
return False
@staticmethod
def increment_retry_count(trader, order_id):
"""增加订单重试次数
Args:
trader: 交易实例
order_id: 订单ID
Returns:
int: 新的重试次数如果订单不存在则返回-1
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
if order_id in pending_orders[trader_type]:
current = pending_orders[trader_type][order_id].get('retry_count', 0)
pending_orders[trader_type][order_id]['retry_count'] = current + 1
return current + 1
return -1
@staticmethod
def remove_pending_order(trader, order_id):
"""移除未完成委托
Args:
trader: 交易实例
order_id: 订单ID
Returns:
bool: 是否成功移除
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
if order_id in pending_orders[trader_type]:
del pending_orders[trader_type][order_id]
return True
return False
@staticmethod
def clean_timeout_orders():
"""清理超时委托"""
@ -187,6 +289,7 @@ class StrategyPositionManager:
# 超过24小时的委托视为超时
if current_time - order_info['created_time'] > 24 * 60 * 60:
del pending_orders[trader_type][order_id]
logger.warning(f"清理超时委托: ID={order_id}, 状态={order_info.get('status')}")
@staticmethod
def load_strategy_data():