from fastapi import WebSocket # 使用字典来存储websocket连接 class ConnectionManager: def __init__(self): self.active_connections: map = {} # 加入一个新的websocket连接 async def connect(self, websocket: WebSocket, client_id: int): await websocket.accept() self.active_connections[client_id] = websocket # 删除一个websocket连接(要先检查client_id是否存在) def disconnect(self, client_id: int): if client_id in self.active_connections: del self.active_connections[client_id] # 向指定的websocket连接发送消息(要先检查client_id是否存在) async def send_personal_message(self, message: str, client_id: int): if client_id in self.active_connections: await self.active_connections[client_id].send_text(message) # 向所有的websocket连接发送消息 async def broadcast(self, message: str): for client_id, ws in self.active_connections.items(): await ws.send_text(message)