分片传输数据

This commit is contained in:
2023-09-30 16:19:33 +08:00
parent ef436f2e93
commit 26ede0a73b
3 changed files with 109 additions and 102 deletions

View File

@ -12,27 +12,18 @@ export default class ClientList {
document.body.appendChild(this.ul) document.body.appendChild(this.ul)
this.websocket.onmessage = async event => { this.websocket.onmessage = async event => {
const data = JSON.parse(event.data) const data = JSON.parse(event.data)
const channels_init = (webrtc) => {
return Object.entries(this.channels).map(([name, callback]) => {
const channel = webrtc.createDataChannel(name, { reliable: true })
channel.onopen = callback.onopen
channel.onclose = callback.onclose
channel.onerror = callback.onerror
channel.onmessage = callback.onmessage
return channel
})
}
const webrtc_init = () => { const webrtc_init = () => {
const webrtc = new RTCPeerConnection() const webrtc = new RTCPeerConnection()
Object.entries(channels).forEach(([name, callback]) => {
const channel = webrtc.createDataChannel(name)
channel.onopen = event => {
//console.log('datachannel 已打开', event)
if (callback.onopen) callback.onopen(event)
}
channel.onclose = event => {
//console.log('datachannel 已关闭', event)
if (callback.onclose) callback.onclose(event)
}
channel.onerror = event => {
//console.log('datachannel 发生错误', event)
if (callback.onerror) callback.onerror(event)
}
channel.onmessage = event => {
//console.log('datachannel 收到数据', event)
if (callback.onmessage) callback.onmessage(event)
}
})
webrtc.onicecandidate = event => { webrtc.onicecandidate = event => {
if (event.candidate) { if (event.candidate) {
this.websocket.send(JSON.stringify({ this.websocket.send(JSON.stringify({
@ -43,11 +34,11 @@ export default class ClientList {
} }
} }
webrtc.ondatachannel = ({ channel }) => { webrtc.ondatachannel = ({ channel }) => {
//console.log('收到对方 datachannel', channel) console.log('收到对方 datachannel', channel)
channel.onmessage = event => { channel.onmessage = event => {
//console.log('收到对方 datachannel message', event) //console.log('收到对方 datachannel message', event)
if (channels[event.target.label]) { if (this.channels[event.target.label]) {
channels[event.target.label].onmessage(event, this.clientlist.find(x => x.id === data.id)) this.channels[event.target.label].onmessage(event, this.clientlist.find(x => x.id === data.id))
} }
} }
} }
@ -59,26 +50,28 @@ export default class ClientList {
if (data.type === 'list') { if (data.type === 'list') {
//console.log('取得在线对端列表:', data) //console.log('取得在线对端列表:', data)
const webrtc = webrtc_init() const webrtc = webrtc_init()
const channels = channels_init(webrtc)
//console.log('发送给对方 offer') //console.log('发送给对方 offer')
const offer = await webrtc.createOffer() const offer = await webrtc.createOffer()
await webrtc.setLocalDescription(offer) await webrtc.setLocalDescription(offer)
this.clientlist.push({ id: data.id, name: data.name, webrtc }) this.clientlist.push({ id: data.id, name: data.name, webrtc, channels })
this.websocket.send(JSON.stringify({ type: 'offer', id: data.id, offer })) this.websocket.send(JSON.stringify({ type: 'offer', id: data.id, offer }))
this.add(data) this.add(data)
return return
} }
if (data.type === 'push') { if (data.type === 'push') {
console.log('新上线客户端:', data) //console.log('新上线客户端:', data)
return this.add(data) return this.add(data)
} }
if (data.type === 'pull') { if (data.type === 'pull') {
console.log('移除客户端:', data) //console.log('移除客户端:', data)
return this.remove(data) return this.remove(data)
} }
if (data.type === 'offer') { if (data.type === 'offer') {
//console.log('收到对方 offer', data) //console.log('收到对方 offer', data)
const webrtc = webrtc_init() const webrtc = webrtc_init()
this.clientlist.push({ id: data.id, name: data.name, webrtc }) const channels = channels_init(webrtc)
this.clientlist.push({ id: data.id, name: data.name, webrtc, channels })
//console.log('发送给对方 answer') //console.log('发送给对方 answer')
await webrtc.setRemoteDescription(data.offer) await webrtc.setRemoteDescription(data.offer)
const answer = await webrtc.createAnswer() const answer = await webrtc.createAnswer()
@ -117,10 +110,6 @@ export default class ClientList {
this.clientlist = this.clientlist.filter(client => client.id !== item.id) this.clientlist = this.clientlist.filter(client => client.id !== item.id)
this.ul.removeChild(document.getElementById(item.id)) this.ul.removeChild(document.getElementById(item.id))
} }
update(item) { }
get(id) { }
getAll() { }
clear() { }
// 添加回调函数 // 添加回调函数
on(name, callback) { on(name, callback) {
this.EventListeners[name] = callback this.EventListeners[name] = callback
@ -133,11 +122,12 @@ export default class ClientList {
} }
// 通过指定通道发送数据(广播) // 通过指定通道发送数据(广播)
send(name, data) { send(name, data) {
console.log('广播数据:', data, '到通道:', name, '到所有客户端') //console.log('广播数据:', data, '到通道:', name, '到所有客户端')
this.clientlist.forEach(client => { this.clientlist.forEach(client => {
console.log('发送数据到客户端:', client.id) //console.log('发送数据到客户端:', client.id)
const channel = client.webrtc.getDataChannel(name) ?? client.webrtc.createDataChannel(name) client.channels.filter(ch => ch.label === name).forEach(ch => {
channel.send(data) ch.send(data)
})
}) })
} }
} }

View File

@ -9,7 +9,7 @@
<body> <body>
<div> <div>
<h1>webRTC</h1> <h1>webRTC</h1>
<p>选择音乐使频道内所有设备同步播放</p> <p>选择音乐使频道内所有设备同步播放 chrome://webrtc-internals/</p>
</div> </div>
<script type="module"> <script type="module">
import IndexedDB from './database.js' import IndexedDB from './database.js'
@ -34,78 +34,94 @@
// 初始化客户端列表 // 初始化客户端列表
const clientList = new ClientList({}) const clientList = new ClientList({})
clientList.setChannel('musicList', {
// 缓冲分片发送
const CHUNK_SIZE = 1024 * 128 // 每个块的大小为128KB
const THRESHOLD = 1024 * 1024 // 缓冲区的阈值为1MB
const DELAY = 500 // 延迟500ms
// 将两个ArrayBuffer合并成一个
function appendBuffer(buffer1, buffer2) {
const tmp = new Uint8Array(buffer1.byteLength + buffer2.byteLength)
tmp.set(new Uint8Array(buffer1), 0)
tmp.set(new Uint8Array(buffer2), buffer1.byteLength)
return tmp.buffer
}
// 只有一个基本信道, 用于交换和调度信息
clientList.setChannel('base', {
onopen: async event => { onopen: async event => {
const data = musicList.list.filter(item => { // 要求对方发送音乐列表
return !!item.arrayBuffer clientList.send('base', JSON.stringify({ type: 'get_music_list' }))
}).map(({ arrayBuffer, ...item }) => item)
console.log('发送 musicList:', data)
event.target.send(JSON.stringify(data))
}, },
onmessage: async (event, client) => { onmessage: async (event, client) => {
console.log('收到 musicList:', event) const { type, id, channel, list } = JSON.parse(event.data)
const data = JSON.parse(event.data) console.log('收到 base 基本信道数据:', type, id, channel)
if (type === 'get_music_list') {
console.log('发送音乐列表:', musicList.list)
clientList.send('base', JSON.stringify({
type: 'set_music_list',
list: musicList.list.map(({ id, name, size, type }) => ({ id, name, size, type }))
}))
return
}
if (type === 'set_music_list') {
console.log('接收音乐列表:', event)
// 将音乐列表添加到本地
const ids = musicList.list.map(item => item.id) const ids = musicList.list.map(item => item.id)
data.filter(item => !ids.includes(item.id)).forEach(item => { list.filter(item => !ids.includes(item.id)).forEach(item => {
musicList.add(item) musicList.add(item)
}) })
// 将数据设置到这个客户端 // 从获得的列表随机选一个音乐下载
console.log('设置 musicList:', data) const item = list[Math.floor(Math.random() * list.length)]
console.log('设置 musicList:', event) console.log('从获得的列表随机选一个音乐下载', item)
console.log('当前客户端', client) // 建立一个专用信道, 用于接收音乐数据(接收方已经准备好摘要信息)
client.musicList = data const channel = `music-data-${item.id}`
var buffer = new ArrayBuffer(0)
var count = 0
clientList.setChannel(channel, {
onmessage: async (event, client) => {
buffer = appendBuffer(buffer, event.data)
console.log('收到音乐数据 chunk', count, buffer.byteLength)
count++
if (buffer.byteLength >= item.size) {
console.log('音乐数据接收完毕')
item.ArrayBuffer = buffer
}
} }
}) })
clientList.setChannel('musicload', { // 要求对方从指定信道发送音乐数据
onopen: async event => { clientList.send('base', JSON.stringify({ type: 'get_music_data', id: item.id, channel }))
// 连接打开时要求发送某一条音乐数据? return
}
//console.log('发送 musicload') if (type === 'get_music_data') {
//const buffer = new ArrayBuffer(8) // 建立一个信道, 用于传输音乐数据(接收方已经准备好摘要信息)
//const json = { name: 'John', age: 30 } console.log('建立一个信道, 用于传输音乐数据', musicList.list)
//const jsonString = JSON.stringify(json) musicList.list.filter(item => item.id === id).forEach(item => {
//const jsonBuffer = new TextEncoder().encode(jsonString).buffer const ch = client.webrtc.createDataChannel(channel, { reliable: true })
//const lengthBuffer = new ArrayBuffer(8) ch.onopen = async event => {
//const lengthView = new DataView(lengthBuffer) console.log(`打开 ${channel} 信道, 传输音乐数据`)
//lengthView.setUint32(0, jsonBuffer.byteLength) // 将音乐数据分成多个小块,并逐个发送
//const mergedBuffer = new ArrayBuffer(lengthBuffer.byteLength + jsonBuffer.byteLength + buffer.byteLength) async function sendChunk(dataChannel, data, index = 0, buffer = new ArrayBuffer(0)) {
//const mergedView = new Uint8Array(mergedBuffer) while (index < data.byteLength) {
//mergedView.set(new Uint8Array(lengthBuffer), 0) if (dataChannel.bufferedAmount <= THRESHOLD) {
//mergedView.set(new Uint8Array(jsonBuffer), lengthBuffer.byteLength) const chunk = data.slice(index, index + CHUNK_SIZE)
//mergedView.set(new Uint8Array(buffer), lengthBuffer.byteLength + jsonBuffer.byteLength) dataChannel.send(chunk)
//event.target.send(mergedBuffer) index += CHUNK_SIZE
}, buffer = appendBuffer(buffer, chunk)
onmessage: async event => { }
console.log('收到 musicload') await new Promise((resolve) => setTimeout(resolve, DELAY))
const view = new DataView(event.data) }
const len = new ArrayBuffer(8) return buffer
// 解析出原始数据, 更简洁的方式 }
const lengthBuffer = event.data.slice(0, len.byteLength) await sendChunk(ch, item.arrayBuffer)
const lengthView = new DataView(lengthBuffer) console.log(`发送 ${channel} 信道数据结束`)
console.log('getUint32', lengthView.getUint32(0))
const jsonBuffer = event.data.slice(len.byteLength, len.byteLength + lengthView.getUint32(0))
const jsonView = new DataView(jsonBuffer)
console.log('json', JSON.parse(new TextDecoder().decode(jsonBuffer)))
const buffer = event.data.slice(len.byteLength + lengthView.getUint32(0))
console.log('buffer', buffer)
} }
}) })
return
musicList.on('load', item => { }
console.log('从来源加载音乐', item) console.log('未知类型:', type)
// 选择一个含有此音乐的客户端 }
const client = clientList.clientlist.find(client => {
console.log('client:', client)
if (!client.musicList) client.musicList = []
return client.musicList.some(music => music.id === item.id)
}) })
if (!client) return console.log('没有找到含有此音乐的客户端')
console.log('找到含有此音乐的客户端', client)
//clientList.send('musicList', JSON.stringify([item]))
})
// 获取对方的音乐列表
// like对方的条目时亮起(双方高亮)(本地缓存)(可由对比缓存实现) // like对方的条目时亮起(双方高亮)(本地缓存)(可由对比缓存实现)
// ban对方的条目时灰掉(也禁止对方播放)(并保持ban表)(由插件实现) // ban对方的条目时灰掉(也禁止对方播放)(并保持ban表)(由插件实现)
// 只需要在注册时拉取列表, 播放时才需要拉取音乐数据 // 只需要在注册时拉取列表, 播放时才需要拉取音乐数据

View File

@ -101,6 +101,7 @@ export default class MusicList {
async play(item) { async play(item) {
if (!item.arrayBuffer) { if (!item.arrayBuffer) {
await this.load(item) await this.load(item)
return console.log('等待载入缓存:', item)
} }
this.audio.src = URL.createObjectURL(new Blob([item.arrayBuffer], { type: item.type })) this.audio.src = URL.createObjectURL(new Blob([item.arrayBuffer], { type: item.type }))
this.audio.play() this.audio.play()