This commit is contained in:
2023-03-13 20:57:05 +08:00
parent 70def0599b
commit a0d286cc4f
2 changed files with 115 additions and 70 deletions

View File

@@ -4,34 +4,35 @@ from pydantic import BaseModel
from datetime import datetime from datetime import datetime
# 使用字典来存储websocket连接 ## 使用字典来存储websocket连接
class ConnectionManager: #class ConnectionManager:
def __init__(self): # def __init__(self):
self.active_connections: map = {} # self.active_connections: map = {}
#
# 加入一个新的websocket连接 # # 加入一个新的websocket连接
async def connect(self, websocket: WebSocket, client_id: int): # async def connect(self, websocket: WebSocket, client_id: int):
await websocket.accept() # await websocket.accept()
self.active_connections[client_id] = websocket # self.active_connections[client_id] = websocket
#
# 删除一个websocket连接(要先检查client_id是否存在) # # 删除一个websocket连接(要先检查client_id是否存在)
def disconnect(self, client_id: int): # def disconnect(self, client_id: int):
if client_id in self.active_connections: # if client_id in self.active_connections:
del self.active_connections[client_id] # del self.active_connections[client_id]
#
# 向指定的websocket连接发送消息(要先检查client_id是否存在) # # 向指定的websocket连接发送消息(要先检查client_id是否存在)
async def send_personal_message(self, message: str, client_id: int): # async def send_personal_message(self, message: str, client_id: int):
if client_id in self.active_connections: # if client_id in self.active_connections:
await self.active_connections[client_id].send_text(message) # await self.active_connections[client_id].send_text(message)
#
# 向所有的websocket连接发送消息 # # 向所有的websocket连接发送消息
async def broadcast(self, message: str): # async def broadcast(self, message: str):
for client_id, ws in self.active_connections.items(): # for client_id, ws in self.active_connections.items():
await ws.send_text(message) # await ws.send_text(message)
''' '''
任务管理器(观察者模式) 任务管理器(观察者模式)
使用字典来存储Task任务 维护一个任务队列, 任务队列中的任务会被分发给worker节点
任务状态变化时通知对应的客户端
''' '''
# Task 基本模型 # Task 基本模型
@@ -62,60 +63,109 @@ class Task(BaseModel):
for observer in self.__observers: for observer in self.__observers:
observer.send_text(message) observer.send_text(message)
class Server(BaseModel):
id: str=''
name: str=''
status: str='pending'
created_at: datetime=datetime.now()
ip: str=''
port: int=0
# 使用字典来存储Task任务 # 任务列表
__tasks = []
# 添加任务
def add_task(self, task: Task):
self.__tasks.append(task)
# 移除任务
def remove_task(self, task: Task):
self.__tasks.remove(task)
'''
任务管理器
就绪节点过少时增加节点数量, 就绪节点过多时减少节点数量(1-5个节点)
任务分发到就绪节点, 需要从工作节点中获取任务的状态
'''
# 使用字典来存储Task任务(保持一个后台线程,定时检查任务的状态,并将任务的状态更新到数据中)
class TaskManager: class TaskManager:
def __init__(self): def __init__(self):
self.tasks = {} # 任务ID的映射 self.tasks = {} # 任务ID的映射
# TOOD: 保持一个后台线程,定时检查任务的状态,并将任务的状态更新到数据中 self.servers = {} # 服务器ID的映射
self.websockets = {} # websocket连接ID的映射
# 初始化管理器时,需检查现有的服务器及其状态,将其添加到服务器池中
def add_observer(self, task_id: str, websocket: WebSocket): # 添加服务器
self.tasks[task_id].add_observer(websocket) def add_server(self, server: Server):
server.id = str(uuid.uuid4())
self.servers[server.id] = server
return server
def remove_observer(self, task_id: str, websocket: WebSocket): # 移除服务器
self.tasks[task_id].remove_observer(websocket) def remove_server(self, server_id: str):
# TENCENT CLOUD API
del self.servers[server_id]
def has_task(self, task_id: str): # 添加任务(并为任务分配工作节点)
return task_id in self.tasks def add_task(self, task: Task):
def add(self, task: Task):
task.id = str(uuid.uuid4()) task.id = str(uuid.uuid4())
self.tasks[task.id] = task self.tasks[task.id] = task
# 调起任务检查事件
return task return task
def delete(self, task_id: str): # 移除任务
def remove_task(self, task_id: str):
del self.tasks[task_id] del self.tasks[task_id]
del self.websockets[task_id]
# 更新任务
def update(self, task_id, task: Task): def update(self, task_id, task: Task):
self.tasks[task_id] = task self.tasks[task_id] = task
# 查询任务
def query(self, task_id: str): def query(self, task_id: str):
return self.tasks[task_id] return self.tasks[task_id]
# 查询所有任务
def query_all(self): def query_all(self):
return self.tasks return self.tasks
# 使用字典来存储服务器池(使用腾讯云的API来管理服务器) # 添加观察者
class ServerManager: def add_observer(self, task_id: str, websocket: WebSocket):
def __init__(self): self.tasks[task_id].add_observer(websocket)
self.servers = {}
# 维护一个服务器池,每个服务器都有一个状态,状态有三种:空闲,运行中,异常
# 如果服务器异常,需要将其从服务器池中删除
# 如果服务器空闲,需要通知弹性伸缩服务将其删除
# 添加一个新的服务器(如何得知增加了新服务器? 主动发现? 被动发现? 使用腾讯云的API主动增加服务器) # 移除观察者
def add(self, server_name, server_info): def remove_observer(self, task_id: str, websocket: WebSocket):
self.servers[server_name] = server_info self.tasks[task_id].remove_observer(websocket)
def delete(self, server_name): # 检查任务是否存在
del self.servers[server_name] def has_task(self, task_id: str):
return task_id in self.tasks
def update(self, server_name, server_info):
self.servers[server_name] = server_info
def query(self, server_name): ## 使用字典来存储服务器池(使用腾讯云的API来管理服务器)
return self.servers[server_name] #class ServerManager:
# def __init__(self):
def query_all(self): # self.servers = {}
return self.servers # # 维护一个服务器池,每个服务器都有一个状态,状态有三种:空闲,运行中,异常
# # 如果服务器异常,需要将其从服务器池中删除
# # 如果服务器空闲,需要通知弹性伸缩服务将其删除
#
# # 添加一个新的服务器(如何得知增加了新服务器? 主动发现? 被动发现? 使用腾讯云的API主动增加服务器)
# def add(self, server_name, server_info):
# self.servers[server_name] = server_info
#
# def delete(self, server_name):
# del self.servers[server_name]
#
# def update(self, server_name, server_info):
# self.servers[server_name] = server_info
#
# def query(self, server_name):
# return self.servers[server_name]
#
# def query_all(self):
# return self.servers

