优化很多小问题

This commit is contained in:
zhiyong 2025-04-30 22:50:00 +08:00
parent fdee1c3da1
commit 95cd287675
3 changed files with 426 additions and 154 deletions

View File

@ -21,8 +21,7 @@ class RealTraderManager:
trader: XtTrader实例如果为None则自动获取 trader: XtTrader实例如果为None则自动获取
""" """
# 使用传入的trader实例或获取单例 # 使用传入的trader实例或获取单例
from trade_server import get_real_trader self.trader = trader if trader is not None else XtTrader()
self.trader = trader if trader is not None else get_real_trader()
# 确保已登录 # 确保已登录
if not self.trader.is_logged_in(): if not self.trader.is_logged_in():
@ -71,8 +70,8 @@ class RealTraderManager:
code: 股票代码 code: 股票代码
direction: 交易方向 'buy''sell' direction: 交易方向 'buy''sell'
amount: 交易数量 amount: 交易数量
price: 交易价格 price: 交易价格市价单时可为0
order_type: 订单类型'limit'表示限价单'market'表示市价单 order_type: 订单类型'limit'表示限价单'market'表示市价单默认为'limit'
Returns: Returns:
dict: 包含订单ID和状态信息 dict: 包含订单ID和状态信息
@ -86,23 +85,26 @@ class RealTraderManager:
logger.error(f"无效的交易方向: {direction}") logger.error(f"无效的交易方向: {direction}")
return {"success": False, "error": "无效的交易方向"} return {"success": False, "error": "无效的交易方向"}
# 检查订单类型
if order_type not in ['limit', 'market']:
logger.error(f"无效的订单类型: {order_type}")
return {"success": False, "error": "无效的订单类型,必须是'limit''market'"}
try: try:
# 检查资金和持仓是否足够 # 对于限价单,检查资金和持仓是否足够
if not self._check_order_feasibility(code, direction, amount, price): if order_type == 'limit' and not self._check_order_feasibility(code, direction, amount, price):
logger.warning(f"资金或持仓不足,忽略订单: {direction} {code} {amount}") logger.warning(f"资金或持仓不足,忽略订单: {direction} {code} {amount} {price}")
return {"success": False, "error": "资金或持仓不足"} return {"success": False, "error": "资金或持仓不足"}
# 更新策略目标持仓 # 更新策略目标持仓
self._update_strategy_target(strategy_name, code, direction, amount) self._update_strategy_target(strategy_name, code, direction, amount)
# 执行实际下单
price_type = xtconstant.FIX_PRICE if order_type == 'limit' else xtconstant.MARKET_BEST
# 下单 # 下单
logger.info(f"准备{direction}订单: 代码={code}, 数量={amount}, 价格={price}, 订单类型={order_type}")
if direction == 'buy': if direction == 'buy':
result = self.trader.buy(code, price, amount) result = self.trader.buy(code, price, amount, order_type)
else: else:
result = self.trader.sell(code, price, amount) result = self.trader.sell(code, price, amount, order_type)
order_id = result.get('order_id') order_id = result.get('order_id')
if not order_id or order_id == 'simulation': if not order_id or order_id == 'simulation':
@ -134,7 +136,7 @@ class RealTraderManager:
direction direction
) )
logger.info(f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}") logger.info(f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}")
# 立即更新一次订单状态 # 立即更新一次订单状态
self._update_order_status(order_id) self._update_order_status(order_id)
@ -150,15 +152,32 @@ class RealTraderManager:
try: try:
logger.info("开始检查未完成订单...") logger.info("开始检查未完成订单...")
# 如果没有未完成订单,直接返回
if not self.pending_orders:
logger.info("没有未完成订单需要检查")
return
# 更新StrategyPositionManager中的未完成委托状态 # 更新StrategyPositionManager中的未完成委托状态
try:
StrategyPositionManager.update_pending_orders(self.trader) StrategyPositionManager.update_pending_orders(self.trader)
except Exception as e:
logger.error(f"更新StrategyPositionManager未完成委托状态失败: {str(e)}")
# 获取最新的委托列表 # 获取最新的委托列表
try:
entrusts = self.trader.get_today_entrust() entrusts = self.trader.get_today_entrust()
if entrusts is None:
logger.error("获取今日委托失败,跳过本次检查")
return
entrust_map = {str(e['order_id']): e for e in entrusts} entrust_map = {str(e['order_id']): e for e in entrusts}
except Exception as e:
logger.error(f"获取今日委托失败: {str(e)},跳过本次检查")
return
# 检查每个未完成订单 # 检查每个未完成订单
for order_id, order_info in list(self.pending_orders.items()): for order_id, order_info in list(self.pending_orders.items()):
try:
# 跳过已完成的订单 # 跳过已完成的订单
if order_info['status'] in ['completed', 'cancelled', 'failed']: if order_info['status'] in ['completed', 'cancelled', 'failed']:
continue continue
@ -183,9 +202,14 @@ class RealTraderManager:
logger.info(f"订单部分成交详情: ID={order_id}, 原始数量={original}, 已成交={traded}, 剩余={remaining}") logger.info(f"订单部分成交详情: ID={order_id}, 原始数量={original}, 已成交={traded}, 剩余={remaining}")
self._handle_timeout_order(order_id, order_info) self._handle_timeout_order(order_id, order_info)
except Exception as e:
logger.error(f"处理订单 {order_id} 时出错: {str(e)}")
# 同步策略持仓和实际持仓 # 同步策略持仓和实际持仓
try:
self._sync_strategy_positions() self._sync_strategy_positions()
except Exception as e:
logger.error(f"同步策略持仓和实际持仓失败: {str(e)}")
logger.info("未完成订单检查完毕") logger.info("未完成订单检查完毕")
@ -305,24 +329,33 @@ class RealTraderManager:
# 递增重试计数 # 递增重试计数
order_info['retry_count'] += 1 order_info['retry_count'] += 1
logger.info(f"准备使用市价单补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}") # 决定是否使用市价单进行补单
use_market_order = Config.RTM_USE_MARKET_ORDER
# 如果重试次数少于最大重试次数,则使用市价单补单 logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 使用市价单={use_market_order}")
# 如果重试次数少于最大重试次数,则进行补单
if order_info['retry_count'] <= Config.RTM_MAX_RETRIES: if order_info['retry_count'] <= Config.RTM_MAX_RETRIES:
# 使用市价单 # 决定使用的订单类型
new_order_type = 'market' if use_market_order else 'limit'
# 对于市价单价格参数可设为0对于限价单使用原价格
new_price = 0 if new_order_type == 'market' else order_info['price']
# 下新订单
new_order = self.place_order( new_order = self.place_order(
order_info['strategy_name'], order_info['strategy_name'],
order_info['code'], order_info['code'],
order_info['direction'], order_info['direction'],
remaining_amount, remaining_amount,
0, # 市价单价格参数无效 new_price,
'market' # 使用市价单 new_order_type
) )
if new_order.get('success'): if new_order.get('success'):
logger.info(f"市价补单成功: 原订单ID={order_id}, 新订单ID={new_order['order_id']}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}") logger.info(f"补单成功: 原订单ID={order_id}, 新订单ID={new_order['order_id']}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}, 订单类型={new_order_type}")
else: else:
logger.error(f"市价补单失败: 原订单ID={order_id}, 错误={new_order.get('error')}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}") logger.error(f"补单失败: 原订单ID={order_id}, 错误={new_order.get('error')}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}, 订单类型={new_order_type}")
else: else:
logger.warning(f"订单重试次数过多,不再尝试: ID={order_id}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 代码={order_info['code']}, 方向={order_info['direction']}, 未成交数量={remaining_amount}") logger.warning(f"订单重试次数过多,不再尝试: ID={order_id}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 代码={order_info['code']}, 方向={order_info['direction']}, 未成交数量={remaining_amount}")
else: else:
@ -422,8 +455,17 @@ class RealTraderManager:
try: try:
# 获取实际持仓 # 获取实际持仓
actual_positions = self.trader.get_positions() actual_positions = self.trader.get_positions()
if actual_positions is None:
logger.error("获取实际持仓失败,跳过同步")
return
position_map = {p['stock_code']: p for p in actual_positions} position_map = {p['stock_code']: p for p in actual_positions}
# 如果没有策略目标持仓,直接返回
if not self.strategy_targets:
logger.info("没有策略目标持仓需要同步")
return
# 遍历每个策略的目标持仓 # 遍历每个策略的目标持仓
for strategy_name, targets in self.strategy_targets.items(): for strategy_name, targets in self.strategy_targets.items():
# 该策略的实际持仓映射 # 该策略的实际持仓映射
@ -431,6 +473,7 @@ class RealTraderManager:
# 遍历该策略的目标持仓 # 遍历该策略的目标持仓
for code, target_amount in targets.items(): for code, target_amount in targets.items():
try:
# 获取股票的实际持仓 # 获取股票的实际持仓
actual_position = position_map.get(code, {}) actual_position = position_map.get(code, {})
actual_amount = actual_position.get('volume', 0) actual_amount = actual_position.get('volume', 0)
@ -439,6 +482,7 @@ class RealTraderManager:
strategy_actual_positions[code] = actual_amount strategy_actual_positions[code] = actual_amount
# 更新策略持仓管理器中的持仓记录 # 更新策略持仓管理器中的持仓记录
try:
StrategyPositionManager.update_strategy_position( StrategyPositionManager.update_strategy_position(
self.trader, self.trader,
strategy_name, strategy_name,
@ -446,12 +490,16 @@ class RealTraderManager:
'sync', # 使用同步模式 'sync', # 使用同步模式
actual_amount actual_amount
) )
except Exception as e:
logger.error(f"更新策略持仓管理器持仓记录失败: {str(e)}")
# 检查是否需要调整持仓 # 检查是否需要调整持仓
if actual_amount != target_amount: if actual_amount != target_amount:
diff = target_amount - actual_amount diff = target_amount - actual_amount
if diff != 0: if diff != 0:
logger.warning(f"持仓不一致: 策略={strategy_name}, 代码={code}, 目标={target_amount}, 实际={actual_amount}") logger.warning(f"持仓不一致: 策略={strategy_name}, 代码={code}, 目标={target_amount}, 实际={actual_amount}")
except Exception as e:
logger.error(f"同步股票 {code} 持仓时出错: {str(e)}")
# 记录日志 # 记录日志
logger.info(f"策略 {strategy_name} 的目标持仓: {targets}") logger.info(f"策略 {strategy_name} 的目标持仓: {targets}")
@ -494,6 +542,7 @@ class RealTraderManager:
# 单例模式实现 # 单例模式实现
_instance = None _instance = None
_instance_lock = threading.RLock() # 添加线程锁
def get_real_trader_manager(): def get_real_trader_manager():
"""获取实盘交易管理器单例实例 """获取实盘交易管理器单例实例
@ -502,9 +551,11 @@ def get_real_trader_manager():
RealTraderManager: 实盘交易管理器实例 RealTraderManager: 实盘交易管理器实例
""" """
global _instance global _instance
# 使用线程锁确保线程安全
with _instance_lock:
if _instance is None: if _instance is None:
# 从trade_server获取实盘交易实例 # 避免循环导入使用参数传递XtTrader实例
from trade_server import get_real_trader # 注意这里依赖trade_server.py中的实现提供trader实例
trader = get_real_trader() # 在trade_server.py中会直接传入trader实例
_instance = RealTraderManager(trader) _instance = RealTraderManager(None)
return _instance return _instance

