diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..9655e1c --- /dev/null +++ b/src/core/__init__.py @@ -0,0 +1 @@ +# Core module for application state and schedulers \ No newline at end of file diff --git a/src/core/app_state.py b/src/core/app_state.py new file mode 100644 index 0000000..b450394 --- /dev/null +++ b/src/core/app_state.py @@ -0,0 +1,148 @@ +""" +应用核心状态管理和基础交易逻辑。 + +包含全局交易实例、日志记录器以及登录/登出等核心功能。 +""" +import time +from typing import Optional + +from ..config import Config # 使用相对导入 +from ..logger_config import get_logger +from ..real.xt_trader import XtTrader +from ..simulation.simulation_trader import SimulationTrader +from ..base_trader import BaseTrader +from ..real.real_trader_manager import RealTraderManager # 确保导入 + +# 获取日志记录器 +logger = get_logger("app_state") # 可以考虑是否需要区分 logger 名称 + +# 全局交易实例(采用单例模式) +_trader_instance: Optional[BaseTrader] = None +_real_trader_manager_instance: Optional[RealTraderManager] = None + + +def is_real_mode() -> bool: + """检查当前是否为实盘交易模式。""" + return not Config.SIMULATION_MODE + + +def on_connect_failed() -> None: + """交易连接失败的回调函数。""" + logger.critical("交易系统连接失败,API将返回错误状态") + + +def get_trader() -> Optional[BaseTrader]: + """ + 获取交易实例(模拟或实盘)。 + + 根据配置和当前时间(是否为交易日/交易时间),返回合适的交易实例。 + 在非交易日或非交易时间(实盘模式下),会返回一个临时的、标记为连接失败的实例。 + + Returns: + Optional[BaseTrader]: 交易实例或 None。 + """ + global _trader_instance + + if is_real_mode() and _trader_instance is None: + if not BaseTrader.is_trading_date(): + temp_trader = XtTrader(connect_failed_callback=on_connect_failed) + temp_trader.connection_failed = True + temp_trader.connection_error_message = "当前为非交易日,交易系统未连接" + return temp_trader + + if not BaseTrader.is_trading_time(): + temp_trader = XtTrader(connect_failed_callback=on_connect_failed) + temp_trader.connection_failed = True + temp_trader.connection_error_message = "当前为非交易时间,交易系统未连接" + return temp_trader + + return _trader_instance + + +def get_real_trader_manager() -> Optional[RealTraderManager]: + """ + 获取实盘交易管理器单例。 + + 如果当前为模拟模式,则返回 None。 + Returns: + Optional[RealTraderManager]: 实盘交易管理器实例或 None。 + """ + global _real_trader_manager_instance + + if _real_trader_manager_instance is None: + current_trader = get_trader() + if Config.SIMULATION_MODE: + _real_trader_manager_instance = None + elif current_trader is not None: + _real_trader_manager_instance = RealTraderManager(current_trader) + else: + _real_trader_manager_instance = None + + return _real_trader_manager_instance + + +def login() -> bool: + """ + 初始化并登录交易实例。 + + 会根据配置(实盘/模拟)创建相应的交易实例并尝试登录。 + Returns: + bool: 登录是否成功。 + Raises: + Exception: 如果在实盘模式下创建实例失败。 + """ + global _trader_instance + + try: + if _trader_instance is not None and is_real_mode(): + # _trader_instance is BaseTrader or its child, which should have logout + _trader_instance.logout() + _trader_instance = None + + _trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed) + + logger.info("开始登录") + + if Config.SIMULATION_MODE: + return True + + # _trader_instance is XtTrader here, which has login + login_success = _trader_instance.login() + + if login_success: + # _trader_instance is XtTrader here + result = _trader_instance.get_balance() + if result and result.get("account_id") is not None: + logger.info(f"查询余额成功: {result}") + return True + else: + logger.error(f"登录成功但查询余额失败: {result}") + if _trader_instance: + _trader_instance.connection_failed = True # BaseTrader property + _trader_instance.last_reconnect_time = time.time() # BaseTrader property + _trader_instance.notify_connection_failure("登录成功但查询余额失败") # BaseTrader method + return False + else: + logger.error("登录失败") + return False + except Exception as e: + logger.error(f"登录初始化异常: {str(e)}") + if is_real_mode() and _trader_instance is not None: + _trader_instance.connection_failed = True + _trader_instance.last_reconnect_time = time.time() + _trader_instance.notify_connection_failure(f"登录初始化异常: {str(e)}") + return False + raise Exception(f"登录失败,无法创建交易实例: {e}") + + +def logout() -> None: + """登出并销毁交易实例。""" + global _trader_instance + + if _trader_instance is not None: + _trader_instance.logout() + logger.info("登出成功") + + if is_real_mode(): + _trader_instance = None + logger.info("交易实例已销毁") \ No newline at end of file diff --git a/src/core/scheduler_tasks.py b/src/core/scheduler_tasks.py new file mode 100644 index 0000000..bd8c69e --- /dev/null +++ b/src/core/scheduler_tasks.py @@ -0,0 +1,111 @@ +""" +定时任务管理和执行。 + +本模块负责定义、调度和执行所有后台定时任务, +例如每日自动登录/登出、QMT重启以及连接状态检查等。 +""" +import schedule +import threading +import time +import subprocess +import os + +from ..config import Config +from ..logger_config import get_logger +from ..base_trader import BaseTrader +from .app_state import login, logout, is_real_mode, _trader_instance, logger as app_logger # 使用 app_state中的logger + +# 如果需要独立的 logger,可以取消下面这行的注释 +# logger = get_logger("scheduler") +logger = app_logger # 复用 app_state 的 logger + +def _run_scheduler() -> None: + """定时任务执行线程的主循环。""" + while True: + try: + schedule.run_pending() + time.sleep(1) + except Exception as e: + logger.error(f"调度器执行错误: {str(e)}") + +def _check_reconnect() -> None: + """定期检查交易实例是否需要重连(仅实盘模式)。""" + while True: + try: + # 直接使用 app_state 中的 _trader_instance + if is_real_mode() and _trader_instance is not None and hasattr(_trader_instance, 'check_reconnect'): + _trader_instance.check_reconnect() + time.sleep(60) # 每分钟检查一次 + except Exception as e: + logger.error(f"重连检查错误: {str(e)}") + +def login_on_trading_day() -> None: + """仅在交易日执行登录操作。""" + if BaseTrader.is_trading_date(): + logger.info("今天是交易日,执行计划的登录操作") + login() # 调用 app_state.login + else: + logger.info("今天不是交易日,跳过计划的登录操作") + +def logout_on_trading_day() -> None: + """仅在交易日执行登出操作。""" + if BaseTrader.is_trading_date(): + logger.info("今天是交易日,执行计划的登出操作") + logout() # 调用 app_state.logout + else: + logger.info("今天不是交易日,跳过计划的登出操作") + +def restart_qmt() -> None: + """重启QMT客户端软件。""" + try: + if not os.path.exists(Config.XT_LAUNCHER): + logger.error(f"QMT启动路径不存在: {Config.XT_LAUNCHER}") + return + + for proc_name in [Config.XT_PROCESS1, Config.XT_PROCESS2]: + try: + kill_cmd = f'taskkill /F /IM {proc_name}' + result = subprocess.run(kill_cmd, shell=True, capture_output=True, text=True, check=False) + if result.returncode == 0: + logger.info(f"已关闭进程: {proc_name}") + else: + # 进程不存在通常返回码 128 或 1 + if result.returncode not in [1, 128]: + logger.warning(f"关闭进程 {proc_name} 时返回码: {result.returncode}, 输出: {result.stdout}, 错误: {result.stderr}") + except Exception as e: + logger.error(f"关闭进程 {proc_name} 时发生异常: {str(e)}") + + subprocess.Popen(Config.XT_LAUNCHER) + logger.info(f"已尝试启动QMT软件: {Config.XT_LAUNCHER}") + except Exception as e: + logger.error(f"重启QMT软件时发生错误: {str(e)}") + +def restart_qmt_on_trading_day() -> None: + """仅在交易日执行QMT软件重启操作。""" + if BaseTrader.is_trading_date(): + logger.info("今天是交易日,执行计划的QMT软件重启操作") + restart_qmt() + else: + logger.info("今天不是交易日,跳过计划的QMT软件重启") + +def setup_scheduler() -> threading.Thread: + """ + 设置所有定时任务并启动调度线程。 + + Returns: + threading.Thread: 启动的调度器线程实例。 + """ + schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login_on_trading_day) + schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout_on_trading_day) + schedule.every().day.at(Config.XT_RESTART_TIME).do(restart_qmt_on_trading_day) + + scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True) + scheduler_thread.start() + logger.info("定时任务调度器已启动") + + if is_real_mode(): + reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True) + reconnect_thread.start() + logger.info("重连检查线程已启动") + + return scheduler_thread \ No newline at end of file diff --git a/src/routes/__init__.py b/src/routes/__init__.py new file mode 100644 index 0000000..8f3617c --- /dev/null +++ b/src/routes/__init__.py @@ -0,0 +1 @@ +# Routes module for Flask Blueprints \ No newline at end of file diff --git a/src/routes/account_routes.py b/src/routes/account_routes.py new file mode 100644 index 0000000..509b8ef --- /dev/null +++ b/src/routes/account_routes.py @@ -0,0 +1,85 @@ +""" +账户信息查询相关的API端点。 + +包括余额、持仓、当日成交、当日委托和单个订单查询。 +""" +from flask import Blueprint, jsonify, abort + +from ..core.app_state import get_trader, is_real_mode, logger + +account_bp = Blueprint('account_routes', __name__, url_prefix='/yu') + +def _check_trader_availability(): + """辅助函数,检查交易实例是否可用,如果不可用则 abort(503)。""" + trader = get_trader() + if not trader: + logger.error("Trader instance not available for account request.") + abort(503, description="Trader not available. System may be initializing or in a failed state.") + + if is_real_mode() and not trader.is_available(): + logger.warning(f"Account info request when trader is not available: {trader.connection_error_message}") + abort(503, description=trader.connection_error_message or "交易系统连接失败,请稍后再试") + return trader + +@account_bp.route("/balance", methods=["GET"]) +def get_balance_route(): # Renamed to avoid conflict with get_balance from app_state if imported directly + """获取账户余额信息。""" + logger.info("Received balance request") + try: + trader = _check_trader_availability() + balance = trader.get_balance() + return jsonify(balance), 200 + except Exception as e: + logger.error(f"Error processing balance request: {str(e)}", exc_info=True) + abort(500, description="Internal server error during balance request") + +@account_bp.route("/positions", methods=["GET"]) +def get_positions_route(): # Renamed + """获取账户持仓信息。""" + logger.info("Received positions request") + try: + trader = _check_trader_availability() + positions = trader.get_positions() + return jsonify(positions), 200 + except Exception as e: + logger.error(f"Error processing positions request: {str(e)}", exc_info=True) + abort(500, description="Internal server error during positions request") + +@account_bp.route("/todaytrades", methods=["GET"]) +def get_today_trades_route(): # Renamed + """获取当日成交信息。""" + logger.info("Received today trades request") + try: + trader = _check_trader_availability() + trades = trader.get_today_trades() + return jsonify(trades), 200 + except Exception as e: + logger.error(f"Error processing today trades request: {str(e)}", exc_info=True) + abort(500, description="Internal server error during today trades request") + +@account_bp.route("/todayorders", methods=["GET"]) +def get_today_orders_route(): # Renamed + """获取当日委托信息。""" + logger.info("Received today orders request") + try: + trader = _check_trader_availability() + orders = trader.get_today_orders() + return jsonify(orders), 200 + except Exception as e: + logger.error(f"Error processing today orders request: {str(e)}", exc_info=True) + abort(500, description="Internal server error during today orders request") + +@account_bp.route("/order/", methods=["GET"]) +def get_order_route(order_id: str): # Renamed + """根据订单ID获取订单信息。""" + logger.info(f"Received order request for order {order_id}") + try: + trader = _check_trader_availability() + order_info = trader.get_order(order_id) + if order_info is None or (isinstance(order_info, dict) and not order_info): # Handle case where order not found + logger.warning(f"Order not found for ID: {order_id}") + abort(404, description=f"Order with ID {order_id} not found.") + return jsonify(order_info), 200 + except Exception as e: + logger.error(f"Error processing order request for {order_id}: {str(e)}", exc_info=True) + abort(500, description=f"Internal server error during order request for {order_id}") \ No newline at end of file diff --git a/src/routes/health_routes.py b/src/routes/health_routes.py new file mode 100644 index 0000000..aac11c1 --- /dev/null +++ b/src/routes/health_routes.py @@ -0,0 +1,26 @@ +""" +健康检查和测试相关的API端点。 +""" +from flask import Blueprint, jsonify + +from ..core.app_state import is_real_mode, get_trader, logger + +health_bp = Blueprint('health_routes', __name__, url_prefix='/yu') + +@health_bp.route("/healthcheck", methods=["GET"]) +def health_check(): + """健康检查端点,检查交易系统连接状态。""" + if is_real_mode(): + trader = get_trader() + if trader and not trader.is_available(): + return jsonify({ + "status": "error", + "message": trader.connection_error_message or "交易系统连接失败" + }), 503 + return jsonify({"status": "ok"}), 200 + +@health_bp.route("/test", methods=["GET"]) +def test_route(): + """测试路由,用于验证API路由前缀和基本连通性。""" + logger.info("Test route accessed.") + return jsonify({"success": True, "message": "API路由前缀验证成功"}), 200 \ No newline at end of file diff --git a/src/routes/strategy_routes.py b/src/routes/strategy_routes.py new file mode 100644 index 0000000..b1f54f3 --- /dev/null +++ b/src/routes/strategy_routes.py @@ -0,0 +1,35 @@ +""" +策略管理相关的API端点。 +""" +from flask import Blueprint, jsonify, abort + +from ..core.app_state import get_trader, logger + +strategy_bp = Blueprint('strategy_routes', __name__, url_prefix='/yu') + +@strategy_bp.route("/clear/", methods=["DELETE"]) +def clear_strategy_route(strategy_name: str): # Renamed + """清除指定策略的持仓管理数据。""" + logger.info(f"接收到清除策略持仓请求: {strategy_name}") + try: + trader = get_trader() + if not trader: + logger.error("Trader instance not available for clear_strategy request.") + return jsonify({"success": False, "message": "Trader not available"}), 503 + + # 假设 clear_position_manager 是 BaseTrader 或其子类的方法 + if hasattr(trader, 'clear_position_manager'): + cleared = trader.clear_position_manager(strategy_name) + if cleared: + logger.info(f"策略 {strategy_name} 持仓数据已清除。") + return jsonify({"success": True, "message": f"Strategy {strategy_name} cleared successfully."}), 200 + else: + logger.warning(f"尝试清除一个不存在的策略或清除失败: {strategy_name}") + return jsonify({"success": False, "message": f"Strategy {strategy_name} not found or could not be cleared."}), 404 + else: + logger.error(f"Trader instance does not have 'clear_position_manager' method for strategy {strategy_name}") + return jsonify({"success": False, "message": "Clear position manager feature not available."}), 501 # Not Implemented + + except Exception as e: + logger.error(f"清除策略 {strategy_name} 持仓时出错: {str(e)}", exc_info=True) + abort(500, description=f"服务器内部错误,清除策略 {strategy_name} 持仓失败。") \ No newline at end of file diff --git a/src/routes/trading_routes.py b/src/routes/trading_routes.py new file mode 100644 index 0000000..f1bcbce --- /dev/null +++ b/src/routes/trading_routes.py @@ -0,0 +1,168 @@ +""" +核心交易操作相关的API端点 (买入、卖出、撤单)。 +""" +from flask import Blueprint, request, jsonify, abort + +from ..core.app_state import get_trader, get_real_trader_manager, is_real_mode, logger +from ..base_trader import BaseTrader +from ..trade_constants import ORDER_TYPE_LIMIT, ORDER_DIRECTION_BUY, ORDER_DIRECTION_SELL + +trading_bp = Blueprint('trading_routes', __name__, url_prefix='/yu') + +@trading_bp.route("/buy", methods=["POST"]) +def buy(): + """处理买入请求。""" + logger.info("Received buy request") + data = request.get_json() + if not data: + logger.error("Buy request with no JSON data") + abort(400, description="Request body must be JSON.") + + code = data.get("code") + price_str = data.get("price") + amount_str = data.get("amount") + strategy_name = data.get("strategy_name", "default_strategy") + order_type = data.get("order_type", ORDER_TYPE_LIMIT) + + try: + if not all([code, price_str, amount_str]): + logger.warning(f"Buy request missing parameters: code={code}, price={price_str}, amount={amount_str}") + raise ValueError("Missing required parameters: code, price, amount") + + price = float(price_str) + amount = int(float(amount_str)) + + if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0): + logger.warning(f"Buy request with invalid price/amount: price={price}, amount={amount}") + raise ValueError("For limit orders, price and amount must be positive") + + trader = get_trader() + if not trader: + logger.error("Trader instance not available for buy request.") + return jsonify({"success": False, "error": "Trader not available"}), 503 + + if is_real_mode(): + if not BaseTrader.is_trading_time(): + logger.warning(f"Buy attempt outside trading hours: {code}, {price}, {amount}") + return jsonify({"success": False, "error": "交易失败: 非交易时间不能实盘交易"}), 400 + + if not trader.is_available(): + logger.error(f"Trader not available for real mode buy: {trader.connection_error_message}") + return jsonify({"success": False, "error": trader.connection_error_message or "交易系统连接失败"}), 503 + + rtm = get_real_trader_manager() + if not rtm: + logger.error("RealTraderManager not available for buy request.") + return jsonify({"success": False, "error": "RealTraderManager not available"}), 503 + + logger.info(f"Using RealTraderManager for buy: {code}, {price}, {amount}, {strategy_name}") + result = rtm.place_order(strategy_name, code, ORDER_DIRECTION_BUY, amount, price, order_type) + else: + result = trader.buy(code, price, amount, strategy_name) + + if result.get("success"): + logger.info(f"买入下单成功: {result}") + return jsonify({"success": True, "order_id": result.get("order_id")}), 200 + else: + logger.error(f"买入下单失败: {result}") + return jsonify({"success": False, "error": result.get("error", "Unknown error placing buy order")}), 400 + + except ValueError as e: + logger.error(f"Invalid buy request parameters: {str(e)}") + abort(400, description=str(e)) + except Exception as e: + logger.error(f"Error processing buy request: {str(e)}", exc_info=True) + abort(500, description="Internal server error during buy request") + +@trading_bp.route("/sell", methods=["POST"]) +def sell(): + """处理卖出请求。""" + logger.info("Received sell request") + data = request.get_json() + if not data: + logger.error("Sell request with no JSON data") + abort(400, description="Request body must be JSON.") + + code = data.get("code") + price_str = data.get("price") + amount_str = data.get("amount") + strategy_name = data.get("strategy_name", "default_strategy") + order_type = data.get("order_type", ORDER_TYPE_LIMIT) + + try: + if not all([code, price_str, amount_str]): + logger.warning(f"Sell request missing parameters: code={code}, price={price_str}, amount={amount_str}") + raise ValueError("Missing required parameters: code, price, amount") + + price = float(price_str) + amount = int(float(amount_str)) + + if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0): + logger.warning(f"Sell request with invalid price/amount: price={price}, amount={amount}") + raise ValueError("For limit orders, price and amount must be positive") + + trader = get_trader() + if not trader: + logger.error("Trader instance not available for sell request.") + return jsonify({"success": False, "error": "Trader not available"}), 503 + + if is_real_mode(): + if not BaseTrader.is_trading_time(): + logger.warning(f"Sell attempt outside trading hours: {code}, {price}, {amount}") + return jsonify({"success": False, "error": "交易失败: 非交易时间不能实盘交易"}), 400 + + if not trader.is_available(): + logger.error(f"Trader not available for real mode sell: {trader.connection_error_message}") + return jsonify({"success": False, "error": trader.connection_error_message or "交易系统连接失败"}), 503 + + rtm = get_real_trader_manager() + if not rtm: + logger.error("RealTraderManager not available for sell request.") + return jsonify({"success": False, "error": "RealTraderManager not available"}), 503 + + logger.info(f"Using RealTraderManager for sell: {code}, {price}, {amount}, {strategy_name}") + result = rtm.place_order(strategy_name, code, ORDER_DIRECTION_SELL, amount, price, order_type) + else: + result = trader.sell(code, price, amount, strategy_name) + + if result.get("success"): + logger.info(f"卖出下单成功: {result}") + return jsonify({"success": True, "order_id": result.get("order_id")}), 200 + else: + logger.error(f"卖出下单失败: {result}") + return jsonify({"success": False, "error": result.get("error", "Unknown error placing sell order")}), 400 + + except ValueError as e: + logger.error(f"Invalid sell request parameters: {str(e)}") + abort(400, description=str(e)) + except Exception as e: + logger.error(f"Error processing sell request: {str(e)}", exc_info=True) + abort(500, description="Internal server error during sell request") + +@trading_bp.route("/cancel/", methods=["DELETE"]) +def cancel(order_id: str): + """根据订单ID处理撤单请求。""" + logger.info(f"Received cancel request for order {order_id}") + + try: + trader = get_trader() + if not trader: + logger.error("Trader instance not available for cancel request.") + return jsonify({"success": False, "error": "Trader not available"}), 503 + + if is_real_mode() and not trader.is_available(): + logger.error(f"Trader not available for real mode cancel: {trader.connection_error_message}") + return jsonify({"success": False, "error": trader.connection_error_message or "交易系统连接失败"}), 503 + + result = trader.cancel(order_id) + logger.info(f"撤单结果: {result}") + + # 假设 cancel 操作总是返回一个字典,即使失败也包含状态 + if isinstance(result, dict) and result.get("success") is False: + return jsonify(result), 400 # Propagate error from trader if structured + + return jsonify(result), 200 + + except Exception as e: + logger.error(f"Error processing cancel request for order {order_id}: {str(e)}", exc_info=True) + abort(500, description=f"Internal server error during cancel request for order {order_id}") \ No newline at end of file diff --git a/src/trade_server.py b/src/trade_server.py index 4bed74c..d5d6cbe 100644 --- a/src/trade_server.py +++ b/src/trade_server.py @@ -1,567 +1,91 @@ -import schedule -import threading -import time -import subprocess -import os -from real.xt_trader import XtTrader -from flask import Flask, request, abort, jsonify -from config import Config -from simulation.simulation_trader import SimulationTrader -from logger_config import get_logger -from real.real_trader_manager import RealTraderManager -from trade_constants import * -from base_trader import BaseTrader +""" +Flask应用主入口文件。 -# 获取日志记录器 -logger = get_logger("server") +负责初始化Flask应用、设置核心服务、注册API路由蓝图以及启动服务。 +""" +from flask import Flask, request, abort -# 全局交易实例(采用单例模式) -_trader_instance = None # 交易实例(单例) -_real_trader_manager_instance = None # 实盘交易管理器实例(单例) +from .config import Config +from .logger_config import get_logger # 主应用的logger +# 核心服务和状态初始化 +from .core.app_state import login as initialize_trader_login, logger as app_state_logger, is_real_mode +from .core.scheduler_tasks import setup_scheduler, logger as scheduler_logger -def is_real_mode(): - return not Config.SIMULATION_MODE - - -# 交易接口连接失败回调 -def on_connect_failed(): - """交易连接失败的回调函数""" - logger.critical("交易系统连接失败,API将返回错误状态") - - -# 获取实盘交易管理器实例的辅助函数 -def get_real_trader_manager(): - global _real_trader_manager_instance - - if _real_trader_manager_instance is None: - _real_trader_manager_instance = ( - None if Config.SIMULATION_MODE else RealTraderManager(get_trader()) - ) - - return _real_trader_manager_instance - - -# 获取交易实例 - 根据情况返回模拟或实盘交易实例 -def get_trader(): - global _trader_instance - - # 实盘模式下 - if is_real_mode() and _trader_instance is None: - # 检查是否为交易日 - if not BaseTrader.is_trading_date(): - # 非交易日创建临时实例提供错误信息 - temp_trader = XtTrader(connect_failed_callback=on_connect_failed) - temp_trader.connection_failed = True - temp_trader.connection_error_message = "当前为非交易日,交易系统未连接" - return temp_trader - - # 交易日但非交易时间 - if not BaseTrader.is_trading_time(): - # 非交易时间创建临时实例提供错误信息 - temp_trader = XtTrader(connect_failed_callback=on_connect_failed) - temp_trader.connection_failed = True - temp_trader.connection_error_message = "当前为非交易时间,交易系统未连接" - return temp_trader - - # 返回已存在的实例 - return _trader_instance - -def login(): - global _trader_instance - - try: - # 如果已经有实例,先销毁 - if _trader_instance is not None and is_real_mode(): - _trader_instance.logout() - _trader_instance = None - - # 创建新的XtTrader实例 - _trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed) - - logger.info("开始登录") - - # 模拟交易直接返回成功 - if Config.SIMULATION_MODE: - return True - - # 如果是实盘交易,调用login方法 - login_success = _trader_instance.login() - - # 验证连接 - if login_success: - result = _trader_instance.get_balance() - if result and result.get("account_id") is not None: - logger.info(f"查询余额成功: {result}") - return True - else: - logger.error(f"登录成功但查询余额失败: {result}") - _trader_instance.connection_failed = True - _trader_instance.last_reconnect_time = time.time() - _trader_instance.notify_connection_failure("登录成功但查询余额失败") - return False - else: - logger.error("登录失败") - # 不需要在这里设置失败状态,在XtTrader.login方法中已设置 - return False - except Exception as e: - logger.error(f"登录初始化异常: {str(e)}") - # 如果是实盘模式并且已经成功创建了实例,设置失败状态 - if is_real_mode() and _trader_instance is not None: - _trader_instance.connection_failed = True - _trader_instance.last_reconnect_time = time.time() - _trader_instance.notify_connection_failure(f"登录初始化异常: {str(e)}") - return False - # 如果创建实例失败,则抛出异常 - raise Exception(f"登录失败,无法创建交易实例: {e}") - -def logout(): - global _trader_instance - - if _trader_instance is not None: - _trader_instance.logout() - logger.info("登出成功") - - # 销毁实例 - if is_real_mode(): - _trader_instance = None - logger.info("XtTrader实例已销毁") - - -def _run_scheduler(): - """定时任务执行线程""" - while True: - try: - schedule.run_pending() - time.sleep(1) - except Exception as e: - logger.error(f"调度器执行错误: {str(e)}") - - -def _check_reconnect(): - """重连检查线程,定期检查是否需要重连""" - while True: - try: - if is_real_mode() and _trader_instance is not None: - _trader_instance.check_reconnect() - time.sleep(60) # 每分钟检查一次 - except Exception as e: - logger.error(f"重连检查错误: {str(e)}") - - -# 定时任务:检查是否为交易日,并在交易日执行登录操作 -def login_on_trading_day(): - """仅在交易日执行登录操作""" - # 使用BaseTrader中的静态方法检查是否为交易日 - if BaseTrader.is_trading_date(): - logger.info("今天是交易日,执行登录操作") - login() - else: - logger.info("今天不是交易日,跳过登录操作") - - -# 定时任务:检查是否为交易日,并在交易日执行登出操作 -def logout_on_trading_day(): - """仅在交易日执行登出操作""" - # 使用BaseTrader中的静态方法检查是否为交易日 - if BaseTrader.is_trading_date(): - logger.info("今天是交易日,执行登出操作") - logout() - else: - logger.info("今天不是交易日,跳过登出操作") - - -def restart_qmt(): - try: - # 检查QMT路径是否存在 - if not os.path.exists(Config.XT_LAUNCHER): - logger.error(f"QMT启动路径不存在: {Config.XT_LAUNCHER}") - return - - # 先关闭所有QMT相关进程 - for proc_name in [Config.XT_PROCESS1, Config.XT_PROCESS2]: - try: - kill_cmd = f'taskkill /F /IM {proc_name}' - result = subprocess.run(kill_cmd, shell=True, capture_output=True, text=True) - if result.returncode == 0: - logger.info(f"已关闭进程: {proc_name}") - else: - logger.warning(f"关闭进程{proc_name}时返回码: {result.returncode}, 输出: {result.stdout}, 错误: {result.stderr}") - except Exception as e: - logger.error(f"关闭进程{proc_name}时发生异常: {str(e)}") - - # 尝试启动QMT软件 - subprocess.Popen(Config.XT_LAUNCHER) - logger.info(f"已启动QMT软件: {Config.XT_LAUNCHER}") - except Exception as e: - logger.error(f"重启QMT软件时发生错误: {str(e)}") - -# 定时任务:在交易日重启QMT软件 -def restart_qmt_on_trading_day(): - """仅在交易日执行QMT软件重启操作""" - if BaseTrader.is_trading_date(): - logger.info("今天是交易日,执行QMT软件重启操作") - restart_qmt() - else: - logger.info("今天不是交易日,跳过QMT软件重启") - - -def setup_scheduler(): - # 设置每日任务,仅在交易日执行 - schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login_on_trading_day) - schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout_on_trading_day) - schedule.every().day.at(Config.XT_RESTART_TIME).do(restart_qmt_on_trading_day) - - # 启动调度线程 - scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True) - scheduler_thread.start() - logger.info("定时任务调度器已启动") - - # 启动重连检查线程(仅在实盘模式下) - if is_real_mode(): - reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True) - reconnect_thread.start() - logger.info("重连检查线程已启动") - - return scheduler_thread - - -# 初始化交易系统 -try: - # 程序启动时初始化交易实例 - 尝试登录,但即使失败也继续启动服务 - login_success = login() - if not login_success and is_real_mode(): - logger.warning("初始登录失败,系统将在稍后定期尝试重连") - - # 设置并启动调度器 - setup_scheduler() - - logger.info("交易系统初始化完成") -except Exception as e: - logger.error(f"交易系统初始化异常: {e}") - # 即使初始化失败也尝试启动调度器 - try: - setup_scheduler() - logger.info("调度器启动成功,将尝试定期重新初始化交易系统") - except Exception as scheduler_e: - logger.error(f"调度器启动失败: {scheduler_e}") - raise +# API 路由蓝图 +from .routes.health_routes import health_bp +from .routes.trading_routes import trading_bp +from .routes.account_routes import account_bp +from .routes.strategy_routes import strategy_bp +# 主日志记录器 (可以根据需要选择使用 app_state_logger 或 scheduler_logger, 或者独立的) +logger = get_logger("trade_server_main") +# 创建 Flask 应用实例 app = Flask(__name__) -# 添加中间件,拦截所有不以"/yu"开头的请求并返回404 -@app.before_request -def check_path_prefix(): - if not request.path.startswith('/yu'): - logger.warning(f"拦截非法请求: {request.path}") - abort(404) - -@app.route("/yu/healthcheck", methods=["GET"]) -def health_check(): - if is_real_mode() and _trader_instance and not _trader_instance.is_available(): - return jsonify({ - "status": "error", - "message": _trader_instance.connection_error_message or "交易系统连接失败" - }), 503 - return jsonify({"status": "ok"}), 200 - - -@app.route("/yu/buy", methods=["POST"]) -def buy(): - """Buy an item with given parameters.""" - logger.info("Received buy request") - - # Get data from request body - data = request.get_json() - code = data.get("code") - price_str = data.get("price") - amount_str = data.get("amount") - strategy_name = data.get( - "strategy_name", "default_strategy" - ) # 新增策略名称参数,默认为空 - order_type = data.get("order_type", ORDER_TYPE_LIMIT) +# --- 应用初始化和设置 --- +def initialize_app_services(): + """初始化应用核心服务,如交易系统登录和定时任务。""" + logger.info("开始初始化应用服务...") try: - if not all([code, price_str, amount_str]): - raise ValueError("Missing required parameters") + # 1. 尝试初始登录交易系统 + # login 函数现在位于 app_state.py + login_success = initialize_trader_login() + if not login_success and is_real_mode(): + logger.warning("主程序:初始登录失败,系统将在后台尝试重连。") + else: + logger.info("主程序:交易系统登录步骤完成。") - price = float(price_str) - amount = int(float(amount_str)) - - if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0): - raise ValueError("Price and amount must be positive") - - trader = get_trader() - - if is_real_mode(): - # 检查是否在交易时间内 - if not BaseTrader.is_trading_time(): - logger.warning( - f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}" - ) - return ( - jsonify( - {"success": False, "error": f"交易失败: 非交易时间不能实盘交易"} - ), - 400, - ) - - # 检查交易系统是否可用 - if is_real_mode() and not trader.is_available(): - return jsonify({ - "success": False, - "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" - }), 503 + # 2. 设置并启动调度器 + # setup_scheduler 函数现在位于 scheduler_tasks.py + setup_scheduler() + logger.info("主程序:定时任务调度器已设置并启动。") - # 开始买入 - logger.info( - f"使用RealTraderManager执行买入: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}" - ) - rtm = get_real_trader_manager() - result = rtm.place_order( - strategy_name, code, ORDER_DIRECTION_BUY, amount, price, order_type - ) - else: - result = trader.buy(code, price, amount, strategy_name) - - if result.get("success"): - logger.info(f"买入下单成功: {result}") - return jsonify({"success": True, "order_id": result.get("order_id")}), 200 - else: - logger.error(f"买入下单失败: {result}") - return jsonify({"success": False, "error": result}), 400 - - except ValueError as e: - logger.error(f"Invalid request parameters: {str(e)}") - abort(400, description=str(e)) - except Exception as e: - logger.error(f"Error processing buy request: {str(e)}") - abort(500, description="Internal server error") - - -@app.route("/yu/sell", methods=["POST"]) -def sell(): - """Sell an item with given parameters.""" - logger.info("Received sell request") - - # Get data from request body - data = request.get_json() - code = data.get("code") - price_str = data.get("price") - amount_str = data.get("amount") - strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空 - order_type = data.get("order_type", ORDER_TYPE_LIMIT) - - try: - if not all([code, price_str, amount_str]): - raise ValueError("Missing required parameters") - - price = float(price_str) - amount = int(float(amount_str)) - - if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0): - raise ValueError("Price and amount must be positive") - - trader = get_trader() - - if is_real_mode(): - # 检查是否在交易时间内 - if not BaseTrader.is_trading_time(): - logger.warning( - f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}" - ) - return ( - jsonify( - {"success": False, "error": f"交易失败: 非交易时间不能实盘交易"} - ), - 400, - ) - - # 检查交易系统是否可用 - if not trader.is_available(): - return jsonify({ - "success": False, - "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" - }), 503 - - # 开始卖出 - logger.info( - f"使用RealTraderManager执行卖出: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}" - ) - rtm = get_real_trader_manager() - result = rtm.place_order( - strategy_name, code, ORDER_DIRECTION_SELL, amount, price, order_type - ) - else: - result = trader.sell(code, price, amount, strategy_name) - - if result.get("success"): - logger.info(f"卖出下单成功: {result}") - return jsonify({"success": True, "order_id": result.get("order_id")}), 200 - elif "error" in result: - logger.error(f"卖出下单失败: {result}") - return jsonify({"success": False, "error": result["error"]}), 400 - else: - logger.error(f"卖出下单失败: {result}") - return jsonify({"success": False, "error": result}), 400 - - except ValueError as e: - logger.error(f"Invalid request parameters: {str(e)}") - abort(400, description=str(e)) - except Exception as e: - logger.error(f"Error processing sell request: {str(e)}") - abort(500, description="Internal server error") - - -@app.route("/yu/cancel/", methods=["DELETE"]) -def cancel(order_id): - """Cancel an order by order_id.""" - logger.info(f"Received cancel request for order {order_id}") - - try: - # 检查交易系统是否可用 - trader = get_trader() - if is_real_mode() and not trader.is_available(): - return jsonify({ - "success": False, - "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" - }), 503 - - result = trader.cancel(order_id) - logger.info(f"撤单结果: {result}") - return jsonify(result), 200 - except Exception as e: - logger.error(f"Error processing cancel request: {str(e)}") - abort(500, description="Internal server error") - - -@app.route("/yu/balance", methods=["GET"]) -def get_balance(): - """Get balance information.""" - logger.info("Received balance request") - - try: - # 检查交易系统是否可用 - trader = get_trader() - if is_real_mode() and not trader.is_available(): - return jsonify({ - "success": False, - "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" - }), 503 - - balance = trader.get_balance() - return jsonify(balance), 200 - except Exception as e: - logger.error(f"Error processing balance request: {str(e)}") - abort(500, description="Internal server error") - - -@app.route("/yu/positions", methods=["GET"]) -def get_positions(): - """Get position information.""" - logger.info("Received positions request") - - try: - # 检查交易系统是否可用 - trader = get_trader() - if is_real_mode() and not trader.is_available(): - return jsonify({ - "success": False, - "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" - }), 503 - - positions = trader.get_positions() - return jsonify(positions), 200 - except Exception as e: - logger.error(f"Error processing positions request: {str(e)}") - abort(500, description="Internal server error") - - -@app.route("/yu/todaytrades", methods=["GET"]) -def get_today_trades(): - """Get today's trade information.""" - logger.info("Received today trades request") - - try: - # 检查交易系统是否可用 - trader = get_trader() - if is_real_mode() and not trader.is_available(): - return jsonify({ - "success": False, - "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" - }), 503 - - trades = trader.get_today_trades() - return jsonify(trades), 200 - except Exception as e: - logger.error(f"Error processing today trades request: {str(e)}") - abort(500, description="Internal server error") - - -@app.route("/yu/todayorders", methods=["GET"]) -def get_today_orders(): - """Get today's order information.""" - logger.info("Received today orders request") - - try: - # 检查交易系统是否可用 - trader = get_trader() - if is_real_mode() and not trader.is_available(): - return jsonify({ - "success": False, - "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" - }), 503 - - orders = trader.get_today_orders() - return jsonify(orders), 200 - except Exception as e: - logger.error(f"Error processing today orders request: {str(e)}") - abort(500, description="Internal server error") - - -@app.route("/yu/order/", methods=["GET"]) -def get_order(order_id): - """Get order information by order_id.""" - logger.info(f"Received order request for order {order_id}") - - try: - # 检查交易系统是否可用 - trader = get_trader() - if is_real_mode() and not trader.is_available(): - return jsonify({ - "success": False, - "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" - }), 503 - - order = trader.get_order(order_id) - return jsonify(order), 200 - except Exception as e: - logger.error(f"Error processing order request: {str(e)}") - abort(500, description="Internal server error") - - -@app.route("/yu/clear/", methods=["DELETE"]) -def clear_strategy(strategy_name): - """清除指定策略的持仓管理数据""" - logger.info(f"接收到清除策略持仓请求: {strategy_name}") - try: - if get_trader().clear_position_manager(strategy_name): - return jsonify({"success": True, "message": "clear success"}), 200 - else: - return jsonify({"success": False, "message": "策略不存在: " + strategy_name}), 400 + logger.info("应用服务初始化完成。") except Exception as e: - logger.error(f"清除策略持仓时出错: {str(e)}") - abort(500, description="服务器内部错误") + logger.error(f"主程序:交易系统或调度器初始化异常: {e}", exc_info=True) + # 即使部分初始化失败,也尝试继续,让Flask应用能启动(可能API部分功能受限) + # 如果调度器启动失败是关键错误,可以考虑在这里重新抛出异常或退出 + if "setup_scheduler" not in str(e): # 避免重复日志 + try: + logger.info("尝试单独启动调度器...") + setup_scheduler() + except Exception as scheduler_e: + logger.error(f"主程序:调度器单独启动也失败: {scheduler_e}", exc_info=True) + # 根据关键性决定是否在此处 raise scheduler_e +# --- Flask 中间件和路由注册 --- -@app.route("/yu/test", methods=["GET"]) -def test_route(): - """测试路由,用于验证中间件是否正常工作""" - return jsonify({"success": True, "message": "API路由前缀验证成功"}), 200 +@app.before_request +def check_path_prefix() -> None: + """中间件:拦截所有不以 /yu 开头的请求并返回404。""" + if not request.path.startswith('/yu'): + # 允许访问 Flask 的静态文件,例如 /static/... + if not request.path.startswith(app.static_url_path or '/static'): + logger.warning(f"拦截非法路径请求: {request.path}") + abort(404) +# 注册蓝图 +app.register_blueprint(health_bp) +app.register_blueprint(trading_bp) +app.register_blueprint(account_bp) +app.register_blueprint(strategy_bp) +logger.info("所有API路由蓝图已注册。") + +# --- 主程序入口 --- if __name__ == "__main__": - logger.info(f"Server starting on {Config.HOST}:{Config.PORT}") - app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT) + # 在启动Web服务器之前初始化服务 + initialize_app_services() + + logger.info(f"Flask 服务器准备在 {Config.HOST}:{Config.PORT} 启动...") + app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT, use_reloader=False) + # use_reloader=False 建议在生产或有自定义启动/关闭逻辑时使用,以避免重复初始化。 + # 对于开发模式,debug=True 通常会启用重载器。 +else: + # 如果是通过 Gunicorn 等 WSGI 服务器运行,则也需要初始化服务 + # Gunicorn 通常会为每个 worker 进程加载一次应用模块 + initialize_app_services()