Files
drawing/ObjectManager.py
2023-03-13 14:08:42 +08:00

121 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import uuid
from fastapi import WebSocket
from pydantic import BaseModel
from datetime import datetime
# 使用字典来存储websocket连接
class ConnectionManager:
def __init__(self):
self.active_connections: map = {}
# 加入一个新的websocket连接
async def connect(self, websocket: WebSocket, client_id: int):
await websocket.accept()
self.active_connections[client_id] = websocket
# 删除一个websocket连接(要先检查client_id是否存在)
def disconnect(self, client_id: int):
if client_id in self.active_connections:
del self.active_connections[client_id]
# 向指定的websocket连接发送消息(要先检查client_id是否存在)
async def send_personal_message(self, message: str, client_id: int):
if client_id in self.active_connections:
await self.active_connections[client_id].send_text(message)
# 向所有的websocket连接发送消息
async def broadcast(self, message: str):
for client_id, ws in self.active_connections.items():
await ws.send_text(message)
'''
任务管理器(观察者模式)
使用字典来存储Task任务
'''
# Task 基本模型
class Task(BaseModel):
id: str=''
name: str=''
status: str='pending'
created_at: datetime=datetime.now()
updated_at: datetime=datetime.now()
__observers = [] # 观察者列表
# 属性发生变化时更新updated_at并通知观察者
def __setattr__(self, name, value):
super().__setattr__(name, value)
self.event_observer(f"Task {self.id} updated at {self.updated_at}")
# 添加观察者
def add_observer(self, websocket: WebSocket):
self.__observers.append(websocket)
# 移除观察者
def remove_observer(self, websocket: WebSocket):
self.__observers.remove(websocket)
# 通知观察者
def event_observer(self, message: str):
for observer in self.__observers:
observer.send_text(message)
# 使用字典来存储Task任务
class TaskManager:
def __init__(self):
self.tasks = {} # 任务ID的映射
# TOOD: 保持一个后台线程,定时检查任务的状态,并将任务的状态更新到数据中
def add_observer(self, task_id: str, websocket: WebSocket):
self.tasks[task_id].add_observer(websocket)
def remove_observer(self, task_id: str, websocket: WebSocket):
self.tasks[task_id].remove_observer(websocket)
def has_task(self, task_id: str):
return task_id in self.tasks
def add(self, task: Task):
task.id = str(uuid.uuid4())
self.tasks[task.id] = task
return task
def delete(self, task_id: str):
del self.tasks[task_id]
def update(self, task_id, task: Task):
self.tasks[task_id] = task
def query(self, task_id: str):
return self.tasks[task_id]
def query_all(self):
return self.tasks
# 使用字典来存储服务器池(使用腾讯云的API来管理服务器)
class ServerManager:
def __init__(self):
self.servers = {}
# 维护一个服务器池,每个服务器都有一个状态,状态有三种:空闲,运行中,异常
# 如果服务器异常,需要将其从服务器池中删除
# 如果服务器空闲,需要通知弹性伸缩服务将其删除
# 添加一个新的服务器(如何得知增加了新服务器? 主动发现? 被动发现? 使用腾讯云的API主动增加服务器)
def add(self, server_name, server_info):
self.servers[server_name] = server_info
def delete(self, server_name):
del self.servers[server_name]
def update(self, server_name, server_info):
self.servers[server_name] = server_info
def query(self, server_name):
return self.servers[server_name]
def query_all(self):
return self.servers