View File

@ -11,7 +11,6 @@ from simulation_trader import SimulationTrader
import datetime import datetime
from strategy_position_manager import StrategyPositionManager from strategy_position_manager import StrategyPositionManager
from logger_config import get_logger from logger_config import get_logger
from real_trader_manager import get_real_trader_manager
# 获取日志记录器 # 获取日志记录器
logger = get_logger('server') logger = get_logger('server')
@ -19,6 +18,13 @@ logger = get_logger('server')
# 全局交易实例(采用单例模式) # 全局交易实例(采用单例模式)
_sim_trader_instance = None # 模拟交易实例(单例) _sim_trader_instance = None # 模拟交易实例(单例)
_real_trader_instance = None # 实盘交易实例(单例) _real_trader_instance = None # 实盘交易实例(单例)
_real_trader_manager_instance = None # 实盘交易管理器实例(单例)
# 添加线程锁,保护单例实例的创建
_instance_lock = threading.RLock()
# 后台任务执行线程
_scheduler_thread = None
# 获取模拟交易实例的辅助函数 # 获取模拟交易实例的辅助函数
def get_sim_trader(): def get_sim_trader():
@ -28,6 +34,7 @@ def get_sim_trader():
返回模拟交易单例实例 返回模拟交易单例实例
""" """
global _sim_trader_instance global _sim_trader_instance
with _instance_lock:
if _sim_trader_instance is None: if _sim_trader_instance is None:
_sim_trader_instance = SimulationTrader() _sim_trader_instance = SimulationTrader()
return _sim_trader_instance return _sim_trader_instance
@ -40,14 +47,33 @@ def get_real_trader():
返回实盘交易单例实例 返回实盘交易单例实例
""" """
global _real_trader_instance global _real_trader_instance
with _instance_lock:
if _real_trader_instance is None: if _real_trader_instance is None:
_real_trader_instance = XtTrader() _real_trader_instance = XtTrader()
# 检查交易实例是否已登录,如果未登录则进行登录 # 检查交易实例是否已登录,如果未登录则进行登录
if not _real_trader_instance.is_logged_in(): if not _real_trader_instance.is_logged_in():
logger.info("创建新的XtTrader实例并登录") logger.info("创建新的XtTrader实例并登录")
_real_trader_instance.login() login_success = _real_trader_instance.login()
if not login_success:
logger.error("XtTrader登录失败")
return _real_trader_instance return _real_trader_instance
# 获取实盘交易管理器实例的辅助函数
def get_real_trader_manager():
"""获取实盘交易管理器实例 - 保证单例模式
Returns:
返回实盘交易管理器单例实例
"""
global _real_trader_manager_instance
with _instance_lock:
if _real_trader_manager_instance is None:
# 延迟导入避免循环依赖
from real_trader_manager import RealTraderManager
_real_trader_manager_instance = RealTraderManager(get_real_trader())
logger.info("创建新的RealTraderManager实例")
return _real_trader_manager_instance
# 判断当前是否应该使用模拟交易 # 判断当前是否应该使用模拟交易
def should_use_simulation(): def should_use_simulation():
"""判断是否应该使用模拟交易 """判断是否应该使用模拟交易
@ -68,9 +94,46 @@ def should_use_simulation():
try: try:
from chinese_calendar import is_workday, is_holiday from chinese_calendar import is_workday, is_holiday
is_trading_day = is_workday(now) and not is_holiday(now) is_trading_day = is_workday(now) and not is_holiday(now)
logger.debug(f"使用chinese_calendar判断交易日: {now.date()}, 是交易日: {is_trading_day}")
except ImportError: except ImportError:
# 如果无法导入chinese_calendar则简单地用工作日判断 # 如果无法导入chinese_calendar使用简单的工作日判断,并记录警告
is_trading_day = now.weekday() < 5 # 0-4 为周一至周五 is_trading_day = now.weekday() < 5 # 0-4 为周一至周五
logger.warning(f"无法导入chinese_calendar使用简单工作日判断: {now.date()}, 是工作日: {is_trading_day}")
# 额外检查是否为已知的中国法定节假日(如果必要,可以扩展此列表)
holidays_2023 = [
datetime.date(2023, 1, 1), # 元旦
datetime.date(2023, 1, 2), # 元旦调休
datetime.date(2023, 1, 21), # 春节
datetime.date(2023, 1, 22), # 春节
datetime.date(2023, 1, 23), # 春节
datetime.date(2023, 1, 24), # 春节
datetime.date(2023, 1, 25), # 春节
datetime.date(2023, 1, 26), # 春节
datetime.date(2023, 1, 27), # 春节
# ... 可以添加更多已知节假日
]
holidays_2024 = [
datetime.date(2024, 1, 1), # 元旦
datetime.date(2024, 2, 10), # 春节
datetime.date(2024, 2, 11), # 春节
datetime.date(2024, 2, 12), # 春节
datetime.date(2024, 2, 13), # 春节
datetime.date(2024, 2, 14), # 春节
datetime.date(2024, 2, 15), # 春节
datetime.date(2024, 2, 16), # 春节
datetime.date(2024, 2, 17), # 春节
# ... 可以添加更多已知节假日
]
# 合并所有年份的节假日
all_holidays = holidays_2023 + holidays_2024
# 检查当前日期是否在已知节假日列表中
if now.date() in all_holidays:
is_trading_day = False
logger.info(f"当前日期 {now.date()} 在已知节假日列表中,判定为非交易日")
# 如果不是交易日返回True使用模拟交易 # 如果不是交易日返回True使用模拟交易
if not is_trading_day: if not is_trading_day:
@ -147,16 +210,61 @@ def run_daily(time_str, job_func):
def run_pending_tasks(): def run_pending_tasks():
while True: """定时任务执行线程"""
global _scheduler_thread_running
logger.info("定时任务调度线程已启动")
while _scheduler_thread_running:
try: try:
schedule.run_pending() schedule.run_pending()
time.sleep(1) time.sleep(1)
except Exception as e: except Exception as e:
logger.error(f"Error in scheduler: {str(e)}") logger.error(f"Error in scheduler: {str(e)}")
logger.info("定时任务调度线程已停止")
# Run the task scheduler in a new thread # 程序启动时初始化线程
threading.Thread(target=run_pending_tasks).start() _scheduler_thread_running = True
_scheduler_thread = threading.Thread(target=run_pending_tasks, daemon=True)
_scheduler_thread.start()
# 程序退出清理函数
def cleanup():
"""程序退出时执行的清理操作"""
logger.info("开始执行程序退出清理...")
# 停止调度线程
global _scheduler_thread_running
_scheduler_thread_running = False
# 等待调度线程结束最多等待5秒
if _scheduler_thread and _scheduler_thread.is_alive():
_scheduler_thread.join(timeout=5)
# 保存策略数据
try:
StrategyPositionManager.save_strategy_data()
logger.info("策略数据已保存")
except Exception as e:
logger.error(f"保存策略数据失败: {str(e)}")
# 登出交易实例
try:
# 登出模拟交易实例
if _sim_trader_instance is not None:
_sim_trader_instance.logout()
logger.info("模拟交易实例已登出")
# 登出实盘交易实例
if _real_trader_instance is not None:
_real_trader_instance.logout()
logger.info("实盘交易实例已登出")
except Exception as e:
logger.error(f"登出交易实例失败: {str(e)}")
logger.info("程序退出清理完成")
# 注册程序退出处理函数
atexit.register(cleanup)
# 初始化交易环境 # 初始化交易环境
get_trader().login() get_trader().login()
@ -165,8 +273,8 @@ get_trader().login()
app = Flask(__name__) app = Flask(__name__)
# 添加策略数据相关的定期任务 # 添加策略数据相关的定期任务
schedule.every().day.at("00:01").do(StrategyPositionManager.clean_timeout_orders) # 每天清理超时委托 schedule.every().day.at(Config.CLEAN_ORDERS_TIME).do(StrategyPositionManager.clean_timeout_orders) # 每天清理超时委托
schedule.every().day.at("15:30").do(StrategyPositionManager.save_strategy_data) # 每天收盘后保存策略数据 schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(StrategyPositionManager.save_strategy_data) # 每天收盘后保存策略数据
# 程序启动时加载策略数据 # 程序启动时加载策略数据
StrategyPositionManager.load_strategy_data() StrategyPositionManager.load_strategy_data()
@ -335,34 +443,30 @@ def cancel(entrust_no):
StrategyPositionManager.update_pending_orders(sim_trader) StrategyPositionManager.update_pending_orders(sim_trader)
return jsonify({"success": True, "data": result, "simulation": True}), 200 return jsonify({"success": True, "data": result, "simulation": True}), 200
else: else:
# 尝试使用RealTraderManager撤单 # 实盘交易优先使用RealTraderManager
try: if Config.USE_REAL_TRADER_MANAGER:
rtm = get_real_trader_manager() rtm = get_real_trader_manager()
# 在RealTraderManager的待处理订单中查找
found_in_rtm = False
for order in rtm.get_pending_orders(): for order in rtm.get_pending_orders():
if str(order['order_id']) == str(entrust_no): if str(order['order_id']) == str(entrust_no):
# 找到对应订单使用RealTraderManager处理 found_in_rtm = True
real_trader = get_real_trader() # 使用RealTraderManager中的trader进行撤单
result = real_trader.cancel(entrust_no) result = rtm.trader.cancel(entrust_no)
logger.info(f"实盘交易撤单结果: {result}") logger.info(f"通过RealTraderManager撤单结果: {result}")
# 更新订单状态 # 更新订单状态
rtm.check_pending_orders() rtm.check_pending_orders()
return jsonify({"success": True, "data": result, "simulation": False}), 200 return jsonify({"success": True, "data": result, "simulation": False}), 200
# 如果RealTraderManager中找不到则使用普通实盘 # 如果RealTraderManager中未找到执行下面的普通实盘撤单
real_trader = get_real_trader() if not found_in_rtm:
result = real_trader.cancel(entrust_no) logger.info(f"在RealTraderManager中未找到订单{entrust_no},使用普通实盘撤单")
logger.info(f"实盘交易撤单结果: {result}")
# 更新未完成委托状态 # 普通实盘撤单方式
StrategyPositionManager.update_pending_orders(real_trader)
return jsonify({"success": True, "data": result, "simulation": False}), 200
except Exception as e:
logger.error(f"使用RealTraderManager撤单失败: {str(e)}")
# 回退到普通方式
real_trader = get_real_trader() real_trader = get_real_trader()
result = real_trader.cancel(entrust_no) result = real_trader.cancel(entrust_no)
logger.info(f"实盘交易撤单结果: {result}") logger.info(f"普通实盘撤单结果: {result}")
# 更新未完成委托状态 # 更新未完成委托状态
StrategyPositionManager.update_pending_orders(real_trader) StrategyPositionManager.update_pending_orders(real_trader)
@ -388,9 +492,14 @@ def get_balance():
logger.info(f"模拟交易余额: {balance}") logger.info(f"模拟交易余额: {balance}")
return jsonify({"success": True, "data": balance, "simulation": True}), 200 return jsonify({"success": True, "data": balance, "simulation": True}), 200
else: else:
# 实盘交易 # 实盘交易,添加超时处理
trader = get_real_trader() trader = get_real_trader()
balance = trader.get_balance() balance = execute_with_timeout(trader.get_balance, Config.TRADE_TIMEOUT)
if balance is None:
logger.error("获取实盘余额超时")
return jsonify({"success": False, "error": "获取余额超时,请稍后重试", "simulation": False}), 500
logger.info(f"实盘交易余额: {balance}") logger.info(f"实盘交易余额: {balance}")
return jsonify({"success": True, "data": balance, "simulation": False}), 200 return jsonify({"success": True, "data": balance, "simulation": False}), 200
except Exception as e: except Exception as e:
@ -483,19 +592,35 @@ def clear_strategy(strategy_name):
# 判断当前交易模式 # 判断当前交易模式
should_simulate, _ = should_use_simulation() should_simulate, _ = should_use_simulation()
# 如果是实盘模式 # 如果是实盘模式且启用了RealTraderManager
if not should_simulate: if not should_simulate and Config.USE_REAL_TRADER_MANAGER:
# 先尝试清除RealTraderManager中的策略目标 # 先尝试清除RealTraderManager中的策略目标
rtm = get_real_trader_manager() rtm = get_real_trader_manager()
if strategy_name in rtm.strategy_targets:
with _instance_lock: # 使用锁保护操作
if strategy_name in rtm.strategy_targets: if strategy_name in rtm.strategy_targets:
del rtm.strategy_targets[strategy_name] del rtm.strategy_targets[strategy_name]
logger.info(f"已清除RealTraderManager中的策略目标: {strategy_name}") logger.info(f"已清除RealTraderManager中的策略目标: {strategy_name}")
# 清除RealTraderManager中相关的待处理订单
pending_orders_to_remove = []
for order_id, order_info in rtm.pending_orders.items():
if order_info.get('strategy_name') == strategy_name:
pending_orders_to_remove.append(order_id)
# 删除相关订单
for order_id in pending_orders_to_remove:
with _instance_lock: # 使用锁保护操作
if order_id in rtm.pending_orders:
del rtm.pending_orders[order_id]
logger.info(f"已清除RealTraderManager中的订单: {order_id}")
# 获取相应的交易实例 # 获取相应的交易实例
trader = get_sim_trader() if should_simulate else get_real_trader() trader = get_sim_trader() if should_simulate else get_real_trader()
# 如果是模拟交易实例,则重置模拟交易实例 # 如果是模拟交易实例,则重置模拟交易实例
if should_simulate and isinstance(trader, SimulationTrader): if should_simulate and isinstance(trader, SimulationTrader):
with _instance_lock: # 使用锁保护操作
global _sim_trader_instance global _sim_trader_instance
if _sim_trader_instance is not None: if _sim_trader_instance is not None:
logger.info("重置模拟交易实例") logger.info("重置模拟交易实例")
@ -518,11 +643,25 @@ def clear_strategy(strategy_name):
# 超时处理函数 # 超时处理函数
def execute_with_timeout(func, timeout, *args, **kwargs): def execute_with_timeout(func, timeout, *args, **kwargs):
"""执行函数并设置超时时间如果超时则返回None
Args:
func: 要执行的函数
timeout: 超时时间
args, kwargs: 传递给func的参数
Returns:
func的返回值如果超时则返回None
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs) future = executor.submit(func, *args, **kwargs)
try: try:
return future.result(timeout=timeout) return future.result(timeout=timeout)
except TimeoutError: except TimeoutError:
logger.warning(f"函数 {func.__name__} 执行超时 (>{timeout}秒)")
return None
except Exception as e:
logger.error(f"函数 {func.__name__} 执行出错: {str(e)}")
return None return None
@ -532,19 +671,38 @@ def get_order_status():
"""获取订单状态""" """获取订单状态"""
logger.info("Received order status request") logger.info("Received order status request")
try:
# 判断当前交易模式 # 判断当前交易模式
should_simulate, _ = should_use_simulation() should_simulate, _ = should_use_simulation()
if not should_simulate and Config.USE_REAL_TRADER_MANAGER: if not should_simulate and Config.USE_REAL_TRADER_MANAGER:
# 实盘 + RealTraderManager模式 # 实盘 + RealTraderManager模式
try:
rtm = get_real_trader_manager() rtm = get_real_trader_manager()
pending_orders = rtm.get_pending_orders() pending_orders = rtm.get_pending_orders()
if pending_orders is None:
logger.error("从RealTraderManager获取订单状态失败")
return jsonify({"success": False, "error": "获取订单状态失败", "simulation": False}), 500
return jsonify({"success": True, "data": pending_orders, "simulation": False}), 200 return jsonify({"success": True, "data": pending_orders, "simulation": False}), 200
else: except Exception as e:
logger.error(f"从RealTraderManager获取订单状态时出错: {str(e)}")
# 发生错误时,回退到使用普通交易实例
logger.info("回退到使用普通交易实例获取订单状态")
# 模拟交易或实盘但未使用RealTraderManager # 模拟交易或实盘但未使用RealTraderManager
trader = get_sim_trader() if should_simulate else get_real_trader() trader = get_sim_trader() if should_simulate else get_real_trader()
entrusts = trader.get_today_entrust() try:
entrusts = execute_with_timeout(trader.get_today_entrust, Config.TRADE_TIMEOUT)
if entrusts is None:
logger.error("获取今日委托超时")
return jsonify({"success": False, "error": "获取今日委托超时", "simulation": should_simulate}), 500
return jsonify({"success": True, "data": entrusts, "simulation": should_simulate}), 200 return jsonify({"success": True, "data": entrusts, "simulation": should_simulate}), 200
except Exception as e:
logger.error(f"获取今日委托时出错: {str(e)}")
return jsonify({"success": False, "error": f"获取今日委托时出错: {str(e)}", "simulation": should_simulate}), 500
except Exception as e:
logger.error(f"处理订单状态请求时出错: {str(e)}")
abort(500, description="Internal server error")
# 添加新的API端点查询策略目标持仓 # 添加新的API端点查询策略目标持仓
@ -553,13 +711,20 @@ def get_strategy_targets():
"""获取策略目标持仓""" """获取策略目标持仓"""
logger.info("Received strategy targets request") logger.info("Received strategy targets request")
try:
# 获取查询参数 # 获取查询参数
strategy_name = request.args.get("strategy_name") strategy_name = request.args.get("strategy_name")
# 检查是否是实盘模式且使用RealTraderManager # 检查是否是实盘模式且使用RealTraderManager
should_simulate, _ = should_use_simulation() should_simulate, _ = should_use_simulation()
if not should_simulate and Config.USE_REAL_TRADER_MANAGER: if should_simulate:
return jsonify({"success": False, "error": "模拟交易模式下不支持目标持仓", "simulation": True}), 400
if not Config.USE_REAL_TRADER_MANAGER:
return jsonify({"success": False, "error": "RealTraderManager未启用无法获取目标持仓", "simulation": False}), 400
try:
rtm = get_real_trader_manager() rtm = get_real_trader_manager()
targets = rtm.get_strategy_targets() targets = rtm.get_strategy_targets()
@ -569,8 +734,12 @@ def get_strategy_targets():
return jsonify({"success": True, "data": {strategy_name: strategy_target}, "simulation": False}), 200 return jsonify({"success": True, "data": {strategy_name: strategy_target}, "simulation": False}), 200
return jsonify({"success": True, "data": targets, "simulation": False}), 200 return jsonify({"success": True, "data": targets, "simulation": False}), 200
else: except Exception as e:
return jsonify({"success": False, "error": "无法获取目标持仓非实盘模式或RealTraderManager未启用"}), 400 logger.error(f"获取策略目标持仓时出错: {str(e)}")
return jsonify({"success": False, "error": f"获取策略目标持仓时出错: {str(e)}", "simulation": False}), 500
except Exception as e:
logger.error(f"处理策略目标持仓请求时出错: {str(e)}")
abort(500, description="Internal server error")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -165,15 +165,67 @@ class XtTrader:
} }
return price_type_map.get(price_type, f"unknown_{price_type}") return price_type_map.get(price_type, f"unknown_{price_type}")
def buy(self, code, price, amount): def buy(self, code, price, amount, order_type='limit'):
"""买入股票
Args:
code: 股票代码
price: 买入价格市价单时可为0
amount: 买入数量
order_type: 订单类型'limit'=限价单'market'=市价单默认为'limit'
Returns:
dict: 包含订单ID的字典
"""
# 确定价格类型
price_type = xtconstant.FIX_PRICE # 默认为限价单
if order_type == 'market':
# 市价单,根据不同市场选择合适的市价单类型
if code.startswith('1') or code.startswith('5'):
# 基金等可能需要不同的市价单类型
price_type = xtconstant.MARKET_BEST
else:
price_type = xtconstant.MARKET_BEST # 市价最优价
# 如果是市价单价格可以设为0
if price_type != xtconstant.FIX_PRICE:
price = 0
order_id = self.xt_trader.order_stock( order_id = self.xt_trader.order_stock(
self.account, code, xtconstant.STOCK_BUY, amount, xtconstant.FIX_PRICE, price, self._strategy_name, self._remark self.account, code, xtconstant.STOCK_BUY, amount, price_type, price, self._strategy_name, self._remark
) )
return {"order_id": order_id} return {"order_id": order_id}
def sell(self, code, price, amount): def sell(self, code, price, amount, order_type='limit'):
"""卖出股票
Args:
code: 股票代码
price: 卖出价格市价单时可为0
amount: 卖出数量
order_type: 订单类型'limit'=限价单'market'=市价单默认为'limit'
Returns:
dict: 包含订单ID的字典
"""
# 确定价格类型
price_type = xtconstant.FIX_PRICE # 默认为限价单
if order_type == 'market':
# 市价单,根据不同市场选择合适的市价单类型
if code.startswith('1') or code.startswith('5'):
# 基金等可能需要不同的市价单类型
price_type = xtconstant.MARKET_BEST
else:
price_type = xtconstant.MARKET_BEST # 市价最优价
# 如果是市价单价格可以设为0
if price_type != xtconstant.FIX_PRICE:
price = 0
order_id = self.xt_trader.order_stock( order_id = self.xt_trader.order_stock(
self.account, code, xtconstant.STOCK_SELL, amount, xtconstant.FIX_PRICE, price, self._strategy_name, self._remark self.account, code, xtconstant.STOCK_SELL, amount, price_type, price, self._strategy_name, self._remark
) )
return {"order_id": order_id} return {"order_id": order_id}