From 1c1b19383cc1b095f88936012d35c96f90a3c1d3 Mon Sep 17 00:00:00 2001 From: zhiyong Date: Sun, 11 May 2025 02:05:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=87=AA=E5=8A=A8=E9=87=8D=E8=BF=9E,?= =?UTF-8?q?=20=E5=B9=B6=E5=9C=A8=E9=87=8D=E8=BF=9E=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E6=97=B6=E5=8F=91=E9=80=81=E9=82=AE=E4=BB=B6=E8=AD=A6=E6=8A=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config.py | 39 ++++++---- src/real/xt_trader.py | 141 ++++++++++++++++++++++++++++++++++-- src/trade_server.py | 159 ++++++++++++++++++++++++++++++++--------- src/utils/__init__.py | 4 ++ src/utils/mail_util.py | 68 ++++++++++++++++++ 5 files changed, 359 insertions(+), 52 deletions(-) create mode 100644 src/utils/__init__.py create mode 100644 src/utils/mail_util.py diff --git a/src/config.py b/src/config.py index e874180..0570656 100644 --- a/src/config.py +++ b/src/config.py @@ -3,17 +3,17 @@ import datetime class Config: # Server settings - PORT = int(os.environ.get("PORT", 9527)) - HOST = os.environ.get("HOST", "0.0.0.0") - DEBUG = os.environ.get("DEBUG", "False").lower() == "true" + PORT = 9527 + HOST = "0.0.0.0" + DEBUG = False # Trading settings - TRADE_TIMEOUT = int(os.environ.get("TRADE_TIMEOUT", 5)) # 交易超时时间(秒) + TRADE_TIMEOUT = 5 # 交易超时时间(秒) SIMULATION_MODE = True # Trading hours - MARKET_OPEN_TIME = os.environ.get("MARKET_OPEN_TIME", "09:20") - MARKET_CLOSE_TIME = os.environ.get("MARKET_CLOSE_TIME", "15:10") + MARKET_OPEN_TIME = "09:20" + MARKET_CLOSE_TIME = "15:10" # Logging LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "logs") @@ -30,13 +30,28 @@ class Config: RATE_LIMIT_PERIOD = 60 # seconds # XtQuant 相关配置 - XT_ACCOUNT = os.environ.get("XT_ACCOUNT", "80391818") - XT_PATH = os.environ.get("XT_PATH", r'C:\\江海证券QMT实盘_交易\\userdata_mini') - + XT_ACCOUNT = "80391818" + XT_PATH = r'C:\\江海证券QMT实盘_交易\\userdata_mini' + + # 重连相关配置 + XT_RECONNECT_INTERVAL = 3600 # 重连尝试间隔(秒) + XT_MAX_SESSION_ID = 999999 # 最大会话ID + XT_MIN_SESSION_ID = 100000 # 最小会话ID + XT_SESSION_ID_RANGE = 20 # 一次尝试的会话ID数量 + + # 邮件通知配置 + MAIL_ENABLED = True + MAIL_SERVER = "mail.yushaoyou.com" + MAIL_PORT = 465 + MAIL_USERNAME = "jq@yushaoyou.com" + MAIL_PASSWORD = "zhiyong214" + MAIL_FROM = "自动交易服务器" + MAIL_TO = ["jq@yushaoyou.com"] # 可以是多个邮箱 + # RealTraderManager配置 - RTM_ORDER_TIMEOUT = int(os.environ.get("RTM_ORDER_TIMEOUT", 60)) # 订单超时时间(秒) - RTM_MAX_RETRIES = int(os.environ.get("RTM_MAX_RETRIES", 3)) # 最大重试次数 - RTM_USE_MARKET_ORDER = os.environ.get("RTM_USE_MARKET_ORDER", "True").lower() == "true" # 是否使用市价单进行补单 + RTM_ORDER_TIMEOUT = 60 # 订单超时时间(秒) + RTM_MAX_RETRIES = 3 # 最大重试次数 + RTM_USE_MARKET_ORDER = True # 是否使用市价单进行补单 # 计划任务运行时间 STRATEGY_SAVE_TIME = "15:10" # 每天保存策略数据的时间 diff --git a/src/real/xt_trader.py b/src/real/xt_trader.py index e416d34..6e7d1b3 100644 --- a/src/real/xt_trader.py +++ b/src/real/xt_trader.py @@ -1,5 +1,7 @@ import os import random +import time +from datetime import datetime from config import Config from base_trader import BaseTrader from xtquant.xttrader import XtQuantTrader @@ -7,6 +9,7 @@ from xtquant.xttype import StockAccount from xtquant import xtconstant from xtquant.xtdata import get_instrument_detail from logger_config import get_logger +from utils.mail_util import MailUtil # 获取日志记录器 logger = get_logger('real_trader') @@ -17,11 +20,24 @@ class MyXtQuantTraderCallback: def on_connected(self): logger.info("连接成功") def on_disconnected(self): + """连接断开回调 + + 当交易连接断开时调用,会自动尝试重连 + 如果重连失败,将设置连接状态为失败,并通过邮件通知 + """ logger.warning("连接断开") if self.trader_instance: + # 设置连接状态 + self.trader_instance.connected = False + self.trader_instance.subscribed = False + + # 尝试重连 if not self.trader_instance.reconnect(): logger.error("重连失败") - raise Exception("断线重连失败") + # 通知重连失败 + self.trader_instance.connection_failed = True + self.trader_instance.last_reconnect_time = time.time() + self.trader_instance.notify_connection_failure() def on_account_status(self, status): logger.info(f"账号状态: {status.account_id} {status.status}") def on_stock_asset(self, asset): @@ -44,17 +60,26 @@ class MyXtQuantTraderCallback: logger.info(f"约券异步反馈: {response.seq}") class XtTrader(BaseTrader): - def __init__(self): + def __init__(self, connect_failed_callback=None): super().__init__(logger) self.started = False self.connected = False self.subscribed = False self._ACCOUNT = Config.XT_ACCOUNT self._PATH = Config.XT_PATH - self._SESSION_ID = random.randint(100000, 99999999) + self._SESSION_ID = random.randint(Config.XT_MIN_SESSION_ID, Config.XT_MAX_SESSION_ID) self._account_type = os.environ.get("XT_ACCOUNT_TYPE", "STOCK") self._strategy_name = os.environ.get("XT_STRATEGY_NAME", "xt_strategy") self._remark = os.environ.get("XT_REMARK", "remark") + + # 重连相关 + self.connection_failed = False + self.last_reconnect_time = None + self.reconnect_interval = Config.XT_RECONNECT_INTERVAL + self.connect_failed_callback = connect_failed_callback + self.connection_error_message = None + + # 初始化trader self._callback = MyXtQuantTraderCallback(self) self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID) self.account = StockAccount(self._ACCOUNT, self._account_type) @@ -67,6 +92,14 @@ class XtTrader(BaseTrader): bool: True表示已登录,False表示未登录 """ return self.started and self.connected and self.subscribed + + def is_available(self): + """检查交易接口是否可用 + + Returns: + bool: True表示可用,False表示不可用 + """ + return self.is_logged_in() and not self.connection_failed def login(self): if not self.started: @@ -88,6 +121,9 @@ class XtTrader(BaseTrader): self.subscribed = False def get_balance(self): + if not self.is_available(): + return None + asset = self.xt_trader.query_stock_asset(self.account) if asset: return { @@ -100,6 +136,9 @@ class XtTrader(BaseTrader): return None def get_positions(self): + if not self.is_available(): + return [] + positions = self.xt_trader.query_stock_positions(self.account) if positions: return [ @@ -119,6 +158,9 @@ class XtTrader(BaseTrader): return [] def get_position(self, stock_code): + if not self.is_available(): + return None + position = self.xt_trader.query_stock_position(self.account, stock_code) if position: return { @@ -136,6 +178,9 @@ class XtTrader(BaseTrader): return None def get_today_trades(self): + if not self.is_available(): + return [] + trades = self.xt_trader.query_stock_trades(self.account) if trades: return [ @@ -155,6 +200,9 @@ class XtTrader(BaseTrader): return [] def get_today_orders(self): + if not self.is_available(): + return [] + orders = self.xt_trader.query_stock_orders(self.account) if orders: return [ @@ -176,6 +224,9 @@ class XtTrader(BaseTrader): return [] def get_order(self, order_id): + if not self.is_available(): + return None + order = self.xt_trader.query_stock_order(self.account, int(order_id)) if order: return { @@ -226,6 +277,9 @@ class XtTrader(BaseTrader): return "" def buy(self, code, price, amount, order_type='limit'): + if not self.is_available(): + return {"error": self.connection_error_message or "交易系统连接失败"} + # 确定价格类型 price_type = xtconstant.MARKET_BEST if order_type == 'market' else xtconstant.FIX_PRICE @@ -239,6 +293,9 @@ class XtTrader(BaseTrader): return {"order_id": order_id} def sell(self, code, price, amount, order_type='limit'): + if not self.is_available(): + return {"error": self.connection_error_message or "交易系统连接失败"} + # 确定价格类型 price_type = xtconstant.MARKET_BEST if order_type == 'market' else xtconstant.FIX_PRICE @@ -252,6 +309,9 @@ class XtTrader(BaseTrader): return {"order_id": order_id} def cancel(self, order_id): + if not self.is_available(): + return {"success": False, "message": self.connection_error_message or "交易系统连接失败"} + # 撤单接口需要订单编号 result = self.xt_trader.cancel_order_stock(self.account, int(order_id)) return {"success": result == 0, "message": f"撤单结果: {result}"} @@ -265,6 +325,9 @@ class XtTrader(BaseTrader): Returns: dict: 行情数据,如果获取失败则返回None """ + if not self.is_available(): + return None + try: quote = self.xt_trader.query_quote(code) if quote: @@ -283,8 +346,37 @@ class XtTrader(BaseTrader): except Exception as e: logger.error(f"获取行情失败: {code}, {str(e)}") return None + + def notify_connection_failure(self): + """通知交易连接失败""" + self.connection_error_message = f"交易系统连接失败,将在{self.reconnect_interval//60}分钟后自动尝试重连" + + # 调用回调通知上层应用 + if self.connect_failed_callback: + self.connect_failed_callback() + + # 发送邮件通知 + trader_info = f"账户:{self._ACCOUNT},会话ID:{self._SESSION_ID}" + time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + subject = f"[交易系统] 连接失败通知 - {time_str}" + body = f""" +交易系统连接失败,请尽快检查! + +时间:{time_str} +{trader_info} +错误信息:交易连接断开且重连失败 + +系统将在{self.reconnect_interval//60}分钟后自动尝试重连。如需立即恢复,请手动重启交易系统。 + """ + + MailUtil.send_mail(subject, body) def reconnect(self): + """尝试重新连接交易系统 + + Returns: + bool: 重连是否成功 + """ # 关闭旧连接 if self.started: self.xt_trader.stop() @@ -293,7 +385,9 @@ class XtTrader(BaseTrader): self.subscribed = False # 尝试范围内的新session_id - session_id_range = range(100000, 100020) + start_id = Config.XT_MIN_SESSION_ID + end_id = start_id + Config.XT_SESSION_ID_RANGE + session_id_range = range(start_id, end_id) for session_id in random.sample(list(session_id_range), len(session_id_range)): self._SESSION_ID = session_id self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID) @@ -311,7 +405,44 @@ class XtTrader(BaseTrader): if result == 0: self.subscribed = True logger.info(f"重连成功,使用session_id: {self._SESSION_ID}") + + # 重置连接失败状态 + if self.connection_failed: + self.connection_failed = False + self.connection_error_message = None + + # 通知连接已恢复 + time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + subject = f"[交易系统] 连接恢复通知 - {time_str}" + body = f""" +交易系统连接已恢复! + +时间:{time_str} +账户:{self._ACCOUNT},会话ID:{self._SESSION_ID} + +系统已自动恢复连接,交易功能现已正常。 + """ + MailUtil.send_mail(subject, body) + return True logger.error("所有尝试都失败,无法重连") - return False \ No newline at end of file + return False + + def check_reconnect(self): + """检查是否需要尝试重连 + + 此方法应在主程序循环中定期调用,检查是否需要尝试重连 + """ + if (self.connection_failed and + self.last_reconnect_time and + (time.time() - self.last_reconnect_time) > self.reconnect_interval): + + logger.info("尝试定期重连...") + if self.reconnect(): + logger.info("定期重连成功") + self.connection_failed = False + self.last_reconnect_time = None + else: + logger.warning("定期重连失败") + self.last_reconnect_time = time.time() \ No newline at end of file diff --git a/src/trade_server.py b/src/trade_server.py index ca6655a..7314606 100644 --- a/src/trade_server.py +++ b/src/trade_server.py @@ -21,6 +21,12 @@ def is_real_mode(): return not Config.SIMULATION_MODE +# 交易接口连接失败回调 +def on_connect_failed(): + """交易连接失败的回调函数""" + logger.critical("交易系统连接失败,API将返回错误状态") + + # 获取实盘交易管理器实例的辅助函数 def get_real_trader_manager(): global _real_trader_manager_instance @@ -38,7 +44,7 @@ def get_trader(): global _trader_instance if _trader_instance is None: - _trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader() + _trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed) return _trader_instance @@ -52,7 +58,7 @@ def login(): _trader_instance = None # 创建新的XtTrader实例 - _trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader() + _trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed) logger.info("开始登录") _trader_instance.login() @@ -89,6 +95,12 @@ def setup_scheduler(): scheduler_thread.start() logger.info("定时任务调度器已启动") + # 启动重连检查线程(仅在实盘模式下) + if is_real_mode(): + reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True) + reconnect_thread.start() + logger.info("重连检查线程已启动") + return scheduler_thread @@ -102,18 +114,35 @@ def _run_scheduler(): logger.error(f"调度器执行错误: {str(e)}") -# 设置并启动调度器 -setup_scheduler() +def _check_reconnect(): + """重连检查线程,定期检查是否需要重连""" + while True: + try: + if is_real_mode() and _trader_instance is not None: + _trader_instance.check_reconnect() + time.sleep(60) # 每分钟检查一次 + except Exception as e: + logger.error(f"重连检查错误: {str(e)}") + # 程序启动时初始化交易实例 login() -# 添加请求频率限制 + +# 设置并启动调度器 +setup_scheduler() + + app = Flask(__name__) @app.route("/yu/healthcheck", methods=["GET"]) def health_check(): - return "ok", 200 + if is_real_mode() and _trader_instance and not _trader_instance.is_available(): + return jsonify({ + "status": "error", + "message": _trader_instance.connection_error_message or "交易系统连接失败" + }), 503 + return jsonify({"status": "ok"}), 200 @app.route("/yu/buy", methods=["POST"]) @@ -140,8 +169,16 @@ def buy(): if price <= 0 or amount <= 0: raise ValueError("Price and amount must be positive") + # 检查交易系统是否可用 + trader = get_trader() + if is_real_mode() and not trader.is_available(): + return jsonify({ + "success": False, + "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" + }), 503 + # 检查是否在交易时间内 - if not get_trader().is_trading_time(): + if not trader.is_trading_time(): logger.warning( f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}" ) @@ -161,7 +198,7 @@ def buy(): strategy_name, code, ORDER_DIRECTION_BUY, amount, price ) else: - result = get_trader().buy(code, price, amount, strategy_name) + result = trader.buy(code, price, amount, strategy_name) if result.get("success"): logger.info(f"买入成功: {result}") @@ -200,8 +237,16 @@ def sell(): if price <= 0 or amount <= 0: raise ValueError("Price and amount must be positive") + # 检查交易系统是否可用 + trader = get_trader() + if is_real_mode() and not trader.is_available(): + return jsonify({ + "success": False, + "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" + }), 503 + # 检查是否在交易时间内 - if not get_trader().is_trading_time(): + if not trader.is_trading_time(): logger.warning( f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}" ) @@ -213,16 +258,22 @@ def sell(): ) if is_real_mode(): + logger.info( + f"使用RealTraderManager执行卖出: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}" + ) rtm = get_real_trader_manager() result = rtm.place_order( strategy_name, code, ORDER_DIRECTION_SELL, amount, price ) else: - result = get_trader().sell(code, price, amount, strategy_name) + result = trader.sell(code, price, amount) if result.get("success"): logger.info(f"卖出成功: {result}") return jsonify({"success": True, "order_id": result.get("order_id")}), 200 + elif "error" in result: + logger.error(f"卖出失败: {result}") + return jsonify({"success": False, "error": result["error"]}), 400 else: logger.error(f"卖出失败: {result}") return jsonify({"success": False, "error": result}), 400 @@ -237,11 +288,21 @@ def sell(): @app.route("/yu/cancel/", methods=["DELETE"]) def cancel(order_id): - logger.info(f"Received cancel request for entrust_no={order_id}") - try: - result = get_trader().cancel(order_id) - return jsonify({"success": True, "data": result}), 200 + """Cancel an order by order_id.""" + logger.info(f"Received cancel request for order {order_id}") + try: + # 检查交易系统是否可用 + trader = get_trader() + if is_real_mode() and not trader.is_available(): + return jsonify({ + "success": False, + "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" + }), 503 + + result = trader.cancel(order_id) + logger.info(f"撤单结果: {result}") + return jsonify(result), 200 except Exception as e: logger.error(f"Error processing cancel request: {str(e)}") abort(500, description="Internal server error") @@ -249,14 +310,20 @@ def cancel(order_id): @app.route("/yu/balance", methods=["GET"]) def get_balance(): - """Get the balance of the account.""" + """Get balance information.""" logger.info("Received balance request") - try: - # 直接使用实盘交易实例,不考虑模拟盘 - balance = get_trader().get_balance() - logger.info(f"实盘交易余额: {balance}") - return jsonify({"success": True, "data": balance}), 200 + try: + # 检查交易系统是否可用 + trader = get_trader() + if is_real_mode() and not trader.is_available(): + return jsonify({ + "success": False, + "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" + }), 503 + + balance = trader.get_balance() + return jsonify(balance), 200 except Exception as e: logger.error(f"Error processing balance request: {str(e)}") abort(500, description="Internal server error") @@ -264,26 +331,41 @@ def get_balance(): @app.route("/yu/positions", methods=["GET"]) def get_positions(): - """Get the positions of the account.""" + """Get position information.""" logger.info("Received positions request") try: - result = get_trader().get_positions() + # 检查交易系统是否可用 + trader = get_trader() + if is_real_mode() and not trader.is_available(): + return jsonify({ + "success": False, + "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" + }), 503 - return jsonify({"success": True, "data": result}), 200 + positions = trader.get_positions() + return jsonify(positions), 200 except Exception as e: logger.error(f"Error processing positions request: {str(e)}") abort(500, description="Internal server error") + @app.route("/yu/todaytrades", methods=["GET"]) def get_today_trades(): - """Get the today's trades of the account.""" + """Get today's trade information.""" logger.info("Received today trades request") - try: - trades = get_trader().get_today_trades() - logger.info(f"今日成交: {trades}") - return jsonify({"success": True, "data": trades}), 200 + try: + # 检查交易系统是否可用 + trader = get_trader() + if is_real_mode() and not trader.is_available(): + return jsonify({ + "success": False, + "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" + }), 503 + + trades = trader.get_today_trades() + return jsonify(trades), 200 except Exception as e: logger.error(f"Error processing today trades request: {str(e)}") abort(500, description="Internal server error") @@ -291,15 +373,22 @@ def get_today_trades(): @app.route("/yu/todayorders", methods=["GET"]) def get_today_orders(): - """Get the today's entrust of the account.""" - logger.info("Received today entrust request") - try: - entrust = get_trader().get_today_orders() - logger.info(f"今日委托: {entrust}") + """Get today's order information.""" + logger.info("Received today orders request") - return jsonify({"success": True, "data": entrust}), 200 + try: + # 检查交易系统是否可用 + trader = get_trader() + if is_real_mode() and not trader.is_available(): + return jsonify({ + "success": False, + "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" + }), 503 + + orders = trader.get_today_orders() + return jsonify(orders), 200 except Exception as e: - logger.error(f"Error processing today entrust request: {str(e)}") + logger.error(f"Error processing today orders request: {str(e)}") abort(500, description="Internal server error") diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..455cd8d --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,4 @@ +""" +工具模块 +包含各种辅助功能 +""" \ No newline at end of file diff --git a/src/utils/mail_util.py b/src/utils/mail_util.py new file mode 100644 index 0000000..7196999 --- /dev/null +++ b/src/utils/mail_util.py @@ -0,0 +1,68 @@ +import smtplib +import logging +import sys +import os +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.header import Header +from email.utils import formataddr + +# 添加项目根目录到sys.path +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(current_dir) +if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + +from config import Config +from logger_config import get_logger + +# 获取正确配置的日志记录器 +logger = get_logger('mail_util') + +class MailUtil: + @staticmethod + def send_mail(subject, body, recipients=None): + """发送邮件 + + Args: + subject: 邮件主题 + body: 邮件内容 + recipients: 收件人列表,如果为None则使用配置中的默认收件人 + + Returns: + bool: 发送是否成功 + """ + if not Config.MAIL_ENABLED: + logger.info("邮件通知未启用") + return False + + recipients = recipients or Config.MAIL_TO + if not recipients: + logger.warning("未配置收件人") + return False + + try: + msg = MIMEMultipart() + msg['From'] = formataddr((Config.MAIL_FROM, Config.MAIL_USERNAME)) + msg['To'] = ', '.join(recipients) + msg['Subject'] = subject + + msg.attach(MIMEText(body, 'plain')) + + server = smtplib.SMTP_SSL(Config.MAIL_SERVER, Config.MAIL_PORT) + + if Config.MAIL_USERNAME and Config.MAIL_PASSWORD: + server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD) + + server.send_message(msg) + server.quit() + + logger.info(f"邮件发送成功: {subject}") + return True + except Exception as e: + logger.error(f"邮件发送失败: {str(e)}") + return False + +if __name__ == "__main__": + # 测试邮件发送 + result = MailUtil.send_mail("测试邮件", "这是一封测试邮件")