feat: 自动重连, 并在重连失败时发送邮件警报

This commit is contained in:
zhiyong 2025-05-11 02:05:39 +08:00
parent a9e074a116
commit 1c1b19383c
5 changed files with 359 additions and 52 deletions

View File

@ -3,17 +3,17 @@ import datetime
class Config:
# Server settings
PORT = int(os.environ.get("PORT", 9527))
HOST = os.environ.get("HOST", "0.0.0.0")
DEBUG = os.environ.get("DEBUG", "False").lower() == "true"
PORT = 9527
HOST = "0.0.0.0"
DEBUG = False
# Trading settings
TRADE_TIMEOUT = int(os.environ.get("TRADE_TIMEOUT", 5)) # 交易超时时间(秒)
TRADE_TIMEOUT = 5 # 交易超时时间(秒)
SIMULATION_MODE = True
# Trading hours
MARKET_OPEN_TIME = os.environ.get("MARKET_OPEN_TIME", "09:20")
MARKET_CLOSE_TIME = os.environ.get("MARKET_CLOSE_TIME", "15:10")
MARKET_OPEN_TIME = "09:20"
MARKET_CLOSE_TIME = "15:10"
# Logging
LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "logs")
@ -30,13 +30,28 @@ class Config:
RATE_LIMIT_PERIOD = 60 # seconds
# XtQuant 相关配置
XT_ACCOUNT = os.environ.get("XT_ACCOUNT", "80391818")
XT_PATH = os.environ.get("XT_PATH", r'C:\\江海证券QMT实盘_交易\\userdata_mini')
XT_ACCOUNT = "80391818"
XT_PATH = r'C:\\江海证券QMT实盘_交易\\userdata_mini'
# 重连相关配置
XT_RECONNECT_INTERVAL = 3600 # 重连尝试间隔(秒)
XT_MAX_SESSION_ID = 999999 # 最大会话ID
XT_MIN_SESSION_ID = 100000 # 最小会话ID
XT_SESSION_ID_RANGE = 20 # 一次尝试的会话ID数量
# 邮件通知配置
MAIL_ENABLED = True
MAIL_SERVER = "mail.yushaoyou.com"
MAIL_PORT = 465
MAIL_USERNAME = "jq@yushaoyou.com"
MAIL_PASSWORD = "zhiyong214"
MAIL_FROM = "自动交易服务器"
MAIL_TO = ["jq@yushaoyou.com"] # 可以是多个邮箱
# RealTraderManager配置
RTM_ORDER_TIMEOUT = int(os.environ.get("RTM_ORDER_TIMEOUT", 60)) # 订单超时时间(秒)
RTM_MAX_RETRIES = int(os.environ.get("RTM_MAX_RETRIES", 3)) # 最大重试次数
RTM_USE_MARKET_ORDER = os.environ.get("RTM_USE_MARKET_ORDER", "True").lower() == "true" # 是否使用市价单进行补单
RTM_ORDER_TIMEOUT = 60 # 订单超时时间(秒)
RTM_MAX_RETRIES = 3 # 最大重试次数
RTM_USE_MARKET_ORDER = True # 是否使用市价单进行补单
# 计划任务运行时间
STRATEGY_SAVE_TIME = "15:10" # 每天保存策略数据的时间

View File

