-
Notifications
You must be signed in to change notification settings - Fork 3
/
index.js
347 lines (276 loc) · 9.59 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
/* jshint node: true */
'use strict';
var debug = require('cog/logger')('rtc-dcstream');
var stream = require('stream');
var toBuffer = require('typedarray-to-buffer');
var util = require('util');
var closingStates = ['closing', 'closed'];
var ENDOFSTREAM = '::endofstream';
var MAX_CHUNK_SIZE = 1024 * 64;
/**
# rtc-dcstream
Node streams2 interface for working with WebRTC data channels. This stream
implementation will cater for current data size limits in the WebRTC
data channels.
## Example Usage
The example below shows how to use the `rtc-dcstream` module to stream data
via a datachannel to *n* remote participants. In this case we are using
the W3C FileReader API and streaming dropped data files over the data
channel:
<<< examples/file-transfer.js
## Alternative Implementations
In addition to this module, the following are other modules that wrap
WebRTC data channel communication via a node streaming interface:
- [rtc-data-stream](https://github.com/kumavis/rtc-data-stream)
## Reference
To be completed.
**/
function RTCChannelStream(channel) {
if (! (this instanceof RTCChannelStream)) {
return new RTCChannelStream(channel);
}
// super
stream.Duplex.call(this, {
decodeStrings: false,
objectMode: true
});
// create the internal read and write queues
this._rq = [];
this._wq = [];
this._bytesWritten = 0;
// initialise the closed state
this._closed = channel.readyState === 'closed';
// save a reference to the channel
this.channel = channel;
// set the channel binaryType to arraybuffer
channel.binaryType = 'arraybuffer';
// initialise the message handlers
this._handlers = {
message: this._handleMessage.bind(this),
close: this._handleClose.bind(this),
open: this._handleOpen.bind(this)
};
// attach channel listeners
if (typeof channel.addEventListener == 'function') {
channel.addEventListener('message', this._handlers.message);
channel.addEventListener('close', this._handlers.close);
channel.addEventListener('open', this._handlers.open);
} else {
channel.onmessage = this._handlers.message;
channel.onclose = this._handlers.close;
channel.onopen = this._handlers.open;
}
// Check if the channel is already open, and if it is, fire the open handler
if (channel.readyState === 'open') {
this._handlers.open();
}
// send an ENDOFSTREAM marker on finish
this.once('finish', this._dcsend.bind(this, ENDOFSTREAM));
}
module.exports = RTCChannelStream;
util.inherits(RTCChannelStream, stream.Duplex);
var prot = RTCChannelStream.prototype;
prot._checkClear = function() {
var peer = this;
var bufferedAmount = this.channel.bufferedAmount;
var bytesWritten = this._bytesWritten;
if (bufferedAmount === 0) {
clearInterval(this._clearTimer);
this._clearTimer = undefined;
this._handleOpen();
}
// Detect any errors with the data channel whereby it just
// stops writing and the buffer never clears
else if (!this._bufferWriteTimer) {
this._bufferWriteTimer = setTimeout(function() {
// Check if our buffer is empty now
var currentBuffer = peer.channel.bufferedAmount;
if (currentBuffer === 0) return peer._checkClear();
// Raise an exception
var err = new Error('Buffer write timer failed. State at start: [Buffered] ' + bufferedAmount + ', [Written] ' + bytesWritten + '. State now: [Buffered] ' + peer.channel.bufferedAmount + ', [Written] ' + peer._bytesWritten);
err.name = 'BufferWriteError';
peer.emit('error', err);
}, 10000);
}
};
prot._ensureClearCheck = function() {
if (this._clearTimer) return;
this._clearTimer = setInterval(this._checkClear.bind(this), 100);
};
prot._debindChannel = function() {
var channel = this.channel;
// remove the message listener
if (typeof channel.removeEventListener == 'function') {
channel.removeEventListener('message', this._handlers.message);
channel.removeEventListener('close', this._handlers.close);
channel.removeEventListener('open', this._handlers.message);
} else {
channel.onmessage = null;
channel.onclose = null;
channel.onopen = null;
}
};
prot._isChannelClosed = function() {
return (! this.channel) || closingStates.indexOf(this.channel.readyState) >= 0;
};
prot._read = function(n) {
var ready = true;
var next;
// if we have no data queued, then wait until we have been told we
// do as _read will not be called again until we have pushed something
if (this._rq.length === 0) {
return this.once('readable', this._read.bind(this, n));
}
// TODO: honour the request for a particular number of bytes
// this.push(evt.data);
while (ready && this._rq.length > 0) {
// get the next chunk
next = this._rq.shift();
// if the next chunk is an array buffer, convert to a node buffer
if (next instanceof ArrayBuffer) {
this.push(toBuffer(new Uint8Array(next)));
}
else {
this.push(next);
}
}
return ready;
};
prot._write = function(chunk, encoding, callback) {
var closed = this._isChannelClosed();
// if closed then abort
if (closed) {
return false;
}
// process in chunks of an appropriate size for the data channel
var length = chunk.length || chunk.byteLength || chunk.size;
var numChunks = Math.ceil(length / MAX_CHUNK_SIZE);
var _returned = false;
// debug('_write ' + length + ' in ' + numChunks + ' chunks');
function progressiveCallback(e) {
if (_returned || !e) return;
// Capture errors for writes that are split into multiple chunks and
// return to the root callback
_returned = true;
return callback(e);
}
var result;
// To prevent overwhelming the data channel with a large write
// we ensure that writes are only for chunks within the MAX_CHUNK_SIZE
// If not, we split it up further into smaller chunks
for (var i = 0; i < numChunks; i++) {
var offset = i * MAX_CHUNK_SIZE;
var until = offset + MAX_CHUNK_SIZE;
var currentChunk = (numChunks === 1 ? chunk : chunk.slice(offset, until));
var ccLength = currentChunk.length || currentChunk.byteLength || currentChunk.size;
// Only callback after the entire attempted chunk is written
var currentCallback = (i + 1 === numChunks) ? callback : progressiveCallback;
// if we are connecting, then wait
if (this._wq.length || this.channel.readyState === 'connecting') {
result = this._wq.push([ currentChunk, encoding, currentCallback ]);
}
// if the channel is buffering, let's give it a rest
else if (this.channel.bufferedAmount > 0) {
debug('data channel buffering ' + this.channel.bufferedAmount + ', backing off');
this._ensureClearCheck();
result = this._wq.push([ currentChunk, encoding, currentCallback ]);
} else {
result = this._dcsend(currentChunk, encoding, currentCallback);
}
}
return result;
};
/**
### `_dcsend(chunk, encoding, callback)`
The internal function that is responsible for sending the data over the
underlying datachannel.
**/
prot._dcsend = function(chunk, encoding, callback) {
this._clearBufferWriteTimer();
// ensure we have a callback to use if not supplied
callback = callback || function() {};
// if the channel is closed, then return false
if (this._closed || this.channel.readyState !== 'open') {
return false;
}
var size = chunk.length || chunk.byteLength || chunk.size || 0;
try {
this.channel.send(chunk);
}
catch (e) {
// handle closed streams where we didn't get the memo
if (e.name == 'NetworkError') {
return this._handleClose();
}
return callback(e);
}
this._bytesWritten += size;
return callback();
};
/* event handlers */
prot._handleClose = function(evt) {
// flag the channel as closed
this._closed = true;
// emit the close and end events
this.emit('close');
this.emit('end');
return false;
};
prot._handleMessage = function(evt) {
/* jshint validthis: true */
var data = evt && evt.data;
// if we have an end of stream marker, end
if (typeof data == 'string' && data === ENDOFSTREAM) {
// remove the channel event bindings
this._debindChannel();
// emit the end
return this.emit('end');
}
this._rq.push(data);
this.emit('readable');
};
prot._clearBufferWriteTimer = function() {
if (!this._bufferWriteTimer) return;
clearTimeout(this._bufferWriteTimer);
this._bufferWriteTimer = undefined;
}
prot._handleOpen = function(evt) {
var peer = this;
var queue = this._wq;
// If the buffer write timer was active, we can now clear it
this._clearBufferWriteTimer();
function sendNext() {
// Check if the channel is closed in which case, cancel the attempt to send
if (peer._isChannelClosed()) {
debug('Channel has closed, queued writes are being discarded');
queue = [];
return;
}
// Check that we are ready to send, if not, restart the timer
if (peer.channel.readyState !== 'open' || peer.channel.bufferedAmount > 0) {
debug('Not yet ready to resume sending queued write, backing off');
return peer._ensureClearCheck();
}
var args = queue.shift();
var callback;
// if we have no args, then abort
if (! args) {
return queue.length ? sendNext() : null;
}
// save the callback
callback = args[2];
// replace with a new callback
args[2] = function() {
// Queue up the next clearing of the queue
setTimeout(sendNext, 0);
// trigger the callback
if (typeof callback == 'function') {
callback();
}
};
peer._dcsend.apply(peer, args);
}
// send the queued messages
debug('channel open, sending queued ' + queue.length + ' messages');
sendNext();
};