From 80fb9a409300dbfff4d4132853dc95cb41e51bbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A1=9C=E8=8F=AF?= Date: Sat, 11 Mar 2023 09:48:55 +0800 Subject: [PATCH] model --- ConnectionManager.py | 26 ------------ ObjectManager.py | 86 +++++++++++++++++++++++++++++++++++++++ TaskManager.py | 25 ------------ main.py | 29 +++++++++---- tencent.py | 96 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 203 insertions(+), 59 deletions(-) delete mode 100644 ConnectionManager.py create mode 100644 ObjectManager.py delete mode 100644 TaskManager.py create mode 100644 tencent.py diff --git a/ConnectionManager.py b/ConnectionManager.py deleted file mode 100644 index 4b8910e..0000000 --- a/ConnectionManager.py +++ /dev/null @@ -1,26 +0,0 @@ -from fastapi import WebSocket - -# 使用字典来存储websocket连接 -class ConnectionManager: - def __init__(self): - self.active_connections: map = {} - - # 加入一个新的websocket连接 - async def connect(self, websocket: WebSocket, client_id: int): - await websocket.accept() - self.active_connections[client_id] = websocket - - # 删除一个websocket连接(要先检查client_id是否存在) - def disconnect(self, client_id: int): - if client_id in self.active_connections: - del self.active_connections[client_id] - - # 向指定的websocket连接发送消息(要先检查client_id是否存在) - async def send_personal_message(self, message: str, client_id: int): - if client_id in self.active_connections: - await self.active_connections[client_id].send_text(message) - - # 向所有的websocket连接发送消息 - async def broadcast(self, message: str): - for client_id, ws in self.active_connections.items(): - await ws.send_text(message) diff --git a/ObjectManager.py b/ObjectManager.py new file mode 100644 index 0000000..54b01eb --- /dev/null +++ b/ObjectManager.py @@ -0,0 +1,86 @@ +import uuid +from fastapi import WebSocket +from pydantic import BaseModel +from datetime import datetime + +# 使用字典来存储websocket连接 +class ConnectionManager: + def __init__(self): + self.active_connections: map = {} + + # 加入一个新的websocket连接 + async def connect(self, websocket: WebSocket, client_id: int): + await websocket.accept() + self.active_connections[client_id] = websocket + + # 删除一个websocket连接(要先检查client_id是否存在) + def disconnect(self, client_id: int): + if client_id in self.active_connections: + del self.active_connections[client_id] + + # 向指定的websocket连接发送消息(要先检查client_id是否存在) + async def send_personal_message(self, message: str, client_id: int): + if client_id in self.active_connections: + await self.active_connections[client_id].send_text(message) + + # 向所有的websocket连接发送消息 + async def broadcast(self, message: str): + for client_id, ws in self.active_connections.items(): + await ws.send_text(message) + +# Task 基本模型 +class Task(BaseModel): + id: int + name: str + status: str + created_at: datetime + updated_at: datetime + + +# 使用字典来存储Task任务 +class TaskManager(object): + def __init__(self): + self.tasks = {} + # TOOD: 保持一个后台线程,定时检查任务的状态,并将任务的状态更新到数据中 + + def create(self, task: dict): + task["id"] = str(uuid.uuid4()) + self.tasks[task["id"]] = task + return task + + def delete(self, task_id: str): + del self.tasks[task_id] + + def update(self, task_id, task: dict): + self.tasks[task_id] = task + + def query(self, task_id: str): + return self.tasks[task_id] + + def query_all(self): + return self.tasks + +# 使用字典来存储服务器池(使用腾讯云的API来管理服务器) +class ServerManager(object): + def __init__(self): + self.servers = {} + # 维护一个服务器池,每个服务器都有一个状态,状态有三种:空闲,运行中,异常 + # 如果服务器异常,需要将其从服务器池中删除 + # 如果服务器空闲,需要通知弹性伸缩服务将其删除 + + # 添加一个新的服务器(如何得知增加了新服务器? 主动发现? 被动发现? 使用腾讯云的API主动增加服务器) + def add(self, server_name, server_info): + self.servers[server_name] = server_info + + def delete(self, server_name): + del self.servers[server_name] + + def update(self, server_name, server_info): + self.servers[server_name] = server_info + + def query(self, server_name): + return self.servers[server_name] + + def query_all(self): + return self.servers + \ No newline at end of file diff --git a/TaskManager.py b/TaskManager.py deleted file mode 100644 index 77d2b6b..0000000 --- a/TaskManager.py +++ /dev/null @@ -1,25 +0,0 @@ -# 使用字典来存储Task任务 -# 任务的状态有:未开始、进行中、已完成 -# 任务的状态可以通过TaskManager来进行修改 -# 任务的状态可以通过TaskManager来进行查询 -# 任务的状态可以通过TaskManager来进行删除 -# 任务的状态可以通过TaskManager来进行添加 - -class TaskManager(object): - def __init__(self): - self.tasks = {} - - def add(self, task_name): - self.tasks[task_name] = 'not started' - - def delete(self, task_name): - del self.tasks[task_name] - - def update(self, task_name, status): - self.tasks[task_name] = status - - def query(self, task_name): - return self.tasks[task_name] - - def query_all(self): - return self.tasks diff --git a/main.py b/main.py index e98ada8..c8b4f03 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,8 @@ import uvicorn from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException from fastapi.responses import HTMLResponse -import ConnectionManager +from ObjectManager import ConnectionManager, TaskManager, ServerManager, Task +from typing import List ''' @@ -55,8 +56,8 @@ app = FastAPI() async def get(): return HTMLResponse(html) -manager = ConnectionManager.ConnectionManager() - +connection_manager = ConnectionManager() +task_manager = TaskManager() # 接收客户端的websocket连接 @app.websocket("/ws/{client_id}") @@ -67,16 +68,17 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int): # if token is None: # raise HTTPException(status_code=401, detail="Unauthorized") - await manager.connect(websocket=websocket, client_id=client_id) + await connection_manager.connect(websocket=websocket, client_id=client_id) try: while True: data = await websocket.receive_text() - await manager.send_personal_message(f"You wrote: {data}", client_id=client_id) - await manager.broadcast(f"Client #{client_id} says: {data}") + await connection_manager.send_personal_message(f"You wrote: {data}", client_id=client_id) + await connection_manager.broadcast(f"Client #{client_id} says: {data}") # TODO: 处理客户端的请求变化(理论上并没有) except WebSocketDisconnect: - manager.disconnect(websocket) - await manager.broadcast(f"Client #{client_id} left the chat") + connection_manager.disconnect(client_id=client_id) + await connection_manager.broadcast(f"Client #{client_id} left the chat") + # 通知所有的ws客户端 @app.post("/notify") @@ -84,5 +86,16 @@ async def notify(): pass +@app.get("/tasks", response_model=List[Task]) +async def get_tasks(): + return task_manager.query_all() + + +@app.post("/tasks", response_model=Task) +async def create_task(): + return task_manager.add({ + "name": "test", + }) + # 维护一个任务队列, 任务队列中的任务会被分发给worker节点 # 任务状态变化时通知对应的客户端 diff --git a/tencent.py b/tencent.py new file mode 100644 index 0000000..4bfbe1e --- /dev/null +++ b/tencent.py @@ -0,0 +1,96 @@ +import json +from tencentcloud.common import credential +from tencentcloud.common.profile.client_profile import ClientProfile +from tencentcloud.common.profile.http_profile import HttpProfile +from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException +from tencentcloud.cvm.v20170312 import cvm_client, models + +# 从腾讯云自动弹性伸缩 +''' +1. 通过腾讯云API创建一个服务器 +2. 通过腾讯云API获取服务器的IP地址 +3. 通过腾讯云API获取服务器的用户名和密码 +4. 通过腾讯云API获取服务器的端口号 +5. 通过腾讯云API获取服务器的状态 +6. 通过腾讯云API删除一个服务器 + +1. 如果服务器闲置数大于两台,需要通知腾讯云API删除一台闲置服务器 +2. 如果服务器负载全满,需要通知腾讯云API创建一台新的服务器 +3. 如果服务器负载不全满,需要通知腾讯云API删除一台闲置服务器 +4. 每天的早上9点到下午5点, 至少保留一台空闲服务器 +5. 每天的晚上5点到早上9点, 移除所有空闲服务器 +''' + +try: + # 实例化一个认证对象,入参需要传入腾讯云账户 SecretId 和 SecretKey,此处还需注意密钥对的保密 + # 代码泄露可能会导致 SecretId 和 SecretKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议采用更安全的方式来使用密钥,请参见:https://cloud.tencent.com/document/product/1278/85305 + # 密钥可前往官网控制台 https://console.cloud.tencent.com/cam/capi 进行获取 + cred = credential.Credential("AKIDgO0UrfO2b4mS6io7Yx9Yq1Pjz8AA4qUA", "jVBtV1SnprOviRK5teR7GC82thbTKqrv") + # 实例化一个http选项,可选的,没有特殊需求可以跳过 + httpProfile = HttpProfile() + httpProfile.endpoint = "cvm.ap-guangzhou.tencentcloudapi.com" + + # 实例化一个client选项,可选的,没有特殊需求可以跳过 + clientProfile = ClientProfile() + clientProfile.httpProfile = httpProfile + # 实例化要请求产品的client对象,clientProfile是可选的 + client = cvm_client.CvmClient(cred, "ap-guangzhou", clientProfile) + + # 实例化一个请求对象,每个接口都会对应一个request对象 + req = models.RunInstancesRequest() + params = { + "InstanceChargeType": "SPOTPAID", + "DisableApiTermination": False, + "Placement": { + "Zone": "ap-guangzhou-6", + "ProjectId": 0 + }, + "InstanceMarketOptions": { + "SpotOptions": { + "MaxPrice": "1000" + } + }, + "VirtualPrivateCloud": { + "AsVpcGateway": False, + "VpcId": "vpc-fyjby1gt", + "SubnetId": "subnet-oqdeakts", + "Ipv6AddressCount": 0 + }, + "InstanceType": "S6.MEDIUM2", + "ImageId": "img-eb30mz89", + "SystemDisk": { + "DiskSize": 50, + "DiskType": "CLOUD_BSSD" + }, + "InternetAccessible": { + "InternetMaxBandwidthOut": 100, + "PublicIpAssigned": True, + "InternetChargeType": "TRAFFIC_POSTPAID_BY_HOUR" + }, + "InstanceName": "未命名", + "LoginSettings": { + "KeyIds": [ "skey-672qeot7" ] + }, + "SecurityGroupIds": [ "sg-fkkd8qw9" ], + "InstanceCount": 1, + "EnhancedService": { + "SecurityService": { + "Enabled": True + }, + "MonitorService": { + "Enabled": True + }, + "AutomationService": { + "Enabled": True + } + } + } + req.from_json_string(json.dumps(params)) + + # 返回的resp是一个RunInstancesResponse的实例,与请求对象对应 + resp = client.RunInstances(req) + # 输出json格式的字符串回包 + print(resp.to_json_string()) + +except TencentCloudSDKException as err: + print(err)