Basic implementation of data channel multiplexing

This commit is contained in:
Mitchell McCaffrey 2020-07-16 10:58:30 +10:00
parent 57c1d01fc7
commit 25aa24199b
5 changed files with 86 additions and 53 deletions

View File

@ -31,7 +31,7 @@
"react-spring": "^8.0.27",
"react-use-gesture": "^7.0.15",
"shortid": "^2.2.15",
"simple-peer": "^9.6.2",
"simple-peer": "feross/simple-peer#694/head",
"simplebar-react": "^2.1.0",
"simplify-js": "^1.2.4",
"socket.io-client": "^2.3.0",

View File

@ -12,62 +12,89 @@ class Peer extends SimplePeer {
constructor(props) {
super(props);
this.currentChunks = {};
this.on("data", (packed) => {
const unpacked = decode(packed);
// If the special property __chunked is set and true
// The data is a partial chunk of the a larger file
// So wait until all chunks are collected and assembled
// before emitting the dataComplete event
if (unpacked.__chunked) {
let chunk = this.currentChunks[unpacked.id] || {
data: [],
count: 0,
total: unpacked.total,
};
chunk.data[unpacked.index] = unpacked.data;
chunk.count++;
this.currentChunks[unpacked.id] = chunk;
this.emit("dataProgress", {
id: unpacked.id,
count: chunk.count,
total: chunk.total,
});
// All chunks have been loaded
if (chunk.count === chunk.total) {
// Merge chunks with a blob
// TODO: Look at a more efficient way to recombine buffer data
const merged = new Blob(chunk.data);
blobToBuffer(merged).then((buffer) => {
this.emit("dataComplete", decode(buffer));
delete this.currentChunks[unpacked.id];
});
}
} else {
this.emit("dataComplete", unpacked);
}
});
this.dataChannels = {};
this.on("data", this.handleData);
this.on("datachannel", this.handleDataChannel);
}
send(data) {
// Intercept the data event with decoding and chunking support
handleData(packed) {
const unpacked = decode(packed);
// If the special property __chunked is set and true
// The data is a partial chunk of the a larger file
// So wait until all chunks are collected and assembled
// before emitting the dataComplete event
if (unpacked.__chunked) {
let chunk = this.currentChunks[unpacked.id] || {
data: [],
count: 0,
total: unpacked.total,
};
chunk.data[unpacked.index] = unpacked.data;
chunk.count++;
this.currentChunks[unpacked.id] = chunk;
this.emit("dataProgress", {
id: unpacked.id,
count: chunk.count,
total: chunk.total,
});
// All chunks have been loaded
if (chunk.count === chunk.total) {
// Merge chunks with a blob
// TODO: Look at a more efficient way to recombine buffer data
const merged = new Blob(chunk.data);
blobToBuffer(merged).then((buffer) => {
this.emit("dataComplete", decode(buffer));
delete this.currentChunks[unpacked.id];
});
}
} else {
this.emit("dataComplete", unpacked);
}
}
// Override the send function with encoding, chunking and data channel support
send(data, channel) {
try {
const packedData = encode(data);
if (packedData.byteLength > MAX_BUFFER_SIZE) {
const chunks = this.chunk(packedData);
for (let chunk of chunks) {
super.send(encode(chunk));
if (this.dataChannels[channel]) {
this.dataChannels[channel].send(encode(chunk));
} else {
super.send(encode(chunk));
}
}
return;
} else {
super.send(packedData);
if (this.dataChannels[channel]) {
this.dataChannels[channel].send(packedData);
} else {
super.send(packedData);
}
}
} catch (error) {
console.error(error);
}
}
// Override the create data channel function to store our own named reference to it
// and to use our custom data handler
createDataChannel(channelName, channelConfig, opts) {
const channel = super.createDataChannel(channelName, channelConfig, opts);
this.dataChannels[channelName] = channel;
channel.on("data", this.handleData.bind(this));
return channel;
}
handleDataChannel(channel) {
this.dataChannels[channel.channelName] = channel;
channel.on("data", this.handleData.bind(this));
}
// Converted from https://github.com/peers/peerjs/
chunk(data) {
const chunks = [];

View File

@ -159,6 +159,10 @@ function useSession(
trickle: true,
config: { iceServers },
});
if (initiator) {
connection.createDataChannel("map", { iceServers });
connection.createDataChannel("token", { iceServers });
}
setPeers((prevPeers) => ({
...prevPeers,
[id]: { id, connection, initiator, sync },

View File

@ -108,7 +108,7 @@ function Game() {
setCurrentMap(newMap);
for (let peer of Object.values(peers)) {
// Clear the map so the new map state isn't shown on an old map
peer.connection.send({ id: "map", data: null });
peer.connection.send({ id: "map", data: null }, "map");
peer.connection.send({ id: "mapState", data: newMapState });
sendMapDataToPeer(peer, newMap);
sendTokensToPeer(peer, newMapState);
@ -120,9 +120,9 @@ function Game() {
// they have an outdated version
if (mapData.type === "file") {
const { file, resolutions, ...rest } = mapData;
peer.connection.send({ id: "map", data: { ...rest } });
peer.connection.send({ id: "map", data: { ...rest } }, "map");
} else {
peer.connection.send({ id: "map", data: mapData });
peer.connection.send({ id: "map", data: mapData }, "map");
}
}
@ -326,7 +326,7 @@ function Game() {
setCurrentMap(cachedMap);
} else {
putMap(newMap).then(() => {
peer.connection.send({ id: "mapRequest", data: newMap.id });
peer.connection.send({ id: "mapRequest", data: newMap.id }, "map");
});
}
} else {
@ -338,10 +338,13 @@ function Game() {
const map = getMap(data.data);
function respond(file) {
peer.connection.send({
id: "mapResponse",
data: { id: map.id, file },
});
peer.connection.send(
{
id: "mapResponse",
data: { id: map.id, file },
},
"map"
);
}
switch (map.quality) {

View File

@ -10213,10 +10213,9 @@ signal-exit@^3.0.0, signal-exit@^3.0.2:
resolved "https://registry.yarnpkg.com/signal-exit/-/signal-exit-3.0.2.tgz#b5fdc08f1287ea1178628e415e25132b73646c6d"
integrity sha1-tf3AjxKH6hF4Yo5BXiUTK3NkbG0=
simple-peer@^9.6.2:
version "9.6.2"
resolved "https://registry.yarnpkg.com/simple-peer/-/simple-peer-9.6.2.tgz#42418e77cf8f9184e4fa22ef1017b195c2bf84d7"
integrity sha512-EOKoImCaqtNvXIntxT1CBBK/3pVi7tMAoJ3shdyd9qk3zLm3QPiRLb/sPC1G2xvKJkJc5fkQjCXqRZ0AknwTig==
simple-peer@feross/simple-peer#694/head:
version "9.7.2"
resolved "https://codeload.github.com/feross/simple-peer/tar.gz/b8a4ec0210547414c52857343f10089e80710f03"
dependencies:
debug "^4.0.1"
get-browser-rtc "^1.0.0"