15
main.py
View File

@@ -89,14 +89,14 @@ async def get_tasks():
@app.post("/tasks", response_model=Task) @app.post("/tasks", response_model=Task)
async def create_task(task: Task): async def create_task(task: Task):
return task_manager.add(task) return task_manager.add_task(task)
''' '''
监听任务进度 监听任务进度
可能有多个客户端监听同一个任务(向任务的观察者列表中添加websocket连接) 可能有多个客户端监听同一个任务(向任务的观察者列表中添加websocket连接)
可能有多个任务被同一客户端监听(向客户端的观察目标列表中添加任务) 可能有多个任务被同一客户端监听(向客户端的观察目标列表中添加任务)
应检查目标任务是否存在 应检查目标任务是否存在, 连接断开时应从观察者列表中移除
''' '''
@@ -105,24 +105,19 @@ async def task_endpoint(websocket: WebSocket, task_id: str):
await websocket.accept() await websocket.accept()
if not task_manager.has_task(task_id): if not task_manager.has_task(task_id):
await websocket.close() await websocket.close()
print(f"close websocket: {task_id}")
return return
task_manager.add_observer(task_id, websocket) task_manager.add_observer(task_id, websocket)
try: try:
while True: while True:
data = await websocket.receive_text() data = await websocket.receive_text()
print(f"Client #says: {data}") print(f"收到客户端消息: {data}")
except WebSocketDisconnect: except WebSocketDisconnect:
task_manager.remove_observer(task_id, websocket) task_manager.remove_observer(task_id, websocket)
print(f"close websocket: {task_id}") print(f"close websocket: {task_id}")
'''
维护一个任务队列, 任务队列中的任务会被分发给worker节点
任务状态变化时通知对应的客户端
'''
# 启动服务 # 启动服务
if __name__ == '__main__': if __name__ == '__main__':
port = 8000 if len(sys.argv) < 2 else int(sys.argv[1]) port = 8000 if len(sys.argv) < 2 else int(sys.argv[1])
uvicorn.run(app='main:app', host='0.0.0.0', port=port, reload=True, workers=1) uvicorn.run(app='main:app', host='0.0.0.0',
port=port, reload=True, workers=1)