From a9e074a11686baddd90388b397b80259778f6767 Mon Sep 17 00:00:00 2001 From: zhiyong Date: Sun, 11 May 2025 01:18:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=96=AD=E7=BA=BF=E9=87=8D?= =?UTF-8?q?=E8=BF=9E=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/real/xt_trader.py | 42 +++++++++++++++++++++-- src/trade_server.py | 79 +++++++++++++++++++++++-------------------- 2 files changed, 83 insertions(+), 38 deletions(-) diff --git a/src/real/xt_trader.py b/src/real/xt_trader.py index 7d60c0e..e416d34 100644 --- a/src/real/xt_trader.py +++ b/src/real/xt_trader.py @@ -12,10 +12,16 @@ from logger_config import get_logger logger = get_logger('real_trader') class MyXtQuantTraderCallback: + def __init__(self, trader_instance): + self.trader_instance: XtTrader = trader_instance def on_connected(self): logger.info("连接成功") def on_disconnected(self): logger.warning("连接断开") + if self.trader_instance: + if not self.trader_instance.reconnect(): + logger.error("重连失败") + raise Exception("断线重连失败") def on_account_status(self, status): logger.info(f"账号状态: {status.account_id} {status.status}") def on_stock_asset(self, asset): @@ -49,7 +55,7 @@ class XtTrader(BaseTrader): self._account_type = os.environ.get("XT_ACCOUNT_TYPE", "STOCK") self._strategy_name = os.environ.get("XT_STRATEGY_NAME", "xt_strategy") self._remark = os.environ.get("XT_REMARK", "remark") - self._callback = MyXtQuantTraderCallback() + self._callback = MyXtQuantTraderCallback(self) self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID) self.account = StockAccount(self._ACCOUNT, self._account_type) self.xt_trader.register_callback(self._callback) @@ -276,4 +282,36 @@ class XtTrader(BaseTrader): return None except Exception as e: logger.error(f"获取行情失败: {code}, {str(e)}") - return None \ No newline at end of file + return None + + def reconnect(self): + # 关闭旧连接 + if self.started: + self.xt_trader.stop() + self.started = False + self.connected = False + self.subscribed = False + + # 尝试范围内的新session_id + session_id_range = range(100000, 100020) + for session_id in random.sample(list(session_id_range), len(session_id_range)): + self._SESSION_ID = session_id + self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID) + self._callback = MyXtQuantTraderCallback(self) # 传入self引用 + self.xt_trader.register_callback(self._callback) + + # 重新连接 + self.xt_trader.start() + self.started = True + + result = self.xt_trader.connect() + if result == 0: + self.connected = True + result = self.xt_trader.subscribe(self.account) + if result == 0: + self.subscribed = True + logger.info(f"重连成功,使用session_id: {self._SESSION_ID}") + return True + + logger.error("所有尝试都失败,无法重连") + return False \ No newline at end of file diff --git a/src/trade_server.py b/src/trade_server.py index 2e54e91..ca6655a 100644 --- a/src/trade_server.py +++ b/src/trade_server.py @@ -16,8 +16,9 @@ logger = get_logger("server") _trader_instance = None # 交易实例(单例) _real_trader_manager_instance = None # 实盘交易管理器实例(单例) -# 后台任务执行线程 -_scheduler_thread = None + +def is_real_mode(): + return not Config.SIMULATION_MODE # 获取实盘交易管理器实例的辅助函数 @@ -43,9 +44,20 @@ def get_trader(): def login(): try: - get_trader().login() - result = get_trader().get_balance() - if result and result["account_id"] == "simulation": + global _trader_instance + + # 如果已经有实例,先销毁 + if _trader_instance is not None and is_real_mode(): + _trader_instance.logout() + _trader_instance = None + + # 创建新的XtTrader实例 + _trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader() + + logger.info("开始登录") + _trader_instance.login() + result = _trader_instance.get_balance() + if result and result["account_id"] is not None: logger.info(f"查询余额成功: {result}") else: logger.error(f"登录失败: {result}") @@ -55,43 +67,43 @@ def login(): raise Exception(f"登录失败: {e}") def logout(): - get_trader().logout() - logger.info("登出成功") + global _trader_instance + + if _trader_instance is not None: + _trader_instance.logout() + logger.info("登出成功") + + # 销毁实例 + if is_real_mode(): + _trader_instance = None + logger.info("XtTrader实例已销毁") -def is_real_mode(): - return not Config.SIMULATION_MODE +def setup_scheduler(): + # 设置每日任务 + schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login) + schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout) + + # 启动调度线程 + scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True) + scheduler_thread.start() + logger.info("定时任务调度器已启动") + + return scheduler_thread -def run_daily(time_str, job_func): - """设置每天在指定时间运行的任务 - - Args: - time_str: 运行时间,格式为"HH:MM" - job_func: 要运行的函数 - """ - # 不再区分周一到周五,而是每天执行 - # 交易日判断逻辑已移到get_trader函数中 - schedule.every().day.at(time_str).do(job_func) - - -def run_pending_tasks(): +def _run_scheduler(): """定时任务执行线程""" - global _scheduler_thread_running - logger.info("定时任务调度线程已启动") - while _scheduler_thread_running: + while True: try: schedule.run_pending() time.sleep(1) except Exception as e: - logger.error(f"Error in scheduler: {str(e)}") - logger.info("定时任务调度线程已停止") + logger.error(f"调度器执行错误: {str(e)}") -# 程序启动时初始化线程 -_scheduler_thread_running = True -_scheduler_thread = threading.Thread(target=run_pending_tasks, daemon=True) -_scheduler_thread.start() +# 设置并启动调度器 +setup_scheduler() # 程序启动时初始化交易实例 login() @@ -99,11 +111,6 @@ login() # 添加请求频率限制 app = Flask(__name__) -# 使用配置文件中的时间 -run_daily(Config.MARKET_OPEN_TIME, lambda: login()) -run_daily(Config.MARKET_CLOSE_TIME, lambda: logout()) - - @app.route("/yu/healthcheck", methods=["GET"]) def health_check(): return "ok", 200