diff --git a/ConnectionManager.py b/ConnectionManager.py new file mode 100644 index 0000000..4b8910e --- /dev/null +++ b/ConnectionManager.py @@ -0,0 +1,26 @@ +from fastapi import WebSocket + +# 使用字典来存储websocket连接 +class ConnectionManager: + def __init__(self): + self.active_connections: map = {} + + # 加入一个新的websocket连接 + async def connect(self, websocket: WebSocket, client_id: int): + await websocket.accept() + self.active_connections[client_id] = websocket + + # 删除一个websocket连接(要先检查client_id是否存在) + def disconnect(self, client_id: int): + if client_id in self.active_connections: + del self.active_connections[client_id] + + # 向指定的websocket连接发送消息(要先检查client_id是否存在) + async def send_personal_message(self, message: str, client_id: int): + if client_id in self.active_connections: + await self.active_connections[client_id].send_text(message) + + # 向所有的websocket连接发送消息 + async def broadcast(self, message: str): + for client_id, ws in self.active_connections.items(): + await ws.send_text(message) diff --git a/TaskManager.py b/TaskManager.py new file mode 100644 index 0000000..77d2b6b --- /dev/null +++ b/TaskManager.py @@ -0,0 +1,25 @@ +# 使用字典来存储Task任务 +# 任务的状态有:未开始、进行中、已完成 +# 任务的状态可以通过TaskManager来进行修改 +# 任务的状态可以通过TaskManager来进行查询 +# 任务的状态可以通过TaskManager来进行删除 +# 任务的状态可以通过TaskManager来进行添加 + +class TaskManager(object): + def __init__(self): + self.tasks = {} + + def add(self, task_name): + self.tasks[task_name] = 'not started' + + def delete(self, task_name): + del self.tasks[task_name] + + def update(self, task_name, status): + self.tasks[task_name] = status + + def query(self, task_name): + return self.tasks[task_name] + + def query_all(self): + return self.tasks diff --git a/main.py b/main.py new file mode 100644 index 0000000..e98ada8 --- /dev/null +++ b/main.py @@ -0,0 +1,88 @@ +import uvicorn +from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException +from fastapi.responses import HTMLResponse +import ConnectionManager + + +''' +1. 创建一个任务, 则分发任务给worker节点 +2. 客户端建立一个websocket连接, 提供所有的任务的状态更新 +''' + + +html = """ + + + + Chat + + +

WebSocket Chat

+

Your ID:

