This commit is contained in:
2023-03-11 09:48:55 +08:00
parent 0428a1b155
commit 80fb9a4093
5 changed files with 203 additions and 59 deletions

View File

@@ -1,26 +0,0 @@
from fastapi import WebSocket
# 使用字典来存储websocket连接
class ConnectionManager:
def __init__(self):
self.active_connections: map = {}
# 加入一个新的websocket连接
async def connect(self, websocket: WebSocket, client_id: int):
await websocket.accept()
self.active_connections[client_id] = websocket
# 删除一个websocket连接(要先检查client_id是否存在)
def disconnect(self, client_id: int):
if client_id in self.active_connections:
del self.active_connections[client_id]
# 向指定的websocket连接发送消息(要先检查client_id是否存在)
async def send_personal_message(self, message: str, client_id: int):
if client_id in self.active_connections:
await self.active_connections[client_id].send_text(message)
# 向所有的websocket连接发送消息
async def broadcast(self, message: str):
for client_id, ws in self.active_connections.items():
await ws.send_text(message)

86
ObjectManager.py Normal file
View File

@@ -0,0 +1,86 @@
import uuid
from fastapi import WebSocket
from pydantic import BaseModel
from datetime import datetime
# 使用字典来存储websocket连接
class ConnectionManager:
def __init__(self):
self.active_connections: map = {}
# 加入一个新的websocket连接
async def connect(self, websocket: WebSocket, client_id: int):
await websocket.accept()
self.active_connections[client_id] = websocket
# 删除一个websocket连接(要先检查client_id是否存在)
def disconnect(self, client_id: int):
if client_id in self.active_connections:
del self.active_connections[client_id]
# 向指定的websocket连接发送消息(要先检查client_id是否存在)
async def send_personal_message(self, message: str, client_id: int):
if client_id in self.active_connections:
await self.active_connections[client_id].send_text(message)
# 向所有的websocket连接发送消息
async def broadcast(self, message: str):
for client_id, ws in self.active_connections.items():
await ws.send_text(message)
# Task 基本模型
class Task(BaseModel):
id: int
name: str
status: str
created_at: datetime
updated_at: datetime
# 使用字典来存储Task任务
class TaskManager(object):
def __init__(self):
self.tasks = {}
# TOOD: 保持一个后台线程,定时检查任务的状态,并将任务的状态更新到数据中
def create(self, task: dict):
task["id"] = str(uuid.uuid4())
self.tasks[task["id"]] = task
return task
def delete(self, task_id: str):
del self.tasks[task_id]
def update(self, task_id, task: dict):
self.tasks[task_id] = task
def query(self, task_id: str):
return self.tasks[task_id]
def query_all(self):
return self.tasks
# 使用字典来存储服务器池(使用腾讯云的API来管理服务器)
class ServerManager(object):
def __init__(self):
self.servers = {}
# 维护一个服务器池,每个服务器都有一个状态,状态有三种:空闲,运行中,异常
# 如果服务器异常,需要将其从服务器池中删除
# 如果服务器空闲,需要通知弹性伸缩服务将其删除
# 添加一个新的服务器(如何得知增加了新服务器? 主动发现? 被动发现? 使用腾讯云的API主动增加服务器)
def add(self, server_name, server_info):
self.servers[server_name] = server_info
def delete(self, server_name):
del self.servers[server_name]
def update(self, server_name, server_info):
self.servers[server_name] = server_info
def query(self, server_name):
return self.servers[server_name]
def query_all(self):
return self.servers

View File

@@ -1,25 +0,0 @@
# 使用字典来存储Task任务
# 任务的状态有:未开始、进行中、已完成
# 任务的状态可以通过TaskManager来进行修改
# 任务的状态可以通过TaskManager来进行查询
# 任务的状态可以通过TaskManager来进行删除
# 任务的状态可以通过TaskManager来进行添加
class TaskManager(object):
def __init__(self):
self.tasks = {}
def add(self, task_name):
self.tasks[task_name] = 'not started'
def delete(self, task_name):
del self.tasks[task_name]
def update(self, task_name, status):
self.tasks[task_name] = status
def query(self, task_name):
return self.tasks[task_name]
def query_all(self):
return self.tasks

29
main.py
View File

