193 lines
8.1 KiB
JavaScript
193 lines
8.1 KiB
JavaScript
export default class Entanglement {
|
|
constructor({ options }) {
|
|
this.event = {}
|
|
this.options = options
|
|
this.store = {}
|
|
this.users = []
|
|
this.channels = [{ name: 'json' }]
|
|
this.ws = this.__create_websocket()
|
|
}
|
|
|
|
async __create_webrtc(user) {
|
|
const pc = new RTCPeerConnection(this.options)
|
|
// 当有新的媒体流加入时触发
|
|
pc.onicecandidate = (event) => {
|
|
if (event.candidate) {
|
|
console.debug(user.name, '发出 candidate 候选通道')
|
|
this.ws.send(JSON.stringify({ type: 'candidate', user, candidate: event.candidate }))
|
|
}
|
|
}
|
|
// 当连接状态发生改变时触发
|
|
pc.oniceconnectionstatechange = (event) => {
|
|
if (pc.iceConnectionState === 'disconnected' || pc.iceConnectionState === 'failed') {
|
|
console.error(user.name, '需要添加新的 candidate')
|
|
} else if (pc.iceConnectionState === 'connected' || pc.iceConnectionState === 'completed') {
|
|
console.debug(user.name, 'WebRTC 连接已经建立成功')
|
|
}
|
|
}
|
|
// 协商新的会话, 建立初始连接或在网络条件发生变化后重新协商连接
|
|
pc.onnegotiationneeded = async (event) => {
|
|
console.log('onnegotiationneeded', event)
|
|
//const offer = await pc.createOffer()
|
|
//await pc.setLocalDescription(offer)
|
|
//this.ws.send(JSON.stringify({ type: 'offer', user, offer }))
|
|
}
|
|
// 当有新的媒体流加入时触发
|
|
pc.ontrack = (event) => {
|
|
console.log('ontrack', event)
|
|
}
|
|
// 当有新的数据通道加入时触发
|
|
pc.ondatachannel = event => {
|
|
console.log(user.name, '建立', event.channel.label, '接收通道')
|
|
event.channel.onmessage = event => {
|
|
console.log(user.name, '发来', event.target.label, '通道消息', event.data)
|
|
}
|
|
event.channel.onopen = () => {
|
|
console.log(user.name, '打开', event.channel.label, '接收通道')
|
|
//event.channel.send(JSON.stringify({ name: 'sato', hello: 'world' }))
|
|
}
|
|
event.channel.onclose = () => {
|
|
console.log(user.name, '关闭', event.channel.label, '接收通道')
|
|
}
|
|
event.channel.onerror = () => {
|
|
console.log(user.name, '通道', event.channel.label, '发生错误')
|
|
}
|
|
}
|
|
// 创建数据通道
|
|
user.channels = this.channels.map(item => {
|
|
const channel = pc.createDataChannel(item.name, { reliable: true })
|
|
channel.onopen = () => {
|
|
console.log('打开数据发送通道')
|
|
channel.send(JSON.stringify({ name: 'sato', hello: 'world' }))
|
|
}
|
|
channel.onmessage = event => {
|
|
console.log('收到数据发送通道消息', event.data)
|
|
}
|
|
channel.onclose = () => {
|
|
console.log('关闭数据发送通道')
|
|
}
|
|
channel.onerror = () => {
|
|
console.log('发送通道发生错误')
|
|
}
|
|
return { channel, ...item }
|
|
})
|
|
return pc
|
|
}
|
|
|
|
async __create_websocket() {
|
|
const host = window.location.host
|
|
const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'
|
|
const ws = new WebSocket(`${protocol}://${host}/entanglement?name=sato&channel=chat`)
|
|
ws.onopen = async () => {
|
|
console.log('websocket 连接成功')
|
|
if (this.ws instanceof Promise) {
|
|
this.ws = await this.ws
|
|
}
|
|
}
|
|
ws.onclose = async () => {
|
|
console.log('websocket 连接关闭, 3s后尝试重新连接...')
|
|
await new Promise(resolve => setTimeout(resolve, 3000))
|
|
this.ws = await this.__create_websocket()
|
|
}
|
|
ws.onerror = async () => {
|
|
console.log('websocket 连接错误, 3s后尝试重新连接...')
|
|
await new Promise(resolve => setTimeout(resolve, 3000))
|
|
this.ws = await this.__create_websocket()
|
|
}
|
|
ws.onmessage = async event => {
|
|
const data = JSON.parse(event.data)
|
|
if (data.type === 'list') {
|
|
console.debug('收到在线列表', data.list)
|
|
await Promise.all(data.list.map(async user => {
|
|
console.debug('发送给', user.name, 'offer')
|
|
const pc = await this.__create_webrtc(user)
|
|
const offer = await pc.createOffer()
|
|
await pc.setLocalDescription(offer)
|
|
this.users.push({ ...user, webrtc: pc }) // 必须在send之前存入
|
|
this.ws.send(JSON.stringify({ type: 'offer', user, offer }))
|
|
}))
|
|
return
|
|
}
|
|
if (data.type === 'push') {
|
|
console.debug(data.user.name, '上线', data)
|
|
return
|
|
}
|
|
if (data.type === 'pull') {
|
|
console.debug(data.user.name, '下线', data)
|
|
return
|
|
}
|
|
if (data.type === 'offer') {
|
|
console.debug(data.user.name, '发来 offer')
|
|
const pc = await this.__create_webrtc(data.user)
|
|
await pc.setRemoteDescription(data.offer)
|
|
const answer = await pc.createAnswer()
|
|
await pc.setLocalDescription(answer)
|
|
this.ws.send(JSON.stringify({ type: 'answer', user: data.user, answer }))
|
|
this.users.push({ ...data.user, webrtc: pc })
|
|
return
|
|
}
|
|
if (data.type === 'answer') {
|
|
console.debug(data.user.name, '发来 answer')
|
|
const pc = this.users.find(user => user.id === data.user.id).webrtc
|
|
await pc.setRemoteDescription(data.answer)
|
|
return
|
|
}
|
|
if (data.type === 'candidate') {
|
|
console.debug(data.user.name, '发来 candidate 候选通道', JSON.stringify(this.users))
|
|
const pc = this.users.find(user => user.id === data.user.id).webrtc
|
|
await pc.addIceCandidate(data.candidate)
|
|
return
|
|
}
|
|
console.error('收到未知数据:', data)
|
|
}
|
|
return ws
|
|
}
|
|
|
|
// 向所有在线的用户广播消息(webrtc)
|
|
send_all(channel_name, data) {
|
|
console.log('向', channel_name, '频道广播消息:', data)
|
|
console.log('在线用户:', this.users)
|
|
this.users.forEach(async user => {
|
|
console.log('向', user.name, '发送', channel_name, '频道消息:', data)
|
|
const ch = user.channels.find(item => item.name === channel_name).channel
|
|
// 等待 datachannel 打开(临时解决方案)
|
|
while (ch.readyState !== 'open') {
|
|
await new Promise(resolve => setTimeout(resolve, 100))
|
|
}
|
|
console.log('完成发送', channel_name, '频道消息:', data)
|
|
ch.send(JSON.stringify(data))
|
|
})
|
|
}
|
|
|
|
// 数据被修改时触发
|
|
set(name, data) {
|
|
// 递归创建代理对象
|
|
const createDeepProxy = (obj, path = []) => {
|
|
const proxy = new Proxy(obj, {
|
|
set: (target, key, value) => {
|
|
if (!Array.isArray(target) || key !== 'length') {
|
|
console.log('对象被修改', [...path, key], value)
|
|
this.send_all(name, { key: [...path, key], value }) // 向所有在线的用户广播消息
|
|
}
|
|
return Reflect.set(target, key, value)
|
|
},
|
|
get: (target, key) => {
|
|
return target[key]
|
|
}
|
|
})
|
|
Object.keys(obj).forEach(key => {
|
|
if (typeof obj[key] === 'object') {
|
|
obj[key] = createDeepProxy(obj[key], [...path, key])
|
|
}
|
|
})
|
|
return proxy
|
|
}
|
|
return Reflect.set(this.store, name, createDeepProxy(data))
|
|
}
|
|
|
|
// 读取一个通道
|
|
get(name) {
|
|
return this.store[name]
|
|
}
|
|
}
|