diff --git a/src/position_manager.py b/src/position_manager.py index 1cbeec8..250ea4b 100644 --- a/src/position_manager.py +++ b/src/position_manager.py @@ -72,8 +72,9 @@ class PositionManager: if code in self.positions and self.positions[code].total_amount <= 0: del self.positions[code] logger.info(f"移除空持仓 - 策略: {self.strategy_name}, 代码: {code}") - - self.save_data() + + # 在锁之外异步保存数据 + threading.Thread(target=self.save_data).start() def add_pending_order( self, order_id, code, price, amount, direction, order_type=ORDER_TYPE_LIMIT @@ -92,7 +93,8 @@ class PositionManager: f"数量: {amount}, 价格: {price}, 类型: {order_type}" ) - self.save_data() + # 在锁之外异步保存数据 + threading.Thread(target=self.save_data).start() def update_order_status(self, order_id, filled, new_status): with self._lock: @@ -120,10 +122,17 @@ class PositionManager: # 保留订单信息以供参考,但标记为已完成 del self.pending_orders[order_id] logger.info(f"订单已删除 - ID: {order_id}, 状态: {new_status}") - self.save_data() - return True - - return False + + has_changes = True + else: + has_changes = False + + # 如果有修改,在锁外异步保存数据 + if has_changes: + threading.Thread(target=self.save_data).start() + return True + + return False def get_pending_order(self, order_id) -> LocalOrder: """获取未完成委托信息 @@ -135,7 +144,9 @@ class PositionManager: dict: 委托信息,如果不存在返回None """ with self._lock: - return self.pending_orders.get(order_id) + order = self.pending_orders.get(order_id) + # 如果找到订单,返回它的副本而不是直接引用 + return order.copy() if order else None def get_pending_orders(self): """获取所有未完成委托 @@ -143,9 +154,19 @@ class PositionManager: Returns: dict: 订单ID到委托信息的映射 """ + # 创建临时变量存储锁内读取的数据 + orders_copy = {} with self._lock: - # 返回副本以避免外部修改 - return self.pending_orders.copy() + # 快速获取数据并立即释放锁 + for order_id, order in self.pending_orders.items(): + orders_copy[order_id] = order + + # 锁外创建副本,避免外部修改影响内部数据 + result = {} + for order_id, order in orders_copy.items(): + result[order_id] = order.copy() if hasattr(order, 'copy') else order + + return result def get_positions(self) -> Dict[str, LocalPosition]: """获取策略持仓 @@ -154,12 +175,23 @@ class PositionManager: Dict[str, LocalPosition]: key为股票代码(str),value为LocalPosition对象,若无持仓则返回空字典。 """ + # 创建临时变量存储锁内读取的数据 + positions_copy = {} with self._lock: - # 返回副本以避免外部修改 - return self.positions.copy() + # 快速获取数据并立即释放锁 + for code, pos in self.positions.items(): + positions_copy[code] = pos + + # 锁外创建副本,避免外部修改影响内部数据 + result = {} + for code, pos in positions_copy.items(): + result[code] = LocalPosition(pos.code, pos.total_amount, pos.closeable_amount) + + return result def save_data(self): """保存策略数据""" + # 在锁内准备要保存的数据 with self._lock: try: # 将对象转换为可序列化的字典 @@ -205,90 +237,98 @@ class PositionManager: ), }) - with open(self.data_path, "w") as f: - json.dump( - { - "positions": positions_dict, - "pending_orders": pending_orders_dict, - "all_orders": all_orders_array, - }, - f, - ) - logger.info("成功保存实盘策略数据") + # 准备好要保存的数据 + data_to_save = { + "positions": positions_dict, + "pending_orders": pending_orders_dict, + "all_orders": all_orders_array, + } except Exception as e: - logger.error(f"保存实盘策略数据失败: {str(e)}") + logger.error(f"准备保存数据失败: {str(e)}") + return + + # 锁外执行文件I/O操作 + try: + with open(self.data_path, "w") as f: + json.dump(data_to_save, f) + logger.info("成功保存实盘策略数据") + except Exception as e: + logger.error(f"保存实盘策略数据失败: {str(e)}") def load_data(self): """加载策略数据""" - with self._lock: - try: - if os.path.exists(self.data_path): - from datetime import datetime + try: + # 文件I/O操作在锁外执行 + if os.path.exists(self.data_path): + from datetime import datetime + with open(self.data_path, "r") as f: + data = json.load(f) + + # 在锁内更新内存中的数据结构 + with self._lock: + # 还原positions对象 + self.positions = {} + positions_dict = data.get("positions", {}) + for code, pos_data in positions_dict.items(): + self.positions[code] = LocalPosition( + pos_data["code"], + int(pos_data["total_amount"]), + int(pos_data["closeable_amount"]), + ) - with open(self.data_path, "r") as f: - data = json.load(f) - - # 还原positions对象 - self.positions = {} - positions_dict = data.get("positions", {}) - for code, pos_data in positions_dict.items(): - self.positions[code] = LocalPosition( - pos_data["code"], - int(pos_data["total_amount"]), - int(pos_data["closeable_amount"]), - ) - - # 还原pending_orders对象 - self.pending_orders = {} - pending_orders_dict = data.get("pending_orders", {}) - for order_id, order_data in pending_orders_dict.items(): - order = LocalOrder( - order_data["order_id"], - order_data["code"], - float(order_data["price"]), - int(order_data["amount"]), - order_data["direction"], - order_data["order_type"], - int(order_data["filled"]), - order_data["status"], - ) - if order_data.get("created_time"): - try: - order.created_time = datetime.fromisoformat( - order_data["created_time"] - ) - except (ValueError, TypeError): - order.created_time = datetime.now() - - self.pending_orders[order_id] = order - - # 还原all_orders对象 - self.all_orders = [] - all_orders_array = data.get("all_orders", []) - for order_data in all_orders_array: - order = LocalOrder( - order_data["order_id"], - order_data["code"], - float(order_data["price"]), - int(order_data["amount"]), - order_data["direction"], - order_data["order_type"], - created_time=datetime.fromisoformat( + # 还原pending_orders对象 + self.pending_orders = {} + pending_orders_dict = data.get("pending_orders", {}) + for order_id, order_data in pending_orders_dict.items(): + order = LocalOrder( + order_data["order_id"], + order_data["code"], + float(order_data["price"]), + int(order_data["amount"]), + order_data["direction"], + order_data["order_type"], + int(order_data["filled"]), + order_data["status"], + ) + if order_data.get("created_time"): + try: + order.created_time = datetime.fromisoformat( order_data["created_time"] - ) if order_data.get("created_time") else datetime.now() - ) - self.all_orders.append(order) + ) + except (ValueError, TypeError): + order.created_time = datetime.now() - logger.info("已加载实盘策略数据") - logger.info(f"本策略: {self.strategy_name} 持仓股票个数: {len(self.positions)} 未完成委托数: {len(self.pending_orders)} 历史订单数: {len(self.all_orders)}") - else: - logger.info(f"实盘策略数据文件不存在: {self.data_path}") + self.pending_orders[order_id] = order + + # 还原all_orders对象 + self.all_orders = [] + all_orders_array = data.get("all_orders", []) + for order_data in all_orders_array: + order = LocalOrder( + order_data["order_id"], + order_data["code"], + float(order_data["price"]), + int(order_data["amount"]), + order_data["direction"], + order_data["order_type"], + created_time=datetime.fromisoformat( + order_data["created_time"] + ) if order_data.get("created_time") else datetime.now() + ) + self.all_orders.append(order) + + logger.info("已加载实盘策略数据") + logger.info(f"本策略: {self.strategy_name} 持仓股票个数: {len(self.positions)} 未完成委托数: {len(self.pending_orders)} 历史订单数: {len(self.all_orders)}") + else: + logger.info(f"实盘策略数据文件不存在: {self.data_path}") + with self._lock: self.positions = {} self.pending_orders = {} self.all_orders = [] - except Exception as e: - logger.error(f"加载实盘策略数据失败: {str(e)}") - # 初始化空数据结构 + except Exception as e: + logger.error(f"加载实盘策略数据失败: {str(e)}") + # 初始化空数据结构 + with self._lock: self.positions = {} self.pending_orders = {} self.all_orders = [] @@ -299,22 +339,51 @@ class PositionManager: self.positions = {} self.pending_orders = {} self.all_orders = [] - self.save_data() + + # 在锁之外异步保存数据 + threading.Thread(target=self.save_data).start() def update_closeable_amount(self): """更新可卖持仓""" + need_save = False with self._lock: for _, position in self.positions.items(): if position.closeable_amount != position.total_amount: position.closeable_amount = position.total_amount + need_save = True + + # 只有在有更改时才保存 + if need_save: + threading.Thread(target=self.save_data).start() def clear_pending_orders(self): """清除所有未完成订单""" with self._lock: - self.pending_orders = {} + if self.pending_orders: # 只有在有挂单时才清除并保存 + self.pending_orders = {} + need_save = True + else: + need_save = False + + # 只有在有更改时才保存 + if need_save: + threading.Thread(target=self.save_data).start() def get_all_orders(self): - """获取所有订单""" + """获取所有订单 + + Returns: + list: 所有订单列表的副本 + """ + # 创建临时变量存储锁内读取的数据 + orders_copy = [] with self._lock: - # 返回副本以避免外部修改 - return self.all_orders.copy() + # 快速获取数据并立即释放锁 + orders_copy = self.all_orders.copy() + + # 锁外创建深拷贝,避免外部修改影响内部数据 + result = [] + for order in orders_copy: + result.append(order.copy() if hasattr(order, 'copy') else order) + + return result