更新使用实盘还是模拟盘的逻辑

This commit is contained in:
zhiyong 2025-04-30 22:16:48 +08:00
parent 414dca6ee4
commit fdee1c3da1
4 changed files with 825 additions and 227 deletions

View File

@ -1,19 +1,19 @@
import os
import datetime
class Config:
# Server settings
PORT = 9527
HOST = '0.0.0.0'
DEBUG = False
PORT = int(os.environ.get("PORT", 9527))
HOST = os.environ.get("HOST", "0.0.0.0")
DEBUG = os.environ.get("DEBUG", "False").lower() == "true"
# Trading settings
TRADE_TIMEOUT = 3 # 交易超时时间(秒)
SIMULATION_ONLY = True # 是否仅使用模拟交易
TRADE_TIMEOUT = int(os.environ.get("TRADE_TIMEOUT", 10)) # 交易超时时间(秒)
SIMULATION_ONLY = os.environ.get("SIMULATION_ONLY", "False").lower() == "true"
# Trading hours
MARKET_OPEN_TIME = "09:20"
MARKET_ACTIVE_TIME = "09:15"
MARKET_CLOSE_TIME = "15:10"
MARKET_OPEN_TIME = os.environ.get("MARKET_OPEN_TIME", "09:15")
MARKET_CLOSE_TIME = os.environ.get("MARKET_CLOSE_TIME", "15:30")
# Logging
LOG_DIR = "logs"
@ -27,5 +27,15 @@ class Config:
RATE_LIMIT_PERIOD = 60 # seconds
# XtQuant 相关配置
XT_ACCOUNT = '80391818'
XT_PATH = r'C:\\江海证券QMT实盘_交易\\userdata_mini'
XT_ACCOUNT = os.environ.get("XT_ACCOUNT", "80391818")
XT_PATH = os.environ.get("XT_PATH", r'C:\\江海证券QMT实盘_交易\\userdata_mini')
# 新增RealTraderManager配置
USE_REAL_TRADER_MANAGER = os.environ.get("USE_REAL_TRADER_MANAGER", "True").lower() == "true"
RTM_ORDER_TIMEOUT = int(os.environ.get("RTM_ORDER_TIMEOUT", 60)) # 订单超时时间(秒)
RTM_MAX_RETRIES = int(os.environ.get("RTM_MAX_RETRIES", 3)) # 最大重试次数
RTM_USE_MARKET_ORDER = os.environ.get("RTM_USE_MARKET_ORDER", "True").lower() == "true" # 是否使用市价单进行补单
# 计划任务运行时间
STRATEGY_SAVE_TIME = os.environ.get("STRATEGY_SAVE_TIME", "15:30") # 每天保存策略数据的时间
CLEAN_ORDERS_TIME = os.environ.get("CLEAN_ORDERS_TIME", "00:01") # 每天清理超时委托的时间

510
src/real_trader_manager.py Normal file
View File

