package routers import ( "fmt" "log" "net/http" "sync" "github.com/gorilla/websocket" ) // 用户 websocket 连接池, 用于向用户推送其关注的对象状态变化 // 用户可以添加关注的对象, 当对象状态变化时, 会向用户推送消息 // 用户可以取消关注的对象, 当用户没有关注时, 会从连接池中移除用户的连接 // 用户关注分为两种情况: 1. 当前用户正在查看的对象; 2. 当前用户关注的对象(两个集合的并集, 才是此处的对象推送列表) // 关注的对象变化事件为有记录的消息, 因而并不需要监听对象的变化, 只需要监听消息盒子的变化即可 type WebSocketManager struct { connections map[*websocket.Conn]map[string]string // ws:object_name:object_id objects map[string]map[string]*websocket.Conn // object_name:object_id:ws mutex sync.RWMutex } // 获取连接的全部关注对象 func (mgr *WebSocketManager) GetConnectionObjects(conn *websocket.Conn) map[string]string { mgr.mutex.RLock() defer mgr.mutex.RUnlock() return mgr.connections[conn] } // 获取对象的全部连接 func (mgr *WebSocketManager) GetObjectConnections(object_name string) map[string]*websocket.Conn { mgr.mutex.RLock() defer mgr.mutex.RUnlock() return mgr.objects[object_name] } // 添加一个新连接 func (mgr *WebSocketManager) AddConnection(conn *websocket.Conn) { mgr.mutex.Lock() defer mgr.mutex.Unlock() fmt.Println("添加一个新连接", conn.RemoteAddr().String()) mgr.connections[conn] = make(map[string]string) } // 添加一个新对象 func (mgr *WebSocketManager) AddObject(object_name string, object_id string, conn *websocket.Conn) { mgr.mutex.Lock() defer mgr.mutex.Unlock() // 添加对象 object, ok := mgr.objects[object_name] if !ok { object = make(map[string]*websocket.Conn) mgr.objects[object_name] = object } object[object_id] = conn // 添加连接 mgr.connections[conn][object_name] = object_id } // 移除一个连接 func (mgr *WebSocketManager) RemoveConnection(conn *websocket.Conn) { mgr.mutex.Lock() defer mgr.mutex.Unlock() // 移除连接时, 需要将此连接关注的对象从连接池中移除 for object_name, object_id := range mgr.connections[conn] { delete(mgr.objects[object_name], object_id) } delete(mgr.connections, conn) } // 移除一个对象 func (mgr *WebSocketManager) RemoveObject(object_name string, object_id string) { mgr.mutex.Lock() defer mgr.mutex.Unlock() defer fmt.Println("移除对象:", object_name, object_id) // 先获取监听此对象的所有连接 object, ok := mgr.objects[object_name] if !ok { return } // 向监听此对象的所有连接发送消息, 通知对象已被移除, 并且移除每个连接对此对象的监听 for _, conn := range object { conn.WriteJSON(map[string]interface{}{ "object_name": object_name, "object_id": object_id, "action": "remove", }) delete(mgr.connections[conn], object_name) } } // 通知对象状态变化 func (mgr *WebSocketManager) NotifyObjectChange(object_name string, object_id string, data interface{}) { mgr.mutex.Lock() defer mgr.mutex.Unlock() // 先获取监听此对象的所有连接 object, ok := mgr.objects[object_name] if !ok { return } // 向监听此对象的所有连接发送消息 for _, conn := range object { conn.WriteJSON(data) } } // 创建一个新的连接池 func NewWebSocketManager() *WebSocketManager { return &WebSocketManager{ connections: make(map[*websocket.Conn]map[string]string), objects: make(map[string]map[string]*websocket.Conn), mutex: sync.RWMutex{}, } } var WebSocketMgr = NewWebSocketManager() // 连接 websocket func WebSocket(w http.ResponseWriter, r *http.Request) { // 是否 websocket if r.Header.Get("Upgrade") != "websocket" { w.WriteHeader(http.StatusOK) w.Write([]byte("只支持 websocket 连接")) return } // 升级连接 conn, err := (&websocket.Upgrader{}).Upgrade(w, r, nil) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("升级连接失败")) return } // 连接建立时, 加入连接 WebSocketMgr.AddConnection(conn) // 连接关闭时, 移除连接 conn.SetCloseHandler(func(code int, text string) error { log.Println("连接关闭:", conn.RemoteAddr().String()) WebSocketMgr.RemoveConnection(conn) // 移除连接 return nil }) // TODO: 载入用户关注的对象 // 读取连接发送的消息 for { fmt.Println("读取连接发送的消息", conn.RemoteAddr().String()) var data map[string]interface{} err := conn.ReadJSON(&data) if err != nil { log.Println(err) return } // 处理消息(添加/移除监听对象) switch data["action"] { case "add": WebSocketMgr.AddObject(data["object_name"].(string), data["object_id"].(string), conn) case "remove": WebSocketMgr.RemoveObject(data["object_name"].(string), data["object_id"].(string)) default: log.Println("未知消息:", data) } } }