176 lines
4.9 KiB
Go
176 lines
4.9 KiB
Go
package routers
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
// 用户 websocket 连接池, 用于向用户推送其关注的对象状态变化
|
|
// 用户可以添加关注的对象, 当对象状态变化时, 会向用户推送消息
|
|
// 用户可以取消关注的对象, 当用户没有关注时, 会从连接池中移除用户的连接
|
|
// 用户关注分为两种情况: 1. 当前用户正在查看的对象; 2. 当前用户关注的对象(两个集合的并集, 才是此处的对象推送列表)
|
|
// 关注的对象变化事件为有记录的消息, 因而并不需要监听对象的变化, 只需要监听消息盒子的变化即可
|
|
|
|
type WebSocketManager struct {
|
|
connections map[*websocket.Conn]map[string]string // ws:object_name:object_id
|
|
objects map[string]map[string]*websocket.Conn // object_name:object_id:ws
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// 获取连接的全部关注对象
|
|
func (mgr *WebSocketManager) GetConnectionObjects(conn *websocket.Conn) map[string]string {
|
|
mgr.mutex.RLock()
|
|
defer mgr.mutex.RUnlock()
|
|
return mgr.connections[conn]
|
|
}
|
|
|
|
// 获取对象的全部连接
|
|
func (mgr *WebSocketManager) GetObjectConnections(object_name string) map[string]*websocket.Conn {
|
|
mgr.mutex.RLock()
|
|
defer mgr.mutex.RUnlock()
|
|
return mgr.objects[object_name]
|
|
}
|
|
|
|
// 添加一个新连接
|
|
func (mgr *WebSocketManager) AddConnection(conn *websocket.Conn) {
|
|
mgr.mutex.Lock()
|
|
defer mgr.mutex.Unlock()
|
|
fmt.Println("添加一个新连接", conn.RemoteAddr().String())
|
|
mgr.connections[conn] = make(map[string]string)
|
|
}
|
|
|
|
// 添加一个新对象
|
|
func (mgr *WebSocketManager) AddObject(object_name string, object_id string, conn *websocket.Conn) {
|
|
mgr.mutex.Lock()
|
|
defer mgr.mutex.Unlock()
|
|
|
|
// 添加对象
|
|
object, ok := mgr.objects[object_name]
|
|
if !ok {
|
|
object = make(map[string]*websocket.Conn)
|
|
mgr.objects[object_name] = object
|
|
}
|
|
object[object_id] = conn
|
|
|
|
// 添加连接
|
|
mgr.connections[conn][object_name] = object_id
|
|
}
|
|
|
|
// 移除一个连接
|
|
func (mgr *WebSocketManager) RemoveConnection(conn *websocket.Conn) {
|
|
mgr.mutex.Lock()
|
|
defer mgr.mutex.Unlock()
|
|
|
|
// 移除连接时, 需要将此连接关注的对象从连接池中移除
|
|
for object_name, object_id := range mgr.connections[conn] {
|
|
delete(mgr.objects[object_name], object_id)
|
|
}
|
|
|
|
delete(mgr.connections, conn)
|
|
}
|
|
|
|
// 移除一个对象
|
|
func (mgr *WebSocketManager) RemoveObject(object_name string, object_id string) {
|
|
mgr.mutex.Lock()
|
|
defer mgr.mutex.Unlock()
|
|
defer fmt.Println("移除对象:", object_name, object_id)
|
|
|
|
// 先获取监听此对象的所有连接
|
|
object, ok := mgr.objects[object_name]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// 向监听此对象的所有连接发送消息, 通知对象已被移除, 并且移除每个连接对此对象的监听
|
|
for _, conn := range object {
|
|
conn.WriteJSON(map[string]interface{}{
|
|
"object_name": object_name,
|
|
"object_id": object_id,
|
|
"action": "remove",
|
|
})
|
|
delete(mgr.connections[conn], object_name)
|
|
}
|
|
}
|
|
|
|
// 通知对象状态变化
|
|
func (mgr *WebSocketManager) NotifyObjectChange(object_name string, object_id string, data interface{}) {
|
|
mgr.mutex.Lock()
|
|
defer mgr.mutex.Unlock()
|
|
|
|
// 先获取监听此对象的所有连接
|
|
object, ok := mgr.objects[object_name]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// 向监听此对象的所有连接发送消息
|
|
for _, conn := range object {
|
|
conn.WriteJSON(data)
|
|
}
|
|
}
|
|
|
|
// 创建一个新的连接池
|
|
func NewWebSocketManager() *WebSocketManager {
|
|
return &WebSocketManager{
|
|
connections: make(map[*websocket.Conn]map[string]string),
|
|
objects: make(map[string]map[string]*websocket.Conn),
|
|
mutex: sync.RWMutex{},
|
|
}
|
|
}
|
|
|
|
var WebSocketMgr = NewWebSocketManager()
|
|
|
|
// 连接 websocket
|
|
func WebSocket(w http.ResponseWriter, r *http.Request) {
|
|
// 是否 websocket
|
|
if r.Header.Get("Upgrade") != "websocket" {
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("只支持 websocket 连接"))
|
|
return
|
|
}
|
|
|
|
// 升级连接
|
|
conn, err := (&websocket.Upgrader{}).Upgrade(w, r, nil)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte("升级连接失败"))
|
|
return
|
|
}
|
|
|
|
// 连接建立时, 加入连接
|
|
WebSocketMgr.AddConnection(conn)
|
|
|
|
// 连接关闭时, 移除连接
|
|
conn.SetCloseHandler(func(code int, text string) error {
|
|
log.Println("连接关闭:", conn.RemoteAddr().String())
|
|
WebSocketMgr.RemoveConnection(conn) // 移除连接
|
|
return nil
|
|
})
|
|
|
|
// TODO: 载入用户关注的对象
|
|
|
|
// 读取连接发送的消息
|
|
for {
|
|
fmt.Println("读取连接发送的消息", conn.RemoteAddr().String())
|
|
var data map[string]interface{}
|
|
err := conn.ReadJSON(&data)
|
|
if err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
// 处理消息(添加/移除监听对象)
|
|
switch data["action"] {
|
|
case "add":
|
|
WebSocketMgr.AddObject(data["object_name"].(string), data["object_id"].(string), conn)
|
|
case "remove":
|
|
WebSocketMgr.RemoveObject(data["object_name"].(string), data["object_id"].(string))
|
|
default:
|
|
log.Println("未知消息:", data)
|
|
}
|
|
}
|
|
}
|