import uuid from fastapi import WebSocket from pydantic import BaseModel from datetime import datetime ## 使用字典来存储websocket连接 #class ConnectionManager: # def __init__(self): # self.active_connections: map = {} # # # 加入一个新的websocket连接 # async def connect(self, websocket: WebSocket, client_id: int): # await websocket.accept() # self.active_connections[client_id] = websocket # # # 删除一个websocket连接(要先检查client_id是否存在) # def disconnect(self, client_id: int): # if client_id in self.active_connections: # del self.active_connections[client_id] # # # 向指定的websocket连接发送消息(要先检查client_id是否存在) # async def send_personal_message(self, message: str, client_id: int): # if client_id in self.active_connections: # await self.active_connections[client_id].send_text(message) # # # 向所有的websocket连接发送消息 # async def broadcast(self, message: str): # for client_id, ws in self.active_connections.items(): # await ws.send_text(message) ''' 任务管理器(观察者模式) 维护一个任务队列, 任务队列中的任务会被分发给worker节点 任务状态变化时通知对应的客户端 ''' # Task 基本模型 class Task(BaseModel): id: str='' name: str='' status: str='pending' created_at: datetime=datetime.now() updated_at: datetime=datetime.now() __observers = [] # 观察者列表 # 属性发生变化时,更新updated_at并通知观察者 def __setattr__(self, name, value): super().__setattr__(name, value) self.event_observer(f"Task {self.id} updated at {self.updated_at}") # 添加观察者 def add_observer(self, websocket: WebSocket): self.__observers.append(websocket) # 移除观察者 def remove_observer(self, websocket: WebSocket): self.__observers.remove(websocket) # 通知观察者 def event_observer(self, message: str): for observer in self.__observers: observer.send_text(message) class Server(BaseModel): id: str='' name: str='' status: str='pending' created_at: datetime=datetime.now() ip: str='' port: int=0 # 任务列表 __tasks = [] # 添加任务 def add_task(self, task: Task): self.__tasks.append(task) # 移除任务 def remove_task(self, task: Task): self.__tasks.remove(task) ''' 任务管理器 就绪节点过少时增加节点数量, 就绪节点过多时减少节点数量(1-5个节点) 任务分发到就绪节点, 需要从工作节点中获取任务的状态 ''' # 使用字典来存储Task任务(保持一个后台线程,定时检查任务的状态,并将任务的状态更新到数据中) class TaskManager: def __init__(self): self.tasks = {} # 任务ID的映射 self.servers = {} # 服务器ID的映射 self.websockets = {} # websocket连接ID的映射 # 初始化管理器时,需检查现有的服务器及其状态,将其添加到服务器池中 # 添加服务器 def add_server(self, server: Server): server.id = str(uuid.uuid4()) self.servers[server.id] = server return server # 移除服务器 def remove_server(self, server_id: str): # TENCENT CLOUD API del self.servers[server_id] # 添加任务(并为任务分配工作节点) def add_task(self, task: Task): task.id = str(uuid.uuid4()) self.tasks[task.id] = task # 调起任务检查事件 return task # 移除任务 def remove_task(self, task_id: str): del self.tasks[task_id] del self.websockets[task_id] # 更新任务 def update(self, task_id, task: Task): self.tasks[task_id] = task # 查询任务 def query(self, task_id: str): return self.tasks[task_id] # 查询所有任务 def query_all(self): return self.tasks # 添加观察者 def add_observer(self, task_id: str, websocket: WebSocket): self.tasks[task_id].add_observer(websocket) # 移除观察者 def remove_observer(self, task_id: str, websocket: WebSocket): self.tasks[task_id].remove_observer(websocket) # 检查任务是否存在 def has_task(self, task_id: str): return task_id in self.tasks ## 使用字典来存储服务器池(使用腾讯云的API来管理服务器) #class ServerManager: # def __init__(self): # self.servers = {} # # 维护一个服务器池,每个服务器都有一个状态,状态有三种:空闲,运行中,异常 # # 如果服务器异常,需要将其从服务器池中删除 # # 如果服务器空闲,需要通知弹性伸缩服务将其删除 # # # 添加一个新的服务器(如何得知增加了新服务器? 主动发现? 被动发现? 使用腾讯云的API主动增加服务器) # def add(self, server_name, server_info): # self.servers[server_name] = server_info # # def delete(self, server_name): # del self.servers[server_name] # # def update(self, server_name, server_info): # self.servers[server_name] = server_info # # def query(self, server_name): # return self.servers[server_name] # # def query_all(self): # return self.servers