+
+ + +
+ + + + +""" + + +app = FastAPI() + + +@app.get("/") +async def get(): + return HTMLResponse(html) + +manager = ConnectionManager.ConnectionManager() + + +# 接收客户端的websocket连接 +@app.websocket("/ws/{client_id}") +async def websocket_endpoint(websocket: WebSocket, client_id: int): + # TODO: 验证客户端的身份(使用TOKEN) + # 获取TOKEN (从数据库中获取用户的信息) + # token = request.headers.get('Authorization') + # if token is None: + # raise HTTPException(status_code=401, detail="Unauthorized") + + await manager.connect(websocket=websocket, client_id=client_id) + try: + while True: + data = await websocket.receive_text() + await manager.send_personal_message(f"You wrote: {data}", client_id=client_id) + await manager.broadcast(f"Client #{client_id} says: {data}") + # TODO: 处理客户端的请求变化(理论上并没有) + except WebSocketDisconnect: + manager.disconnect(websocket) + await manager.broadcast(f"Client #{client_id} left the chat") + +# 通知所有的ws客户端 +@app.post("/notify") +async def notify(): + pass + + +# 维护一个任务队列, 任务队列中的任务会被分发给worker节点 +# 任务状态变化时通知对应的客户端 diff --git a/nuxt.config.ts b/nuxt.config.ts index eca0a10..66c6290 100644 --- a/nuxt.config.ts +++ b/nuxt.config.ts @@ -13,17 +13,17 @@ export default defineNuxtConfig({ } } }, - nitro: { - devProxy: { - '/api/img': { - target: 'http://106.15.192.42:3000/api/img' - }, - '/api/drawing': { - target: 'http://106.15.192.42:3000/api/drawing' - }, - '/api/text': { - target: 'http://139.224.128.24:7861/api/text_to_text' - }, - }, - } + //nitro: { + // devProxy: { + // '/api/img': { + // target: 'http://106.15.192.42:3000/api/img' + // }, + // '/api/drawing': { + // target: 'http://106.15.192.42:3000/api/drawing' + // }, + // '/api/text': { + // target: 'http://139.224.128.24:7861/api/text_to_text' + // }, + // }, + //} }) diff --git a/package-lock.json b/package-lock.json index 4c31eed..3cadaac 100644 --- a/package-lock.json +++ b/package-lock.json @@ -814,8 +814,7 @@ "@types/node": { "version": "18.11.17", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.11.17.tgz", - "integrity": "sha512-HJSUJmni4BeDHhfzn6nF0sVmd1SMezP7/4F0Lq+aXzmp2xm9O7WXrUtHW/CHlYVtZUbByEvWidHqRtcJXGF2Ng==", - "dev": true + "integrity": "sha512-HJSUJmni4BeDHhfzn6nF0sVmd1SMezP7/4F0Lq+aXzmp2xm9O7WXrUtHW/CHlYVtZUbByEvWidHqRtcJXGF2Ng==" }, "@types/resolve": { "version": "1.20.2", @@ -823,6 +822,14 @@ "integrity": "sha512-60BCwRFOZCQhDncwQdxxeOEEkbc5dIMccYLwbxsS4TUNeVECQ/pBJ0j09mrHOl/JJvpRPGwO9SvE4nR2Nb/a4Q==", "dev": true }, + "@types/ws": { + "version": "8.5.4", + "resolved": "https://registry.npmmirror.com/@types/ws/-/ws-8.5.4.tgz", + "integrity": "sha512-zdQDHKUgcX/zBc4GrwsE/7dVdAD8JR4EuiAXiiUhhfyIJXXb2+PrGshFyeXWQPMmmZ2XxgaqclgpIC7eTXc1mg==", + "requires": { + "@types/node": "*" + } + }, "@unhead/dom": { "version": "1.0.13", "resolved": "https://registry.npmjs.org/@unhead/dom/-/dom-1.0.13.tgz", @@ -6374,10 +6381,9 @@ "dev": true }, "ws": { - "version": "8.11.0", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz", - "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", - "dev": true + "version": "8.12.1", + "resolved": "https://registry.npmmirror.com/ws/-/ws-8.12.1.tgz", + "integrity": "sha512-1qo+M9Ba+xNhPB+YTWUlK6M17brTut5EXbcBaMRN5pH5dFrXz7lzz1ChFSUq3bOUl8yEvSenhHmYUNJxFzdJew==" }, "xxhashjs": { "version": "0.2.2", diff --git a/package.json b/package.json index 46fc99e..1e5350e 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,8 @@ "author": "satori.love", "license": "MIT", "dependencies": { - "axios": "^1.3.4" + "@types/ws": "^8.5.4", + "axios": "^1.3.4", + "ws": "^8.12.1" } } diff --git a/pages/index.vue b/pages/index.vue index 482db2e..6e3b2e1 100644 --- a/pages/index.vue +++ b/pages/index.vue @@ -328,6 +328,31 @@ const TaskSubmit = async () => { }) } +// 如果列表里有待执行的任务, 使用WS连接检查任务状态 +const TaskManagement = () => { + console.log('构建WS连接') + let ws = new WebSocket('ws://'+window.location.host+'/api/sendmessage') + ws.onopen = () => { + console.log('ws open') + ws.send('hello') + } + ws.onmessage = (e) => { + console.log('ws message') + let data = JSON.parse(e.data) + console.log(data.id, data.status, data.progress) + } + ws.onerror = () => { + console.log('ws error') + } + ws.onclose = () => { + console.log('ws close') + } +} + +onMounted(() => { + TaskManagement() +}) +