This commit is contained in:
zhiyong 2025-04-30 12:34:24 +08:00
parent bfe509a22b
commit a6772bea75
2 changed files with 553 additions and 128 deletions

151
src/simulation_trader.py Normal file
View File

@ -0,0 +1,151 @@
import logging
from logging.handlers import RotatingFileHandler
import os
import time
class SimulationTrader:
def __init__(self, logger=None):
self.logger = logger or self._setup_default_logger()
# 添加模拟持仓字典,用于追踪模拟交易的持仓
self.sim_positions = {}
# 模拟资金账户信息
self.sim_balance = {"cash": 1000000.00, "frozen": 0.00, "total": 1000000.00}
def _setup_default_logger(self):
"""设置默认日志记录器"""
sim_logger = logging.getLogger('trade_simulation')
sim_logger.setLevel(logging.INFO)
# 确保没有重复的处理器
for handler in sim_logger.handlers[:]:
sim_logger.removeHandler(handler)
# 文件处理器
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
file_handler = RotatingFileHandler(
os.path.join(log_dir, 'trade_simulation.log'),
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
sim_logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
sim_logger.addHandler(console_handler)
return sim_logger
def login(self):
self.logger.info("模拟交易:登录成功")
return True
def logout(self):
self.logger.info("模拟交易:登出成功")
return True
def buy(self, code, price, amount):
message = f"模拟买入 - 代码: {code}, 价格: {price}, 数量: {amount}"
self.logger.info(message)
# 更新模拟持仓
if code not in self.sim_positions:
self.sim_positions[code] = {
"stock_code": code,
"volume": 0,
"can_use_volume": 0,
"frozen_volume": 0,
"avg_price": 0.0,
"market_value": 0.0
}
# 计算新的平均成本
current_cost = self.sim_positions[code]["avg_price"] * self.sim_positions[code]["volume"]
new_cost = price * amount
total_volume = self.sim_positions[code]["volume"] + amount
# 更新持仓信息
self.sim_positions[code]["volume"] += amount
self.sim_positions[code]["can_use_volume"] += amount
self.sim_positions[code]["avg_price"] = (current_cost + new_cost) / total_volume if total_volume > 0 else 0
self.sim_positions[code]["market_value"] = self.sim_positions[code]["volume"] * price
# 更新资金
self.sim_balance["cash"] -= price * amount
self.sim_balance["total"] = self.sim_balance["cash"] + sum(pos["market_value"] for pos in self.sim_positions.values())
return {"order_id": "simulation", "message": message}
def sell(self, code, price, amount):
message = f"模拟卖出 - 代码: {code}, 价格: {price}, 数量: {amount}"
self.logger.info(message)
# 更新模拟持仓
if code in self.sim_positions:
# 确保可用数量足够
if self.sim_positions[code]["can_use_volume"] >= amount:
# 更新持仓信息
self.sim_positions[code]["volume"] -= amount
self.sim_positions[code]["can_use_volume"] -= amount
self.sim_positions[code]["market_value"] = self.sim_positions[code]["volume"] * price
# 如果持仓为0删除该股票
if self.sim_positions[code]["volume"] <= 0:
del self.sim_positions[code]
# 更新资金
self.sim_balance["cash"] += price * amount
self.sim_balance["total"] = self.sim_balance["cash"] + sum(pos["market_value"] for pos in self.sim_positions.values())
else:
message = f"模拟卖出失败 - 代码: {code}, 可用数量不足"
self.logger.warning(message)
else:
message = f"模拟卖出失败 - 代码: {code}, 无持仓"
self.logger.warning(message)
return {"order_id": "simulation", "message": message}
def cancel(self, entrust_no):
message = f"模拟撤单 - 委托号: {entrust_no}"
self.logger.info(message)
return {"order_id": "simulation", "message": message}
def get_balance(self):
message = "模拟交易:查询余额"
self.logger.info(message)
return self.sim_balance
def get_positions(self):
message = "模拟交易:查询持仓"
self.logger.info(message)
# 返回与XtTrader格式一致的持仓数据
return [
{
"account_id": "simulation",
"stock_code": code,
"volume": pos["volume"],
"can_use_volume": pos["can_use_volume"],
"open_price": pos["avg_price"],
"avg_price": pos["avg_price"],
"market_value": pos["market_value"],
"frozen_volume": pos["frozen_volume"],
"on_road_volume": 0
} for code, pos in self.sim_positions.items()
]
def get_today_trades(self):
message = "模拟交易:查询今日成交"
self.logger.info(message)
return []
def get_today_entrust(self):
message = "模拟交易:查询今日委托"
self.logger.info(message)
return []
def is_trading_time(self):
return True

View File

@ -11,36 +11,61 @@ import os
from logging.handlers import RotatingFileHandler
import concurrent.futures
from concurrent.futures import TimeoutError
import json
import atexit
from simulation_trader import SimulationTrader
import datetime
# 策略仓位管理
strategy_positions = {} # 存储各策略持仓
strategy_trades = {} # 存储各策略交易记录
pending_orders = {} # 存储未完成委托
# 配置模拟交易日志
def setup_simulation_logger():
sim_logger = logging.getLogger('trade_simulation')
sim_logger.setLevel(logging.INFO)
# 获取交易实例
def get_trader(use_sim_trader=False):
"""获取交易实例
# 确保没有重复的处理器
for handler in sim_logger.handlers[:]:
sim_logger.removeHandler(handler)
Args:
use_sim_trader (bool): 是否强制使用模拟交易True表示必定返回模拟交易实例
Returns:
返回交易实例根据参数和配置决定是模拟交易还是实盘交易
"""
# 如果强制使用模拟交易,直接返回
if use_sim_trader:
return SimulationTrader()
# 文件处理器
log_dir = Config.LOG_DIR
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 如果配置为仅模拟交易,直接返回模拟交易实例
if Config.SIMULATION_ONLY:
return SimulationTrader()
file_handler = RotatingFileHandler(
os.path.join(log_dir, 'trade_simulation.log'),
maxBytes=Config.LOG_MAX_BYTES,
backupCount=Config.LOG_BACKUP_COUNT
)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
sim_logger.addHandler(file_handler)
# 判断当前是否为交易时间
now = datetime.datetime.now()
current_time = now.time()
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
sim_logger.addHandler(console_handler)
# 是否在交易时间段内9:30-11:30, 13:00-15:00
morning_start = datetime.time(9, 30)
morning_end = datetime.time(11, 30)
afternoon_start = datetime.time(13, 0)
afternoon_end = datetime.time(15, 0)
return sim_logger
is_trading_hour = (morning_start <= current_time <= morning_end) or (afternoon_start <= current_time <= afternoon_end)
# 尝试导入chinese_calendar判断是否为交易日
try:
from chinese_calendar import is_workday, is_holiday
is_trading_day = is_workday(now) and not is_holiday(now)
except ImportError:
# 如果无法导入chinese_calendar则简单地用工作日判断
is_trading_day = now.weekday() < 5 # 0-4 为周一至周五
# 如果不是交易日或不在交易时间内,返回模拟交易实例
if not is_trading_day or not is_trading_hour:
logger.info(f"当前非交易时段 - 日期: {now.date()}, 时间: {current_time}, 使用模拟交易")
return SimulationTrader()
# 否则返回真实交易实例
return XtTrader()
# 配置日志
def setup_logger():
@ -79,17 +104,176 @@ def setup_logger():
return logger
logger = setup_logger()
sim_logger = setup_simulation_logger()
def run_daily(time_str, job_func):
schedule.every().monday.at(time_str).do(job_func)
schedule.every().tuesday.at(time_str).do(job_func)
schedule.every().wednesday.at(time_str).do(job_func)
schedule.every().thursday.at(time_str).do(job_func)
schedule.every().friday.at(time_str).do(job_func)
"""设置每天在指定时间运行的任务
Args:
time_str: 运行时间格式为"HH:MM"
job_func: 要运行的函数
"""
# 不再区分周一到周五,而是每天执行
# 交易日判断逻辑已移到get_trader函数中
schedule.every().day.at(time_str).do(job_func)
# 策略持仓管理辅助函数
def update_strategy_position(strategy_name, code, direction, amount):
"""更新策略持仓
Args:
strategy_name: 策略名称
code: 股票代码
direction: 'buy''sell'
amount: 交易数量
"""
if not strategy_name:
return
# 确保策略在字典中
if strategy_name not in strategy_positions:
strategy_positions[strategy_name] = {}
try:
# 获取交易实例持仓情况(不论真实还是模拟)
actual_positions = get_trader().get_positions()
code_position = next((pos for pos in actual_positions if pos.get('stock_code') == code), None)
# 记录实际持仓总量
actual_total = code_position.get('volume', 0) if code_position else 0
actual_can_use = code_position.get('can_use_volume', 0) if code_position else 0
logger.info(f"实际持仓 - 代码: {code}, 总量: {actual_total}, 可用: {actual_can_use}")
# 如果股票代码在持仓字典中不存在,初始化它
if code not in strategy_positions[strategy_name]:
strategy_positions[strategy_name][code] = {
'total_amount': 0,
'closeable_amount': 0
}
# 直接使用交易实例返回的持仓数据更新策略持仓
strategy_positions[strategy_name][code]['total_amount'] = actual_total
strategy_positions[strategy_name][code]['closeable_amount'] = actual_can_use
logger.info(f"更新策略持仓 - 策略: {strategy_name}, 代码: {code}, 总量: {strategy_positions[strategy_name][code]['total_amount']}, 可用: {strategy_positions[strategy_name][code]['closeable_amount']}")
except Exception as e:
logger.error(f"获取实际持仓失败: {str(e)}")
# 异常情况下只记录错误,不尝试更新持仓
# 移除total_amount为0的持仓
if code in strategy_positions[strategy_name] and strategy_positions[strategy_name][code]['total_amount'] <= 0:
del strategy_positions[strategy_name][code]
def update_pending_orders():
"""更新未完成委托状态"""
try:
# 获取今日委托
current_trader = get_trader()
today_entrusts = current_trader.get_today_entrust()
# 更新委托状态
for order_id, order_info in list(pending_orders.items()):
entrust = next((e for e in today_entrusts if e.get('委托编号') == order_id), None)
if entrust:
if entrust.get('状态') in ['已成', '部分成交']:
# 成交量计算
traded_amount = int(entrust.get('成交数量', 0))
# 更新策略持仓
update_strategy_position(
order_info['strategy_name'],
order_info['code'],
order_info['direction'],
traded_amount
)
# 如果完全成交,从待处理列表中移除
if entrust.get('状态') == '已成':
del pending_orders[order_id]
# 如果已撤单、废单等终态,也从待处理列表中移除
elif entrust.get('状态') in ['已撤', '废单']:
del pending_orders[order_id]
except Exception as e:
logger.error(f"更新未完成委托状态失败: {str(e)}")
def add_pending_order(order_id, strategy_name, code, price, amount, direction):
"""添加未完成委托
Args:
order_id: 委托编号
strategy_name: 策略名称
code: 股票代码
price: 委托价格
amount: 委托数量
direction: 交易方向'buy''sell'
"""
if not order_id or order_id == 'simulation':
return
# 添加到未完成委托列表
pending_orders[order_id] = {
'strategy_name': strategy_name,
'code': code,
'price': price,
'amount': amount,
'direction': direction,
'created_time': time.time()
}
# 同时记录到交易历史
if strategy_name:
if strategy_name not in strategy_trades:
strategy_trades[strategy_name] = []
strategy_trades[strategy_name].append({
'time': time.strftime('%Y-%m-%d %H:%M:%S'),
'type': direction,
'code': code,
'price': price,
'amount': amount,
'order_id': order_id,
'status': 'pending'
})
logger.info(f"添加未完成委托: {order_id}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}")
def clean_timeout_orders():
"""清理超时委托"""
current_time = time.time()
for order_id, order_info in list(pending_orders.items()):
# 超过24小时的委托视为超时
if current_time - order_info['created_time'] > 24 * 60 * 60:
del pending_orders[order_id]
def load_strategy_data():
"""加载策略数据"""
global strategy_positions, strategy_trades, pending_orders
try:
if os.path.exists('strategy_data.json'):
with open('strategy_data.json', 'r') as f:
data = json.load(f)
strategy_positions = data.get('positions', {})
strategy_trades = data.get('trades', {})
pending_orders = data.get('pending_orders', {})
except Exception as e:
logger.error(f"加载策略数据失败: {str(e)}")
def save_strategy_data():
"""保存策略数据"""
try:
with open('strategy_data.json', 'w') as f:
json.dump({
'positions': strategy_positions,
'trades': strategy_trades,
'pending_orders': pending_orders
}, f)
except Exception as e:
logger.error(f"保存策略数据失败: {str(e)}")
def run_pending_tasks():
while True:
try:
@ -102,68 +286,27 @@ def run_pending_tasks():
# Run the task scheduler in a new thread
threading.Thread(target=run_pending_tasks).start()
# 创建模拟交易类
class SimulationTrader:
def __init__(self):
self.logger = sim_logger
def login(self):
self.logger.info("模拟交易:登录成功")
return True
def logout(self):
self.logger.info("模拟交易:登出成功")
return True
def buy(self, code, price, amount):
message = f"模拟买入 - 代码: {code}, 价格: {price}, 数量: {amount}"
self.logger.info(message)
return {"order_id": "simulation", "message": message}
def sell(self, code, price, amount):
message = f"模拟卖出 - 代码: {code}, 价格: {price}, 数量: {amount}"
self.logger.info(message)
return {"order_id": "simulation", "message": message}
def cancel(self, entrust_no):
message = f"模拟撤单 - 委托号: {entrust_no}"
self.logger.info(message)
return {"order_id": "simulation", "message": message}
def get_balance(self):
message = "模拟交易:查询余额"
self.logger.info(message)
return {"cash": 1000000.00, "frozen": 0.00, "total": 1000000.00}
def get_positions(self):
message = "模拟交易:查询持仓"
self.logger.info(message)
return []
def get_today_trades(self):
message = "模拟交易:查询今日成交"
self.logger.info(message)
return []
def get_today_entrust(self):
message = "模拟交易:查询今日委托"
self.logger.info(message)
return []
def is_trading_time(self):
return True
# 根据配置选择交易实例
trader = SimulationTrader() if Config.SIMULATION_ONLY else XtTrader()
trader.login()
# 初始化交易环境
get_trader().login()
# 添加请求频率限制
app = Flask(__name__)
# 添加策略数据相关的定期任务
schedule.every(5).minutes.do(update_pending_orders) # 每5分钟更新未完成委托状态
schedule.every().day.at("00:01").do(clean_timeout_orders) # 每天清理超时委托
schedule.every().day.at("15:30").do(save_strategy_data) # 每天收盘后保存策略数据
# 程序启动时加载策略数据
load_strategy_data()
# 程序退出时保存策略数据
atexit.register(save_strategy_data)
# 使用配置文件中的时间
run_daily(Config.MARKET_OPEN_TIME, lambda: trader.login())
run_daily(Config.MARKET_ACTIVE_TIME, lambda: trader.get_balance())
run_daily(Config.MARKET_CLOSE_TIME, lambda: trader.logout())
run_daily(Config.MARKET_OPEN_TIME, lambda: get_trader().login())
run_daily(Config.MARKET_ACTIVE_TIME, lambda: get_trader().get_balance())
run_daily(Config.MARKET_CLOSE_TIME, lambda: get_trader().logout())
@app.route("/yu/healthcheck", methods=["GET"])
@ -179,21 +322,19 @@ def should_use_simulation():
should_simulate: 是否应该使用模拟交易
simulation_reason: 使用模拟交易的原因
"""
should_simulate = True
simulation_reason = "未知原因"
# 直接使用get_trader()返回的实例类型判断
trader = get_trader()
if not Config.SIMULATION_ONLY:
try:
if trader.is_trading_time():
should_simulate = False
else:
simulation_reason = "当前为非交易时段"
except Exception as e:
simulation_reason = f"检查交易时间发生错误: {str(e)}"
else:
simulation_reason = "配置为仅模拟交易"
return should_simulate, simulation_reason
if isinstance(trader, SimulationTrader):
# 获取原因
if Config.SIMULATION_ONLY:
return True, "配置为仅模拟交易"
else:
now = datetime.datetime.now()
return True, f"当前非交易时段 - {now.strftime('%Y-%m-%d %H:%M:%S')}"
# 如果是实盘交易实例
return False, ""
@app.route("/yu/buy", methods=["POST"])
@ -205,6 +346,7 @@ def buy():
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空
try:
if not all([code, price_str, amount_str]):
@ -221,28 +363,73 @@ def buy():
if should_simulate:
# 使用模拟交易
sim_message = f"模拟买入 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}"
sim_logger.info(sim_message)
return jsonify({"success": True, "data": {"order_id": "simulation", "message": sim_message}}), 200
logger.info(f"使用模拟交易 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}")
# 获取模拟交易实例并执行买入操作
sim_trader = get_trader(True)
result = sim_trader.buy(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 模拟交易立即生效,更新策略持仓
update_strategy_position(strategy_name, code, 'buy', amount)
return jsonify({"success": True, "data": result}), 200
# 尝试实盘交易
logger.info(f"Executing buy order: code={code}, price={price}, amount={amount}")
logger.info(f"Executing buy order: code={code}, price={price}, amount={amount}, strategy_name={strategy_name}")
try:
result = execute_with_timeout(trader.buy, Config.TRADE_TIMEOUT, code, price, amount)
current_trader = get_trader()
result = execute_with_timeout(current_trader.buy, Config.TRADE_TIMEOUT, code, price, amount)
if result is None:
# 超时时使用模拟交易
sim_message = f"模拟买入 - 交易超时 - 代码: {code}, 价格: {price}, 数量: {amount}"
sim_logger.info(sim_message)
logger.warning(f"Buy order timeout after {Config.TRADE_TIMEOUT} seconds, switching to simulation mode")
return jsonify({"success": True, "data": {"order_id": "simulation", "message": sim_message}}), 200
# 创建模拟交易实例并执行买入操作
sim_trader = get_trader(True)
result = sim_trader.buy(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 超时情况下,使用模拟交易,立即更新策略持仓
update_strategy_position(strategy_name, code, 'buy', amount)
return jsonify({"success": True, "data": result}), 200
# 如果指定了策略名称且是真实交易
if strategy_name and 'order_id' in result and result['order_id'] != 'simulation':
order_id = result['order_id']
# 添加到未完成委托
add_pending_order(
order_id,
strategy_name,
code,
price,
amount,
'buy'
)
# 注意不在这里调用update_strategy_position
# 持仓更新将由update_pending_orders函数处理
# 这避免了持仓更新的冗余操作
logger.info(f"Buy order result: {result}")
return jsonify({"success": True, "data": result}), 200
except Exception as e:
# 发生错误时使用模拟交易
sim_message = f"模拟买入 - 交易失败({str(e)}) - 代码: {code}, 价格: {price}, 数量: {amount}"
sim_logger.info(sim_message)
logger.error(f"Buy order failed: {str(e)}, switching to simulation mode")
return jsonify({"success": True, "data": {"order_id": "simulation", "message": sim_message}}), 200
# 创建模拟交易实例并执行买入操作
sim_trader = get_trader(True)
result = sim_trader.buy(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 错误情况下,使用模拟交易,立即更新策略持仓
update_strategy_position(strategy_name, code, 'buy', amount)
return jsonify({"success": True, "data": result}), 200
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
@ -260,6 +447,7 @@ def sell():
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空
try:
if not all([code, price_str, amount_str]):
@ -276,28 +464,69 @@ def sell():
if should_simulate:
# 使用模拟交易
sim_message = f"模拟卖出 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}"
sim_logger.info(sim_message)
return jsonify({"success": True, "data": {"order_id": "simulation", "message": sim_message}}), 200
logger.info(f"使用模拟交易 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}")
# 获取模拟交易实例并执行卖出操作
sim_trader = get_trader(True)
result = sim_trader.sell(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 模拟交易下,使用简单更新模式
update_strategy_position(strategy_name, code, 'sell', amount)
return jsonify({"success": True, "data": result}), 200
# 尝试实盘交易
logger.info(f"Executing sell order: code={code}, price={price}, amount={amount}")
logger.info(f"Executing sell order: code={code}, price={price}, amount={amount}, strategy_name={strategy_name}")
try:
result = execute_with_timeout(trader.sell, Config.TRADE_TIMEOUT, code, price, amount)
current_trader = get_trader()
result = execute_with_timeout(current_trader.sell, Config.TRADE_TIMEOUT, code, price, amount)
if result is None:
# 超时时使用模拟交易
sim_message = f"模拟卖出 - 交易超时 - 代码: {code}, 价格: {price}, 数量: {amount}"
sim_logger.info(sim_message)
logger.warning(f"Sell order timeout after {Config.TRADE_TIMEOUT} seconds, switching to simulation mode")
return jsonify({"success": True, "data": {"order_id": "simulation", "message": sim_message}}), 200
# 创建模拟交易实例并执行卖出操作
sim_trader = get_trader(True)
result = sim_trader.sell(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 超时情况下,使用简单更新模式
update_strategy_position(strategy_name, code, 'sell', amount)
return jsonify({"success": True, "data": result}), 200
# 如果指定了策略名称,记录到未完成委托
if strategy_name and 'order_id' in result and result['order_id'] != 'simulation':
order_id = result['order_id']
# 添加到未完成委托
add_pending_order(
order_id,
strategy_name,
code,
price,
amount,
'sell'
)
logger.info(f"Sell order result: {result}")
return jsonify({"success": True, "data": result}), 200
except Exception as e:
# 发生错误时使用模拟交易
sim_message = f"模拟卖出 - 交易失败({str(e)}) - 代码: {code}, 价格: {price}, 数量: {amount}"
sim_logger.info(sim_message)
logger.error(f"Sell order failed: {str(e)}, switching to simulation mode")
return jsonify({"success": True, "data": {"order_id": "simulation", "message": sim_message}}), 200
# 创建模拟交易实例并执行卖出操作
sim_trader = get_trader(True)
result = sim_trader.sell(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 错误情况下,使用简单更新模式
update_strategy_position(strategy_name, code, 'sell', amount)
return jsonify({"success": True, "data": result}), 200
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
@ -310,7 +539,7 @@ def sell():
def cancel(entrust_no):
logger.info(f"Received cancel request for entrust_no={entrust_no}")
try:
result = trader.cancel(entrust_no)
result = get_trader().cancel(entrust_no)
logger.info(f"Cancel result: {result}")
response = {"success": True, "data": result}
@ -325,7 +554,7 @@ def get_balance():
"""Get the balance of the account."""
logger.info("Received balance request")
try:
balance = trader.get_balance()
balance = get_trader().get_balance()
logger.info(f"Balance: {balance}")
response = {"success": True, "data": balance}
@ -341,9 +570,54 @@ def get_positions():
"""Get the positions of the account."""
logger.info("Received positions request")
try:
positions = trader.get_positions()
# 获取查询参数中的策略名称
strategy_name = request.args.get("strategy_name", "")
# 如果指定了策略名称,返回该策略的持仓
if strategy_name:
# 更新未完成委托状态
update_pending_orders()
# 获取真实账户持仓,用于计算可交易量
current_trader = get_trader()
real_positions = current_trader.get_positions()
real_positions_map = {}
for pos in real_positions:
# 使用xt_trader返回的字段名
if 'stock_code' in pos and 'can_use_volume' in pos:
real_positions_map[pos['stock_code']] = pos
# 如果该策略没有记录,返回空列表
if strategy_name not in strategy_positions:
logger.info(f"Strategy {strategy_name} has no positions")
return jsonify({"success": True, "data": []}), 200
# 合并策略持仓和真实持仓的可交易量
result = []
for code, pos_info in strategy_positions[strategy_name].items():
# 忽略total_amount为0的持仓
if pos_info['total_amount'] <= 0:
continue
# 使用真实账户的可交易量作为策略的可交易量上限
real_pos = real_positions_map.get(code, {})
closeable = min(pos_info['total_amount'], real_pos.get('can_use_volume', 0))
result.append({
code: {
'total_amount': pos_info['total_amount'],
'closeable_amount': closeable
}
})
logger.info(f"Strategy {strategy_name} positions: {result}")
return jsonify({"success": True, "data": result}), 200
# 否则返回原始持仓
current_trader = get_trader()
positions = current_trader.get_positions()
logger.info(f"Positions: {positions}")
response = {"success": True, "data": positions}
return jsonify(response), 200
except Exception as e:
@ -356,7 +630,7 @@ def get_today_trades():
"""Get the today's trades of the account."""
logger.info("Received today trades request")
try:
trades = trader.get_today_trades()
trades = get_trader().get_today_trades()
logger.info(f"Today trades: {trades}")
response = {"success": True, "data": trades}
@ -371,7 +645,7 @@ def get_today_entrust():
"""Get the today's entrust of the account."""
logger.info("Received today entrust request")
try:
entrust = trader.get_today_entrust()
entrust = get_trader().get_today_entrust()
logger.info(f"Today entrust: {entrust}")
response = {"success": True, "data": entrust}