optimize trade_server

This commit is contained in:
zhiyong 2025-05-16 01:13:45 +08:00
parent f2f7b0cd29
commit d17b3de56d
9 changed files with 647 additions and 548 deletions

1
src/core/__init__.py Normal file
View File

@ -0,0 +1 @@
# Core module for application state and schedulers

148
src/core/app_state.py Normal file
View File

@ -0,0 +1,148 @@
"""
应用核心状态管理和基础交易逻辑
包含全局交易实例日志记录器以及登录/登出等核心功能
"""
import time
from typing import Optional
from ..config import Config # 使用相对导入
from ..logger_config import get_logger
from ..real.xt_trader import XtTrader
from ..simulation.simulation_trader import SimulationTrader
from ..base_trader import BaseTrader
from ..real.real_trader_manager import RealTraderManager # 确保导入
# 获取日志记录器
logger = get_logger("app_state") # 可以考虑是否需要区分 logger 名称
# 全局交易实例(采用单例模式)
_trader_instance: Optional[BaseTrader] = None
_real_trader_manager_instance: Optional[RealTraderManager] = None
def is_real_mode() -> bool:
"""检查当前是否为实盘交易模式。"""
return not Config.SIMULATION_MODE
def on_connect_failed() -> None:
"""交易连接失败的回调函数。"""
logger.critical("交易系统连接失败API将返回错误状态")
def get_trader() -> Optional[BaseTrader]:
"""
获取交易实例模拟或实盘
根据配置和当前时间是否为交易日/交易时间返回合适的交易实例
在非交易日或非交易时间实盘模式下会返回一个临时的标记为连接失败的实例
Returns:
Optional[BaseTrader]: 交易实例或 None
"""
global _trader_instance
if is_real_mode() and _trader_instance is None:
if not BaseTrader.is_trading_date():
temp_trader = XtTrader(connect_failed_callback=on_connect_failed)
temp_trader.connection_failed = True
temp_trader.connection_error_message = "当前为非交易日,交易系统未连接"
return temp_trader
if not BaseTrader.is_trading_time():
temp_trader = XtTrader(connect_failed_callback=on_connect_failed)
temp_trader.connection_failed = True
temp_trader.connection_error_message = "当前为非交易时间,交易系统未连接"
return temp_trader
return _trader_instance
def get_real_trader_manager() -> Optional[RealTraderManager]:
"""
获取实盘交易管理器单例
如果当前为模拟模式则返回 None
Returns:
Optional[RealTraderManager]: 实盘交易管理器实例或 None
"""
global _real_trader_manager_instance
if _real_trader_manager_instance is None:
current_trader = get_trader()
if Config.SIMULATION_MODE:
_real_trader_manager_instance = None
elif current_trader is not None:
_real_trader_manager_instance = RealTraderManager(current_trader)
else:
_real_trader_manager_instance = None
return _real_trader_manager_instance
def login() -> bool:
"""
初始化并登录交易实例
会根据配置实盘/模拟创建相应的交易实例并尝试登录
Returns:
bool: 登录是否成功
Raises:
Exception: 如果在实盘模式下创建实例失败
"""
global _trader_instance
try:
if _trader_instance is not None and is_real_mode():
# _trader_instance is BaseTrader or its child, which should have logout
_trader_instance.logout()
_trader_instance = None
_trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed)
logger.info("开始登录")
if Config.SIMULATION_MODE:
return True
# _trader_instance is XtTrader here, which has login
login_success = _trader_instance.login()
if login_success:
# _trader_instance is XtTrader here
result = _trader_instance.get_balance()
if result and result.get("account_id") is not None:
logger.info(f"查询余额成功: {result}")
return True
else:
logger.error(f"登录成功但查询余额失败: {result}")
if _trader_instance:
_trader_instance.connection_failed = True # BaseTrader property
_trader_instance.last_reconnect_time = time.time() # BaseTrader property
_trader_instance.notify_connection_failure("登录成功但查询余额失败") # BaseTrader method
return False
else:
logger.error("登录失败")
return False
except Exception as e:
logger.error(f"登录初始化异常: {str(e)}")
if is_real_mode() and _trader_instance is not None:
_trader_instance.connection_failed = True
_trader_instance.last_reconnect_time = time.time()
_trader_instance.notify_connection_failure(f"登录初始化异常: {str(e)}")
return False
raise Exception(f"登录失败,无法创建交易实例: {e}")
def logout() -> None:
"""登出并销毁交易实例。"""
global _trader_instance
if _trader_instance is not None:
_trader_instance.logout()
logger.info("登出成功")
if is_real_mode():
_trader_instance = None
logger.info("交易实例已销毁")

