real_trader/src/core/scheduler_tasks.py
2025-05-16 14:12:20 +08:00

111 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
定时任务管理和执行。
本模块负责定义、调度和执行所有后台定时任务,
例如每日自动登录/登出、QMT重启以及连接状态检查等。
"""
import schedule
import threading
import time
import subprocess
import os
from config import Config
from logger_config import get_logger
from base_trader import BaseTrader
from .app_state import login, logout, is_real_mode, _trader_instance, logger as app_logger # 使用 app_state中的logger
# 如果需要独立的 logger可以取消下面这行的注释
# logger = get_logger("scheduler")
logger = app_logger # 复用 app_state 的 logger
def _run_scheduler() -> None:
"""定时任务执行线程的主循环。"""
while True:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logger.error(f"调度器执行错误: {str(e)}")
def _check_reconnect() -> None:
"""定期检查交易实例是否需要重连(仅实盘模式)。"""
while True:
try:
# 直接使用 app_state 中的 _trader_instance
if is_real_mode() and _trader_instance is not None and hasattr(_trader_instance, 'check_reconnect'):
_trader_instance.check_reconnect()
time.sleep(60) # 每分钟检查一次
except Exception as e:
logger.error(f"重连检查错误: {str(e)}")
def login_on_trading_day() -> None:
"""仅在交易日执行登录操作。"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行计划的登录操作")
login() # 调用 app_state.login
else:
logger.info("今天不是交易日,跳过计划的登录操作")
def logout_on_trading_day() -> None:
"""仅在交易日执行登出操作。"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行计划的登出操作")
logout() # 调用 app_state.logout
else:
logger.info("今天不是交易日,跳过计划的登出操作")
def restart_qmt() -> None:
"""重启QMT客户端软件。"""
try:
if not os.path.exists(Config.XT_LAUNCHER):
logger.error(f"QMT启动路径不存在: {Config.XT_LAUNCHER}")
return
for proc_name in [Config.XT_PROCESS1, Config.XT_PROCESS2]:
try:
kill_cmd = f'taskkill /F /IM {proc_name}'
result = subprocess.run(kill_cmd, shell=True, capture_output=True, text=True, check=False)
if result.returncode == 0:
logger.info(f"已关闭进程: {proc_name}")
else:
# 进程不存在通常返回码 128 或 1
if result.returncode not in [1, 128]:
logger.warning(f"关闭进程 {proc_name} 时返回码: {result.returncode}, 输出: {result.stdout}, 错误: {result.stderr}")
except Exception as e:
logger.error(f"关闭进程 {proc_name} 时发生异常: {str(e)}")
subprocess.Popen(Config.XT_LAUNCHER)
logger.info(f"已尝试启动QMT软件: {Config.XT_LAUNCHER}")
except Exception as e:
logger.error(f"重启QMT软件时发生错误: {str(e)}")
def restart_qmt_on_trading_day() -> None:
"""仅在交易日执行QMT软件重启操作。"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日执行计划的QMT软件重启操作")
restart_qmt()
else:
logger.info("今天不是交易日跳过计划的QMT软件重启")
def setup_scheduler() -> threading.Thread:
"""
设置所有定时任务并启动调度线程。
Returns:
threading.Thread: 启动的调度器线程实例。
"""
schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login_on_trading_day)
schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout_on_trading_day)
schedule.every().day.at(Config.XT_RESTART_TIME).do(restart_qmt_on_trading_day)
scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("定时任务调度器已启动")
if is_real_mode():
reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True)
reconnect_thread.start()
logger.info("重连检查线程已启动")
return scheduler_thread