This commit is contained in:
satori 2022-02-11 15:58:57 +08:00
parent dedb87fe38
commit f1263af85b
3 changed files with 74 additions and 37 deletions

View File

@ -1,37 +1,9 @@
import interrelated from './interrelated.js'
import level from 'level'
import fs from 'fs'
import path from 'path'
function getStat(path) {
return new Promise((resolve, reject) => {
fs.stat(path, (err, stats) => {
err ? resolve(false) : resolve(stats)
})
})
}
function mkdir(dir) {
return new Promise((resolve, reject) => {
fs.mkdir(dir, err => {
err ? resolve(false) : resolve(true)
})
})
}
async function dirExists(dir) {
let isExists = await getStat(dir)
if (isExists && isExists.isDirectory()) {
return true
} else if (isExists) {
return false
}
let status = await dirExists(path.parse(dir).dir)
return status ? await mkdir(dir) : null
}
import tools from './tools.js'
// 检查并创建文件夹
await dirExists('data/level')
await tools.dirExists('data/level')
// 初始化 leveldb
const db = level("./data/level/fmhub")
@ -47,10 +19,13 @@ const db = level("./data/level/fmhub")
// })
//})
export default class {
// 订阅记录, 每个频道可能被多次订阅因而产生大量查询
export default class fmhub {
constructor() {
this.用户订阅 = new interrelated()
this.用户会话 = new interrelated()
this.终端注视 = new interrelated()
}
订阅频道(fid, uid) {
@ -102,17 +77,17 @@ export default class {
}
}
发送消息(fm, uid, data) {
let msg = JSON.stringify({ fm, uid, data })
发送消息(频道, 来源UID, 数据) {
let msg = JSON.stringify({ fm: 频道, uid: 来源UID, data: 数据 })
// 订阅列表中
// A 是用户, 所以是 A 下 B 的集合
// B 是频道, 向频道下所有用户的会话发送消息, 所以是 B下A的集合用于查询会话列表
// 会话列表中:
// A 是用户, 所以是 A 下 B 的 集合
// B 是 WS, 向用户的每个 WS 发送消息
this.用户订阅.B中取A(fm, (uid) => {
this.用户订阅.B中取A(频道, (目标UID) => {
//console.log(`用户 ${uid} 订阅过此频道`, fm)
this.用户会话.A中取B(uid, (ws) => {
this.用户会话.A中取B(目标UID, (ws) => {
//console.log(`用户 ${uid} 的会话`)
ws.send(msg)
})
@ -135,4 +110,9 @@ export default class {
JSON.parse(value).forEach(item => this.用户订阅.关联数据(uid, item))
})
}
// FM 通道状态监听分为两种情况
// 1. 当前正在观看某一对象, 因此变更都推送(包括删除, 仅针对当前场景的会话)
// 2. 订阅此对象的变化, 触发关键变化时收到通知(不包括删除, 所有在线会话都收到推送)
}

View File

@ -54,12 +54,11 @@ function websocketer(ws, req) {
let uid = req.session?.account?.uid || "0"
console.log(`用户 ${uid} 连接了服务器`)
//FM.加载订阅记录(uid)
FM.增加会话(uid, ws)
ws.on('message', function (msg) {
if (typeof (msg) !== "string") return console.log("消息不是字符串")
let { fm, data } = JSON.parse(msg)
let { fm, data } = JSON.parse(msg) // 消息不是JSON
FM.发送消息(fm, uid, data)
})
ws.on('close', (code) => FM.移除会话(uid, ws))
@ -294,6 +293,31 @@ function object_patch(req, res, next) {
return db(req.params.name).update({ _id: req.params._id }, { $set: req.body }, function (err, count) {
if (!count) return res.status(500).send('修改失败')
// 对象发生了修改, 收集通知用户
// 执行通知所有关注者
// 构建消息内容
let data = { name: req.params.name, _id: req.params._id }
// 如何加入订阅和取消订阅? 如何判断自己是否已经订阅?
// 关注了此对象的用户们(如果存在)
if (Array.isArray(doc.fm)) {
doc.fm.forEach(uid => {
FM.发送消息("PATCH", req.session.account.uid, data)
// 应当是向每个用户发送消息, 而不是向整个频道发送消息
})
}
// 这个范围过大, 应当是关注此对象的, 而不是关注 PATCH 频道的, 因此 PATCH 是此对象消息的内容
// 但直接使用对象ID与其它对象重复, 还需要标记对象类型..
// 对象发生了修改, 收集通知终端(也许需要另建一个注视状态绑定)
let 注视着此对象的终端们 = new Map()
// 如果已经关注, 则排除对注视终端的重复通知
// 此处插入 hook
// 使用方法:
// kana.item(name || all).patch.

33
tools.js Normal file
View File

@ -0,0 +1,33 @@
import fs from 'fs'
import path from 'path'
function getStat(path) {
return new Promise((resolve, reject) => {
fs.stat(path, (err, stats) => {
err ? resolve(false) : resolve(stats)
})
})
}
function mkdir(dir) {
return new Promise((resolve, reject) => {
fs.mkdir(dir, err => {
err ? resolve(false) : resolve(true)
})
})
}
async function dirExists(dir) {
let isExists = await getStat(dir)
if (isExists && isExists.isDirectory()) {
return true
} else if (isExists) {
return false
}
let status = await dirExists(path.parse(dir).dir)
return status ? await mkdir(dir) : null
}
export default {
dirExists
}