From 435752e7a9b48face59915089bc1a1a0310df87d Mon Sep 17 00:00:00 2001 From: Mitchell McCaffrey Date: Wed, 20 Jan 2021 18:07:23 +1100 Subject: [PATCH] Moved peer connection to fully use write for backpressure --- src/network/Connection.js | 14 +++++++------- src/network/Session.js | 16 +++++++++++++--- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/network/Connection.js b/src/network/Connection.js index 40c490d..4871094 100644 --- a/src/network/Connection.js +++ b/src/network/Connection.js @@ -55,26 +55,26 @@ class Connection extends SimplePeer { } } - // Override the send function with encoding, chunking and data channel support - send(data, channel) { + // Custom send function with encoding, chunking and data channel support + // Uses `write` to send the data to allow for buffer / backpressure handling + sendObject(object, channel) { try { - const packedData = encode(data); + const packedData = encode(object); if (packedData.byteLength > MAX_BUFFER_SIZE) { const chunks = this.chunk(packedData); for (let chunk of chunks) { if (this.dataChannels[channel]) { - // Write to the stream to allow for buffer / backpressure handling this.dataChannels[channel].write(encode(chunk)); } else { - super.send(encode(chunk)); + this.write(encode(chunk)); } } return; } else { if (this.dataChannels[channel]) { - this.dataChannels[channel].send(packedData); + this.dataChannels[channel].write(packedData); } else { - super.send(packedData); + this.write(packedData); } } } catch (error) { diff --git a/src/network/Session.js b/src/network/Session.js index f417432..e38ea6e 100644 --- a/src/network/Session.js +++ b/src/network/Session.js @@ -124,10 +124,16 @@ class Session extends EventEmitter { if (!this.peers[sessionId].ready) { this.peers[sessionId].connection.once("connect", () => { - this.peers[sessionId].connection.send({ id: eventId, data }, channel); + this.peers[sessionId].connection.sendObject( + { id: eventId, data }, + channel + ); }); } else { - this.peers[sessionId].connection.send({ id: eventId, data }, channel); + this.peers[sessionId].connection.sendObject( + { id: eventId, data }, + channel + ); } } @@ -193,10 +199,14 @@ class Session extends EventEmitter { trickle: true, config: { iceServers: this._iceServers }, }); + + // Up max listeners to 100 to account for up to 100 tokens on load + connection.setMaxListeners && connection.setMaxListeners(100); + const peer = { id, connection, initiator, ready: false }; function sendPeer(id, data, channel) { - peer.connection.send({ id, data }, channel); + peer.connection.sendObject({ id, data }, channel); } function handleSignal(signal) {