@ -0,0 +1,510 @@
import time
import threading
import schedule
from xt_trader import XtTrader
from xtquant import xtconstant
from logger_config import get_logger
from config import Config
from strategy_position_manager import StrategyPositionManager
import json
# 获取日志记录器
logger = get_logger('real_trader_manager')
class RealTraderManager:
"""实盘交易管理器,处理实盘下单失败、部分成交等问题,尽量保证仓位与策略信号一致"""
def __init__(self, trader=None):
"""初始化实盘交易管理器
Args:
trader: XtTrader实例如果为None则自动获取
"""
# 使用传入的trader实例或获取单例
from trade_server import get_real_trader
self.trader = trader if trader is not None else get_real_trader()
# 确保已登录
if not self.trader.is_logged_in():
self.trader.login()
# 存储待处理的交易请求及其状态
# 格式: {order_id: {strategy_name, code, direction, target_amount, price, status, create_time, last_check_time, retry_count}}
self.pending_orders = {}
# 启动调度器
self._start_scheduler()
# 记录策略期望持仓状态
# 格式: {strategy_name: {code: target_amount}}
self.strategy_targets = {}
logger.info("实盘交易管理器初始化完成")
def _start_scheduler(self):
"""启动定时任务调度器"""
# 每分钟检查一次未完成订单状态并处理
schedule.every(1).minutes.do(self.check_pending_orders)
# 每天收盘后清理过期未完成订单
schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(self.clean_expired_orders)
# 启动调度线程
def run_scheduler():
while True:
try:
schedule.run_pending()
time.sleep(10)
except Exception as e:
logger.error(f"调度器运行错误: {str(e)}")
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.daemon = True
scheduler_thread.start()
logger.info("交易管理器调度器已启动")
def place_order(self, strategy_name, code, direction, amount, price, order_type='limit'):
"""下单接口,处理买入/卖出请求
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向 'buy''sell'
amount: 交易数量
price: 交易价格
order_type: 订单类型'limit'表示限价单'market'表示市价单
Returns:
dict: 包含订单ID和状态信息
"""
if not strategy_name or not code or not direction:
logger.error("下单参数不完整")
return {"success": False, "error": "参数不完整"}
# 检查交易方向
if direction not in ['buy', 'sell']:
logger.error(f"无效的交易方向: {direction}")
return {"success": False, "error": "无效的交易方向"}
try:
# 检查资金和持仓是否足够
if not self._check_order_feasibility(code, direction, amount, price):
logger.warning(f"资金或持仓不足,忽略订单: {direction} {code} {amount}")
return {"success": False, "error": "资金或持仓不足"}
# 更新策略目标持仓
self._update_strategy_target(strategy_name, code, direction, amount)
# 执行实际下单
price_type = xtconstant.FIX_PRICE if order_type == 'limit' else xtconstant.MARKET_BEST
# 下单
if direction == 'buy':
result = self.trader.buy(code, price, amount)
else:
result = self.trader.sell(code, price, amount)
order_id = result.get('order_id')
if not order_id or order_id == 'simulation':
logger.error(f"下单失败: {result}")
return {"success": False, "error": "下单失败"}
# 记录到待处理订单
self.pending_orders[order_id] = {
'strategy_name': strategy_name,
'code': code,
'direction': direction,
'target_amount': amount,
'price': price,
'status': 'pending',
'create_time': time.time(),
'last_check_time': time.time(),
'retry_count': 0,
'order_type': order_type
}
# 添加到策略持仓管理器中的未完成委托
StrategyPositionManager.add_pending_order(
self.trader,
order_id,
strategy_name,
code,
price,
amount,
direction
)
logger.info(f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}")
# 立即更新一次订单状态
self._update_order_status(order_id)
return {"success": True, "order_id": order_id}
except Exception as e:
logger.error(f"下单过程发生异常: {str(e)}")
return {"success": False, "error": f"下单异常: {str(e)}"}
def check_pending_orders(self):
"""检查所有未完成订单状态并处理,定时任务调用"""
try:
logger.info("开始检查未完成订单...")
# 更新StrategyPositionManager中的未完成委托状态
StrategyPositionManager.update_pending_orders(self.trader)
# 获取最新的委托列表
entrusts = self.trader.get_today_entrust()
entrust_map = {str(e['order_id']): e for e in entrusts}
# 检查每个未完成订单
for order_id, order_info in list(self.pending_orders.items()):
# 跳过已完成的订单
if order_info['status'] in ['completed', 'cancelled', 'failed']:
continue
# 更新订单状态
self._update_order_status(order_id, entrust_map)
# 处理超时未成交或部分成交的订单
current_time = time.time()
order_age = current_time - order_info['create_time']
# 如果订单超过配置的超时时间且状态仍为pending或partial
if order_age > Config.RTM_ORDER_TIMEOUT and order_info['status'] in ['pending', 'partial']:
# 记录超时信息
logger.warning(f"订单已超时({order_age:.0f}秒 > {Config.RTM_ORDER_TIMEOUT}秒): ID={order_id}, 代码={order_info['code']}, 状态={order_info['status']}")
# 如果是部分成交,记录详情
if order_info['status'] == 'partial' and 'traded_volume' in order_info:
original = order_info['target_amount']
traded = order_info['traded_volume']
remaining = original - traded
logger.info(f"订单部分成交详情: ID={order_id}, 原始数量={original}, 已成交={traded}, 剩余={remaining}")
self._handle_timeout_order(order_id, order_info)
# 同步策略持仓和实际持仓
self._sync_strategy_positions()
logger.info("未完成订单检查完毕")
except Exception as e:
logger.error(f"检查未完成订单时发生异常: {str(e)}")
def _update_order_status(self, order_id, entrust_map=None):
"""更新单个订单状态
Args:
order_id: 订单ID
entrust_map: 可选的委托字典如果为None则重新获取
"""
if order_id not in self.pending_orders:
return
try:
# 如果没有提供委托字典,则获取当前委托
if entrust_map is None:
entrusts = self.trader.get_today_entrust()
entrust_map = {str(e['order_id']): e for e in entrusts}
# 查找对应的委托记录
entrust = entrust_map.get(str(order_id))
if entrust:
# 获取订单之前的状态,用于判断是否发生变化
previous_status = self.pending_orders[order_id].get('status')
previous_volume = self.pending_orders[order_id].get('traded_volume', 0)
# 更新最后检查时间
self.pending_orders[order_id]['last_check_time'] = time.time()
# 根据委托状态更新订单状态
if entrust['order_status'] == xtconstant.ORDER_SUCCEEDED:
# 全部成交
self.pending_orders[order_id]['status'] = 'completed'
# 记录状态变化
if previous_status != 'completed':
logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=completed, 成交量={entrust.get('traded_volume', 0)}")
elif entrust['order_status'] == xtconstant.ORDER_PART_SUCC:
# 部分成交
self.pending_orders[order_id]['status'] = 'partial'
current_volume = entrust.get('traded_volume', 0)
self.pending_orders[order_id]['traded_volume'] = current_volume
# 如果成交量有变化,记录日志
if current_volume != previous_volume:
target_amount = self.pending_orders[order_id]['target_amount']
logger.info(f"订单部分成交更新: ID={order_id}, 代码={entrust['stock_code']}, 目标数量={target_amount}, 已成交数量={current_volume}, 剩余数量={target_amount - current_volume}")
elif entrust['order_status'] in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]:
# 已撤单或废单
self.pending_orders[order_id]['status'] = 'cancelled'
# 记录状态变化
if previous_status != 'cancelled':
logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=cancelled, 原因={entrust.get('err_msg', '未知原因')}")
elif entrust['order_status'] == xtconstant.ORDER_UNREPORTED:
# 未报
self.pending_orders[order_id]['status'] = 'pending'
if previous_status != 'pending':
logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(未报)")
elif entrust['order_status'] == xtconstant.ORDER_WAIT_REPORTING:
# 待报
self.pending_orders[order_id]['status'] = 'pending'
if previous_status != 'pending':
logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(待报)")
elif entrust['order_status'] == xtconstant.ORDER_REPORTED:
# 已报
self.pending_orders[order_id]['status'] = 'pending'
if previous_status != 'pending':
logger.info(f"订单状态变化: ID={order_id}, 代码={entrust['stock_code']}, 旧状态={previous_status}, 新状态=pending(已报)")
else:
# 委托列表中找不到该订单,可能已经太久
current_time = time.time()
if current_time - self.pending_orders[order_id]['create_time'] > 24 * 60 * 60:
previous_status = self.pending_orders[order_id].get('status')
self.pending_orders[order_id]['status'] = 'failed'
logger.warning(f"订单状态未知且过期: ID={order_id}, 旧状态={previous_status}, 新状态=failed, 创建时长={(current_time - self.pending_orders[order_id]['create_time'])/3600:.1f}小时")
except Exception as e:
logger.error(f"更新订单状态时发生异常: order_id={order_id}, error={str(e)}")
def _handle_timeout_order(self, order_id, order_info):
"""处理超时或部分成交的订单
Args:
order_id: 订单ID
order_info: 订单信息字典
"""
try:
# 首先尝试撤单
logger.info(f"尝试撤销超时订单: ID={order_id}, 代码={order_info['code']}, 超时时间={(time.time() - order_info['create_time']):.0f}")
cancel_result = self.trader.cancel(order_id)
# 记录撤单结果
if isinstance(cancel_result, dict):
result_str = json.dumps(cancel_result)
else:
result_str = str(cancel_result)
logger.info(f"撤单结果: ID={order_id}, 结果={result_str}")
# 计算未成交数量
original_amount = order_info['target_amount']
traded_amount = order_info.get('traded_volume', 0)
remaining_amount = original_amount - traded_amount
# 记录详细的成交情况
logger.info(f"订单成交情况: ID={order_id}, 代码={order_info['code']}, 原始数量={original_amount}, 已成交={traded_amount}, 剩余={remaining_amount}")
# 如果有未成交的部分,使用市价单补充交易
if remaining_amount > 0:
# 递增重试计数
order_info['retry_count'] += 1
logger.info(f"准备使用市价单补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}")
# 如果重试次数少于最大重试次数,则使用市价单补单
if order_info['retry_count'] <= Config.RTM_MAX_RETRIES:
# 使用市价单
new_order = self.place_order(
order_info['strategy_name'],
order_info['code'],
order_info['direction'],
remaining_amount,
0, # 市价单价格参数无效
'market' # 使用市价单
)
if new_order.get('success'):
logger.info(f"市价补单成功: 原订单ID={order_id}, 新订单ID={new_order['order_id']}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}")
else:
logger.error(f"市价补单失败: 原订单ID={order_id}, 错误={new_order.get('error')}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}")
else:
logger.warning(f"订单重试次数过多,不再尝试: ID={order_id}, 重试次数={order_info['retry_count']}/{Config.RTM_MAX_RETRIES}, 代码={order_info['code']}, 方向={order_info['direction']}, 未成交数量={remaining_amount}")
else:
logger.info(f"订单已全部成交,无需补单: ID={order_id}, 代码={order_info['code']}, 成交数量={traded_amount}")
# 更新原订单状态
previous_status = order_info['status']
order_info['status'] = 'cancelled'
logger.info(f"更新原订单状态: ID={order_id}, 旧状态={previous_status}, 新状态=cancelled")
except Exception as e:
logger.error(f"处理超时订单时发生异常: order_id={order_id}, error={str(e)}")
def _check_order_feasibility(self, code, direction, amount, price):
"""检查订单是否可行(资金或持仓是否足够)
Args:
code: 股票代码
direction: 交易方向
amount: 交易数量
price: 交易价格
Returns:
bool: 订单是否可行
"""
try:
if direction == 'buy':
# 检查资金是否足够
balance = self.trader.get_balance()
if not balance:
logger.error("获取账户余额失败")
return False
# 计算所需资金加上3%的手续费作为缓冲)
required_cash = price * amount * 1.03
available_cash = balance.get('cash', 0)
if required_cash > available_cash:
logger.warning(f"资金不足: 需要 {required_cash:.2f}, 可用 {available_cash:.2f}")
return False
return True
elif direction == 'sell':
# 检查持仓是否足够
positions = self.trader.get_positions()
position = next((p for p in positions if p.get('stock_code') == code), None)
if not position:
logger.warning(f"没有持仓: {code}")
return False
available_volume = position.get('can_use_volume', 0)
if amount > available_volume:
logger.warning(f"可用持仓不足: 需要 {amount}, 可用 {available_volume}")
return False
return True
return False
except Exception as e:
logger.error(f"检查订单可行性时发生异常: {str(e)}")
return False
def _update_strategy_target(self, strategy_name, code, direction, amount):
"""更新策略目标持仓
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向
amount: 交易数量
"""
# 确保策略存在于目标字典中
if strategy_name not in self.strategy_targets:
self.strategy_targets[strategy_name] = {}
# 确保股票代码存在于策略目标中
if code not in self.strategy_targets[strategy_name]:
self.strategy_targets[strategy_name][code] = 0
# 根据交易方向更新目标持仓
if direction == 'buy':
self.strategy_targets[strategy_name][code] += amount
else: # sell
self.strategy_targets[strategy_name][code] -= amount
# 避免负数持仓
if self.strategy_targets[strategy_name][code] < 0:
self.strategy_targets[strategy_name][code] = 0
logger.info(f"更新策略目标持仓: 策略={strategy_name}, 代码={code}, 目标持仓={self.strategy_targets[strategy_name][code]}")
def _sync_strategy_positions(self):
"""同步策略持仓和实际持仓"""
try:
# 获取实际持仓
actual_positions = self.trader.get_positions()
position_map = {p['stock_code']: p for p in actual_positions}
# 遍历每个策略的目标持仓
for strategy_name, targets in self.strategy_targets.items():
# 该策略的实际持仓映射
strategy_actual_positions = {}
# 遍历该策略的目标持仓
for code, target_amount in targets.items():
# 获取股票的实际持仓
actual_position = position_map.get(code, {})
actual_amount = actual_position.get('volume', 0)
if actual_amount > 0:
strategy_actual_positions[code] = actual_amount
# 更新策略持仓管理器中的持仓记录
StrategyPositionManager.update_strategy_position(
self.trader,
strategy_name,
code,
'sync', # 使用同步模式
actual_amount
)
# 检查是否需要调整持仓
if actual_amount != target_amount:
diff = target_amount - actual_amount
if diff != 0:
logger.warning(f"持仓不一致: 策略={strategy_name}, 代码={code}, 目标={target_amount}, 实际={actual_amount}")
# 记录日志
logger.info(f"策略 {strategy_name} 的目标持仓: {targets}")
logger.info(f"策略 {strategy_name} 的实际持仓: {strategy_actual_positions}")
except Exception as e:
logger.error(f"同步策略持仓时发生异常: {str(e)}")
def clean_expired_orders(self):
"""清理过期的未完成订单"""
try:
current_time = time.time()
for order_id, order_info in list(self.pending_orders.items()):
# 如果订单创建时间超过24小时
if current_time - order_info['create_time'] > 24 * 60 * 60:
if order_info['status'] not in ['completed', 'cancelled', 'failed']:
logger.warning(f"清理过期订单: ID={order_id}, 状态={order_info['status']}")
order_info['status'] = 'expired'
except Exception as e:
logger.error(f"清理过期订单时发生异常: {str(e)}")
def get_pending_orders(self):
"""获取所有未完成订单
Returns:
list: 未完成订单列表
"""
return [{
'order_id': order_id,
**order_info
} for order_id, order_info in self.pending_orders.items()]
def get_strategy_targets(self):
"""获取策略目标持仓
Returns:
dict: 策略目标持仓
"""
return self.strategy_targets
# 单例模式实现
_instance = None
def get_real_trader_manager():
"""获取实盘交易管理器单例实例
Returns:
RealTraderManager: 实盘交易管理器实例
"""
global _instance
if _instance is None:
# 从trade_server获取实盘交易实例
from trade_server import get_real_trader
trader = get_real_trader()
_instance = RealTraderManager(trader)
return _instance

