From 9340c99755c7246faac4a1cc584292967c46ea82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=A7=89?= Date: Fri, 6 Oct 2023 13:52:09 +0800 Subject: [PATCH] entanglement --- index.js | 45 +++++++++++ public/client.js | 2 - public/entanglement.js | 165 +++++++++++++++++++++++++++++++++++++++++ public/music.js | 4 +- public/test.html | 28 +++++++ 5 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 public/entanglement.js create mode 100644 public/test.html diff --git a/index.js b/index.js index f6c37fa..dba0f1e 100644 --- a/index.js +++ b/index.js @@ -70,6 +70,51 @@ app.ws('/webrtc/:channel', (ws, req) => { }) }) +// Entanglement +app.ws('/entanglement', (ws, req) => { + ws.id = req.headers['sec-websocket-key'] + ws.channel = req.params.channel + ws.name = req.query.name + console.log('ws.name:', ws.name) + // 设备离开频道时广播给所有在线设备 + ws.on('close', () => { + console.log(ws.id, '设备离开频道:', ws.channel, wsInstance.getWss().clients.size) + wsInstance.getWss().clients.forEach(client => { + if (client !== ws && client.readyState === 1 && client.channel === ws.channel) { + client.send(JSON.stringify({ type: 'pull', id: ws.id, channel: ws.channel })) + } + }) + }) + // 设备发生错误时广播给所有在线设备 + ws.on('error', () => { + console.log(ws.id, '设备发生错误:', ws.channel, wsInstance.getWss().clients.size) + wsInstance.getWss().clients.forEach(client => { + if (client !== ws && client.readyState === 1 && client.channel === ws.channel) { + client.send(JSON.stringify({ type: 'error', id: ws.id, channel: ws.channel })) + } + }) + }) + // 设备发送信令时转发给指定在线设备 + ws.on('message', message => { + console.log(ws.id, '设备发送信令:', ws.channel, wsInstance.getWss().clients.size) + const data = JSON.parse(message) + wsInstance.getWss().clients.forEach(client => { + if (client !== ws && client.readyState === 1 && client.channel === ws.channel && client.id === data.id) { + client.send(JSON.stringify({ ...data, id: ws.id, name: ws.name })) + } + }) + }) + // 设备加入频道时广播给所有在线设备(也获取所有在线设备) + console.log(ws.id, '设备加入频道:', ws.channel, wsInstance.getWss().clients.size) + wsInstance.getWss().clients.forEach(client => { + if (client !== ws && client.readyState === 1 && client.channel === ws.channel) { + console.log(ws.name, '广播给在线设备:', client.name) + client.send(JSON.stringify({ type: 'push', id: ws.id, name: ws.name, channel: ws.channel })) + ws.send(JSON.stringify({ type: 'list', id: client.id, name: client.name, channel: client.channel })) + } + }) +}) + // WEBHOOK 处理 GitHub 事件 app.post('/webhook', (req, res) => { console.log('WEBHOOK:', req.body) diff --git a/public/client.js b/public/client.js index 54b9aee..2f7c91a 100644 --- a/public/client.js +++ b/public/client.js @@ -96,8 +96,6 @@ export default class ClientList { })) } } - webrtc.getReceivers - webrtc.getTransceivers webrtc.oniceconnectionstatechange = async event => { if (webrtc.iceConnectionState === 'disconnected' || webrtc.iceConnectionState === 'failed') { console.error(data.name, '需要添加新的 candidate') diff --git a/public/entanglement.js b/public/entanglement.js new file mode 100644 index 0000000..03368e4 --- /dev/null +++ b/public/entanglement.js @@ -0,0 +1,165 @@ +export default class Entanglement { + constructor({ options, server }) { + this.event = {} + this.options = options + this.server = server + this.channels = {} + this.client = {} + + this.store = {} + this.users = [] + this.account = {} + this.ws = this.__create_websocket() + } + // 当创建一个对象时,会调用这个方法 + // 当创建一个对象时,所有连接在一起的终端都会同步这个对象 + // 并不是所有的终端都会监听全局变化, 因此需要在这里注册监听(频道) + + async __create_websocket() { + const host = window.location.host + const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws' + const ws = new WebSocket(`${protocol}://${host}/entanglement`) + ws.onopen = () => console.log('websocket 连接成功') + ws.onclose = async () => { + console.log('websocket 连接关闭, 3s后尝试重新连接...') + await new Promise(resolve => setTimeout(resolve, 3000)) + this.ws = this.__create_websocket() + } + ws.onerror = async () => { + console.log('websocket 连接错误, 3s后尝试重新连接...') + await new Promise(resolve => setTimeout(resolve, 3000)) + this.ws = this.__create_websocket() + } + ws.onmessage = async event => { + const data = JSON.parse(event.data) + //console.log('收到消息', data) + if (data.type === 'list') { + console.debug('收到已在线对端列表', data.list) + this.users = Promise.all(data.list.map(async user => { + console.debug('发送给', user.name, 'offer') + const pc = this.__create_webrtc() + const offer = await pc.createOffer() + pc.setLocalDescription(offer) + this.ws.send(JSON.stringify({ type: 'offer', user, offer })) + return { ...user, webrtc: pc } + })) + return + } + if (data.type === 'push') { + console.debug(data.user.name, '上线', data) + return + } + if (data.type === 'pull') { + console.debug(data.user.name, '下线', data) + return + } + if (data.type === 'offer') { + console.debug(data.user.name, '发来 offer') + const pc = this.__create_webrtc() + pc.setRemoteDescription(data.offer) + const answer = await pc.createAnswer() + await pc.setLocalDescription(answer) + this.ws.send(JSON.stringify({ type: 'answer', user: data.user, answer })) + this.users.push({ ...data.user, webrtc: pc }) + return + } + if (data.type === 'answer') { + console.debug(data.user.name, '发来 answer') + const pc = this.users.find(user => user.id === data.user.id).webrtc + await pc.setRemoteDescription(data.answer) + return + } + if (data.type === 'icecandidate') { + console.debug(data.user.name, '发来 icecandidate 候选通道') + const pc = this.users.find(user => user.id === data.user.id).webrtc + await pc.addIceCandidate(data.icecandidate) + return + } + console.error('收到未知数据:', data) + } + return ws + } + + __create_webrtc() { + const pc = new RTCPeerConnection(this.options) + pc.onicecandidate = (event) => { + if (event.candidate) { + this.ws.send(JSON.stringify({ type: 'icecandidate', user: this.account, icecandidate: event.candidate })) + } + } + pc.oniceconnectionstatechange = (event) => { + if (webrtc.iceConnectionState === 'disconnected' || webrtc.iceConnectionState === 'failed') { + console.error(data.name, '需要添加新的 candidate') + } else if (webrtc.iceConnectionState === 'connected' || webrtc.iceConnectionState === 'completed') { + console.debug(data.name, 'WebRTC 连接已经建立成功') + } + } + pc.onnegotiationneeded = (event) => { + console.log('onnegotiationneeded', event) + } + pc.ontrack = (event) => { + console.log('ontrack', event) + } + pc.ondatachannel = event => { + console.log(data.user.name, '建立', event.channel.label, '通道') + event.channel.onmessage = event => { + console.log(data.user.name, '收到', event.channel.label, '通道消息', event.data) + } + event.channel.onopen = () => { + console.log(data.user.name, '打开', event.channel.label, '通道') + event.channel.send('hello') + } + event.channel.onclose = () => { + console.log(data.user.name, '关闭', event.channel.label, '通道') + } + event.channel.onerror = () => { + console.log(data.user.name, '通道', event.channel.label, '发生错误') + } + } + return pc + } + + // 向所有在线的用户广播消息(webrtc) + send_all(channel_name, data) { + console.log('向', channel_name, '频道广播消息:', data) + this.users.filter(user => { + const status = user.webrtc.iceConnectionState + return status === 'connected' || status === 'completed' + }).forEach(user => { + console.log('向', user.name, '发送', channel_name, '频道消息:', data) + const channel = user.webrtc.createDataChannel(channel_name) + channel.onopen = () => channel.send(JSON.stringify(data)) + }) + } + + // 数据被修改时触发 + set(name, data) { + // 递归创建代理对象 + const createDeepProxy = (obj, path = []) => { + const proxy = new Proxy(obj, { + set: (target, key, value) => { + if (!Array.isArray(target) || key !== 'length') { + console.log('对象被修改', [...path, key], value) + this.send_all(name, this.store[name]) // 向所有在线的用户广播消息 + } + return Reflect.set(target, key, value) + }, + get: (target, key) => { + return target[key] + } + }) + Object.keys(obj).forEach(key => { + if (typeof obj[key] === 'object') { + obj[key] = createDeepProxy(obj[key], [...path, key]) + } + }) + return proxy + } + return Reflect.set(this.store, name, createDeepProxy(data)) + } + + // 读取一个通道 + get(name) { + return this.store[name] + } +} diff --git a/public/music.js b/public/music.js index 8c60fcb..f4199d0 100644 --- a/public/music.js +++ b/public/music.js @@ -95,7 +95,7 @@ export default class MusicList { Span({ textContent: `${item.name} - ${bytesToSize(item.size)}`, onclick: event => { - event.stopPropagation() + event.stopPropagation() // !如果使用async则此处不能阻止冒泡传递 const li = event.target.parentElement // ListItem const ul = li.parentElement // List const list = Array.from(ul.children) // ListItems @@ -112,7 +112,7 @@ export default class MusicList { Button({ textContent: item.save ? '移除' : '缓存', onclick: event => { - event.stopPropagation() + event.stopPropagation() // !如果使用async则此处不能阻止冒泡传递 if (item.save) { event.target.textContent = '缓存' this.ul.querySelector(`#${item.id}`).classList.remove('cache') diff --git a/public/test.html b/public/test.html new file mode 100644 index 0000000..75fed4e --- /dev/null +++ b/public/test.html @@ -0,0 +1,28 @@ + + + + + + webRTC + + + +
+

Entanglement

+

同步

+
+ + + +