减少锁的持有时间

This commit is contained in:
zhiyong 2025-05-15 10:13:14 +08:00
parent 2f0d7e5a7e
commit a6a081e773

View File

@ -72,8 +72,9 @@ class PositionManager:
if code in self.positions and self.positions[code].total_amount <= 0: if code in self.positions and self.positions[code].total_amount <= 0:
del self.positions[code] del self.positions[code]
logger.info(f"移除空持仓 - 策略: {self.strategy_name}, 代码: {code}") logger.info(f"移除空持仓 - 策略: {self.strategy_name}, 代码: {code}")
self.save_data() # 在锁之外异步保存数据
threading.Thread(target=self.save_data).start()
def add_pending_order( def add_pending_order(
self, order_id, code, price, amount, direction, order_type=ORDER_TYPE_LIMIT self, order_id, code, price, amount, direction, order_type=ORDER_TYPE_LIMIT
@ -92,7 +93,8 @@ class PositionManager:
f"数量: {amount}, 价格: {price}, 类型: {order_type}" f"数量: {amount}, 价格: {price}, 类型: {order_type}"
) )
self.save_data() # 在锁之外异步保存数据
threading.Thread(target=self.save_data).start()
def update_order_status(self, order_id, filled, new_status): def update_order_status(self, order_id, filled, new_status):
with self._lock: with self._lock:
@ -120,10 +122,17 @@ class PositionManager:
# 保留订单信息以供参考,但标记为已完成 # 保留订单信息以供参考,但标记为已完成
del self.pending_orders[order_id] del self.pending_orders[order_id]
logger.info(f"订单已删除 - ID: {order_id}, 状态: {new_status}") logger.info(f"订单已删除 - ID: {order_id}, 状态: {new_status}")
self.save_data()
return True has_changes = True
else:
return False has_changes = False
# 如果有修改,在锁外异步保存数据
if has_changes:
threading.Thread(target=self.save_data).start()
return True
return False
def get_pending_order(self, order_id) -> LocalOrder: def get_pending_order(self, order_id) -> LocalOrder:
"""获取未完成委托信息 """获取未完成委托信息
@ -135,7 +144,9 @@ class PositionManager:
dict: 委托信息如果不存在返回None dict: 委托信息如果不存在返回None
""" """
with self._lock: with self._lock:
return self.pending_orders.get(order_id) order = self.pending_orders.get(order_id)
# 如果找到订单,返回它的副本而不是直接引用
return order.copy() if order else None
def get_pending_orders(self): def get_pending_orders(self):
"""获取所有未完成委托 """获取所有未完成委托
@ -143,9 +154,19 @@ class PositionManager:
Returns: Returns:
dict: 订单ID到委托信息的映射 dict: 订单ID到委托信息的映射
""" """
# 创建临时变量存储锁内读取的数据
orders_copy = {}
with self._lock: with self._lock:
# 返回副本以避免外部修改 # 快速获取数据并立即释放锁
return self.pending_orders.copy() for order_id, order in self.pending_orders.items():
orders_copy[order_id] = order
# 锁外创建副本,避免外部修改影响内部数据
result = {}
for order_id, order in orders_copy.items():
result[order_id] = order.copy() if hasattr(order, 'copy') else order
return result
def get_positions(self) -> Dict[str, LocalPosition]: def get_positions(self) -> Dict[str, LocalPosition]:
"""获取策略持仓 """获取策略持仓
@ -154,12 +175,23 @@ class PositionManager:
Dict[str, LocalPosition]: Dict[str, LocalPosition]:
key为股票代码strvalue为LocalPosition对象若无持仓则返回空字典 key为股票代码strvalue为LocalPosition对象若无持仓则返回空字典
""" """
# 创建临时变量存储锁内读取的数据
positions_copy = {}
with self._lock: with self._lock:
# 返回副本以避免外部修改 # 快速获取数据并立即释放锁
return self.positions.copy() for code, pos in self.positions.items():
positions_copy[code] = pos
# 锁外创建副本,避免外部修改影响内部数据
result = {}
for code, pos in positions_copy.items():
result[code] = LocalPosition(pos.code, pos.total_amount, pos.closeable_amount)
return result
def save_data(self): def save_data(self):
"""保存策略数据""" """保存策略数据"""
# 在锁内准备要保存的数据
with self._lock: with self._lock:
try: try:
# 将对象转换为可序列化的字典 # 将对象转换为可序列化的字典
@ -205,90 +237,98 @@ class PositionManager:
), ),
}) })
with open(self.data_path, "w") as f: # 准备好要保存的数据
json.dump( data_to_save = {
{ "positions": positions_dict,
"positions": positions_dict, "pending_orders": pending_orders_dict,
"pending_orders": pending_orders_dict, "all_orders": all_orders_array,
"all_orders": all_orders_array, }
},
f,
)
logger.info("成功保存实盘策略数据")
except Exception as e: except Exception as e:
logger.error(f"保存实盘策略数据失败: {str(e)}") logger.error(f"准备保存数据失败: {str(e)}")
return
# 锁外执行文件I/O操作
try:
with open(self.data_path, "w") as f:
json.dump(data_to_save, f)
logger.info("成功保存实盘策略数据")
except Exception as e:
logger.error(f"保存实盘策略数据失败: {str(e)}")
def load_data(self): def load_data(self):
"""加载策略数据""" """加载策略数据"""
with self._lock: try:
try: # 文件I/O操作在锁外执行
if os.path.exists(self.data_path): if os.path.exists(self.data_path):
from datetime import datetime from datetime import datetime
with open(self.data_path, "r") as f:
data = json.load(f)
# 在锁内更新内存中的数据结构
with self._lock:
# 还原positions对象
self.positions = {}
positions_dict = data.get("positions", {})
for code, pos_data in positions_dict.items():
self.positions[code] = LocalPosition(
pos_data["code"],
int(pos_data["total_amount"]),
int(pos_data["closeable_amount"]),
)
with open(self.data_path, "r") as f: # 还原pending_orders对象
data = json.load(f) self.pending_orders = {}
pending_orders_dict = data.get("pending_orders", {})
# 还原positions对象 for order_id, order_data in pending_orders_dict.items():
self.positions = {} order = LocalOrder(
positions_dict = data.get("positions", {}) order_data["order_id"],
for code, pos_data in positions_dict.items(): order_data["code"],
self.positions[code] = LocalPosition( float(order_data["price"]),
pos_data["code"], int(order_data["amount"]),
int(pos_data["total_amount"]), order_data["direction"],
int(pos_data["closeable_amount"]), order_data["order_type"],
) int(order_data["filled"]),
order_data["status"],
# 还原pending_orders对象 )
self.pending_orders = {} if order_data.get("created_time"):
pending_orders_dict = data.get("pending_orders", {}) try:
for order_id, order_data in pending_orders_dict.items(): order.created_time = datetime.fromisoformat(
order = LocalOrder(
order_data["order_id"],
order_data["code"],
float(order_data["price"]),
int(order_data["amount"]),
order_data["direction"],
order_data["order_type"],
int(order_data["filled"]),
order_data["status"],
)
if order_data.get("created_time"):
try:
order.created_time = datetime.fromisoformat(
order_data["created_time"]
)
except (ValueError, TypeError):
order.created_time = datetime.now()
self.pending_orders[order_id] = order
# 还原all_orders对象
self.all_orders = []
all_orders_array = data.get("all_orders", [])
for order_data in all_orders_array:
order = LocalOrder(
order_data["order_id"],
order_data["code"],
float(order_data["price"]),
int(order_data["amount"]),
order_data["direction"],
order_data["order_type"],
created_time=datetime.fromisoformat(
order_data["created_time"] order_data["created_time"]
) if order_data.get("created_time") else datetime.now() )
) except (ValueError, TypeError):
self.all_orders.append(order) order.created_time = datetime.now()
logger.info("已加载实盘策略数据") self.pending_orders[order_id] = order
logger.info(f"本策略: {self.strategy_name} 持仓股票个数: {len(self.positions)} 未完成委托数: {len(self.pending_orders)} 历史订单数: {len(self.all_orders)}")
else: # 还原all_orders对象
logger.info(f"实盘策略数据文件不存在: {self.data_path}") self.all_orders = []
all_orders_array = data.get("all_orders", [])
for order_data in all_orders_array:
order = LocalOrder(
order_data["order_id"],
order_data["code"],
float(order_data["price"]),
int(order_data["amount"]),
order_data["direction"],
order_data["order_type"],
created_time=datetime.fromisoformat(
order_data["created_time"]
) if order_data.get("created_time") else datetime.now()
)
self.all_orders.append(order)
logger.info("已加载实盘策略数据")
logger.info(f"本策略: {self.strategy_name} 持仓股票个数: {len(self.positions)} 未完成委托数: {len(self.pending_orders)} 历史订单数: {len(self.all_orders)}")
else:
logger.info(f"实盘策略数据文件不存在: {self.data_path}")
with self._lock:
self.positions = {} self.positions = {}
self.pending_orders = {} self.pending_orders = {}
self.all_orders = [] self.all_orders = []
except Exception as e: except Exception as e:
logger.error(f"加载实盘策略数据失败: {str(e)}") logger.error(f"加载实盘策略数据失败: {str(e)}")
# 初始化空数据结构 # 初始化空数据结构
with self._lock:
self.positions = {} self.positions = {}
self.pending_orders = {} self.pending_orders = {}
self.all_orders = [] self.all_orders = []
@ -299,22 +339,51 @@ class PositionManager:
self.positions = {} self.positions = {}
self.pending_orders = {} self.pending_orders = {}
self.all_orders = [] self.all_orders = []
self.save_data()
# 在锁之外异步保存数据
threading.Thread(target=self.save_data).start()
def update_closeable_amount(self): def update_closeable_amount(self):
"""更新可卖持仓""" """更新可卖持仓"""
need_save = False
with self._lock: with self._lock:
for _, position in self.positions.items(): for _, position in self.positions.items():
if position.closeable_amount != position.total_amount: if position.closeable_amount != position.total_amount:
position.closeable_amount = position.total_amount position.closeable_amount = position.total_amount
need_save = True
# 只有在有更改时才保存
if need_save:
threading.Thread(target=self.save_data).start()
def clear_pending_orders(self): def clear_pending_orders(self):
"""清除所有未完成订单""" """清除所有未完成订单"""
with self._lock: with self._lock:
self.pending_orders = {} if self.pending_orders: # 只有在有挂单时才清除并保存
self.pending_orders = {}
need_save = True
else:
need_save = False
# 只有在有更改时才保存
if need_save:
threading.Thread(target=self.save_data).start()
def get_all_orders(self): def get_all_orders(self):
"""获取所有订单""" """获取所有订单
Returns:
list: 所有订单列表的副本
"""
# 创建临时变量存储锁内读取的数据
orders_copy = []
with self._lock: with self._lock:
# 返回副本以避免外部修改 # 快速获取数据并立即释放锁
return self.all_orders.copy() orders_copy = self.all_orders.copy()
# 锁外创建深拷贝,避免外部修改影响内部数据
result = []
for order in orders_copy:
result.append(order.copy() if hasattr(order, 'copy') else order)
return result