View File

@ -196,28 +196,33 @@ class StrategyPositionManager:
if os.path.exists('strategy_data.json'):
with open('strategy_data.json', 'r') as f:
data = json.load(f)
# 兼容旧版数据格式
if 'positions' in data and not isinstance(data['positions'].get('real', None), dict):
# 旧版数据,将其迁移到新结构
strategy_positions = {
'real': data.get('positions', {}),
'simulation': {}
}
strategy_trades = {
'real': data.get('trades', {}),
'simulation': {}
}
pending_orders = {
'real': data.get('pending_orders', {}),
'simulation': {}
}
else:
# 新版数据结构
strategy_positions = data.get('positions', {'real': {}, 'simulation': {}})
strategy_trades = data.get('trades', {'real': {}, 'simulation': {}})
pending_orders = data.get('pending_orders', {'real': {}, 'simulation': {}})
# 直接使用新版数据结构,不再兼容旧版格式
strategy_positions = data.get('positions', {'real': {}, 'simulation': {}})
strategy_trades = data.get('trades', {'real': {}, 'simulation': {}})
pending_orders = data.get('pending_orders', {'real': {}, 'simulation': {}})
# 确保数据结构完整
if 'real' not in strategy_positions:
strategy_positions['real'] = {}
if 'simulation' not in strategy_positions:
strategy_positions['simulation'] = {}
if 'real' not in strategy_trades:
strategy_trades['real'] = {}
if 'simulation' not in strategy_trades:
strategy_trades['simulation'] = {}
if 'real' not in pending_orders:
pending_orders['real'] = {}
if 'simulation' not in pending_orders:
pending_orders['simulation'] = {}
logger.info("已加载策略数据")
logger.info(f"实盘策略数: {len(strategy_positions['real'])}, 模拟策略数: {len(strategy_positions['simulation'])}")
except Exception as e:
logger.error(f"加载策略数据失败: {str(e)}")
# 初始化空数据结构
strategy_positions = {'real': {}, 'simulation': {}}
strategy_trades = {'real': {}, 'simulation': {}}
pending_orders = {'real': {}, 'simulation': {}}
@staticmethod
def save_strategy_data():

