From a0d286cc4f39257a37cc6c2079d0e7612cd73bc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A1=9C=E8=8F=AF?= Date: Mon, 13 Mar 2023 20:57:05 +0800 Subject: [PATCH] emmm --- ObjectManager.py | 170 ++++++++++++++++++++++++++++++----------------- main.py | 15 ++--- 2 files changed, 115 insertions(+), 70 deletions(-) diff --git a/ObjectManager.py b/ObjectManager.py index 2ee036b..6fcfd69 100644 --- a/ObjectManager.py +++ b/ObjectManager.py @@ -4,34 +4,35 @@ from pydantic import BaseModel from datetime import datetime -# 使用字典来存储websocket连接 -class ConnectionManager: - def __init__(self): - self.active_connections: map = {} - - # 加入一个新的websocket连接 - async def connect(self, websocket: WebSocket, client_id: int): - await websocket.accept() - self.active_connections[client_id] = websocket - - # 删除一个websocket连接(要先检查client_id是否存在) - def disconnect(self, client_id: int): - if client_id in self.active_connections: - del self.active_connections[client_id] - - # 向指定的websocket连接发送消息(要先检查client_id是否存在) - async def send_personal_message(self, message: str, client_id: int): - if client_id in self.active_connections: - await self.active_connections[client_id].send_text(message) - - # 向所有的websocket连接发送消息 - async def broadcast(self, message: str): - for client_id, ws in self.active_connections.items(): - await ws.send_text(message) +## 使用字典来存储websocket连接 +#class ConnectionManager: +# def __init__(self): +# self.active_connections: map = {} +# +# # 加入一个新的websocket连接 +# async def connect(self, websocket: WebSocket, client_id: int): +# await websocket.accept() +# self.active_connections[client_id] = websocket +# +# # 删除一个websocket连接(要先检查client_id是否存在) +# def disconnect(self, client_id: int): +# if client_id in self.active_connections: +# del self.active_connections[client_id] +# +# # 向指定的websocket连接发送消息(要先检查client_id是否存在) +# async def send_personal_message(self, message: str, client_id: int): +# if client_id in self.active_connections: +# await self.active_connections[client_id].send_text(message) +# +# # 向所有的websocket连接发送消息 +# async def broadcast(self, message: str): +# for client_id, ws in self.active_connections.items(): +# await ws.send_text(message) ''' 任务管理器(观察者模式) -使用字典来存储Task任务 +维护一个任务队列, 任务队列中的任务会被分发给worker节点 +任务状态变化时通知对应的客户端 ''' # Task 基本模型 @@ -62,60 +63,109 @@ class Task(BaseModel): for observer in self.__observers: observer.send_text(message) +class Server(BaseModel): + id: str='' + name: str='' + status: str='pending' + created_at: datetime=datetime.now() + ip: str='' + port: int=0 -# 使用字典来存储Task任务 + # 任务列表 + __tasks = [] + + # 添加任务 + def add_task(self, task: Task): + self.__tasks.append(task) + + # 移除任务 + def remove_task(self, task: Task): + self.__tasks.remove(task) + + +''' +任务管理器 +就绪节点过少时增加节点数量, 就绪节点过多时减少节点数量(1-5个节点) +任务分发到就绪节点, 需要从工作节点中获取任务的状态 +''' + +# 使用字典来存储Task任务(保持一个后台线程,定时检查任务的状态,并将任务的状态更新到数据中) class TaskManager: def __init__(self): self.tasks = {} # 任务ID的映射 - # TOOD: 保持一个后台线程,定时检查任务的状态,并将任务的状态更新到数据中 - - def add_observer(self, task_id: str, websocket: WebSocket): - self.tasks[task_id].add_observer(websocket) - - def remove_observer(self, task_id: str, websocket: WebSocket): - self.tasks[task_id].remove_observer(websocket) - - def has_task(self, task_id: str): - return task_id in self.tasks + self.servers = {} # 服务器ID的映射 + self.websockets = {} # websocket连接ID的映射 + # 初始化管理器时,需检查现有的服务器及其状态,将其添加到服务器池中 - def add(self, task: Task): + # 添加服务器 + def add_server(self, server: Server): + server.id = str(uuid.uuid4()) + self.servers[server.id] = server + return server + + # 移除服务器 + def remove_server(self, server_id: str): + # TENCENT CLOUD API + del self.servers[server_id] + + # 添加任务(并为任务分配工作节点) + def add_task(self, task: Task): task.id = str(uuid.uuid4()) self.tasks[task.id] = task + # 调起任务检查事件 return task - def delete(self, task_id: str): + # 移除任务 + def remove_task(self, task_id: str): del self.tasks[task_id] + del self.websockets[task_id] + # 更新任务 def update(self, task_id, task: Task): self.tasks[task_id] = task + # 查询任务 def query(self, task_id: str): return self.tasks[task_id] + # 查询所有任务 def query_all(self): return self.tasks -# 使用字典来存储服务器池(使用腾讯云的API来管理服务器) -class ServerManager: - def __init__(self): - self.servers = {} - # 维护一个服务器池,每个服务器都有一个状态,状态有三种:空闲,运行中,异常 - # 如果服务器异常,需要将其从服务器池中删除 - # 如果服务器空闲,需要通知弹性伸缩服务将其删除 + # 添加观察者 + def add_observer(self, task_id: str, websocket: WebSocket): + self.tasks[task_id].add_observer(websocket) - # 添加一个新的服务器(如何得知增加了新服务器? 主动发现? 被动发现? 使用腾讯云的API主动增加服务器) - def add(self, server_name, server_info): - self.servers[server_name] = server_info - - def delete(self, server_name): - del self.servers[server_name] - - def update(self, server_name, server_info): - self.servers[server_name] = server_info - - def query(self, server_name): - return self.servers[server_name] - - def query_all(self): - return self.servers + # 移除观察者 + def remove_observer(self, task_id: str, websocket: WebSocket): + self.tasks[task_id].remove_observer(websocket) + + # 检查任务是否存在 + def has_task(self, task_id: str): + return task_id in self.tasks + + +## 使用字典来存储服务器池(使用腾讯云的API来管理服务器) +#class ServerManager: +# def __init__(self): +# self.servers = {} +# # 维护一个服务器池,每个服务器都有一个状态,状态有三种:空闲,运行中,异常 +# # 如果服务器异常,需要将其从服务器池中删除 +# # 如果服务器空闲,需要通知弹性伸缩服务将其删除 +# +# # 添加一个新的服务器(如何得知增加了新服务器? 主动发现? 被动发现? 使用腾讯云的API主动增加服务器) +# def add(self, server_name, server_info): +# self.servers[server_name] = server_info +# +# def delete(self, server_name): +# del self.servers[server_name] +# +# def update(self, server_name, server_info): +# self.servers[server_name] = server_info +# +# def query(self, server_name): +# return self.servers[server_name] +# +# def query_all(self): +# return self.servers \ No newline at end of file diff --git a/main.py b/main.py index ba0dd5b..b957c25 100644 --- a/main.py +++ b/main.py @@ -89,14 +89,14 @@ async def get_tasks(): @app.post("/tasks", response_model=Task) async def create_task(task: Task): - return task_manager.add(task) + return task_manager.add_task(task) ''' 监听任务进度 可能有多个客户端监听同一个任务(向任务的观察者列表中添加websocket连接) 可能有多个任务被同一客户端监听(向客户端的观察目标列表中添加任务) -应检查目标任务是否存在 +应检查目标任务是否存在, 连接断开时应从观察者列表中移除 ''' @@ -105,24 +105,19 @@ async def task_endpoint(websocket: WebSocket, task_id: str): await websocket.accept() if not task_manager.has_task(task_id): await websocket.close() - print(f"close websocket: {task_id}") return task_manager.add_observer(task_id, websocket) try: while True: data = await websocket.receive_text() - print(f"Client #says: {data}") + print(f"收到客户端消息: {data}") except WebSocketDisconnect: task_manager.remove_observer(task_id, websocket) print(f"close websocket: {task_id}") -''' -维护一个任务队列, 任务队列中的任务会被分发给worker节点 -任务状态变化时通知对应的客户端 -''' - # 启动服务 if __name__ == '__main__': port = 8000 if len(sys.argv) < 2 else int(sys.argv[1]) - uvicorn.run(app='main:app', host='0.0.0.0', port=port, reload=True, workers=1) + uvicorn.run(app='main:app', host='0.0.0.0', + port=port, reload=True, workers=1)