111
src/core/scheduler_tasks.py Normal file
View File

@ -0,0 +1,111 @@
"""
定时任务管理和执行
本模块负责定义调度和执行所有后台定时任务
例如每日自动登录/登出QMT重启以及连接状态检查等
"""
import schedule
import threading
import time
import subprocess
import os
from ..config import Config
from ..logger_config import get_logger
from ..base_trader import BaseTrader
from .app_state import login, logout, is_real_mode, _trader_instance, logger as app_logger # 使用 app_state中的logger
# 如果需要独立的 logger可以取消下面这行的注释
# logger = get_logger("scheduler")
logger = app_logger # 复用 app_state 的 logger
def _run_scheduler() -> None:
"""定时任务执行线程的主循环。"""
while True:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logger.error(f"调度器执行错误: {str(e)}")
def _check_reconnect() -> None:
"""定期检查交易实例是否需要重连(仅实盘模式)。"""
while True:
try:
# 直接使用 app_state 中的 _trader_instance
if is_real_mode() and _trader_instance is not None and hasattr(_trader_instance, 'check_reconnect'):
_trader_instance.check_reconnect()
time.sleep(60) # 每分钟检查一次
except Exception as e:
logger.error(f"重连检查错误: {str(e)}")
def login_on_trading_day() -> None:
"""仅在交易日执行登录操作。"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行计划的登录操作")
login() # 调用 app_state.login
else:
logger.info("今天不是交易日,跳过计划的登录操作")
def logout_on_trading_day() -> None:
"""仅在交易日执行登出操作。"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行计划的登出操作")
logout() # 调用 app_state.logout
else:
logger.info("今天不是交易日,跳过计划的登出操作")
def restart_qmt() -> None:
"""重启QMT客户端软件。"""
try:
if not os.path.exists(Config.XT_LAUNCHER):
logger.error(f"QMT启动路径不存在: {Config.XT_LAUNCHER}")
return
for proc_name in [Config.XT_PROCESS1, Config.XT_PROCESS2]:
try:
kill_cmd = f'taskkill /F /IM {proc_name}'
result = subprocess.run(kill_cmd, shell=True, capture_output=True, text=True, check=False)
if result.returncode == 0:
logger.info(f"已关闭进程: {proc_name}")
else:
# 进程不存在通常返回码 128 或 1
if result.returncode not in [1, 128]:
logger.warning(f"关闭进程 {proc_name} 时返回码: {result.returncode}, 输出: {result.stdout}, 错误: {result.stderr}")
except Exception as e:
logger.error(f"关闭进程 {proc_name} 时发生异常: {str(e)}")
subprocess.Popen(Config.XT_LAUNCHER)
logger.info(f"已尝试启动QMT软件: {Config.XT_LAUNCHER}")
except Exception as e:
logger.error(f"重启QMT软件时发生错误: {str(e)}")
def restart_qmt_on_trading_day() -> None:
"""仅在交易日执行QMT软件重启操作。"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日执行计划的QMT软件重启操作")
restart_qmt()
else:
logger.info("今天不是交易日跳过计划的QMT软件重启")
def setup_scheduler() -> threading.Thread:
"""
设置所有定时任务并启动调度线程
Returns:
threading.Thread: 启动的调度器线程实例
"""
schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login_on_trading_day)
schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout_on_trading_day)
schedule.every().day.at(Config.XT_RESTART_TIME).do(restart_qmt_on_trading_day)
scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("定时任务调度器已启动")
if is_real_mode():
reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True)
reconnect_thread.start()
logger.info("重连检查线程已启动")
return scheduler_thread

1
src/routes/__init__.py Normal file
View File

@ -0,0 +1 @@
# Routes module for Flask Blueprints

View File

@ -0,0 +1,85 @@
"""
账户信息查询相关的API端点
包括余额持仓当日成交当日委托和单个订单查询
"""
from flask import Blueprint, jsonify, abort
from ..core.app_state import get_trader, is_real_mode, logger
account_bp = Blueprint('account_routes', __name__, url_prefix='/yu')
def _check_trader_availability():
"""辅助函数,检查交易实例是否可用,如果不可用则 abort(503)。"""
trader = get_trader()
if not trader:
logger.error("Trader instance not available for account request.")
abort(503, description="Trader not available. System may be initializing or in a failed state.")
if is_real_mode() and not trader.is_available():
logger.warning(f"Account info request when trader is not available: {trader.connection_error_message}")
abort(503, description=trader.connection_error_message or "交易系统连接失败,请稍后再试")
return trader
@account_bp.route("/balance", methods=["GET"])
def get_balance_route(): # Renamed to avoid conflict with get_balance from app_state if imported directly
"""获取账户余额信息。"""
logger.info("Received balance request")
try:
trader = _check_trader_availability()
balance = trader.get_balance()
return jsonify(balance), 200
except Exception as e:
logger.error(f"Error processing balance request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during balance request")
@account_bp.route("/positions", methods=["GET"])
def get_positions_route(): # Renamed
"""获取账户持仓信息。"""
logger.info("Received positions request")
try:
trader = _check_trader_availability()
positions = trader.get_positions()
return jsonify(positions), 200
except Exception as e:
logger.error(f"Error processing positions request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during positions request")
@account_bp.route("/todaytrades", methods=["GET"])
def get_today_trades_route(): # Renamed
"""获取当日成交信息。"""
logger.info("Received today trades request")
try:
trader = _check_trader_availability()
trades = trader.get_today_trades()
return jsonify(trades), 200
except Exception as e:
logger.error(f"Error processing today trades request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during today trades request")
@account_bp.route("/todayorders", methods=["GET"])
def get_today_orders_route(): # Renamed
"""获取当日委托信息。"""
logger.info("Received today orders request")
try:
trader = _check_trader_availability()
orders = trader.get_today_orders()
return jsonify(orders), 200
except Exception as e:
logger.error(f"Error processing today orders request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during today orders request")
@account_bp.route("/order/<order_id>", methods=["GET"])
def get_order_route(order_id: str): # Renamed
"""根据订单ID获取订单信息。"""
logger.info(f"Received order request for order {order_id}")
try:
trader = _check_trader_availability()
order_info = trader.get_order(order_id)
if order_info is None or (isinstance(order_info, dict) and not order_info): # Handle case where order not found
logger.warning(f"Order not found for ID: {order_id}")
abort(404, description=f"Order with ID {order_id} not found.")
return jsonify(order_info), 200
except Exception as e:
logger.error(f"Error processing order request for {order_id}: {str(e)}", exc_info=True)
abort(500, description=f"Internal server error during order request for {order_id}")

View File

@ -0,0 +1,26 @@
"""
健康检查和测试相关的API端点
"""
from flask import Blueprint, jsonify
from ..core.app_state import is_real_mode, get_trader, logger
health_bp = Blueprint('health_routes', __name__, url_prefix='/yu')
@health_bp.route("/healthcheck", methods=["GET"])
def health_check():
"""健康检查端点,检查交易系统连接状态。"""
if is_real_mode():
trader = get_trader()
if trader and not trader.is_available():
return jsonify({
"status": "error",
"message": trader.connection_error_message or "交易系统连接失败"
}), 503
return jsonify({"status": "ok"}), 200
@health_bp.route("/test", methods=["GET"])
def test_route():
"""测试路由用于验证API路由前缀和基本连通性。"""
logger.info("Test route accessed.")
return jsonify({"success": True, "message": "API路由前缀验证成功"}), 200

View File

@ -0,0 +1,35 @@
"""
策略管理相关的API端点
"""
from flask import Blueprint, jsonify, abort
from ..core.app_state import get_trader, logger
strategy_bp = Blueprint('strategy_routes', __name__, url_prefix='/yu')
@strategy_bp.route("/clear/<strategy_name>", methods=["DELETE"])
def clear_strategy_route(strategy_name: str): # Renamed
"""清除指定策略的持仓管理数据。"""
logger.info(f"接收到清除策略持仓请求: {strategy_name}")
try:
trader = get_trader()
if not trader:
logger.error("Trader instance not available for clear_strategy request.")
return jsonify({"success": False, "message": "Trader not available"}), 503
# 假设 clear_position_manager 是 BaseTrader 或其子类的方法
if hasattr(trader, 'clear_position_manager'):
cleared = trader.clear_position_manager(strategy_name)
if cleared:
logger.info(f"策略 {strategy_name} 持仓数据已清除。")
return jsonify({"success": True, "message": f"Strategy {strategy_name} cleared successfully."}), 200
else:
logger.warning(f"尝试清除一个不存在的策略或清除失败: {strategy_name}")
return jsonify({"success": False, "message": f"Strategy {strategy_name} not found or could not be cleared."}), 404
else:
logger.error(f"Trader instance does not have 'clear_position_manager' method for strategy {strategy_name}")
return jsonify({"success": False, "message": "Clear position manager feature not available."}), 501 # Not Implemented
except Exception as e:
logger.error(f"清除策略 {strategy_name} 持仓时出错: {str(e)}", exc_info=True)
abort(500, description=f"服务器内部错误,清除策略 {strategy_name} 持仓失败。")

View File

@ -0,0 +1,168 @@
"""
核心交易操作相关的API端点 (买入卖出撤单)
"""
from flask import Blueprint, request, jsonify, abort
from ..core.app_state import get_trader, get_real_trader_manager, is_real_mode, logger
from ..base_trader import BaseTrader
from ..trade_constants import ORDER_TYPE_LIMIT, ORDER_DIRECTION_BUY, ORDER_DIRECTION_SELL
trading_bp = Blueprint('trading_routes', __name__, url_prefix='/yu')
@trading_bp.route("/buy", methods=["POST"])
def buy():
"""处理买入请求。"""
logger.info("Received buy request")
data = request.get_json()
if not data:
logger.error("Buy request with no JSON data")
abort(400, description="Request body must be JSON.")
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "default_strategy")
order_type = data.get("order_type", ORDER_TYPE_LIMIT)
try:
if not all([code, price_str, amount_str]):
logger.warning(f"Buy request missing parameters: code={code}, price={price_str}, amount={amount_str}")
raise ValueError("Missing required parameters: code, price, amount")
price = float(price_str)
amount = int(float(amount_str))
if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0):
logger.warning(f"Buy request with invalid price/amount: price={price}, amount={amount}")
raise ValueError("For limit orders, price and amount must be positive")
trader = get_trader()
if not trader:
logger.error("Trader instance not available for buy request.")
return jsonify({"success": False, "error": "Trader not available"}), 503
if is_real_mode():
if not BaseTrader.is_trading_time():
logger.warning(f"Buy attempt outside trading hours: {code}, {price}, {amount}")
return jsonify({"success": False, "error": "交易失败: 非交易时间不能实盘交易"}), 400
if not trader.is_available():
logger.error(f"Trader not available for real mode buy: {trader.connection_error_message}")
return jsonify({"success": False, "error": trader.connection_error_message or "交易系统连接失败"}), 503
rtm = get_real_trader_manager()
if not rtm:
logger.error("RealTraderManager not available for buy request.")
return jsonify({"success": False, "error": "RealTraderManager not available"}), 503
logger.info(f"Using RealTraderManager for buy: {code}, {price}, {amount}, {strategy_name}")
result = rtm.place_order(strategy_name, code, ORDER_DIRECTION_BUY, amount, price, order_type)
else:
result = trader.buy(code, price, amount, strategy_name)
if result.get("success"):
logger.info(f"买入下单成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
else:
logger.error(f"买入下单失败: {result}")
return jsonify({"success": False, "error": result.get("error", "Unknown error placing buy order")}), 400
except ValueError as e:
logger.error(f"Invalid buy request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing buy request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during buy request")
@trading_bp.route("/sell", methods=["POST"])
def sell():
"""处理卖出请求。"""
logger.info("Received sell request")
data = request.get_json()
if not data:
logger.error("Sell request with no JSON data")
abort(400, description="Request body must be JSON.")
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "default_strategy")
order_type = data.get("order_type", ORDER_TYPE_LIMIT)
try:
if not all([code, price_str, amount_str]):
logger.warning(f"Sell request missing parameters: code={code}, price={price_str}, amount={amount_str}")
raise ValueError("Missing required parameters: code, price, amount")
price = float(price_str)
amount = int(float(amount_str))
if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0):
logger.warning(f"Sell request with invalid price/amount: price={price}, amount={amount}")
raise ValueError("For limit orders, price and amount must be positive")
trader = get_trader()
if not trader:
logger.error("Trader instance not available for sell request.")
return jsonify({"success": False, "error": "Trader not available"}), 503
if is_real_mode():
if not BaseTrader.is_trading_time():
logger.warning(f"Sell attempt outside trading hours: {code}, {price}, {amount}")
return jsonify({"success": False, "error": "交易失败: 非交易时间不能实盘交易"}), 400
if not trader.is_available():
logger.error(f"Trader not available for real mode sell: {trader.connection_error_message}")
return jsonify({"success": False, "error": trader.connection_error_message or "交易系统连接失败"}), 503
rtm = get_real_trader_manager()
if not rtm:
logger.error("RealTraderManager not available for sell request.")
return jsonify({"success": False, "error": "RealTraderManager not available"}), 503
logger.info(f"Using RealTraderManager for sell: {code}, {price}, {amount}, {strategy_name}")
result = rtm.place_order(strategy_name, code, ORDER_DIRECTION_SELL, amount, price, order_type)
else:
result = trader.sell(code, price, amount, strategy_name)
if result.get("success"):
logger.info(f"卖出下单成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
else:
logger.error(f"卖出下单失败: {result}")
return jsonify({"success": False, "error": result.get("error", "Unknown error placing sell order")}), 400
except ValueError as e:
logger.error(f"Invalid sell request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing sell request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during sell request")
@trading_bp.route("/cancel/<order_id>", methods=["DELETE"])
def cancel(order_id: str):
"""根据订单ID处理撤单请求。"""
logger.info(f"Received cancel request for order {order_id}")
try:
trader = get_trader()
if not trader:
logger.error("Trader instance not available for cancel request.")
return jsonify({"success": False, "error": "Trader not available"}), 503
if is_real_mode() and not trader.is_available():
logger.error(f"Trader not available for real mode cancel: {trader.connection_error_message}")
return jsonify({"success": False, "error": trader.connection_error_message or "交易系统连接失败"}), 503
result = trader.cancel(order_id)
logger.info(f"撤单结果: {result}")
# 假设 cancel 操作总是返回一个字典,即使失败也包含状态
if isinstance(result, dict) and result.get("success") is False:
return jsonify(result), 400 # Propagate error from trader if structured
return jsonify(result), 200
except Exception as e:
logger.error(f"Error processing cancel request for order {order_id}: {str(e)}", exc_info=True)
abort(500, description=f"Internal server error during cancel request for order {order_id}")

View File

@ -1,567 +1,91 @@
import schedule
import threading
import time
import subprocess
import os
from real.xt_trader import XtTrader
from flask import Flask, request, abort, jsonify
from config import Config
from simulation.simulation_trader import SimulationTrader
from logger_config import get_logger
from real.real_trader_manager import RealTraderManager
from trade_constants import *
from base_trader import BaseTrader
"""
Flask应用主入口文件
# 获取日志记录器
logger = get_logger("server")
负责初始化Flask应用设置核心服务注册API路由蓝图以及启动服务
"""
from flask import Flask, request, abort
# 全局交易实例(采用单例模式)
_trader_instance = None # 交易实例(单例)
_real_trader_manager_instance = None # 实盘交易管理器实例(单例)
from .config import Config
from .logger_config import get_logger # 主应用的logger
# 核心服务和状态初始化
from .core.app_state import login as initialize_trader_login, logger as app_state_logger, is_real_mode
from .core.scheduler_tasks import setup_scheduler, logger as scheduler_logger
def is_real_mode():
return not Config.SIMULATION_MODE
# 交易接口连接失败回调
def on_connect_failed():
"""交易连接失败的回调函数"""
logger.critical("交易系统连接失败API将返回错误状态")
# 获取实盘交易管理器实例的辅助函数
def get_real_trader_manager():
global _real_trader_manager_instance
if _real_trader_manager_instance is None:
_real_trader_manager_instance = (
None if Config.SIMULATION_MODE else RealTraderManager(get_trader())
)
return _real_trader_manager_instance
# 获取交易实例 - 根据情况返回模拟或实盘交易实例
def get_trader():
global _trader_instance
# 实盘模式下
if is_real_mode() and _trader_instance is None:
# 检查是否为交易日
if not BaseTrader.is_trading_date():
# 非交易日创建临时实例提供错误信息
temp_trader = XtTrader(connect_failed_callback=on_connect_failed)
temp_trader.connection_failed = True
temp_trader.connection_error_message = "当前为非交易日,交易系统未连接"
return temp_trader
# 交易日但非交易时间
if not BaseTrader.is_trading_time():
# 非交易时间创建临时实例提供错误信息
temp_trader = XtTrader(connect_failed_callback=on_connect_failed)
temp_trader.connection_failed = True
temp_trader.connection_error_message = "当前为非交易时间,交易系统未连接"
return temp_trader
# 返回已存在的实例
return _trader_instance
def login():
global _trader_instance
try:
# 如果已经有实例,先销毁
if _trader_instance is not None and is_real_mode():
_trader_instance.logout()
_trader_instance = None
# 创建新的XtTrader实例
_trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed)
logger.info("开始登录")
# 模拟交易直接返回成功
if Config.SIMULATION_MODE:
return True
# 如果是实盘交易调用login方法
login_success = _trader_instance.login()
# 验证连接
if login_success:
result = _trader_instance.get_balance()
if result and result.get("account_id") is not None:
logger.info(f"查询余额成功: {result}")
return True
else:
logger.error(f"登录成功但查询余额失败: {result}")
_trader_instance.connection_failed = True
_trader_instance.last_reconnect_time = time.time()
_trader_instance.notify_connection_failure("登录成功但查询余额失败")
return False
else:
logger.error("登录失败")
# 不需要在这里设置失败状态在XtTrader.login方法中已设置
return False
except Exception as e:
logger.error(f"登录初始化异常: {str(e)}")
# 如果是实盘模式并且已经成功创建了实例,设置失败状态
if is_real_mode() and _trader_instance is not None:
_trader_instance.connection_failed = True
_trader_instance.last_reconnect_time = time.time()
_trader_instance.notify_connection_failure(f"登录初始化异常: {str(e)}")
return False
# 如果创建实例失败,则抛出异常
raise Exception(f"登录失败,无法创建交易实例: {e}")
def logout():
global _trader_instance
if _trader_instance is not None:
_trader_instance.logout()
logger.info("登出成功")
# 销毁实例
if is_real_mode():
_trader_instance = None
logger.info("XtTrader实例已销毁")
def _run_scheduler():
"""定时任务执行线程"""
while True:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logger.error(f"调度器执行错误: {str(e)}")
def _check_reconnect():
"""重连检查线程,定期检查是否需要重连"""
while True:
try:
if is_real_mode() and _trader_instance is not None:
_trader_instance.check_reconnect()
time.sleep(60) # 每分钟检查一次
except Exception as e:
logger.error(f"重连检查错误: {str(e)}")
# 定时任务:检查是否为交易日,并在交易日执行登录操作
def login_on_trading_day():
"""仅在交易日执行登录操作"""
# 使用BaseTrader中的静态方法检查是否为交易日
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行登录操作")
login()
else:
logger.info("今天不是交易日,跳过登录操作")
# 定时任务:检查是否为交易日,并在交易日执行登出操作
def logout_on_trading_day():
"""仅在交易日执行登出操作"""
# 使用BaseTrader中的静态方法检查是否为交易日
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行登出操作")
logout()
else:
logger.info("今天不是交易日,跳过登出操作")
def restart_qmt():
try:
# 检查QMT路径是否存在
if not os.path.exists(Config.XT_LAUNCHER):
logger.error(f"QMT启动路径不存在: {Config.XT_LAUNCHER}")
return
# 先关闭所有QMT相关进程
for proc_name in [Config.XT_PROCESS1, Config.XT_PROCESS2]:
try:
kill_cmd = f'taskkill /F /IM {proc_name}'
result = subprocess.run(kill_cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"已关闭进程: {proc_name}")
else:
logger.warning(f"关闭进程{proc_name}时返回码: {result.returncode}, 输出: {result.stdout}, 错误: {result.stderr}")
except Exception as e:
logger.error(f"关闭进程{proc_name}时发生异常: {str(e)}")
# 尝试启动QMT软件
subprocess.Popen(Config.XT_LAUNCHER)
logger.info(f"已启动QMT软件: {Config.XT_LAUNCHER}")
except Exception as e:
logger.error(f"重启QMT软件时发生错误: {str(e)}")
# 定时任务在交易日重启QMT软件
def restart_qmt_on_trading_day():
"""仅在交易日执行QMT软件重启操作"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日执行QMT软件重启操作")
restart_qmt()
else:
logger.info("今天不是交易日跳过QMT软件重启")
def setup_scheduler():
# 设置每日任务,仅在交易日执行
schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login_on_trading_day)
schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout_on_trading_day)
schedule.every().day.at(Config.XT_RESTART_TIME).do(restart_qmt_on_trading_day)
# 启动调度线程
scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("定时任务调度器已启动")
# 启动重连检查线程(仅在实盘模式下)
if is_real_mode():
reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True)
reconnect_thread.start()
logger.info("重连检查线程已启动")
return scheduler_thread
# 初始化交易系统
try:
# 程序启动时初始化交易实例 - 尝试登录,但即使失败也继续启动服务
login_success = login()
if not login_success and is_real_mode():
logger.warning("初始登录失败,系统将在稍后定期尝试重连")
# 设置并启动调度器
setup_scheduler()
logger.info("交易系统初始化完成")
except Exception as e:
logger.error(f"交易系统初始化异常: {e}")
# 即使初始化失败也尝试启动调度器
try:
setup_scheduler()
logger.info("调度器启动成功,将尝试定期重新初始化交易系统")
except Exception as scheduler_e:
logger.error(f"调度器启动失败: {scheduler_e}")
raise
# API 路由蓝图
from .routes.health_routes import health_bp
from .routes.trading_routes import trading_bp
from .routes.account_routes import account_bp
from .routes.strategy_routes import strategy_bp
# 主日志记录器 (可以根据需要选择使用 app_state_logger 或 scheduler_logger, 或者独立的)
logger = get_logger("trade_server_main")
# 创建 Flask 应用实例
app = Flask(__name__)
# 添加中间件,拦截所有不以"/yu"开头的请求并返回404
@app.before_request
def check_path_prefix():
if not request.path.startswith('/yu'):
logger.warning(f"拦截非法请求: {request.path}")
abort(404)
@app.route("/yu/healthcheck", methods=["GET"])
def health_check():
if is_real_mode() and _trader_instance and not _trader_instance.is_available():
return jsonify({
"status": "error",
"message": _trader_instance.connection_error_message or "交易系统连接失败"
}), 503
return jsonify({"status": "ok"}), 200
@app.route("/yu/buy", methods=["POST"])
def buy():
"""Buy an item with given parameters."""
logger.info("Received buy request")
# Get data from request body
data = request.get_json()
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get(
"strategy_name", "default_strategy"
) # 新增策略名称参数,默认为空
order_type = data.get("order_type", ORDER_TYPE_LIMIT)
# --- 应用初始化和设置 ---
def initialize_app_services():
"""初始化应用核心服务,如交易系统登录和定时任务。"""
logger.info("开始初始化应用服务...")
try:
if not all([code, price_str, amount_str]):
raise ValueError("Missing required parameters")
# 1. 尝试初始登录交易系统
# login 函数现在位于 app_state.py
login_success = initialize_trader_login()
if not login_success and is_real_mode():
logger.warning("主程序:初始登录失败,系统将在后台尝试重连。")
else:
logger.info("主程序:交易系统登录步骤完成。")
price = float(price_str)
amount = int(float(amount_str))
if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0):
raise ValueError("Price and amount must be positive")
trader = get_trader()
if is_real_mode():
# 检查是否在交易时间内
if not BaseTrader.is_trading_time():
logger.warning(
f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}"
)
return (
jsonify(
{"success": False, "error": f"交易失败: 非交易时间不能实盘交易"}
),
400,
)
# 检查交易系统是否可用
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
# 2. 设置并启动调度器
# setup_scheduler 函数现在位于 scheduler_tasks.py
setup_scheduler()
logger.info("主程序:定时任务调度器已设置并启动。")
# 开始买入
logger.info(
f"使用RealTraderManager执行买入: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}"
)
rtm = get_real_trader_manager()
result = rtm.place_order(
strategy_name, code, ORDER_DIRECTION_BUY, amount, price, order_type
)
else:
result = trader.buy(code, price, amount, strategy_name)
if result.get("success"):
logger.info(f"买入下单成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
else:
logger.error(f"买入下单失败: {result}")
return jsonify({"success": False, "error": result}), 400
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing buy request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/sell", methods=["POST"])
def sell():
"""Sell an item with given parameters."""
logger.info("Received sell request")
# Get data from request body
data = request.get_json()
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空
order_type = data.get("order_type", ORDER_TYPE_LIMIT)
try:
if not all([code, price_str, amount_str]):
raise ValueError("Missing required parameters")
price = float(price_str)
amount = int(float(amount_str))
if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0):
raise ValueError("Price and amount must be positive")
trader = get_trader()
if is_real_mode():
# 检查是否在交易时间内
if not BaseTrader.is_trading_time():
logger.warning(
f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}"
)
return (
jsonify(
{"success": False, "error": f"交易失败: 非交易时间不能实盘交易"}
),
400,
)
# 检查交易系统是否可用
if not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
# 开始卖出
logger.info(
f"使用RealTraderManager执行卖出: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}"
)
rtm = get_real_trader_manager()
result = rtm.place_order(
strategy_name, code, ORDER_DIRECTION_SELL, amount, price, order_type
)
else:
result = trader.sell(code, price, amount, strategy_name)
if result.get("success"):
logger.info(f"卖出下单成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
elif "error" in result:
logger.error(f"卖出下单失败: {result}")
return jsonify({"success": False, "error": result["error"]}), 400
else:
logger.error(f"卖出下单失败: {result}")
return jsonify({"success": False, "error": result}), 400
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing sell request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/cancel/<order_id>", methods=["DELETE"])
def cancel(order_id):
"""Cancel an order by order_id."""
logger.info(f"Received cancel request for order {order_id}")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
result = trader.cancel(order_id)
logger.info(f"撤单结果: {result}")
return jsonify(result), 200
except Exception as e:
logger.error(f"Error processing cancel request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/balance", methods=["GET"])
def get_balance():
"""Get balance information."""
logger.info("Received balance request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
balance = trader.get_balance()
return jsonify(balance), 200
except Exception as e:
logger.error(f"Error processing balance request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/positions", methods=["GET"])
def get_positions():
"""Get position information."""
logger.info("Received positions request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
positions = trader.get_positions()
return jsonify(positions), 200
except Exception as e:
logger.error(f"Error processing positions request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todaytrades", methods=["GET"])
def get_today_trades():
"""Get today's trade information."""
logger.info("Received today trades request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
trades = trader.get_today_trades()
return jsonify(trades), 200
except Exception as e:
logger.error(f"Error processing today trades request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todayorders", methods=["GET"])
def get_today_orders():
"""Get today's order information."""
logger.info("Received today orders request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
orders = trader.get_today_orders()
return jsonify(orders), 200
except Exception as e:
logger.error(f"Error processing today orders request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/order/<order_id>", methods=["GET"])
def get_order(order_id):
"""Get order information by order_id."""
logger.info(f"Received order request for order {order_id}")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
order = trader.get_order(order_id)
return jsonify(order), 200
except Exception as e:
logger.error(f"Error processing order request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/clear/<strategy_name>", methods=["DELETE"])
def clear_strategy(strategy_name):
"""清除指定策略的持仓管理数据"""
logger.info(f"接收到清除策略持仓请求: {strategy_name}")
try:
if get_trader().clear_position_manager(strategy_name):
return jsonify({"success": True, "message": "clear success"}), 200
else:
return jsonify({"success": False, "message": "策略不存在: " + strategy_name}), 400
logger.info("应用服务初始化完成。")
except Exception as e:
logger.error(f"清除策略持仓时出错: {str(e)}")
abort(500, description="服务器内部错误")
logger.error(f"主程序:交易系统或调度器初始化异常: {e}", exc_info=True)
# 即使部分初始化失败也尝试继续让Flask应用能启动可能API部分功能受限
# 如果调度器启动失败是关键错误,可以考虑在这里重新抛出异常或退出
if "setup_scheduler" not in str(e): # 避免重复日志
try:
logger.info("尝试单独启动调度器...")
setup_scheduler()
except Exception as scheduler_e:
logger.error(f"主程序:调度器单独启动也失败: {scheduler_e}", exc_info=True)
# 根据关键性决定是否在此处 raise scheduler_e
# --- Flask 中间件和路由注册 ---
@app.route("/yu/test", methods=["GET"])
def test_route():
"""测试路由,用于验证中间件是否正常工作"""
return jsonify({"success": True, "message": "API路由前缀验证成功"}), 200
@app.before_request
def check_path_prefix() -> None:
"""中间件:拦截所有不以 /yu 开头的请求并返回404。"""
if not request.path.startswith('/yu'):
# 允许访问 Flask 的静态文件,例如 /static/...
if not request.path.startswith(app.static_url_path or '/static'):
logger.warning(f"拦截非法路径请求: {request.path}")
abort(404)
# 注册蓝图
app.register_blueprint(health_bp)
app.register_blueprint(trading_bp)
app.register_blueprint(account_bp)
app.register_blueprint(strategy_bp)
logger.info("所有API路由蓝图已注册。")
# --- 主程序入口 ---
if __name__ == "__main__":
logger.info(f"Server starting on {Config.HOST}:{Config.PORT}")
app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT)
# 在启动Web服务器之前初始化服务
initialize_app_services()
logger.info(f"Flask 服务器准备在 {Config.HOST}:{Config.PORT} 启动...")
app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT, use_reloader=False)
# use_reloader=False 建议在生产或有自定义启动/关闭逻辑时使用,以避免重复初始化。
# 对于开发模式debug=True 通常会启用重载器。
else:
# 如果是通过 Gunicorn 等 WSGI 服务器运行,则也需要初始化服务
# Gunicorn 通常会为每个 worker 进程加载一次应用模块
initialize_app_services()