View File

@ -11,6 +11,7 @@ from simulation_trader import SimulationTrader
import datetime
from strategy_position_manager import StrategyPositionManager
from logger_config import get_logger
from real_trader_manager import get_real_trader_manager
# 获取日志记录器
logger = get_logger('server')
@ -47,25 +48,46 @@ def get_real_trader():
_real_trader_instance.login()
return _real_trader_instance
# 获取交易实例
def get_trader(use_sim_trader=False):
"""获取交易实例 - 采用单例模式
# 判断当前是否应该使用模拟交易
def should_use_simulation():
"""判断是否应该使用模拟交易
Args:
use_sim_trader (bool): 是否强制使用模拟交易True表示必定返回模拟交易实例
Returns:
返回交易实例根据参数和配置决定是模拟交易还是实盘交易
tuple: (should_simulate: bool, simulation_reason: str)
should_simulate: 是否应该使用模拟交易
simulation_reason: 使用模拟交易的原因
"""
# 如果强制使用模拟交易,返回模拟交易单例
if use_sim_trader:
return get_sim_trader()
# 如果配置为仅模拟交易,返回模拟交易单例
# 如果配置为仅模拟交易返回True
if Config.SIMULATION_ONLY:
return get_sim_trader()
return True, "配置为仅模拟交易"
# 判断当前是否为交易时间
# 判断当前是否为交易日(只基于日期,不考虑时间)
now = datetime.datetime.now()
# 尝试导入chinese_calendar判断是否为交易日
try:
from chinese_calendar import is_workday, is_holiday
is_trading_day = is_workday(now) and not is_holiday(now)
except ImportError:
# 如果无法导入chinese_calendar则简单地用工作日判断
is_trading_day = now.weekday() < 5 # 0-4 为周一至周五
# 如果不是交易日返回True使用模拟交易
if not is_trading_day:
return True, f"当前非交易日 - {now.date()}"
# 如果是交易日无论是否在交易时间都返回False使用实盘
return False, ""
# 判断当前是否在交易时间内
def is_trading_hours():
"""判断当前是否在交易时间内
Returns:
tuple: (is_trading: bool, message: str)
is_trading: 是否在交易时间
message: 相关信息
"""
now = datetime.datetime.now()
current_time = now.time()
@ -77,21 +99,40 @@ def get_trader(use_sim_trader=False):
is_trading_hour = (morning_start <= current_time <= morning_end) or (afternoon_start <= current_time <= afternoon_end)
# 尝试导入chinese_calendar判断是否为交易日
try:
from chinese_calendar import is_workday, is_holiday
is_trading_day = is_workday(now) and not is_holiday(now)
except ImportError:
# 如果无法导入chinese_calendar则简单地用工作日判断
is_trading_day = now.weekday() < 5 # 0-4 为周一至周五
if is_trading_hour:
return True, ""
else:
return False, f"当前非交易时段 - 时间: {current_time.strftime('%H:%M:%S')}"
# 获取交易实例 - 根据情况返回模拟或实盘交易实例
def get_trader():
"""获取交易实例 - 根据当前状态决定返回模拟还是实盘交易实例
# 如果不是交易日或不在交易时间内,返回模拟交易单例
if not is_trading_day or not is_trading_hour:
logger.info(f"当前非交易时段 - 日期: {now.date()}, 时间: {current_time}, 使用模拟交易")
Returns:
返回交易实例根据配置和当前时间决定是模拟交易还是实盘交易
"""
should_simulate, _ = should_use_simulation()
if should_simulate:
return get_sim_trader()
else:
return get_real_trader()
# 获取指定类型的交易实例 - 供内部API查询等使用
def get_trader_by_type(trader_type='auto'):
"""获取指定类型的交易实例
# 否则返回真实交易单例
return get_real_trader()
Args:
trader_type: 'simulation'=模拟交易, 'real'=实盘交易, 'auto'=自动判断
Returns:
指定类型的交易实例
"""
if trader_type == 'simulation':
return get_sim_trader()
elif trader_type == 'real':
return get_real_trader()
else: # 'auto'
return get_trader()
def run_daily(time_str, job_func):
"""设置每天在指定时间运行的任务
@ -135,7 +176,6 @@ atexit.register(StrategyPositionManager.save_strategy_data)
# 使用配置文件中的时间
run_daily(Config.MARKET_OPEN_TIME, lambda: get_trader().login())
run_daily(Config.MARKET_ACTIVE_TIME, lambda: get_trader().get_balance())
run_daily(Config.MARKET_CLOSE_TIME, lambda: get_trader().logout())
@ -144,36 +184,10 @@ def health_check():
return "ok", 200
def should_use_simulation():
"""判断是否应该使用模拟交易
Returns:
tuple: (should_simulate: bool, simulation_reason: str)
should_simulate: 是否应该使用模拟交易
simulation_reason: 使用模拟交易的原因
"""
# 直接使用get_trader()返回的实例类型判断
trader = get_trader()
if isinstance(trader, SimulationTrader):
# 获取原因
if Config.SIMULATION_ONLY:
return True, "配置为仅模拟交易"
else:
now = datetime.datetime.now()
return True, f"当前非交易时段 - {now.strftime('%Y-%m-%d %H:%M:%S')}"
# 如果是实盘交易实例
return False, ""
@app.route("/yu/buy", methods=["POST"])
def buy():
"""Buy an item with given parameters."""
logger.info("Received buy request")
# 每次操作前更新未完成委托状态
current_trader = get_trader()
StrategyPositionManager.update_pending_orders(current_trader)
# Get data from request body
data = request.get_json()
@ -195,12 +209,13 @@ def buy():
# 检查是否需要模拟交易
should_simulate, simulation_reason = should_use_simulation()
# 自动判断需要使用模拟交易
if should_simulate:
# 使用模拟交易
logger.info(f"使用模拟交易 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}")
# 获取模拟交易实例并执行买入操作
sim_trader = get_trader(True)
sim_trader = get_sim_trader()
result = sim_trader.buy(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
@ -208,62 +223,26 @@ def buy():
# 模拟交易立即生效,更新策略持仓
StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'buy', amount)
return jsonify({"success": True, "data": result}), 200
return jsonify({"success": True, "data": result, "simulation": True}), 200
# 检查是否在交易时间内
trading_hours, hours_message = is_trading_hours()
if not trading_hours:
logger.warning(f"实盘交易失败 - {hours_message} - 代码: {code}, 价格: {price}, 数量: {amount}")
return jsonify({"success": False, "error": f"交易失败: {hours_message},非交易时间不能实盘交易"}), 400
# 尝试实盘交易
logger.info(f"Executing buy order: code={code}, price={price}, amount={amount}, strategy_name={strategy_name}")
try:
result = execute_with_timeout(current_trader.buy, Config.TRADE_TIMEOUT, code, price, amount)
if result is None:
# 超时时使用模拟交易
logger.warning(f"Buy order timeout after {Config.TRADE_TIMEOUT} seconds, switching to simulation mode")
# 创建模拟交易实例并执行买入操作
sim_trader = get_trader(True)
result = sim_trader.buy(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 超时情况下,使用模拟交易,立即更新策略持仓
StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'buy', amount)
return jsonify({"success": True, "data": result}), 200
# 使用RealTraderManager执行实盘交易
logger.info(f"使用RealTraderManager执行买入: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}")
rtm = get_real_trader_manager()
result = rtm.place_order(strategy_name, code, 'buy', amount, price)
if result.get('success'):
logger.info(f"RealTraderManager买入成功: {result}")
return jsonify({"success": True, "data": result, "simulation": False}), 200
else:
logger.error(f"RealTraderManager买入失败: {result.get('error')}")
return jsonify({"success": False, "error": result.get('error')}), 400
# 如果指定了策略名称且是真实交易
if strategy_name and 'order_id' in result and result['order_id'] != 'simulation':
order_id = result['order_id']
# 添加到未完成委托
StrategyPositionManager.add_pending_order(
current_trader,
order_id,
strategy_name,
code,
price,
amount,
'buy'
)
# 注意不在这里调用update_strategy_position
# 持仓更新将由update_pending_orders函数处理
# 这避免了持仓更新的冗余操作
logger.info(f"Buy order result: {result}")
return jsonify({"success": True, "data": result}), 200
except Exception as e:
# 发生错误时使用模拟交易
logger.error(f"Buy order failed: {str(e)}, switching to simulation mode")
# 创建模拟交易实例并执行买入操作
sim_trader = get_trader(True)
result = sim_trader.buy(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 错误情况下,使用模拟交易,立即更新策略持仓
StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'buy', amount)
return jsonify({"success": True, "data": result}), 200
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
@ -276,9 +255,6 @@ def buy():
def sell():
"""Sell an item with given parameters."""
logger.info("Received sell request")
# 每次操作前更新未完成委托状态
current_trader = get_trader()
StrategyPositionManager.update_pending_orders(current_trader)
# Get data from request body
data = request.get_json()
@ -300,12 +276,13 @@ def sell():
# 检查是否需要模拟交易
should_simulate, simulation_reason = should_use_simulation()
# 自动判断需要使用模拟交易
if should_simulate:
# 使用模拟交易
logger.info(f"使用模拟交易 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}")
# 获取模拟交易实例并执行卖出操作
sim_trader = get_trader(True)
sim_trader = get_sim_trader()
result = sim_trader.sell(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
@ -313,58 +290,26 @@ def sell():
# 模拟交易下,使用简单更新模式
StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'sell', amount)
return jsonify({"success": True, "data": result}), 200
return jsonify({"success": True, "data": result, "simulation": True}), 200
# 尝试实盘交易
logger.info(f"Executing sell order: code={code}, price={price}, amount={amount}, strategy_name={strategy_name}")
try:
result = execute_with_timeout(current_trader.sell, Config.TRADE_TIMEOUT, code, price, amount)
if result is None:
# 超时时使用模拟交易
logger.warning(f"Sell order timeout after {Config.TRADE_TIMEOUT} seconds, switching to simulation mode")
# 创建模拟交易实例并执行卖出操作
sim_trader = get_trader(True)
result = sim_trader.sell(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 超时情况下,使用简单更新模式
StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'sell', amount)
return jsonify({"success": True, "data": result}), 200
# 检查是否在交易时间内
trading_hours, hours_message = is_trading_hours()
if not trading_hours:
logger.warning(f"实盘交易失败 - {hours_message} - 代码: {code}, 价格: {price}, 数量: {amount}")
return jsonify({"success": False, "error": f"交易失败: {hours_message},非交易时间不能实盘交易"}), 400
# 如果指定了策略名称,记录到未完成委托
if strategy_name and 'order_id' in result and result['order_id'] != 'simulation':
order_id = result['order_id']
# 添加到未完成委托
StrategyPositionManager.add_pending_order(
current_trader,
order_id,
strategy_name,
code,
price,
amount,
'sell'
)
# 使用RealTraderManager执行实盘交易
logger.info(f"使用RealTraderManager执行卖出: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}")
rtm = get_real_trader_manager()
result = rtm.place_order(strategy_name, code, 'sell', amount, price)
if result.get('success'):
logger.info(f"RealTraderManager卖出成功: {result}")
return jsonify({"success": True, "data": result, "simulation": False}), 200
else:
logger.error(f"RealTraderManager卖出失败: {result.get('error')}")
return jsonify({"success": False, "error": result.get('error')}), 400
logger.info(f"Sell order result: {result}")
return jsonify({"success": True, "data": result}), 200
except Exception as e:
# 发生错误时使用模拟交易
logger.error(f"Sell order failed: {str(e)}, switching to simulation mode")
# 创建模拟交易实例并执行卖出操作
sim_trader = get_trader(True)
result = sim_trader.sell(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 错误情况下,使用简单更新模式
StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'sell', amount)
return jsonify({"success": True, "data": result}), 200
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
@ -377,15 +322,52 @@ def sell():
def cancel(entrust_no):
logger.info(f"Received cancel request for entrust_no={entrust_no}")
try:
current_trader = get_trader()
result = current_trader.cancel(entrust_no)
logger.info(f"Cancel result: {result}")
# 检查是否为模拟交易
should_simulate, simulation_reason = should_use_simulation()
# 更新未完成委托状态
StrategyPositionManager.update_pending_orders(current_trader)
if should_simulate:
# 模拟交易
sim_trader = get_sim_trader()
result = sim_trader.cancel(entrust_no)
logger.info(f"模拟交易撤单结果: {result}")
# 更新未完成委托状态
StrategyPositionManager.update_pending_orders(sim_trader)
return jsonify({"success": True, "data": result, "simulation": True}), 200
else:
# 尝试使用RealTraderManager撤单
try:
rtm = get_real_trader_manager()
for order in rtm.get_pending_orders():
if str(order['order_id']) == str(entrust_no):
# 找到对应订单使用RealTraderManager处理
real_trader = get_real_trader()
result = real_trader.cancel(entrust_no)
logger.info(f"实盘交易撤单结果: {result}")
# 更新订单状态
rtm.check_pending_orders()
return jsonify({"success": True, "data": result, "simulation": False}), 200
# 如果RealTraderManager中找不到则使用普通实盘
real_trader = get_real_trader()
result = real_trader.cancel(entrust_no)
logger.info(f"实盘交易撤单结果: {result}")
# 更新未完成委托状态
StrategyPositionManager.update_pending_orders(real_trader)
return jsonify({"success": True, "data": result, "simulation": False}), 200
except Exception as e:
logger.error(f"使用RealTraderManager撤单失败: {str(e)}")
# 回退到普通方式
real_trader = get_real_trader()
result = real_trader.cancel(entrust_no)
logger.info(f"实盘交易撤单结果: {result}")
# 更新未完成委托状态
StrategyPositionManager.update_pending_orders(real_trader)
return jsonify({"success": True, "data": result, "simulation": False}), 200
response = {"success": True, "data": result}
return jsonify(response), 200
except Exception as e:
logger.error(f"Error processing cancel request: {str(e)}")
abort(500, description="Internal server error")
@ -396,14 +378,22 @@ def get_balance():
"""Get the balance of the account."""
logger.info("Received balance request")
try:
current_trader = get_trader()
balance = current_trader.get_balance()
logger.info(f"Balance: {balance}")
response = {"success": True, "data": balance}
return jsonify(response), 200
# 判断当前交易模式
should_simulate, _ = should_use_simulation()
if should_simulate:
# 模拟交易
trader = get_sim_trader()
balance = trader.get_balance()
logger.info(f"模拟交易余额: {balance}")
return jsonify({"success": True, "data": balance, "simulation": True}), 200
else:
# 实盘交易
trader = get_real_trader()
balance = trader.get_balance()
logger.info(f"实盘交易余额: {balance}")
return jsonify({"success": True, "data": balance, "simulation": False}), 200
except Exception as e:
print(e)
logger.error(f"Error processing balance request: {str(e)}")
abort(500, description="Internal server error")
@ -412,18 +402,36 @@ def get_balance():
def get_positions():
"""Get the positions of the account."""
logger.info("Received positions request")
# 每次查询前更新未完成委托状态
current_trader = get_trader()
StrategyPositionManager.update_pending_orders(current_trader)
try:
# 获取查询参数中的策略名称
# 获取查询参数
strategy_name = request.args.get("strategy_name", "")
# 使用StrategyPositionManager获取持仓信息
result = StrategyPositionManager.get_strategy_positions(current_trader, strategy_name if strategy_name else None)
# 判断当前交易模式
should_simulate, _ = should_use_simulation()
return jsonify({"success": True, "data": result}), 200
# 选择相应的交易实例
trader = get_sim_trader() if should_simulate else get_real_trader()
# 更新未完成委托状态
StrategyPositionManager.update_pending_orders(trader)
# 如果实盘且指定要查询RealTraderManager中的目标持仓
if not should_simulate and request.args.get("target", "").lower() == "true":
rtm = get_real_trader_manager()
targets = rtm.get_strategy_targets()
# 如果指定了策略名称
if strategy_name:
strategy_target = targets.get(strategy_name, {})
return jsonify({"success": True, "data": {strategy_name: strategy_target}, "simulation": False}), 200
return jsonify({"success": True, "data": targets, "simulation": False}), 200
# 使用StrategyPositionManager获取持仓信息
result = StrategyPositionManager.get_strategy_positions(trader, strategy_name if strategy_name else None)
return jsonify({"success": True, "data": result, "simulation": should_simulate}), 200
except Exception as e:
logger.error(f"Error processing positions request: {str(e)}")
abort(500, description="Internal server error")
@ -434,12 +442,15 @@ def get_today_trades():
"""Get the today's trades of the account."""
logger.info("Received today trades request")
try:
current_trader = get_trader()
trades = current_trader.get_today_trades()
logger.info(f"Today trades: {trades}")
# 判断当前交易模式
should_simulate, _ = should_use_simulation()
# 选择相应的交易实例
trader = get_sim_trader() if should_simulate else get_real_trader()
trades = trader.get_today_trades()
logger.info(f"今日成交: {trades}")
response = {"success": True, "data": trades}
return jsonify(response), 200
return jsonify({"success": True, "data": trades, "simulation": should_simulate}), 200
except Exception as e:
logger.error(f"Error processing today trades request: {str(e)}")
abort(500, description="Internal server error")
@ -450,12 +461,15 @@ def get_today_entrust():
"""Get the today's entrust of the account."""
logger.info("Received today entrust request")
try:
current_trader = get_trader()
entrust = current_trader.get_today_entrust()
logger.info(f"Today entrust: {entrust}")
# 判断当前交易模式
should_simulate, _ = should_use_simulation()
# 选择相应的交易实例
trader = get_sim_trader() if should_simulate else get_real_trader()
entrust = trader.get_today_entrust()
logger.info(f"今日委托: {entrust}")
response = {"success": True, "data": entrust}
return jsonify(response), 200
return jsonify({"success": True, "data": entrust, "simulation": should_simulate}), 200
except Exception as e:
logger.error(f"Error processing today entrust request: {str(e)}")
abort(500, description="Internal server error")
@ -466,22 +480,34 @@ def clear_strategy(strategy_name):
"""清除指定策略的持仓管理数据"""
logger.info(f"接收到清除策略持仓请求: {strategy_name}")
try:
current_trader = get_trader()
# 判断当前交易模式
should_simulate, _ = should_use_simulation()
# 如果是实盘模式
if not should_simulate:
# 先尝试清除RealTraderManager中的策略目标
rtm = get_real_trader_manager()
if strategy_name in rtm.strategy_targets:
del rtm.strategy_targets[strategy_name]
logger.info(f"已清除RealTraderManager中的策略目标: {strategy_name}")
# 获取相应的交易实例
trader = get_sim_trader() if should_simulate else get_real_trader()
# 如果是模拟交易实例,则重置模拟交易实例
if isinstance(current_trader, SimulationTrader):
if should_simulate and isinstance(trader, SimulationTrader):
global _sim_trader_instance
if _sim_trader_instance is not None:
logger.info("重置模拟交易实例")
# 创建一个新的模拟交易实例,替换原有实例
_sim_trader_instance = SimulationTrader()
current_trader = _sim_trader_instance
trader = _sim_trader_instance
# 使用StrategyPositionManager清除策略
success, message = StrategyPositionManager.clear_strategy(current_trader, strategy_name)
success, message = StrategyPositionManager.clear_strategy(trader, strategy_name)
if success:
return jsonify({"success": True, "message": message}), 200
return jsonify({"success": True, "message": message, "simulation": should_simulate}), 200
else:
abort(400, description=message)
@ -500,6 +526,53 @@ def execute_with_timeout(func, timeout, *args, **kwargs):
return None
# 添加新的API端点查询订单状态
@app.route("/yu/order_status", methods=["GET"])
def get_order_status():
"""获取订单状态"""
logger.info("Received order status request")
# 判断当前交易模式
should_simulate, _ = should_use_simulation()
if not should_simulate and Config.USE_REAL_TRADER_MANAGER:
# 实盘 + RealTraderManager模式
rtm = get_real_trader_manager()
pending_orders = rtm.get_pending_orders()
return jsonify({"success": True, "data": pending_orders, "simulation": False}), 200
else:
# 模拟交易或实盘但未使用RealTraderManager
trader = get_sim_trader() if should_simulate else get_real_trader()
entrusts = trader.get_today_entrust()
return jsonify({"success": True, "data": entrusts, "simulation": should_simulate}), 200
# 添加新的API端点查询策略目标持仓
@app.route("/yu/strategy_targets", methods=["GET"])
def get_strategy_targets():
"""获取策略目标持仓"""
logger.info("Received strategy targets request")
# 获取查询参数
strategy_name = request.args.get("strategy_name")
# 检查是否是实盘模式且使用RealTraderManager
should_simulate, _ = should_use_simulation()
if not should_simulate and Config.USE_REAL_TRADER_MANAGER:
rtm = get_real_trader_manager()
targets = rtm.get_strategy_targets()
# 如果指定了策略名称,则只返回该策略的目标持仓
if strategy_name:
strategy_target = targets.get(strategy_name, {})
return jsonify({"success": True, "data": {strategy_name: strategy_target}, "simulation": False}), 200
return jsonify({"success": True, "data": targets, "simulation": False}), 200
else:
return jsonify({"success": False, "error": "无法获取目标持仓非实盘模式或RealTraderManager未启用"}), 400
if __name__ == "__main__":
logger.info(f"Server starting on {Config.HOST}:{Config.PORT}")
app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT)