Compare commits

..

10 Commits

Author SHA1 Message Date
zhiyong
7c8db6adeb 添加锁, 避免定时任务check_pending_orders堆积, 从而导致重复执行撤单, 补单 2025-05-16 20:07:18 +08:00
zhiyong
e7463158d2 优化check_pending_orders 2025-05-16 20:00:33 +08:00
zhiyong
90ae3f286b add get_trade 2025-05-16 15:29:38 +08:00
zhiyong
c57c96fe64 add traded price in log 2025-05-16 14:49:29 +08:00
zhiyong
004f3aa370 update create order time 2025-05-16 14:29:12 +08:00
zhiyong
b8025ec98f update config 2025-05-16 14:22:26 +08:00
zhiyong
1a4d8cd11e add more log for pending order checking 2025-05-16 14:15:06 +08:00
zhiyong
49bd92296e fix: module path 2025-05-16 14:12:20 +08:00
zhiyong
d17b3de56d optimize trade_server 2025-05-16 01:13:45 +08:00
zhiyong
f2f7b0cd29 optimize constants 2025-05-16 00:56:03 +08:00
22 changed files with 841 additions and 596 deletions

View File

@ -0,0 +1,7 @@
---
description:
globs:
alwaysApply: true
---
do not edit files in folder "xtquant" and its sub folders

View File

@ -0,0 +1,45 @@
---
description:
globs:
alwaysApply: true
---
You are an AI assistant specialized in Python development. Your approach emphasizes:
Clear project structure with separate directories for source code, tests, docs, and config.
Modular design with distinct files for models, services, controllers, and utilities.
Configuration management using environment variables.
Robust error handling and logging, including context capture.
Comprehensive testing with pytest.
Detailed documentation using docstrings and README files.
Dependency management via https://github.com/astral-sh/uv and virtual environments.
Code style consistency using Ruff.
CI/CD implementation with GitHub Actions or GitLab CI.
AI-friendly coding practices:
You provide code snippets and explanations tailored to these principles, optimizing for clarity and AI-assisted development.
Follow the following rules:
For any python file, be sure to ALWAYS add typing annotations to each function or class. Be sure to include return types when necessary. Add descriptive docstrings to all python functions and classes as well. Please use pep257 convention. Update existing docstrings if need be.
Make sure you keep any comments that exist in a file.
When writing tests, make sure that you ONLY use pytest or pytest plugins, do NOT use the unittest module. All tests should have typing annotations as well. All tests should be in ./tests. Be sure to create all necessary files and folders. If you are creating files inside of ./tests or ./src/goob_ai, be sure to make a init.py file if one does not exist.
All tests should be fully annotated and should contain docstrings. Be sure to import the following if TYPE_CHECKING:
from _pytest.capture import CaptureFixture
from _pytest.fixtures import FixtureRequest
from _pytest.logging import LogCaptureFixture
from _pytest.monkeypatch import MonkeyPatch
from pytest_mock.plugin import MockerFixture

3
docs/README.md Normal file
View File

@ -0,0 +1,3 @@
# Project Documentation
This directory contains the detailed documentation for the RealTrader project.

View File

@ -53,9 +53,12 @@ class Config:
MAIL_TO = ["jq@yushaoyou.com"] # 可以是多个邮箱 MAIL_TO = ["jq@yushaoyou.com"] # 可以是多个邮箱
# RealTraderManager配置 # RealTraderManager配置
RTM_ORDER_TIMEOUT = 30 # 订单超时时间(秒) RTM_ORDER_TIMEOUT = 10 # 订单超时时间(秒)
RTM_USE_MARKET_ORDER = True # 是否使用市价单进行补单 RTM_USE_MARKET_ORDER = True # 是否使用市价单进行补单
# 撤销订单超时时间(秒)
RTM_CANCEL_TIMEOUT = 10
# 计划任务运行时间 # 计划任务运行时间
CLEAN_ORDERS_TIME = "15:05" # 每天清理超时委托的时间 CLEAN_ORDERS_TIME = "15:05" # 每天清理超时委托的时间
STRATEGY_SAVE_TIME = "15:08" # 每天保存策略数据的时间 STRATEGY_SAVE_TIME = "15:08" # 每天保存策略数据的时间

1
src/core/__init__.py Normal file
View File

@ -0,0 +1 @@
# Core module for application state and schedulers

148
src/core/app_state.py Normal file
View File

@ -0,0 +1,148 @@
"""
应用核心状态管理和基础交易逻辑
包含全局交易实例日志记录器以及登录/登出等核心功能
"""
import time
from typing import Optional
from config import Config # 使用相对导入
from logger_config import get_logger
from real.xt_trader import XtTrader
from simulation.simulation_trader import SimulationTrader
from base_trader import BaseTrader
from real.real_trader_manager import RealTraderManager # 确保导入
# 获取日志记录器
logger = get_logger("app_state") # 可以考虑是否需要区分 logger 名称
# 全局交易实例(采用单例模式)
_trader_instance: Optional[BaseTrader] = None
_real_trader_manager_instance: Optional[RealTraderManager] = None
def is_real_mode() -> bool:
"""检查当前是否为实盘交易模式。"""
return not Config.SIMULATION_MODE
def on_connect_failed() -> None:
"""交易连接失败的回调函数。"""
logger.critical("交易系统连接失败API将返回错误状态")
def get_trader() -> Optional[BaseTrader]:
"""
获取交易实例模拟或实盘
根据配置和当前时间是否为交易日/交易时间返回合适的交易实例
在非交易日或非交易时间实盘模式下会返回一个临时的标记为连接失败的实例
Returns:
Optional[BaseTrader]: 交易实例或 None
"""
global _trader_instance
if is_real_mode() and _trader_instance is None:
if not BaseTrader.is_trading_date():
temp_trader = XtTrader(connect_failed_callback=on_connect_failed)
temp_trader.connection_failed = True
temp_trader.connection_error_message = "当前为非交易日,交易系统未连接"
return temp_trader
if not BaseTrader.is_trading_time():
temp_trader = XtTrader(connect_failed_callback=on_connect_failed)
temp_trader.connection_failed = True
temp_trader.connection_error_message = "当前为非交易时间,交易系统未连接"
return temp_trader
return _trader_instance
def get_real_trader_manager() -> Optional[RealTraderManager]:
"""
获取实盘交易管理器单例
如果当前为模拟模式则返回 None
Returns:
Optional[RealTraderManager]: 实盘交易管理器实例或 None
"""
global _real_trader_manager_instance
if _real_trader_manager_instance is None:
current_trader = get_trader()
if Config.SIMULATION_MODE:
_real_trader_manager_instance = None
elif current_trader is not None:
_real_trader_manager_instance = RealTraderManager(current_trader)
else:
_real_trader_manager_instance = None
return _real_trader_manager_instance
def login() -> bool:
"""
初始化并登录交易实例
会根据配置实盘/模拟创建相应的交易实例并尝试登录
Returns:
bool: 登录是否成功
Raises:
Exception: 如果在实盘模式下创建实例失败
"""
global _trader_instance
try:
if _trader_instance is not None and is_real_mode():
# _trader_instance is BaseTrader or its child, which should have logout
_trader_instance.logout()
_trader_instance = None
_trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed)
logger.info("开始登录")
if Config.SIMULATION_MODE:
return True
# _trader_instance is XtTrader here, which has login
login_success = _trader_instance.login()
if login_success:
# _trader_instance is XtTrader here
result = _trader_instance.get_balance()
if result and result.get("account_id") is not None:
logger.info(f"查询余额成功: {result}")
return True
else:
logger.error(f"登录成功但查询余额失败: {result}")
if _trader_instance:
_trader_instance.connection_failed = True # BaseTrader property
_trader_instance.last_reconnect_time = time.time() # BaseTrader property
_trader_instance.notify_connection_failure("登录成功但查询余额失败") # BaseTrader method
return False
else:
logger.error("登录失败")
return False
except Exception as e:
logger.error(f"登录初始化异常: {str(e)}")
if is_real_mode() and _trader_instance is not None:
_trader_instance.connection_failed = True
_trader_instance.last_reconnect_time = time.time()
_trader_instance.notify_connection_failure(f"登录初始化异常: {str(e)}")
return False
raise Exception(f"登录失败,无法创建交易实例: {e}")
def logout() -> None:
"""登出并销毁交易实例。"""
global _trader_instance
if _trader_instance is not None:
_trader_instance.logout()
logger.info("登出成功")
if is_real_mode():
_trader_instance = None
logger.info("交易实例已销毁")

111
src/core/scheduler_tasks.py Normal file
View File

@ -0,0 +1,111 @@
"""
定时任务管理和执行
本模块负责定义调度和执行所有后台定时任务
例如每日自动登录/登出QMT重启以及连接状态检查等
"""
import schedule
import threading
import time
import subprocess
import os
from config import Config
from logger_config import get_logger
from base_trader import BaseTrader
from .app_state import login, logout, is_real_mode, _trader_instance, logger as app_logger # 使用 app_state中的logger
# 如果需要独立的 logger可以取消下面这行的注释
# logger = get_logger("scheduler")
logger = app_logger # 复用 app_state 的 logger
def _run_scheduler() -> None:
"""定时任务执行线程的主循环。"""
while True:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logger.error(f"调度器执行错误: {str(e)}")
def _check_reconnect() -> None:
"""定期检查交易实例是否需要重连(仅实盘模式)。"""
while True:
try:
# 直接使用 app_state 中的 _trader_instance
if is_real_mode() and _trader_instance is not None and hasattr(_trader_instance, 'check_reconnect'):
_trader_instance.check_reconnect()
time.sleep(60) # 每分钟检查一次
except Exception as e:
logger.error(f"重连检查错误: {str(e)}")
def login_on_trading_day() -> None:
"""仅在交易日执行登录操作。"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行计划的登录操作")
login() # 调用 app_state.login
else:
logger.info("今天不是交易日,跳过计划的登录操作")
def logout_on_trading_day() -> None:
"""仅在交易日执行登出操作。"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行计划的登出操作")
logout() # 调用 app_state.logout
else:
logger.info("今天不是交易日,跳过计划的登出操作")
def restart_qmt() -> None:
"""重启QMT客户端软件。"""
try:
if not os.path.exists(Config.XT_LAUNCHER):
logger.error(f"QMT启动路径不存在: {Config.XT_LAUNCHER}")
return
for proc_name in [Config.XT_PROCESS1, Config.XT_PROCESS2]:
try:
kill_cmd = f'taskkill /F /IM {proc_name}'
result = subprocess.run(kill_cmd, shell=True, capture_output=True, text=True, check=False)
if result.returncode == 0:
logger.info(f"已关闭进程: {proc_name}")
else:
# 进程不存在通常返回码 128 或 1
if result.returncode not in [1, 128]:
logger.warning(f"关闭进程 {proc_name} 时返回码: {result.returncode}, 输出: {result.stdout}, 错误: {result.stderr}")
except Exception as e:
logger.error(f"关闭进程 {proc_name} 时发生异常: {str(e)}")
subprocess.Popen(Config.XT_LAUNCHER)
logger.info(f"已尝试启动QMT软件: {Config.XT_LAUNCHER}")
except Exception as e:
logger.error(f"重启QMT软件时发生错误: {str(e)}")
def restart_qmt_on_trading_day() -> None:
"""仅在交易日执行QMT软件重启操作。"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日执行计划的QMT软件重启操作")
restart_qmt()
else:
logger.info("今天不是交易日跳过计划的QMT软件重启")
def setup_scheduler() -> threading.Thread:
"""
设置所有定时任务并启动调度线程
Returns:
threading.Thread: 启动的调度器线程实例
"""
schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login_on_trading_day)
schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout_on_trading_day)
schedule.every().day.at(Config.XT_RESTART_TIME).do(restart_qmt_on_trading_day)
scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("定时任务调度器已启动")
if is_real_mode():
reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True)
reconnect_thread.start()
logger.info("重连检查线程已启动")
return scheduler_thread

View File

@ -1,6 +1,7 @@
import os import os
import json import json
import threading import threading
from datetime import datetime
from logger_config import get_logger from logger_config import get_logger
from config import Config from config import Config
from trade_constants import ( from trade_constants import (
@ -9,6 +10,7 @@ from trade_constants import (
ORDER_TYPE_MARKET, ORDER_TYPE_MARKET,
ORDER_STATUS_COMPLETED, ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED, ORDER_STATUS_CANCELLED,
ORDER_STATUS_FAILED,
) )
from local_position import LocalPosition from local_position import LocalPosition
from local_order import LocalOrder from local_order import LocalOrder
@ -83,7 +85,7 @@ class PositionManager:
return return
with self._lock: with self._lock:
order = LocalOrder(order_id, code, price, amount, direction, order_type) order = LocalOrder(order_id, code, price, amount, direction, order_type, created_time=datetime.now())
self.pending_orders[order_id] = order self.pending_orders[order_id] = order
if (order_type == ORDER_TYPE_LIMIT): if (order_type == ORDER_TYPE_LIMIT):
self.all_orders.append(order) self.all_orders.append(order)
@ -118,6 +120,7 @@ class PositionManager:
if new_status in [ if new_status in [
ORDER_STATUS_COMPLETED, ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED, ORDER_STATUS_CANCELLED,
ORDER_STATUS_FAILED,
]: ]:
# 保留订单信息以供参考,但标记为已完成 # 保留订单信息以供参考,但标记为已完成
del self.pending_orders[order_id] del self.pending_orders[order_id]

View File

@ -44,6 +44,9 @@ class RealTraderManager:
# 使用传入的trader实例使用弱引用避免循环引用 # 使用传入的trader实例使用弱引用避免循环引用
self.trader = trader self.trader = trader
# 初始化锁
self._lock = threading.Lock()
# 启动调度器 # 启动调度器
self._start_scheduler() self._start_scheduler()
@ -176,40 +179,94 @@ class RealTraderManager:
return {"success": False, "error": f"下单异常: {str(e)}"} return {"success": False, "error": f"下单异常: {str(e)}"}
def _handle_timed_out_limit_order(self, order_id: int, order_info: dict, strategy_name: str, duration: float) -> None:
"""处理超时的限价单,尝试撤销并进行市价补单。
Args:
order_id: 订单ID
order_info: 订单信息
strategy_name: 策略名称
duration: 订单持续时间
"""
self.trader.cancel(order_id)
# 使用轮询等待撤销完成,并设置超时
start_time = datetime.now()
cancel_success = False
while (datetime.now() - start_time).total_seconds() < Config.RTM_CANCEL_TIMEOUT:
order = self.trader.get_order(order_id)
if order and order.get('order_status') == xtconstant.ORDER_CANCELED:
logger.info(f"限价单已撤销: ID={order_id}, 策略={strategy_name}")
cancel_success = True
break
time.sleep(0.5) # 每0.5秒查询一次
if cancel_success:
self.trader.handle_order_update(order_id, strategy_name)
logger.info(f"检测到限价单被撤销,准备进行市价单补单: ID={order_id}")
self.trader.place_market_order_for_remainder(order_info, strategy_name)
else:
logger.warning(f"限价单撤销超时或失败: ID={order_id}, 策略={strategy_name}")
self.trader.handle_order_update(order_id, strategy_name) # 即使撤销失败,也更新一下状态
def _process_pending_order(self, order_id: int, order_info: dict, strategy_name: str) -> None:
"""处理单个未完成订单的逻辑,包括更新状态、检查超时、撤销和补单。"""
try:
# 处理订单更新, 更新订单状态, 更新持仓, 使position manager中的订单为最新状态
self.trader.handle_order_update(order_id, strategy_name)
# 重新获取更新后的订单信息
position_manager = self.trader.get_position_manager(strategy_name)
order_info = position_manager.get_pending_order(order_id)
if not order_info:
logger.warning(f"订单信息不存在, 可能已完成或撤单, 或下单失败: {order_id}")
return
# 如果订单类型为限价单,则检查是否超时
if order_info.order_type == ORDER_TYPE_LIMIT:
now = datetime.now()
duration = (now - order_info.created_time).total_seconds()
if duration > Config.RTM_ORDER_TIMEOUT:
# 将处理超时限价单的逻辑委托给新的私有方法
logger.info(f'订单创建时间: {order_info.created_time} 当前时间: {now}')
logger.info(f"限价单超时: ID={order_id}, 策略={strategy_name}, 持续时间={duration}")
self._handle_timed_out_limit_order(order_id, order_info, strategy_name, duration)
else:
# 市价单未完成,更新状态
logger.info(f"市价单未完成, 更新市价单: ID={order_id}, 策略={strategy_name}, 订单类型={order_info.order_type}")
self.trader.handle_order_update(order_id, strategy_name)
except Exception as e:
# 更细粒度的异常处理,捕获处理单个订单时的异常
logger.error(f"处理订单 {order_id} 时发生异常: {str(e)}", exc_info=True)
def check_pending_orders(self): def check_pending_orders(self):
"""检查限价单是否超时""" """检查限价单是否超时,使用锁避免重复执行"""
# 尝试获取锁,如果获取不到则说明上一个任务还在执行,直接返回
if not self._lock.acquire(blocking=False):
logger.info("check_pending_orders: 上一个任务仍在执行,跳过本次执行。")
return
try: try:
# 获取所有未完成订单 # 获取所有未完成订单
position_managers = self.trader.get_all_position_managers() position_managers = self.trader.get_all_position_managers()
for strategy_name, position_manager in position_managers.items(): for strategy_name, position_manager in position_managers.items():
pending_orders = position_manager.get_pending_orders() # 转换为列表避免在迭代过程中修改
for order_id, order_info in pending_orders.items(): pending_orders_list = list(position_manager.get_pending_orders().items())
# 如果订单类型为限价单,则检查是否超时 for order_id, order_info in pending_orders_list:
if order_info.order_type == ORDER_TYPE_LIMIT: # 将单个订单的处理逻辑委托给新的私有方法
duration = (datetime.now() - order_info.created_time).total_seconds() self._process_pending_order(order_id, order_info, strategy_name)
if duration > Config.RTM_ORDER_TIMEOUT:
logger.info(f"限价单超时: ID={order_id}, 策略={strategy_name}, 持续时间={duration}")
self.trader.cancel(order_id)
time.sleep(3)
order = self.trader.get_order(order_id)
if order['order_status'] == xtconstant.ORDER_CANCELED:
logger.info(f"限价单已撤销: ID={order_id}, 策略={strategy_name}")
self.trader.handle_order_update(order_id, strategy_name)
logger.info(f"检测到限价单被撤销,准备进行市价单补单: ID={order_id}")
self.trader.place_market_order_for_remainder(order_info, strategy_name)
elif order['order_status'] == xtconstant.ORDER_SUCCEEDED:
logger.info(f"尝试撤单未成功, 限价单已成交: ID={order_id}, 策略={strategy_name}, 状态={order['order_status']}")
self.trader.handle_order_update(order_id, strategy_name)
else:
logger.warning(f"限价单撤销失败: ID={order_id}, 策略={strategy_name}, 状态={order['order_status']}")
self.trader.handle_order_update(order_id, strategy_name)
else:
logger.info(f"更新市价单: ID={order_id}, 策略={strategy_name}, 订单类型={order_info.order_type}")
self.trader.handle_order_update(order_id, strategy_name)
except Exception as e: except Exception as e:
logger.error(f"检查限价单是否超时时发生异常: {str(e)}") # 顶层异常处理捕获获取position managers或遍历时的异常
logger.error(f"检查限价单是否超时时发生异常: {str(e)}", exc_info=True)
finally:
# 确保在任何情况下都释放锁
if self._lock.locked():
self._lock.release()
def _check_order_feasibility(self, code, direction, amount, price): def _check_order_feasibility(self, code, direction, amount, price):
"""检查订单是否可行(资金或持仓是否足够) """检查订单是否可行(资金或持仓是否足够)

View File

@ -18,7 +18,8 @@ from trade_constants import (
ORDER_TYPE_LIMIT, ORDER_TYPE_LIMIT,
ORDER_TYPE_MARKET, ORDER_TYPE_MARKET,
ORDER_DIRECTION_BUY, ORDER_DIRECTION_BUY,
ORDER_DIRECTION_SELL ORDER_DIRECTION_SELL,
ORDER_STATUS_FAILED
) )
from local_order import LocalOrder from local_order import LocalOrder
@ -231,6 +232,7 @@ class XtTrader(BaseTrader):
"stock_name": self.get_stock_name(t.stock_code), "stock_name": self.get_stock_name(t.stock_code),
"order_id": t.order_id, "order_id": t.order_id,
"traded_id": t.traded_id, "traded_id": t.traded_id,
"order_sysid": t.order_sysid,
"traded_time": t.traded_time, "traded_time": t.traded_time,
"traded_price": t.traded_price, "traded_price": t.traded_price,
"traded_volume": t.traded_volume, "traded_volume": t.traded_volume,
@ -240,6 +242,15 @@ class XtTrader(BaseTrader):
] ]
return [] return []
def get_trade(self, order_sysid):
trades = self.get_today_trades()
for t in trades:
if t['order_sysid'] == order_sysid:
return t
return None
def get_today_orders(self): def get_today_orders(self):
if not self.is_available(): if not self.is_available():
return [] return []
@ -251,6 +262,7 @@ class XtTrader(BaseTrader):
"account_id": o.account_id, "account_id": o.account_id,
"stock_code": o.stock_code, "stock_code": o.stock_code,
"order_id": o.order_id, "order_id": o.order_id,
"order_sysid": o.order_sysid,
"order_time": o.order_time, "order_time": o.order_time,
"order_type": "buy" if o.order_type == xtconstant.STOCK_BUY else "sell", "order_type": "buy" if o.order_type == xtconstant.STOCK_BUY else "sell",
"order_volume": o.order_volume, "order_volume": o.order_volume,
@ -274,6 +286,7 @@ class XtTrader(BaseTrader):
"account_id": order.account_id, "account_id": order.account_id,
"stock_code": order.stock_code, "stock_code": order.stock_code,
"order_id": order.order_id, "order_id": order.order_id,
"order_sysid": order.order_sysid,
"order_time": order.order_time, "order_time": order.order_time,
"order_type": "buy" if order.order_type == xtconstant.STOCK_BUY else "sell", "order_type": "buy" if order.order_type == xtconstant.STOCK_BUY else "sell",
"order_volume": order.order_volume, "order_volume": order.order_volume,
@ -519,18 +532,19 @@ class XtTrader(BaseTrader):
str: 订单状态如ORDER_STATUS_COMPLETEDORDER_STATUS_PARTIAL等如果处理失败则返回None str: 订单状态如ORDER_STATUS_COMPLETEDORDER_STATUS_PARTIAL等如果处理失败则返回None
""" """
try: try:
# 获取订单信息
order = self.get_order(order_id)
if not order:
logger.warning(f"获取订单失败,无法更新状态: {order_id}")
return None
# 获取position_manager # 获取position_manager
position_manager = self.get_position_manager(strategy_name) position_manager = self.get_position_manager(strategy_name)
if not position_manager: if not position_manager:
logger.warning(f"获取position_manager失败无法更新状态: {strategy_name}") logger.warning(f"获取position_manager失败无法更新状态: {strategy_name}")
return None return None
# 获取订单
order = self.get_order(order_id)
if not order:
logger.warning(f"获取订单失败, 可能下单失败,将订单状态设置为失败: {order_id}")
position_manager.update_order_status(order_id, 0, ORDER_STATUS_FAILED)
return None
# 获取订单信息 # 获取订单信息
order_info = position_manager.get_pending_order(order_id) order_info = position_manager.get_pending_order(order_id)
if not order_info: if not order_info:
@ -560,7 +574,7 @@ class XtTrader(BaseTrader):
order_info.direction, order_info.direction,
new_filled, new_filled,
) )
logger.info(f"订单全部成交: ID={order_id}, 代码={order_info.code}, 总成交量={filled}, 新增成交量={new_filled}") logger.info(f"订单全部成交: ID={order_id}, 代码={order_info.code}, 价格={order['traded_price']}, 总成交量={filled}, 新增成交量={new_filled}")
return ORDER_STATUS_COMPLETED return ORDER_STATUS_COMPLETED

1
src/routes/__init__.py Normal file
View File

@ -0,0 +1 @@
# Routes module for Flask Blueprints

View File

@ -0,0 +1,85 @@
"""
账户信息查询相关的API端点
包括余额持仓当日成交当日委托和单个订单查询
"""
from flask import Blueprint, jsonify, abort
from core.app_state import get_trader, is_real_mode, logger
account_bp = Blueprint('account_routes', __name__, url_prefix='/yu')
def _check_trader_availability():
"""辅助函数,检查交易实例是否可用,如果不可用则 abort(503)。"""
trader = get_trader()
if not trader:
logger.error("Trader instance not available for account request.")
abort(503, description="Trader not available. System may be initializing or in a failed state.")
if is_real_mode() and not trader.is_available():
logger.warning(f"Account info request when trader is not available: {trader.connection_error_message}")
abort(503, description=trader.connection_error_message or "交易系统连接失败,请稍后再试")
return trader
@account_bp.route("/balance", methods=["GET"])
def get_balance_route(): # Renamed to avoid conflict with get_balance from app_state if imported directly
"""获取账户余额信息。"""
logger.info("Received balance request")
try:
trader = _check_trader_availability()
balance = trader.get_balance()
return jsonify(balance), 200
except Exception as e:
logger.error(f"Error processing balance request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during balance request")
@account_bp.route("/positions", methods=["GET"])
def get_positions_route(): # Renamed
"""获取账户持仓信息。"""
logger.info("Received positions request")
try:
trader = _check_trader_availability()
positions = trader.get_positions()
return jsonify(positions), 200
except Exception as e:
logger.error(f"Error processing positions request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during positions request")
@account_bp.route("/todaytrades", methods=["GET"])
def get_today_trades_route(): # Renamed
"""获取当日成交信息。"""
logger.info("Received today trades request")
try:
trader = _check_trader_availability()
trades = trader.get_today_trades()
return jsonify(trades), 200
except Exception as e:
logger.error(f"Error processing today trades request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during today trades request")
@account_bp.route("/todayorders", methods=["GET"])
def get_today_orders_route(): # Renamed
"""获取当日委托信息。"""
logger.info("Received today orders request")
try:
trader = _check_trader_availability()
orders = trader.get_today_orders()
return jsonify(orders), 200
except Exception as e:
logger.error(f"Error processing today orders request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during today orders request")
@account_bp.route("/order/<order_id>", methods=["GET"])
def get_order_route(order_id: str): # Renamed
"""根据订单ID获取订单信息。"""
logger.info(f"Received order request for order {order_id}")
try:
trader = _check_trader_availability()
order_info = trader.get_order(order_id)
if order_info is None or (isinstance(order_info, dict) and not order_info): # Handle case where order not found
logger.warning(f"Order not found for ID: {order_id}")
abort(404, description=f"Order with ID {order_id} not found.")
return jsonify(order_info), 200
except Exception as e:
logger.error(f"Error processing order request for {order_id}: {str(e)}", exc_info=True)
abort(500, description=f"Internal server error during order request for {order_id}")

View File

@ -0,0 +1,26 @@
"""
健康检查和测试相关的API端点
"""
from flask import Blueprint, jsonify
from core.app_state import is_real_mode, get_trader, logger
health_bp = Blueprint('health_routes', __name__, url_prefix='/yu')
@health_bp.route("/healthcheck", methods=["GET"])
def health_check():
"""健康检查端点,检查交易系统连接状态。"""
if is_real_mode():
trader = get_trader()
if trader and not trader.is_available():
return jsonify({
"status": "error",
"message": trader.connection_error_message or "交易系统连接失败"
}), 503
return jsonify({"status": "ok"}), 200
@health_bp.route("/test", methods=["GET"])
def test_route():
"""测试路由用于验证API路由前缀和基本连通性。"""
logger.info("Test route accessed.")
return jsonify({"success": True, "message": "API路由前缀验证成功"}), 200

View File

@ -0,0 +1,35 @@
"""
策略管理相关的API端点
"""
from flask import Blueprint, jsonify, abort
from core.app_state import get_trader, logger
strategy_bp = Blueprint('strategy_routes', __name__, url_prefix='/yu')
@strategy_bp.route("/clear/<strategy_name>", methods=["DELETE"])
def clear_strategy_route(strategy_name: str): # Renamed
"""清除指定策略的持仓管理数据。"""
logger.info(f"接收到清除策略持仓请求: {strategy_name}")
try:
trader = get_trader()
if not trader:
logger.error("Trader instance not available for clear_strategy request.")
return jsonify({"success": False, "message": "Trader not available"}), 503
# 假设 clear_position_manager 是 BaseTrader 或其子类的方法
if hasattr(trader, 'clear_position_manager'):
cleared = trader.clear_position_manager(strategy_name)
if cleared:
logger.info(f"策略 {strategy_name} 持仓数据已清除。")
return jsonify({"success": True, "message": f"Strategy {strategy_name} cleared successfully."}), 200
else:
logger.warning(f"尝试清除一个不存在的策略或清除失败: {strategy_name}")
return jsonify({"success": False, "message": f"Strategy {strategy_name} not found or could not be cleared."}), 404
else:
logger.error(f"Trader instance does not have 'clear_position_manager' method for strategy {strategy_name}")
return jsonify({"success": False, "message": "Clear position manager feature not available."}), 501 # Not Implemented
except Exception as e:
logger.error(f"清除策略 {strategy_name} 持仓时出错: {str(e)}", exc_info=True)
abort(500, description=f"服务器内部错误,清除策略 {strategy_name} 持仓失败。")

View File

@ -0,0 +1,168 @@
"""
核心交易操作相关的API端点 (买入卖出撤单)
"""
from flask import Blueprint, request, jsonify, abort
from core.app_state import get_trader, get_real_trader_manager, is_real_mode, logger
from base_trader import BaseTrader
from trade_constants import ORDER_TYPE_LIMIT, ORDER_DIRECTION_BUY, ORDER_DIRECTION_SELL
trading_bp = Blueprint('trading_routes', __name__, url_prefix='/yu')
@trading_bp.route("/buy", methods=["POST"])
def buy():
"""处理买入请求。"""
logger.info("Received buy request")
data = request.get_json()
if not data:
logger.error("Buy request with no JSON data")
abort(400, description="Request body must be JSON.")
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "default_strategy")
order_type = data.get("order_type", ORDER_TYPE_LIMIT)
try:
if not all([code, price_str, amount_str]):
logger.warning(f"Buy request missing parameters: code={code}, price={price_str}, amount={amount_str}")
raise ValueError("Missing required parameters: code, price, amount")
price = float(price_str)
amount = int(float(amount_str))
if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0):
logger.warning(f"Buy request with invalid price/amount: price={price}, amount={amount}")
raise ValueError("For limit orders, price and amount must be positive")
trader = get_trader()
if not trader:
logger.error("Trader instance not available for buy request.")
return jsonify({"success": False, "error": "Trader not available"}), 503
if is_real_mode():
if not BaseTrader.is_trading_time():
logger.warning(f"Buy attempt outside trading hours: {code}, {price}, {amount}")
return jsonify({"success": False, "error": "交易失败: 非交易时间不能实盘交易"}), 400
if not trader.is_available():
logger.error(f"Trader not available for real mode buy: {trader.connection_error_message}")
return jsonify({"success": False, "error": trader.connection_error_message or "交易系统连接失败"}), 503
rtm = get_real_trader_manager()
if not rtm:
logger.error("RealTraderManager not available for buy request.")
return jsonify({"success": False, "error": "RealTraderManager not available"}), 503
logger.info(f"Using RealTraderManager for buy: {code}, {price}, {amount}, {strategy_name}")
result = rtm.place_order(strategy_name, code, ORDER_DIRECTION_BUY, amount, price, order_type)
else:
result = trader.buy(code, price, amount, strategy_name)
if result.get("success"):
logger.info(f"买入下单成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
else:
logger.error(f"买入下单失败: {result}")
return jsonify({"success": False, "error": result.get("error", "Unknown error placing buy order")}), 400
except ValueError as e:
logger.error(f"Invalid buy request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing buy request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during buy request")
@trading_bp.route("/sell", methods=["POST"])
def sell():
"""处理卖出请求。"""
logger.info("Received sell request")
data = request.get_json()
if not data:
logger.error("Sell request with no JSON data")
abort(400, description="Request body must be JSON.")
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "default_strategy")
order_type = data.get("order_type", ORDER_TYPE_LIMIT)
try:
if not all([code, price_str, amount_str]):
logger.warning(f"Sell request missing parameters: code={code}, price={price_str}, amount={amount_str}")
raise ValueError("Missing required parameters: code, price, amount")
price = float(price_str)
amount = int(float(amount_str))
if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0):
logger.warning(f"Sell request with invalid price/amount: price={price}, amount={amount}")
raise ValueError("For limit orders, price and amount must be positive")
trader = get_trader()
if not trader:
logger.error("Trader instance not available for sell request.")
return jsonify({"success": False, "error": "Trader not available"}), 503
if is_real_mode():
if not BaseTrader.is_trading_time():
logger.warning(f"Sell attempt outside trading hours: {code}, {price}, {amount}")
return jsonify({"success": False, "error": "交易失败: 非交易时间不能实盘交易"}), 400
if not trader.is_available():
logger.error(f"Trader not available for real mode sell: {trader.connection_error_message}")
return jsonify({"success": False, "error": trader.connection_error_message or "交易系统连接失败"}), 503
rtm = get_real_trader_manager()
if not rtm:
logger.error("RealTraderManager not available for sell request.")
return jsonify({"success": False, "error": "RealTraderManager not available"}), 503
logger.info(f"Using RealTraderManager for sell: {code}, {price}, {amount}, {strategy_name}")
result = rtm.place_order(strategy_name, code, ORDER_DIRECTION_SELL, amount, price, order_type)
else:
result = trader.sell(code, price, amount, strategy_name)
if result.get("success"):
logger.info(f"卖出下单成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
else:
logger.error(f"卖出下单失败: {result}")
return jsonify({"success": False, "error": result.get("error", "Unknown error placing sell order")}), 400
except ValueError as e:
logger.error(f"Invalid sell request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing sell request: {str(e)}", exc_info=True)
abort(500, description="Internal server error during sell request")
@trading_bp.route("/cancel/<order_id>", methods=["DELETE"])
def cancel(order_id: str):
"""根据订单ID处理撤单请求。"""
logger.info(f"Received cancel request for order {order_id}")
try:
trader = get_trader()
if not trader:
logger.error("Trader instance not available for cancel request.")
return jsonify({"success": False, "error": "Trader not available"}), 503
if is_real_mode() and not trader.is_available():
logger.error(f"Trader not available for real mode cancel: {trader.connection_error_message}")
return jsonify({"success": False, "error": trader.connection_error_message or "交易系统连接失败"}), 503
result = trader.cancel(order_id)
logger.info(f"撤单结果: {result}")
# 假设 cancel 操作总是返回一个字典,即使失败也包含状态
if isinstance(result, dict) and result.get("success") is False:
return jsonify(result), 400 # Propagate error from trader if structured
return jsonify(result), 200
except Exception as e:
logger.error(f"Error processing cancel request for order {order_id}: {str(e)}", exc_info=True)
abort(500, description=f"Internal server error during cancel request for order {order_id}")

View File

@ -1,18 +1,27 @@
"""
交易相关的常量定义
本模块定义了在交易系统中常用的常量例如交易类型订单状态订单类型和订单方向
"""
from typing import Final
# 交易常量 # 交易常量
TRADE_TYPE_REAL = 'real' TRADE_TYPE_REAL: Final[str] = 'real'
TRADE_TYPE_SIMULATION = 'simulation' TRADE_TYPE_SIMULATION: Final[str] = 'simulation'
# 订单状态 # 订单状态
ORDER_STATUS_PENDING = 'pending' ORDER_STATUS_PENDING: Final[str] = 'pending'
ORDER_STATUS_PARTIAL = 'partial' ORDER_STATUS_PARTIAL: Final[str] = 'partial'
ORDER_STATUS_COMPLETED = 'completed' ORDER_STATUS_COMPLETED: Final[str] = 'completed'
ORDER_STATUS_CANCELLED = 'cancelled' ORDER_STATUS_CANCELLED: Final[str] = 'cancelled'
ORDER_STATUS_FAILED: Final[str] = 'failed'
# 订单类型 # 订单类型
ORDER_TYPE_LIMIT = 'limit' ORDER_TYPE_LIMIT: Final[str] = 'limit'
ORDER_TYPE_MARKET = 'market' ORDER_TYPE_MARKET: Final[str] = 'market'
# 订单方向 # 订单方向
ORDER_DIRECTION_BUY = 'buy' ORDER_DIRECTION_BUY: Final[str] = 'buy'
ORDER_DIRECTION_SELL = 'sell' ORDER_DIRECTION_SELL: Final[str] = 'sell'

View File

@ -1,567 +1,91 @@
import schedule """
import threading Flask应用主入口文件
import time
import subprocess 负责初始化Flask应用设置核心服务注册API路由蓝图以及启动服务
import os """
from real.xt_trader import XtTrader from flask import Flask, request, abort
from flask import Flask, request, abort, jsonify
from config import Config from config import Config
from simulation.simulation_trader import SimulationTrader from logger_config import get_logger # 主应用的logger
from logger_config import get_logger
from real.real_trader_manager import RealTraderManager
from trade_constants import *
from base_trader import BaseTrader
# 获取日志记录器 # 核心服务和状态初始化
logger = get_logger("server") from core.app_state import login as initialize_trader_login, logger as app_state_logger, is_real_mode
from core.scheduler_tasks import setup_scheduler, logger as scheduler_logger
# 全局交易实例(采用单例模式) # API 路由蓝图
_trader_instance = None # 交易实例(单例) from routes.health_routes import health_bp
_real_trader_manager_instance = None # 实盘交易管理器实例(单例) from routes.trading_routes import trading_bp
from routes.account_routes import account_bp
from routes.strategy_routes import strategy_bp
def is_real_mode():
return not Config.SIMULATION_MODE
# 交易接口连接失败回调
def on_connect_failed():
"""交易连接失败的回调函数"""
logger.critical("交易系统连接失败API将返回错误状态")
# 获取实盘交易管理器实例的辅助函数
def get_real_trader_manager():
global _real_trader_manager_instance
if _real_trader_manager_instance is None:
_real_trader_manager_instance = (
None if Config.SIMULATION_MODE else RealTraderManager(get_trader())
)
return _real_trader_manager_instance
# 获取交易实例 - 根据情况返回模拟或实盘交易实例
def get_trader():
global _trader_instance
# 实盘模式下
if is_real_mode() and _trader_instance is None:
# 检查是否为交易日
if not BaseTrader.is_trading_date():
# 非交易日创建临时实例提供错误信息
temp_trader = XtTrader(connect_failed_callback=on_connect_failed)
temp_trader.connection_failed = True
temp_trader.connection_error_message = "当前为非交易日,交易系统未连接"
return temp_trader
# 交易日但非交易时间
if not BaseTrader.is_trading_time():
# 非交易时间创建临时实例提供错误信息
temp_trader = XtTrader(connect_failed_callback=on_connect_failed)
temp_trader.connection_failed = True
temp_trader.connection_error_message = "当前为非交易时间,交易系统未连接"
return temp_trader
# 返回已存在的实例
return _trader_instance
def login():
global _trader_instance
try:
# 如果已经有实例,先销毁
if _trader_instance is not None and is_real_mode():
_trader_instance.logout()
_trader_instance = None
# 创建新的XtTrader实例
_trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed)
logger.info("开始登录")
# 模拟交易直接返回成功
if Config.SIMULATION_MODE:
return True
# 如果是实盘交易调用login方法
login_success = _trader_instance.login()
# 验证连接
if login_success:
result = _trader_instance.get_balance()
if result and result.get("account_id") is not None:
logger.info(f"查询余额成功: {result}")
return True
else:
logger.error(f"登录成功但查询余额失败: {result}")
_trader_instance.connection_failed = True
_trader_instance.last_reconnect_time = time.time()
_trader_instance.notify_connection_failure("登录成功但查询余额失败")
return False
else:
logger.error("登录失败")
# 不需要在这里设置失败状态在XtTrader.login方法中已设置
return False
except Exception as e:
logger.error(f"登录初始化异常: {str(e)}")
# 如果是实盘模式并且已经成功创建了实例,设置失败状态
if is_real_mode() and _trader_instance is not None:
_trader_instance.connection_failed = True
_trader_instance.last_reconnect_time = time.time()
_trader_instance.notify_connection_failure(f"登录初始化异常: {str(e)}")
return False
# 如果创建实例失败,则抛出异常
raise Exception(f"登录失败,无法创建交易实例: {e}")
def logout():
global _trader_instance
if _trader_instance is not None:
_trader_instance.logout()
logger.info("登出成功")
# 销毁实例
if is_real_mode():
_trader_instance = None
logger.info("XtTrader实例已销毁")
def _run_scheduler():
"""定时任务执行线程"""
while True:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logger.error(f"调度器执行错误: {str(e)}")
def _check_reconnect():
"""重连检查线程,定期检查是否需要重连"""
while True:
try:
if is_real_mode() and _trader_instance is not None:
_trader_instance.check_reconnect()
time.sleep(60) # 每分钟检查一次
except Exception as e:
logger.error(f"重连检查错误: {str(e)}")
# 定时任务:检查是否为交易日,并在交易日执行登录操作
def login_on_trading_day():
"""仅在交易日执行登录操作"""
# 使用BaseTrader中的静态方法检查是否为交易日
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行登录操作")
login()
else:
logger.info("今天不是交易日,跳过登录操作")
# 定时任务:检查是否为交易日,并在交易日执行登出操作
def logout_on_trading_day():
"""仅在交易日执行登出操作"""
# 使用BaseTrader中的静态方法检查是否为交易日
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行登出操作")
logout()
else:
logger.info("今天不是交易日,跳过登出操作")
def restart_qmt():
try:
# 检查QMT路径是否存在
if not os.path.exists(Config.XT_LAUNCHER):
logger.error(f"QMT启动路径不存在: {Config.XT_LAUNCHER}")
return
# 先关闭所有QMT相关进程
for proc_name in [Config.XT_PROCESS1, Config.XT_PROCESS2]:
try:
kill_cmd = f'taskkill /F /IM {proc_name}'
result = subprocess.run(kill_cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"已关闭进程: {proc_name}")
else:
logger.warning(f"关闭进程{proc_name}时返回码: {result.returncode}, 输出: {result.stdout}, 错误: {result.stderr}")
except Exception as e:
logger.error(f"关闭进程{proc_name}时发生异常: {str(e)}")
# 尝试启动QMT软件
subprocess.Popen(Config.XT_LAUNCHER)
logger.info(f"已启动QMT软件: {Config.XT_LAUNCHER}")
except Exception as e:
logger.error(f"重启QMT软件时发生错误: {str(e)}")
# 定时任务在交易日重启QMT软件
def restart_qmt_on_trading_day():
"""仅在交易日执行QMT软件重启操作"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日执行QMT软件重启操作")
restart_qmt()
else:
logger.info("今天不是交易日跳过QMT软件重启")
def setup_scheduler():
# 设置每日任务,仅在交易日执行
schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login_on_trading_day)
schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout_on_trading_day)
schedule.every().day.at(Config.XT_RESTART_TIME).do(restart_qmt_on_trading_day)
# 启动调度线程
scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("定时任务调度器已启动")
# 启动重连检查线程(仅在实盘模式下)
if is_real_mode():
reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True)
reconnect_thread.start()
logger.info("重连检查线程已启动")
return scheduler_thread
# 初始化交易系统
try:
# 程序启动时初始化交易实例 - 尝试登录,但即使失败也继续启动服务
login_success = login()
if not login_success and is_real_mode():
logger.warning("初始登录失败,系统将在稍后定期尝试重连")
# 设置并启动调度器
setup_scheduler()
logger.info("交易系统初始化完成")
except Exception as e:
logger.error(f"交易系统初始化异常: {e}")
# 即使初始化失败也尝试启动调度器
try:
setup_scheduler()
logger.info("调度器启动成功,将尝试定期重新初始化交易系统")
except Exception as scheduler_e:
logger.error(f"调度器启动失败: {scheduler_e}")
raise
# 主日志记录器 (可以根据需要选择使用 app_state_logger 或 scheduler_logger, 或者独立的)
logger = get_logger("trade_server_main")
# 创建 Flask 应用实例
app = Flask(__name__) app = Flask(__name__)
# 添加中间件,拦截所有不以"/yu"开头的请求并返回404 # --- 应用初始化和设置 ---
def initialize_app_services():
"""初始化应用核心服务,如交易系统登录和定时任务。"""
logger.info("开始初始化应用服务...")
try:
# 1. 尝试初始登录交易系统
# login 函数现在位于 app_state.py
login_success = initialize_trader_login()
if not login_success and is_real_mode():
logger.warning("主程序:初始登录失败,系统将在后台尝试重连。")
else:
logger.info("主程序:交易系统登录步骤完成。")
# 2. 设置并启动调度器
# setup_scheduler 函数现在位于 scheduler_tasks.py
setup_scheduler()
logger.info("主程序:定时任务调度器已设置并启动。")
logger.info("应用服务初始化完成。")
except Exception as e:
logger.error(f"主程序:交易系统或调度器初始化异常: {e}", exc_info=True)
# 即使部分初始化失败也尝试继续让Flask应用能启动可能API部分功能受限
# 如果调度器启动失败是关键错误,可以考虑在这里重新抛出异常或退出
if "setup_scheduler" not in str(e): # 避免重复日志
try:
logger.info("尝试单独启动调度器...")
setup_scheduler()
except Exception as scheduler_e:
logger.error(f"主程序:调度器单独启动也失败: {scheduler_e}", exc_info=True)
# 根据关键性决定是否在此处 raise scheduler_e
# --- Flask 中间件和路由注册 ---
@app.before_request @app.before_request
def check_path_prefix(): def check_path_prefix() -> None:
"""中间件:拦截所有不以 /yu 开头的请求并返回404。"""
if not request.path.startswith('/yu'): if not request.path.startswith('/yu'):
logger.warning(f"拦截非法请求: {request.path}") # 允许访问 Flask 的静态文件,例如 /static/...
if not request.path.startswith(app.static_url_path or '/static'):
logger.warning(f"拦截非法路径请求: {request.path}")
abort(404) abort(404)
@app.route("/yu/healthcheck", methods=["GET"]) # 注册蓝图
def health_check(): app.register_blueprint(health_bp)
if is_real_mode() and _trader_instance and not _trader_instance.is_available(): app.register_blueprint(trading_bp)
return jsonify({ app.register_blueprint(account_bp)
"status": "error", app.register_blueprint(strategy_bp)
"message": _trader_instance.connection_error_message or "交易系统连接失败"
}), 503
return jsonify({"status": "ok"}), 200
@app.route("/yu/buy", methods=["POST"])
def buy():
"""Buy an item with given parameters."""
logger.info("Received buy request")
# Get data from request body
data = request.get_json()
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get(
"strategy_name", "default_strategy"
) # 新增策略名称参数,默认为空
order_type = data.get("order_type", ORDER_TYPE_LIMIT)
try:
if not all([code, price_str, amount_str]):
raise ValueError("Missing required parameters")
price = float(price_str)
amount = int(float(amount_str))
if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0):
raise ValueError("Price and amount must be positive")
trader = get_trader()
if is_real_mode():
# 检查是否在交易时间内
if not BaseTrader.is_trading_time():
logger.warning(
f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}"
)
return (
jsonify(
{"success": False, "error": f"交易失败: 非交易时间不能实盘交易"}
),
400,
)
# 检查交易系统是否可用
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
# 开始买入
logger.info(
f"使用RealTraderManager执行买入: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}"
)
rtm = get_real_trader_manager()
result = rtm.place_order(
strategy_name, code, ORDER_DIRECTION_BUY, amount, price, order_type
)
else:
result = trader.buy(code, price, amount, strategy_name)
if result.get("success"):
logger.info(f"买入下单成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
else:
logger.error(f"买入下单失败: {result}")
return jsonify({"success": False, "error": result}), 400
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing buy request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/sell", methods=["POST"])
def sell():
"""Sell an item with given parameters."""
logger.info("Received sell request")
# Get data from request body
data = request.get_json()
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空
order_type = data.get("order_type", ORDER_TYPE_LIMIT)
try:
if not all([code, price_str, amount_str]):
raise ValueError("Missing required parameters")
price = float(price_str)
amount = int(float(amount_str))
if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0):
raise ValueError("Price and amount must be positive")
trader = get_trader()
if is_real_mode():
# 检查是否在交易时间内
if not BaseTrader.is_trading_time():
logger.warning(
f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}"
)
return (
jsonify(
{"success": False, "error": f"交易失败: 非交易时间不能实盘交易"}
),
400,
)
# 检查交易系统是否可用
if not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
# 开始卖出
logger.info(
f"使用RealTraderManager执行卖出: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}"
)
rtm = get_real_trader_manager()
result = rtm.place_order(
strategy_name, code, ORDER_DIRECTION_SELL, amount, price, order_type
)
else:
result = trader.sell(code, price, amount, strategy_name)
if result.get("success"):
logger.info(f"卖出下单成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
elif "error" in result:
logger.error(f"卖出下单失败: {result}")
return jsonify({"success": False, "error": result["error"]}), 400
else:
logger.error(f"卖出下单失败: {result}")
return jsonify({"success": False, "error": result}), 400
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing sell request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/cancel/<order_id>", methods=["DELETE"])
def cancel(order_id):
"""Cancel an order by order_id."""
logger.info(f"Received cancel request for order {order_id}")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
result = trader.cancel(order_id)
logger.info(f"撤单结果: {result}")
return jsonify(result), 200
except Exception as e:
logger.error(f"Error processing cancel request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/balance", methods=["GET"])
def get_balance():
"""Get balance information."""
logger.info("Received balance request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
balance = trader.get_balance()
return jsonify(balance), 200
except Exception as e:
logger.error(f"Error processing balance request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/positions", methods=["GET"])
def get_positions():
"""Get position information."""
logger.info("Received positions request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
positions = trader.get_positions()
return jsonify(positions), 200
except Exception as e:
logger.error(f"Error processing positions request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todaytrades", methods=["GET"])
def get_today_trades():
"""Get today's trade information."""
logger.info("Received today trades request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
trades = trader.get_today_trades()
return jsonify(trades), 200
except Exception as e:
logger.error(f"Error processing today trades request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todayorders", methods=["GET"])
def get_today_orders():
"""Get today's order information."""
logger.info("Received today orders request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
orders = trader.get_today_orders()
return jsonify(orders), 200
except Exception as e:
logger.error(f"Error processing today orders request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/order/<order_id>", methods=["GET"])
def get_order(order_id):
"""Get order information by order_id."""
logger.info(f"Received order request for order {order_id}")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
order = trader.get_order(order_id)
return jsonify(order), 200
except Exception as e:
logger.error(f"Error processing order request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/clear/<strategy_name>", methods=["DELETE"])
def clear_strategy(strategy_name):
"""清除指定策略的持仓管理数据"""
logger.info(f"接收到清除策略持仓请求: {strategy_name}")
try:
if get_trader().clear_position_manager(strategy_name):
return jsonify({"success": True, "message": "clear success"}), 200
else:
return jsonify({"success": False, "message": "策略不存在: " + strategy_name}), 400
except Exception as e:
logger.error(f"清除策略持仓时出错: {str(e)}")
abort(500, description="服务器内部错误")
@app.route("/yu/test", methods=["GET"])
def test_route():
"""测试路由,用于验证中间件是否正常工作"""
return jsonify({"success": True, "message": "API路由前缀验证成功"}), 200
logger.info("所有API路由蓝图已注册。")
# --- 主程序入口 ---
if __name__ == "__main__": if __name__ == "__main__":
logger.info(f"Server starting on {Config.HOST}:{Config.PORT}") # 在启动Web服务器之前初始化服务
app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT) initialize_app_services()
logger.info(f"Flask 服务器准备在 {Config.HOST}:{Config.PORT} 启动...")
app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT, use_reloader=False)
# use_reloader=False 建议在生产或有自定义启动/关闭逻辑时使用,以避免重复初始化。
# 对于开发模式debug=True 通常会启用重载器。
else:
# 如果是通过 Gunicorn 等 WSGI 服务器运行,则也需要初始化服务
# Gunicorn 通常会为每个 worker 进程加载一次应用模块
initialize_app_services()

1
tests/__init__.py Normal file
View File

@ -0,0 +1 @@

1
tests/real/__init__.py Normal file
View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@

1
tests/utils/__init__.py Normal file
View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@