@ -1,5 +1,7 @@
import os
import random
import time
from datetime import datetime
from config import Config
from base_trader import BaseTrader
from xtquant.xttrader import XtQuantTrader
@ -7,6 +9,7 @@ from xtquant.xttype import StockAccount
from xtquant import xtconstant
from xtquant.xtdata import get_instrument_detail
from logger_config import get_logger
from utils.mail_util import MailUtil
# 获取日志记录器
logger = get_logger('real_trader')
@ -17,11 +20,24 @@ class MyXtQuantTraderCallback:
def on_connected(self):
logger.info("连接成功")
def on_disconnected(self):
"""连接断开回调
当交易连接断开时调用会自动尝试重连
如果重连失败将设置连接状态为失败并通过邮件通知
"""
logger.warning("连接断开")
if self.trader_instance:
# 设置连接状态
self.trader_instance.connected = False
self.trader_instance.subscribed = False
# 尝试重连
if not self.trader_instance.reconnect():
logger.error("重连失败")
raise Exception("断线重连失败")
# 通知重连失败
self.trader_instance.connection_failed = True
self.trader_instance.last_reconnect_time = time.time()
self.trader_instance.notify_connection_failure()
def on_account_status(self, status):
logger.info(f"账号状态: {status.account_id} {status.status}")
def on_stock_asset(self, asset):
@ -44,17 +60,26 @@ class MyXtQuantTraderCallback:
logger.info(f"约券异步反馈: {response.seq}")
class XtTrader(BaseTrader):
def __init__(self):
def __init__(self, connect_failed_callback=None):
super().__init__(logger)
self.started = False
self.connected = False
self.subscribed = False
self._ACCOUNT = Config.XT_ACCOUNT
self._PATH = Config.XT_PATH
self._SESSION_ID = random.randint(100000, 99999999)
self._SESSION_ID = random.randint(Config.XT_MIN_SESSION_ID, Config.XT_MAX_SESSION_ID)
self._account_type = os.environ.get("XT_ACCOUNT_TYPE", "STOCK")
self._strategy_name = os.environ.get("XT_STRATEGY_NAME", "xt_strategy")
self._remark = os.environ.get("XT_REMARK", "remark")
# 重连相关
self.connection_failed = False
self.last_reconnect_time = None
self.reconnect_interval = Config.XT_RECONNECT_INTERVAL
self.connect_failed_callback = connect_failed_callback
self.connection_error_message = None
# 初始化trader
self._callback = MyXtQuantTraderCallback(self)
self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID)
self.account = StockAccount(self._ACCOUNT, self._account_type)
@ -67,6 +92,14 @@ class XtTrader(BaseTrader):
bool: True表示已登录False表示未登录
"""
return self.started and self.connected and self.subscribed
def is_available(self):
"""检查交易接口是否可用
Returns:
bool: True表示可用False表示不可用
"""
return self.is_logged_in() and not self.connection_failed
def login(self):
if not self.started:
@ -88,6 +121,9 @@ class XtTrader(BaseTrader):
self.subscribed = False
def get_balance(self):
if not self.is_available():
return None
asset = self.xt_trader.query_stock_asset(self.account)
if asset:
return {
@ -100,6 +136,9 @@ class XtTrader(BaseTrader):
return None
def get_positions(self):
if not self.is_available():
return []
positions = self.xt_trader.query_stock_positions(self.account)
if positions:
return [
@ -119,6 +158,9 @@ class XtTrader(BaseTrader):
return []
def get_position(self, stock_code):
if not self.is_available():
return None
position = self.xt_trader.query_stock_position(self.account, stock_code)
if position:
return {
@ -136,6 +178,9 @@ class XtTrader(BaseTrader):
return None
def get_today_trades(self):
if not self.is_available():
return []
trades = self.xt_trader.query_stock_trades(self.account)
if trades:
return [
@ -155,6 +200,9 @@ class XtTrader(BaseTrader):
return []
def get_today_orders(self):
if not self.is_available():
return []
orders = self.xt_trader.query_stock_orders(self.account)
if orders:
return [
@ -176,6 +224,9 @@ class XtTrader(BaseTrader):
return []
def get_order(self, order_id):
if not self.is_available():
return None
order = self.xt_trader.query_stock_order(self.account, int(order_id))
if order:
return {
@ -226,6 +277,9 @@ class XtTrader(BaseTrader):
return ""
def buy(self, code, price, amount, order_type='limit'):
if not self.is_available():
return {"error": self.connection_error_message or "交易系统连接失败"}
# 确定价格类型
price_type = xtconstant.MARKET_BEST if order_type == 'market' else xtconstant.FIX_PRICE
@ -239,6 +293,9 @@ class XtTrader(BaseTrader):
return {"order_id": order_id}
def sell(self, code, price, amount, order_type='limit'):
if not self.is_available():
return {"error": self.connection_error_message or "交易系统连接失败"}
# 确定价格类型
price_type = xtconstant.MARKET_BEST if order_type == 'market' else xtconstant.FIX_PRICE
@ -252,6 +309,9 @@ class XtTrader(BaseTrader):
return {"order_id": order_id}
def cancel(self, order_id):
if not self.is_available():
return {"success": False, "message": self.connection_error_message or "交易系统连接失败"}
# 撤单接口需要订单编号
result = self.xt_trader.cancel_order_stock(self.account, int(order_id))
return {"success": result == 0, "message": f"撤单结果: {result}"}
@ -265,6 +325,9 @@ class XtTrader(BaseTrader):
Returns:
dict: 行情数据如果获取失败则返回None
"""
if not self.is_available():
return None
try:
quote = self.xt_trader.query_quote(code)
if quote:
@ -283,8 +346,37 @@ class XtTrader(BaseTrader):
except Exception as e:
logger.error(f"获取行情失败: {code}, {str(e)}")
return None
def notify_connection_failure(self):
"""通知交易连接失败"""
self.connection_error_message = f"交易系统连接失败,将在{self.reconnect_interval//60}分钟后自动尝试重连"
# 调用回调通知上层应用
if self.connect_failed_callback:
self.connect_failed_callback()
# 发送邮件通知
trader_info = f"账户:{self._ACCOUNT}会话ID{self._SESSION_ID}"
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
subject = f"[交易系统] 连接失败通知 - {time_str}"
body = f"""
交易系统连接失败请尽快检查
时间{time_str}
{trader_info}
错误信息交易连接断开且重连失败
系统将在{self.reconnect_interval//60}分钟后自动尝试重连如需立即恢复请手动重启交易系统
"""
MailUtil.send_mail(subject, body)
def reconnect(self):
"""尝试重新连接交易系统
Returns:
bool: 重连是否成功
"""
# 关闭旧连接
if self.started:
self.xt_trader.stop()
@ -293,7 +385,9 @@ class XtTrader(BaseTrader):
self.subscribed = False
# 尝试范围内的新session_id
session_id_range = range(100000, 100020)
start_id = Config.XT_MIN_SESSION_ID
end_id = start_id + Config.XT_SESSION_ID_RANGE
session_id_range = range(start_id, end_id)
for session_id in random.sample(list(session_id_range), len(session_id_range)):
self._SESSION_ID = session_id
self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID)
@ -311,7 +405,44 @@ class XtTrader(BaseTrader):
if result == 0:
self.subscribed = True
logger.info(f"重连成功使用session_id: {self._SESSION_ID}")
# 重置连接失败状态
if self.connection_failed:
self.connection_failed = False
self.connection_error_message = None
# 通知连接已恢复
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
subject = f"[交易系统] 连接恢复通知 - {time_str}"
body = f"""
交易系统连接已恢复
时间{time_str}
账户{self._ACCOUNT}会话ID{self._SESSION_ID}
系统已自动恢复连接交易功能现已正常
"""
MailUtil.send_mail(subject, body)
return True
logger.error("所有尝试都失败,无法重连")
return False
return False
def check_reconnect(self):
"""检查是否需要尝试重连
此方法应在主程序循环中定期调用检查是否需要尝试重连
"""
if (self.connection_failed and
self.last_reconnect_time and
(time.time() - self.last_reconnect_time) > self.reconnect_interval):
logger.info("尝试定期重连...")
if self.reconnect():
logger.info("定期重连成功")
self.connection_failed = False
self.last_reconnect_time = None
else:
logger.warning("定期重连失败")
self.last_reconnect_time = time.time()

