From 03ae91278d88420403960f0c63fd405d516e6478 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A1=9C=E8=8F=AF?= Date: Mon, 13 Mar 2023 14:08:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=87=E5=87=86Task=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ObjectManager.py | 53 +++++++++++++++++++++++++------ main.py | 82 +++++++++++++++++++++++++++++++++--------------- message.py | 42 +++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 35 deletions(-) create mode 100644 message.py diff --git a/ObjectManager.py b/ObjectManager.py index 70dcc8d..2ee036b 100644 --- a/ObjectManager.py +++ b/ObjectManager.py @@ -3,6 +3,7 @@ from fastapi import WebSocket from pydantic import BaseModel from datetime import datetime + # 使用字典来存储websocket连接 class ConnectionManager: def __init__(self): @@ -28,22 +29,56 @@ class ConnectionManager: for client_id, ws in self.active_connections.items(): await ws.send_text(message) +''' +任务管理器(观察者模式) +使用字典来存储Task任务 +''' + # Task 基本模型 class Task(BaseModel): - id: int - name: str - status: str - created_at: datetime - updated_at: datetime + id: str='' + name: str='' + status: str='pending' + created_at: datetime=datetime.now() + updated_at: datetime=datetime.now() + + __observers = [] # 观察者列表 + + # 属性发生变化时,更新updated_at并通知观察者 + def __setattr__(self, name, value): + super().__setattr__(name, value) + self.event_observer(f"Task {self.id} updated at {self.updated_at}") + + # 添加观察者 + def add_observer(self, websocket: WebSocket): + self.__observers.append(websocket) + + # 移除观察者 + def remove_observer(self, websocket: WebSocket): + self.__observers.remove(websocket) + + # 通知观察者 + def event_observer(self, message: str): + for observer in self.__observers: + observer.send_text(message) # 使用字典来存储Task任务 -class TaskManager(object): +class TaskManager: def __init__(self): - self.tasks = {} + self.tasks = {} # 任务ID的映射 # TOOD: 保持一个后台线程,定时检查任务的状态,并将任务的状态更新到数据中 + + def add_observer(self, task_id: str, websocket: WebSocket): + self.tasks[task_id].add_observer(websocket) + + def remove_observer(self, task_id: str, websocket: WebSocket): + self.tasks[task_id].remove_observer(websocket) + + def has_task(self, task_id: str): + return task_id in self.tasks - def create(self, task: Task): + def add(self, task: Task): task.id = str(uuid.uuid4()) self.tasks[task.id] = task return task @@ -61,7 +96,7 @@ class TaskManager(object): return self.tasks # 使用字典来存储服务器池(使用腾讯云的API来管理服务器) -class ServerManager(object): +class ServerManager: def __init__(self): self.servers = {} # 维护一个服务器池,每个服务器都有一个状态,状态有三种:空闲,运行中,异常 diff --git a/main.py b/main.py index 4f257bd..e19d203 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ import uvicorn +import sys from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException from fastapi.responses import HTMLResponse from ObjectManager import ConnectionManager, TaskManager, ServerManager, Task @@ -26,10 +27,23 @@ html = """ -