forked from pull-stream/pull-ws
-
Notifications
You must be signed in to change notification settings - Fork 0
/
source.js
86 lines (69 loc) · 1.72 KB
/
source.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
### `source(socket)`
Create a pull-stream `Source` that will read data from the `socket`.
<<< examples/read.js
**/
var Buffer = require('safe-buffer').Buffer;
// copied from github.com/feross/buffer
// Some ArrayBuffers are not passing the instanceof check, so we need to do a bit more work :(
function isArrayBuffer (obj) {
return obj instanceof ArrayBuffer ||
(obj != null && obj.constructor != null && obj.constructor.name === 'ArrayBuffer' &&
typeof obj.byteLength === 'number')
}
module.exports = function(socket, cb) {
var buffer = [];
var receiver;
var ended;
var started = false;
socket.addEventListener('message', function(evt) {
var data = evt.data;
if (isArrayBuffer(data)) {
data = Buffer.from(data);
}
if (receiver) {
return receiver(null, data);
}
buffer.push(data);
});
socket.addEventListener('close', function(evt) {
if (ended) return
if (receiver) {
receiver(ended = true)
}
});
socket.addEventListener('error', function (evt) {
if (ended) return;
ended = evt;
if(!started) {
started = true
cb && cb(evt)
}
if (receiver) {
receiver(ended)
}
});
socket.addEventListener('open', function (evt) {
if(started || ended) return
started = true
})
function read(abort, cb) {
receiver = null;
//if stream has already ended.
if (ended)
return cb(ended);
// if ended, abort
else if (abort) {
//this will callback when socket closes
receiver = cb
socket.close()
}
// return data, if any
else if(buffer.length > 0)
cb(null, buffer.shift());
// wait for more data (or end)
else
receiver = cb;
};
return read;
};