@@ -1,7 +1,8 @@
import uvicorn import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
import ConnectionManager from ObjectManager import ConnectionManager, TaskManager, ServerManager, Task
from typing import List
''' '''
@@ -55,8 +56,8 @@ app = FastAPI()
async def get(): async def get():
return HTMLResponse(html) return HTMLResponse(html)
manager = ConnectionManager.ConnectionManager() connection_manager = ConnectionManager()
task_manager = TaskManager()
# 接收客户端的websocket连接 # 接收客户端的websocket连接
@app.websocket("/ws/{client_id}") @app.websocket("/ws/{client_id}")
@@ -67,16 +68,17 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
# if token is None: # if token is None:
# raise HTTPException(status_code=401, detail="Unauthorized") # raise HTTPException(status_code=401, detail="Unauthorized")
await manager.connect(websocket=websocket, client_id=client_id) await connection_manager.connect(websocket=websocket, client_id=client_id)
try: try:
while True: while True:
data = await websocket.receive_text() data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", client_id=client_id) await connection_manager.send_personal_message(f"You wrote: {data}", client_id=client_id)
await manager.broadcast(f"Client #{client_id} says: {data}") await connection_manager.broadcast(f"Client #{client_id} says: {data}")
# TODO: 处理客户端的请求变化(理论上并没有) # TODO: 处理客户端的请求变化(理论上并没有)
except WebSocketDisconnect: except WebSocketDisconnect:
manager.disconnect(websocket) connection_manager.disconnect(client_id=client_id)
await manager.broadcast(f"Client #{client_id} left the chat") await connection_manager.broadcast(f"Client #{client_id} left the chat")
# 通知所有的ws客户端 # 通知所有的ws客户端
@app.post("/notify") @app.post("/notify")
@@ -84,5 +86,16 @@ async def notify():
pass pass
@app.get("/tasks", response_model=List[Task])
async def get_tasks():
return task_manager.query_all()
@app.post("/tasks", response_model=Task)
async def create_task():
return task_manager.add({
"name": "test",
})
# 维护一个任务队列, 任务队列中的任务会被分发给worker节点 # 维护一个任务队列, 任务队列中的任务会被分发给worker节点
# 任务状态变化时通知对应的客户端 # 任务状态变化时通知对应的客户端

96
tencent.py Normal file
View File

@@ -0,0 +1,96 @@
import json
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.cvm.v20170312 import cvm_client, models
# 从腾讯云自动弹性伸缩
'''
1. 通过腾讯云API创建一个服务器
2. 通过腾讯云API获取服务器的IP地址
3. 通过腾讯云API获取服务器的用户名和密码
4. 通过腾讯云API获取服务器的端口号
5. 通过腾讯云API获取服务器的状态
6. 通过腾讯云API删除一个服务器
1. 如果服务器闲置数大于两台需要通知腾讯云API删除一台闲置服务器
2. 如果服务器负载全满需要通知腾讯云API创建一台新的服务器
3. 如果服务器负载不全满需要通知腾讯云API删除一台闲置服务器
4. 每天的早上9点到下午5点, 至少保留一台空闲服务器
5. 每天的晚上5点到早上9点, 移除所有空闲服务器
'''
try:
# 实例化一个认证对象,入参需要传入腾讯云账户 SecretId 和 SecretKey此处还需注意密钥对的保密
# 代码泄露可能会导致 SecretId 和 SecretKey 泄露并威胁账号下所有资源的安全性。以下代码示例仅供参考建议采用更安全的方式来使用密钥请参见https://cloud.tencent.com/document/product/1278/85305
# 密钥可前往官网控制台 https://console.cloud.tencent.com/cam/capi 进行获取
cred = credential.Credential("AKIDgO0UrfO2b4mS6io7Yx9Yq1Pjz8AA4qUA", "jVBtV1SnprOviRK5teR7GC82thbTKqrv")
# 实例化一个http选项可选的没有特殊需求可以跳过
httpProfile = HttpProfile()
httpProfile.endpoint = "cvm.ap-guangzhou.tencentcloudapi.com"
# 实例化一个client选项可选的没有特殊需求可以跳过
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
# 实例化要请求产品的client对象,clientProfile是可选的
client = cvm_client.CvmClient(cred, "ap-guangzhou", clientProfile)
# 实例化一个请求对象,每个接口都会对应一个request对象
req = models.RunInstancesRequest()
params = {
"InstanceChargeType": "SPOTPAID",
"DisableApiTermination": False,
"Placement": {
"Zone": "ap-guangzhou-6",
"ProjectId": 0
},
"InstanceMarketOptions": {
"SpotOptions": {
"MaxPrice": "1000"
}
},
"VirtualPrivateCloud": {
"AsVpcGateway": False,
"VpcId": "vpc-fyjby1gt",
"SubnetId": "subnet-oqdeakts",
"Ipv6AddressCount": 0
},
"InstanceType": "S6.MEDIUM2",
"ImageId": "img-eb30mz89",
"SystemDisk": {
"DiskSize": 50,
"DiskType": "CLOUD_BSSD"
},
"InternetAccessible": {
"InternetMaxBandwidthOut": 100,
"PublicIpAssigned": True,
"InternetChargeType": "TRAFFIC_POSTPAID_BY_HOUR"
},
"InstanceName": "未命名",
"LoginSettings": {
"KeyIds": [ "skey-672qeot7" ]
},
"SecurityGroupIds": [ "sg-fkkd8qw9" ],
"InstanceCount": 1,
"EnhancedService": {
"SecurityService": {
"Enabled": True
},
"MonitorService": {
"Enabled": True
},
"AutomationService": {
"Enabled": True
}
}
}
req.from_json_string(json.dumps(params))
# 返回的resp是一个RunInstancesResponse的实例与请求对象对应
resp = client.RunInstances(req)
# 输出json格式的字符串回包
print(resp.to_json_string())
except TencentCloudSDKException as err:
print(err)