-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
69 lines (58 loc) · 1.65 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
const pull = require('pull-stream')
const Log = require('flumelog-array')
const Obv = require('obv')
const debug = require('debug')('flumeview-array')
module.exports = (map) => () => {
// This view is backed up by a simple in-memory flumelog.
let log = Log()
// Here we record the sequence number of the latest item in the view.
// When the view is empty, the sequence number is set to `-1`.
// Our first item will have a sequence number of `0`.
const since = Obv().set(-1)
// When this view is destroyed (e.g. `flumedb.rebuild()`) we need a way to
// abort the pull-stream sink created with `flumeview.createSink()`.
// XXX: Can multiple sinks be open at the same time?
let abortSink
const api = {
close: (cb) => cb(null),
createSink: (cb) => {
debug('createSink')
abortSink = cb
return pull.drain((item) => {
let value
// If value was deleted upstream, add a blank message.
if (item.value === undefined) {
value = undefined
} else {
value = map(item.value)
}
log.append(value, (err) => {
if (err) return cb(err)
since.set(item.seq)
})
}, cb)
},
del: (seqs, cb) => log.del(seqs, cb),
destroy: (cb) => {
debug('destroy')
// Re-initialize `flumelogArray` and reset `since`.
log = Log()
since.set(-1)
abortSink(null)
cb(null)
},
get: (seq, cb) => {
log.get(seq, (err, item) => {
if (err) return cb(err)
cb(null, item)
})
},
methods: {
get: 'async',
del: 'async'
},
ready: (cb) => cb(null),
since
}
return api
}