添加断线重连功能

This commit is contained in:
zhiyong 2025-05-11 01:18:04 +08:00
parent 363cb17ea4
commit a9e074a116
2 changed files with 83 additions and 38 deletions

View File

@ -12,10 +12,16 @@ from logger_config import get_logger
logger = get_logger('real_trader')
class MyXtQuantTraderCallback:
def __init__(self, trader_instance):
self.trader_instance: XtTrader = trader_instance
def on_connected(self):
logger.info("连接成功")
def on_disconnected(self):
logger.warning("连接断开")
if self.trader_instance:
if not self.trader_instance.reconnect():
logger.error("重连失败")
raise Exception("断线重连失败")
def on_account_status(self, status):
logger.info(f"账号状态: {status.account_id} {status.status}")
def on_stock_asset(self, asset):
@ -49,7 +55,7 @@ class XtTrader(BaseTrader):
self._account_type = os.environ.get("XT_ACCOUNT_TYPE", "STOCK")
self._strategy_name = os.environ.get("XT_STRATEGY_NAME", "xt_strategy")
self._remark = os.environ.get("XT_REMARK", "remark")
self._callback = MyXtQuantTraderCallback()
self._callback = MyXtQuantTraderCallback(self)
self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID)
self.account = StockAccount(self._ACCOUNT, self._account_type)
self.xt_trader.register_callback(self._callback)
@ -276,4 +282,36 @@ class XtTrader(BaseTrader):
return None
except Exception as e:
logger.error(f"获取行情失败: {code}, {str(e)}")
return None
return None
def reconnect(self):
# 关闭旧连接
if self.started:
self.xt_trader.stop()
self.started = False
self.connected = False
self.subscribed = False
# 尝试范围内的新session_id
session_id_range = range(100000, 100020)
for session_id in random.sample(list(session_id_range), len(session_id_range)):
self._SESSION_ID = session_id
self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID)
self._callback = MyXtQuantTraderCallback(self) # 传入self引用
self.xt_trader.register_callback(self._callback)
# 重新连接
self.xt_trader.start()
self.started = True
result = self.xt_trader.connect()
if result == 0:
self.connected = True
result = self.xt_trader.subscribe(self.account)
if result == 0:
self.subscribed = True
logger.info(f"重连成功使用session_id: {self._SESSION_ID}")
return True
logger.error("所有尝试都失败,无法重连")
return False

View File

@ -16,8 +16,9 @@ logger = get_logger("server")
_trader_instance = None # 交易实例(单例)
_real_trader_manager_instance = None # 实盘交易管理器实例(单例)
# 后台任务执行线程
_scheduler_thread = None
def is_real_mode():
return not Config.SIMULATION_MODE
# 获取实盘交易管理器实例的辅助函数
@ -43,9 +44,20 @@ def get_trader():
def login():
try:
get_trader().login()
result = get_trader().get_balance()
if result and result["account_id"] == "simulation":
global _trader_instance
# 如果已经有实例,先销毁
if _trader_instance is not None and is_real_mode():
_trader_instance.logout()
_trader_instance = None
# 创建新的XtTrader实例
_trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader()
logger.info("开始登录")
_trader_instance.login()
result = _trader_instance.get_balance()
if result and result["account_id"] is not None:
logger.info(f"查询余额成功: {result}")
else:
logger.error(f"登录失败: {result}")
@ -55,43 +67,43 @@ def login():
raise Exception(f"登录失败: {e}")
def logout():
get_trader().logout()
logger.info("登出成功")
global _trader_instance
if _trader_instance is not None:
_trader_instance.logout()
logger.info("登出成功")
# 销毁实例
if is_real_mode():
_trader_instance = None
logger.info("XtTrader实例已销毁")
def is_real_mode():
return not Config.SIMULATION_MODE
def setup_scheduler():
# 设置每日任务
schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login)
schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout)
# 启动调度线程
scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("定时任务调度器已启动")
return scheduler_thread
def run_daily(time_str, job_func):
"""设置每天在指定时间运行的任务
Args:
time_str: 运行时间格式为"HH:MM"
job_func: 要运行的函数
"""
# 不再区分周一到周五,而是每天执行
# 交易日判断逻辑已移到get_trader函数中
schedule.every().day.at(time_str).do(job_func)
def run_pending_tasks():
def _run_scheduler():
"""定时任务执行线程"""
global _scheduler_thread_running
logger.info("定时任务调度线程已启动")
while _scheduler_thread_running:
while True:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logger.error(f"Error in scheduler: {str(e)}")
logger.info("定时任务调度线程已停止")
logger.error(f"调度器执行错误: {str(e)}")
# 程序启动时初始化线程
_scheduler_thread_running = True
_scheduler_thread = threading.Thread(target=run_pending_tasks, daemon=True)
_scheduler_thread.start()
# 设置并启动调度器
setup_scheduler()
# 程序启动时初始化交易实例
login()
@ -99,11 +111,6 @@ login()
# 添加请求频率限制
app = Flask(__name__)
# 使用配置文件中的时间
run_daily(Config.MARKET_OPEN_TIME, lambda: login())
run_daily(Config.MARKET_CLOSE_TIME, lambda: logout())
@app.route("/yu/healthcheck", methods=["GET"])
def health_check():
return "ok", 200