real_trader/src/core/position_manager.py
2025-05-22 22:30:08 +08:00

381 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import threading
from datetime import datetime
from logger_config import get_logger
from config import Config
from core.trade_constants import (
ORDER_DIRECTION_BUY,
ORDER_TYPE_LIMIT,
ORDER_TYPE_MARKET,
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
ORDER_STATUS_FAILED,
)
from core.local_position import LocalPosition
from core.local_order import LocalOrder
from utils.t0_stocks import is_t0
from typing import Dict
# 获取日志记录器
logger = get_logger("position_manager")
class PositionManager:
"""实盘策略持仓管理器,负责管理不同策略在实盘环境下的持仓情况"""
def __init__(self, strategy_name="default_strategy"):
"""初始化实盘持仓管理器"""
super().__init__()
self.strategy_name = strategy_name
# 创建线程锁
self._lock = threading.Lock()
# 策略持仓信息
self.positions: Dict[str, LocalPosition] = {} # {股票代码 -> LocalPosition}
# 待处理订单信息
self.pending_orders = {} # {order_id -> LocalOrder}
# 所有订单
self.all_orders = [] # [LocalOrder]
self.data_path = os.path.join(Config.DATA_DIR, self.strategy_name + "_positions.json")
# 确保数据目录存在
os.makedirs(os.path.dirname(self.data_path), exist_ok=True)
# 如果文件不存在,创建一个空文件
if not os.path.exists(self.data_path):
with open(self.data_path, 'w') as f:
f.write('{}')
else:
self.load_data()
def update_position(self, code, direction, amount):
with self._lock:
# 如果股票代码在持仓字典中不存在,初始化它
if code not in self.positions:
self.positions[code] = LocalPosition(code, 0, 0)
# 根据方向更新持仓
position = self.positions[code]
is_t0_stock = is_t0(code)
if direction == ORDER_DIRECTION_BUY:
position.total_amount += amount
if is_t0_stock:
position.closeable_amount += amount
else: # sell
position.total_amount -= amount
position.closeable_amount -= amount
logger.info(
f"更新策略持仓 - 策略: {self.strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, "
f"更新后总量: {position.total_amount}, "
f"可用: {position.closeable_amount}"
)
# 移除total_amount为0的持仓
if code in self.positions and self.positions[code].total_amount <= 0:
del self.positions[code]
logger.info(f"移除空持仓 - 策略: {self.strategy_name}, 代码: {code}")
# 在锁之外异步保存数据
threading.Thread(target=self.save_data).start()
def add_pending_order(
self, order_id, code, price, amount, direction, order_type=ORDER_TYPE_LIMIT
):
if not self.strategy_name:
return
with self._lock:
order = LocalOrder(order_id, code, price, amount, direction, order_type, created_time=datetime.now())
self.pending_orders[order_id] = order
self.all_orders.append(order)
logger.info(
f"添加订单 - ID: {order_id}, 策略: {self.strategy_name}, 代码: {code}, 方向: {direction}, "
f"数量: {amount}, 价格: {price}, 类型: {order_type}"
)
# 在锁之外异步保存数据
threading.Thread(target=self.save_data).start()
def update_order_status(self, order_id, filled, new_status):
with self._lock:
if order_id in self.pending_orders:
_order = self.pending_orders[order_id]
# 记录之前的状态用于日志
previous_status = _order.status
# 更新状态
_order.status = new_status
_order.filled = filled
# 记录状态变化日志
if previous_status != new_status:
code = self.pending_orders[order_id].code
logger.info(
f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}"
)
# 如果订单已完成,移除它
if new_status in [
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
ORDER_STATUS_FAILED,
]:
# 保留订单信息以供参考,但标记为已完成
del self.pending_orders[order_id]
logger.info(f"订单已删除 - ID: {order_id}, 状态: {new_status}")
has_changes = True
else:
has_changes = False
# 如果有修改,在锁外异步保存数据
if has_changes:
threading.Thread(target=self.save_data).start()
return True
return False
def get_pending_order(self, order_id) -> LocalOrder:
"""获取未完成委托信息
Args:
order_id: 订单ID
Returns:
dict: 委托信息如果不存在返回None
"""
with self._lock:
order = self.pending_orders.get(order_id)
# 如果找到订单,返回它的副本而不是直接引用
return order
def get_pending_orders(self):
"""获取所有未完成委托
Returns:
dict: 订单ID到委托信息的映射
"""
# 创建临时变量存储锁内读取的数据
orders_copy = {}
with self._lock:
# 快速获取数据并立即释放锁
for order_id, order in self.pending_orders.items():
orders_copy[order_id] = order
# 锁外创建副本,避免外部修改影响内部数据
result = {}
for order_id, order in orders_copy.items():
result[order_id] = order
return result
def get_positions(self) -> Dict[str, LocalPosition]:
"""获取策略持仓
Returns:
Dict[str, LocalPosition]:
key为股票代码strvalue为LocalPosition对象若无持仓则返回空字典。
"""
# 创建临时变量存储锁内读取的数据
positions_copy = {}
with self._lock:
# 快速获取数据并立即释放锁
for code, pos in self.positions.items():
positions_copy[code] = pos
# 锁外创建副本,避免外部修改影响内部数据
result = {}
for code, pos in positions_copy.items():
result[code] = LocalPosition(pos.code, pos.total_amount, pos.closeable_amount)
return result
def save_data(self):
"""保存策略数据"""
# 在锁内准备要保存的数据
with self._lock:
try:
# 将对象转换为可序列化的字典
positions_dict = {}
for code, pos in self.positions.items():
positions_dict[code] = {
"code": pos.code,
"total_amount": pos.total_amount,
"closeable_amount": pos.closeable_amount,
}
pending_orders_dict = {}
for order_id, order in self.pending_orders.items():
pending_orders_dict[order_id] = {
"order_id": order.order_id,
"code": order.code,
"price": order.price,
"amount": order.amount,
"filled": order.filled,
"direction": order.direction,
"order_type": order.order_type,
"status": order.status,
"created_time": (
order.created_time.isoformat()
if hasattr(order, "created_time")
else None
),
}
all_orders_array = []
for order in self.all_orders:
all_orders_array.append({
"order_id": order.order_id,
"code": order.code,
"price": order.price,
"amount": order.amount,
"direction": order.direction,
"order_type": order.order_type,
"created_time": (
order.created_time.isoformat()
if hasattr(order, "created_time")
else None
),
})
# 准备好要保存的数据
data_to_save = {
"positions": positions_dict,
"pending_orders": pending_orders_dict,
"all_orders": all_orders_array,
}
except Exception as e:
logger.error(f"准备保存数据失败: {str(e)}")
return
# 锁外执行文件I/O操作
try:
with open(self.data_path, "w") as f:
json.dump(data_to_save, f)
logger.debug("成功保存实盘策略数据")
except Exception as e:
logger.error(f"保存实盘策略数据失败: {str(e)}")
def load_data(self):
"""加载策略数据"""
try:
# 文件I/O操作在锁外执行
if os.path.exists(self.data_path):
from datetime import datetime
with open(self.data_path, "r") as f:
data = json.load(f)
# 在锁内更新内存中的数据结构
with self._lock:
# 还原positions对象
self.positions = {}
positions_dict = data.get("positions", {})
for code, pos_data in positions_dict.items():
self.positions[code] = LocalPosition(
pos_data["code"],
int(pos_data["total_amount"]),
int(pos_data["closeable_amount"]),
)
# 还原pending_orders对象
self.pending_orders = {}
pending_orders_dict = data.get("pending_orders", {})
for order_id, order_data in pending_orders_dict.items():
order = LocalOrder(
order_data["order_id"],
order_data["code"],
float(order_data["price"]),
int(order_data["amount"]),
order_data["direction"],
order_data["order_type"],
int(order_data["filled"]),
order_data["status"],
)
if order_data.get("created_time"):
try:
order.created_time = datetime.fromisoformat(
order_data["created_time"]
)
except (ValueError, TypeError):
order.created_time = datetime.now()
self.pending_orders[order_id] = order
# 还原all_orders对象
self.all_orders = []
all_orders_array = data.get("all_orders", [])
for order_data in all_orders_array:
order = LocalOrder(
order_data["order_id"],
order_data["code"],
float(order_data["price"]),
int(order_data["amount"]),
order_data["direction"],
order_data["order_type"],
created_time=datetime.fromisoformat(
order_data["created_time"]
) if order_data.get("created_time") else datetime.now()
)
self.all_orders.append(order)
logger.info("已加载实盘策略数据")
logger.info(f"本策略: {self.strategy_name} 持仓股票个数: {len(self.positions)} 未完成委托数: {len(self.pending_orders)} 历史订单数: {len(self.all_orders)}")
else:
logger.info(f"实盘策略数据文件不存在: {self.data_path}")
with self._lock:
self.positions = {}
self.pending_orders = {}
self.all_orders = []
except Exception as e:
logger.error(f"加载实盘策略数据失败: {str(e)}")
# 初始化空数据结构
with self._lock:
self.positions = {}
self.pending_orders = {}
self.all_orders = []
def clear(self):
"""清除所有持仓管理数据"""
with self._lock:
self.positions = {}
self.pending_orders = {}
self.all_orders = []
# 在锁之外异步保存数据
threading.Thread(target=self.save_data).start()
def update_closeable_amount(self):
"""更新可卖持仓"""
need_save = False
with self._lock:
for _, position in self.positions.items():
if position.closeable_amount != position.total_amount:
position.closeable_amount = position.total_amount
need_save = True
# 只有在有更改时才保存
if need_save:
threading.Thread(target=self.save_data).start()
def clear_pending_orders(self):
"""清除所有未完成订单"""
with self._lock:
if self.pending_orders: # 只有在有挂单时才清除并保存
self.pending_orders = {}
need_save = True
else:
need_save = False
# 只有在有更改时才保存
if need_save:
threading.Thread(target=self.save_data).start()
def get_all_orders(self):
"""获取所有订单
Returns:
list: 所有订单列表的副本
"""
return self.all_orders