From 64707229a5fac7650b19ff5154ae3ea5d4d9219c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A1=9C=E8=8F=AF?= Date: Sat, 15 Jul 2023 15:18:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=98=E5=8C=96=E4=BA=8B=E4=BB=B6=E7=9B=91?= =?UTF-8?q?=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 1 + routers/websocket.go | 175 +++++++++++++++++++++++++++++++++++++++++++ test_ws.sh | 27 +++++++ 3 files changed, 203 insertions(+) create mode 100644 routers/websocket.go create mode 100755 test_ws.sh diff --git a/main.go b/main.go index e90d9f3..4df5e47 100644 --- a/main.go +++ b/main.go @@ -89,6 +89,7 @@ func main() { r.HandleFunc("/api/params", routers.ParamsListGet).Methods("GET") r.HandleFunc("/api/params/model", routers.ParamsModelsGet).Methods("GET") r.HandleFunc("/api/account", routers.AccountGet).Methods("GET") + r.HandleFunc("/api/ws", routers.WebSocket).Methods("GET") r.HandleFunc("/img/{id}", routers.WebpGet).Methods("GET") diff --git a/routers/websocket.go b/routers/websocket.go new file mode 100644 index 0000000..f21fb73 --- /dev/null +++ b/routers/websocket.go @@ -0,0 +1,175 @@ +package routers + +import ( + "fmt" + "log" + "net/http" + "sync" + + "github.com/gorilla/websocket" +) + +// 用户 websocket 连接池, 用于向用户推送其关注的对象状态变化 +// 用户可以添加关注的对象, 当对象状态变化时, 会向用户推送消息 +// 用户可以取消关注的对象, 当用户没有关注时, 会从连接池中移除用户的连接 +// 用户关注分为两种情况: 1. 当前用户正在查看的对象; 2. 当前用户关注的对象(两个集合的并集, 才是此处的对象推送列表) +// 关注的对象变化事件为有记录的消息, 因而并不需要监听对象的变化, 只需要监听消息盒子的变化即可 + +type WebSocketManager struct { + connections map[*websocket.Conn]map[string]string // ws:object_name:object_id + objects map[string]map[string]*websocket.Conn // object_name:object_id:ws + mutex sync.RWMutex +} + +// 获取连接的全部关注对象 +func (mgr *WebSocketManager) GetConnectionObjects(conn *websocket.Conn) map[string]string { + mgr.mutex.RLock() + defer mgr.mutex.RUnlock() + return mgr.connections[conn] +} + +// 获取对象的全部连接 +func (mgr *WebSocketManager) GetObjectConnections(object_name string) map[string]*websocket.Conn { + mgr.mutex.RLock() + defer mgr.mutex.RUnlock() + return mgr.objects[object_name] +} + +// 添加一个新连接 +func (mgr *WebSocketManager) AddConnection(conn *websocket.Conn) { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + fmt.Println("添加一个新连接", conn.RemoteAddr().String()) + mgr.connections[conn] = make(map[string]string) +} + +// 添加一个新对象 +func (mgr *WebSocketManager) AddObject(object_name string, object_id string, conn *websocket.Conn) { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + // 添加对象 + object, ok := mgr.objects[object_name] + if !ok { + object = make(map[string]*websocket.Conn) + mgr.objects[object_name] = object + } + object[object_id] = conn + + // 添加连接 + mgr.connections[conn][object_name] = object_id +} + +// 移除一个连接 +func (mgr *WebSocketManager) RemoveConnection(conn *websocket.Conn) { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + // 移除连接时, 需要将此连接关注的对象从连接池中移除 + for object_name, object_id := range mgr.connections[conn] { + delete(mgr.objects[object_name], object_id) + } + + delete(mgr.connections, conn) +} + +// 移除一个对象 +func (mgr *WebSocketManager) RemoveObject(object_name string, object_id string) { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + defer fmt.Println("移除对象:", object_name, object_id) + + // 先获取监听此对象的所有连接 + object, ok := mgr.objects[object_name] + if !ok { + return + } + + // 向监听此对象的所有连接发送消息, 通知对象已被移除, 并且移除每个连接对此对象的监听 + for _, conn := range object { + conn.WriteJSON(map[string]interface{}{ + "object_name": object_name, + "object_id": object_id, + "action": "remove", + }) + delete(mgr.connections[conn], object_name) + } +} + +// 通知对象状态变化 +func (mgr *WebSocketManager) NotifyObjectChange(object_name string, object_id string, data interface{}) { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + // 先获取监听此对象的所有连接 + object, ok := mgr.objects[object_name] + if !ok { + return + } + + // 向监听此对象的所有连接发送消息 + for _, conn := range object { + conn.WriteJSON(data) + } +} + +// 创建一个新的连接池 +func NewWebSocketManager() *WebSocketManager { + return &WebSocketManager{ + connections: make(map[*websocket.Conn]map[string]string), + objects: make(map[string]map[string]*websocket.Conn), + mutex: sync.RWMutex{}, + } +} + +var WebSocketMgr = NewWebSocketManager() + +// 连接 websocket +func WebSocket(w http.ResponseWriter, r *http.Request) { + // 是否 websocket + if r.Header.Get("Upgrade") != "websocket" { + w.WriteHeader(http.StatusOK) + w.Write([]byte("只支持 websocket 连接")) + return + } + + // 升级连接 + conn, err := (&websocket.Upgrader{}).Upgrade(w, r, nil) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("升级连接失败")) + return + } + + // 连接建立时, 加入连接 + WebSocketMgr.AddConnection(conn) + + // 连接关闭时, 移除连接 + conn.SetCloseHandler(func(code int, text string) error { + log.Println("连接关闭:", conn.RemoteAddr().String()) + WebSocketMgr.RemoveConnection(conn) // 移除连接 + return nil + }) + + // TODO: 载入用户关注的对象 + + // 读取连接发送的消息 + for { + fmt.Println("读取连接发送的消息", conn.RemoteAddr().String()) + var data map[string]interface{} + err := conn.ReadJSON(&data) + if err != nil { + log.Println(err) + return + } + // 处理消息(添加/移除监听对象) + switch data["action"] { + case "add": + WebSocketMgr.AddObject(data["object_name"].(string), data["object_id"].(string), conn) + case "remove": + WebSocketMgr.RemoveObject(data["object_name"].(string), data["object_id"].(string)) + default: + log.Println("未知消息:", data) + } + } +} diff --git a/test_ws.sh b/test_ws.sh new file mode 100755 index 0000000..ef52886 --- /dev/null +++ b/test_ws.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +go run main.go -procname go_test & +sleep 6 + +# 退出服務的函數 +function exit_service() { pkill -f go_test && echo "退出服務: $1" && exit 1; } + +# 安装websocketd +# 参考:https://github.com/joewalnes/websocketd/wiki/Installation +# 下载地址:https://github.com/joewalnes/websocketd/releases + +# 启动 WebSocket 服务 +# websocketd --port=8080 echo + +# 连接 WebSocket 客户端 +wscat -c ws://localhost:8080/api/ws + +## 发送消息 +#echo "Hello, WebSocket!" | nc localhost 8080 +# +## 接收消息 +#nc -l localhost 8080 + +# 退出 +sleep 20 +exit_service "test_ws.sh"