View File

@ -21,6 +21,12 @@ def is_real_mode():
return not Config.SIMULATION_MODE
# 交易接口连接失败回调
def on_connect_failed():
"""交易连接失败的回调函数"""
logger.critical("交易系统连接失败API将返回错误状态")
# 获取实盘交易管理器实例的辅助函数
def get_real_trader_manager():
global _real_trader_manager_instance
@ -38,7 +44,7 @@ def get_trader():
global _trader_instance
if _trader_instance is None:
_trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader()
_trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed)
return _trader_instance
@ -52,7 +58,7 @@ def login():
_trader_instance = None
# 创建新的XtTrader实例
_trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader()
_trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed)
logger.info("开始登录")
_trader_instance.login()
@ -89,6 +95,12 @@ def setup_scheduler():
scheduler_thread.start()
logger.info("定时任务调度器已启动")
# 启动重连检查线程(仅在实盘模式下)
if is_real_mode():
reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True)
reconnect_thread.start()
logger.info("重连检查线程已启动")
return scheduler_thread
@ -102,18 +114,35 @@ def _run_scheduler():
logger.error(f"调度器执行错误: {str(e)}")
# 设置并启动调度器
setup_scheduler()
def _check_reconnect():
"""重连检查线程,定期检查是否需要重连"""
while True:
try:
if is_real_mode() and _trader_instance is not None:
_trader_instance.check_reconnect()
time.sleep(60) # 每分钟检查一次
except Exception as e:
logger.error(f"重连检查错误: {str(e)}")
# 程序启动时初始化交易实例
login()
# 添加请求频率限制
# 设置并启动调度器
setup_scheduler()
app = Flask(__name__)
@app.route("/yu/healthcheck", methods=["GET"])
def health_check():
return "ok", 200
if is_real_mode() and _trader_instance and not _trader_instance.is_available():
return jsonify({
"status": "error",
"message": _trader_instance.connection_error_message or "交易系统连接失败"
}), 503
return jsonify({"status": "ok"}), 200
@app.route("/yu/buy", methods=["POST"])
@ -140,8 +169,16 @@ def buy():
if price <= 0 or amount <= 0:
raise ValueError("Price and amount must be positive")
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
# 检查是否在交易时间内
if not get_trader().is_trading_time():
if not trader.is_trading_time():
logger.warning(
f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}"
)
@ -161,7 +198,7 @@ def buy():
strategy_name, code, ORDER_DIRECTION_BUY, amount, price
)
else:
result = get_trader().buy(code, price, amount, strategy_name)
result = trader.buy(code, price, amount, strategy_name)
if result.get("success"):
logger.info(f"买入成功: {result}")
@ -200,8 +237,16 @@ def sell():
if price <= 0 or amount <= 0:
raise ValueError("Price and amount must be positive")
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
# 检查是否在交易时间内
if not get_trader().is_trading_time():
if not trader.is_trading_time():
logger.warning(
f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}"
)
@ -213,16 +258,22 @@ def sell():
)
if is_real_mode():
logger.info(
f"使用RealTraderManager执行卖出: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}"
)
rtm = get_real_trader_manager()
result = rtm.place_order(
strategy_name, code, ORDER_DIRECTION_SELL, amount, price
)
else:
result = get_trader().sell(code, price, amount, strategy_name)
result = trader.sell(code, price, amount)
if result.get("success"):
logger.info(f"卖出成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
elif "error" in result:
logger.error(f"卖出失败: {result}")
return jsonify({"success": False, "error": result["error"]}), 400
else:
logger.error(f"卖出失败: {result}")
return jsonify({"success": False, "error": result}), 400
@ -237,11 +288,21 @@ def sell():
@app.route("/yu/cancel/<order_id>", methods=["DELETE"])
def cancel(order_id):
logger.info(f"Received cancel request for entrust_no={order_id}")
try:
result = get_trader().cancel(order_id)
return jsonify({"success": True, "data": result}), 200
"""Cancel an order by order_id."""
logger.info(f"Received cancel request for order {order_id}")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
result = trader.cancel(order_id)
logger.info(f"撤单结果: {result}")
return jsonify(result), 200
except Exception as e:
logger.error(f"Error processing cancel request: {str(e)}")
abort(500, description="Internal server error")
@ -249,14 +310,20 @@ def cancel(order_id):
@app.route("/yu/balance", methods=["GET"])
def get_balance():
"""Get the balance of the account."""
"""Get balance information."""
logger.info("Received balance request")
try:
# 直接使用实盘交易实例,不考虑模拟盘
balance = get_trader().get_balance()
logger.info(f"实盘交易余额: {balance}")
return jsonify({"success": True, "data": balance}), 200
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
balance = trader.get_balance()
return jsonify(balance), 200
except Exception as e:
logger.error(f"Error processing balance request: {str(e)}")
abort(500, description="Internal server error")
@ -264,26 +331,41 @@ def get_balance():
@app.route("/yu/positions", methods=["GET"])
def get_positions():
"""Get the positions of the account."""
"""Get position information."""
logger.info("Received positions request")
try:
result = get_trader().get_positions()
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
return jsonify({"success": True, "data": result}), 200
positions = trader.get_positions()
return jsonify(positions), 200
except Exception as e:
logger.error(f"Error processing positions request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todaytrades", methods=["GET"])
def get_today_trades():
"""Get the today's trades of the account."""
"""Get today's trade information."""
logger.info("Received today trades request")
try:
trades = get_trader().get_today_trades()
logger.info(f"今日成交: {trades}")
return jsonify({"success": True, "data": trades}), 200
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
trades = trader.get_today_trades()
return jsonify(trades), 200
except Exception as e:
logger.error(f"Error processing today trades request: {str(e)}")
abort(500, description="Internal server error")
@ -291,15 +373,22 @@ def get_today_trades():
@app.route("/yu/todayorders", methods=["GET"])
def get_today_orders():
"""Get the today's entrust of the account."""
logger.info("Received today entrust request")
try:
entrust = get_trader().get_today_orders()
logger.info(f"今日委托: {entrust}")
"""Get today's order information."""
logger.info("Received today orders request")
return jsonify({"success": True, "data": entrust}), 200
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
orders = trader.get_today_orders()
return jsonify(orders), 200
except Exception as e:
logger.error(f"Error processing today entrust request: {str(e)}")
logger.error(f"Error processing today orders request: {str(e)}")
abort(500, description="Internal server error")

4
src/utils/__init__.py Normal file
View File

@ -0,0 +1,4 @@
"""
工具模块
包含各种辅助功能
"""

68
src/utils/mail_util.py Normal file
View File

@ -0,0 +1,68 @@
import smtplib
import logging
import sys
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from email.utils import formataddr
# 添加项目根目录到sys.path
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from config import Config
from logger_config import get_logger
# 获取正确配置的日志记录器
logger = get_logger('mail_util')
class MailUtil:
@staticmethod
def send_mail(subject, body, recipients=None):
"""发送邮件
Args:
subject: 邮件主题
body: 邮件内容
recipients: 收件人列表如果为None则使用配置中的默认收件人
Returns:
bool: 发送是否成功
"""
if not Config.MAIL_ENABLED:
logger.info("邮件通知未启用")
return False
recipients = recipients or Config.MAIL_TO
if not recipients:
logger.warning("未配置收件人")
return False
try:
msg = MIMEMultipart()
msg['From'] = formataddr((Config.MAIL_FROM, Config.MAIL_USERNAME))
msg['To'] = ', '.join(recipients)
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP_SSL(Config.MAIL_SERVER, Config.MAIL_PORT)
if Config.MAIL_USERNAME and Config.MAIL_PASSWORD:
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
server.send_message(msg)
server.quit()
logger.info(f"邮件发送成功: {subject}")
return True
except Exception as e:
logger.error(f"邮件发送失败: {str(e)}")
return False
if __name__ == "__main__":
# 测试邮件发送
result = MailUtil.send_mail("测试邮件", "这是一封测试邮件")