Skip to content

Commit a345b26

Browse files
author
Benjamin Forster
committed
add readStream for hyperdb [temporary]; modified constructor to make hyperdb instance from opts.
will remove createReadStream once hyperdb merges it.
1 parent 8449b0c commit a345b26

8 files changed

Lines changed: 260 additions & 51 deletions

File tree

index.js

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,44 @@
1-
const PassThrough = require('readable-stream').PassThrough
2-
const Transform = require('readable-stream').Transform
1+
const hyperdb = require('hyperdb')
2+
const stream = require('readable-stream')
33
const pump = require('pump')
4+
const inherits = require('inherits')
5+
const events = require('events')
46

57
const utils = require('./lib/utils')
68
const Variable = require('./lib/Variable')
7-
const HyperdbDiffTransform = require('./lib/HyperdbDiffTransform')
9+
const HyperdbReadTransform = require('./lib/HyperdbReadTransform')
810
const JoinStream = require('./lib/JoinStream')
911
const planner = require('./lib/planner')
12+
const attachCreateReadStream = require('./lib/hyperdbModifier').attachCreateReadStream
1013

11-
function Graph (db, opts) {
12-
if (!(this instanceof Graph)) return new Graph(db, opts)
13-
this.db = db
14+
const Transform = stream.Transform
15+
const PassThrough = stream.PassThrough
16+
17+
// temporarily augment hyperdb prototype to include createReadStream
18+
if (!hyperdb.createReadStream) {
19+
attachCreateReadStream(hyperdb)
20+
}
21+
22+
function Graph (storage, key, opts) {
23+
if (!(this instanceof Graph)) return new Graph(storage, key, opts)
24+
events.EventEmitter.call(this)
25+
this.db = hyperdb(storage, key, opts)
26+
27+
this.db.on('error', (e) => {
28+
this.emit('error', e)
29+
})
30+
this.db.on('ready', (e) => {
31+
this.emit('ready', e)
32+
})
1433
}
1534

35+
inherits(Graph, events.EventEmitter)
36+
1637
Graph.prototype.v = (name) => new Variable(name)
1738

1839
Graph.prototype.getStream = function (triple, opts) {
1940
const stream = this.db.createReadStream(utils.createQuery(triple))
20-
return stream.pipe(new HyperdbDiffTransform(this.db, opts))
41+
return stream.pipe(new HyperdbReadTransform(this.db, opts))
2142
}
2243

2344
Graph.prototype.get = function (triple, opts, callback) {
@@ -56,12 +77,11 @@ function doActionStream (action) {
5677
}
5778
}
5879

59-
// this is not implemented in hyperdb yet
60-
// for now we just put a null value in the db
61-
6280
Graph.prototype.put = doAction('put')
6381
Graph.prototype.putStream = doActionStream('put')
6482

83+
// this is not implemented in hyperdb yet
84+
// for now we just put a null value in the db
6585
Graph.prototype.del = doAction('del')
6686
Graph.prototype.delStream = doActionStream('del')
6787

Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
const Transform = require('readable-stream').Transform
22
const inherits = require('inherits')
33

4-
function HyperdbDiffTransform (db, options) {
5-
if (!(this instanceof HyperdbDiffTransform)) {
6-
return new HyperdbDiffTransform(db, options)
4+
function HyperdbReadTransform (db, options) {
5+
if (!(this instanceof HyperdbReadTransform)) {
6+
return new HyperdbReadTransform(db, options)
77
}
88
var opts = options || {}
99
this.db = db
@@ -20,24 +20,9 @@ function HyperdbDiffTransform (db, options) {
2020
})
2121
}
2222

23-
inherits(HyperdbDiffTransform, Transform)
23+
inherits(HyperdbReadTransform, Transform)
2424

25-
HyperdbDiffTransform.prototype._transform = function transform (nodes, encoding, done) {
26-
// if (chunk.type === 'put') {
27-
// const seq = chunk.nodes[0].seq
28-
// const feedSeq = chunk.nodes[0].feedSeq
29-
// this.db.get(chunk.name, (err, nodes) => {
30-
// if (err) {
31-
// this.emit('error', err)
32-
// done()
33-
// }
34-
// const node = nodes[0]
35-
// if (node.feedSeq === feedSeq && node.seq === seq) {
36-
// if (node.value !== null) this.push(JSON.parse(node.value.toString()))
37-
// }
38-
// done()
39-
// })
40-
// }
25+
HyperdbReadTransform.prototype._transform = function transform (nodes, encoding, done) {
4126
if (this._finished) return done()
4227
if (this._limit && this._count >= this._limit) {
4328
this.push(null)
@@ -55,4 +40,4 @@ HyperdbDiffTransform.prototype._transform = function transform (nodes, encoding,
5540
done()
5641
}
5742

58-
module.exports = HyperdbDiffTransform
43+
module.exports = HyperdbReadTransform

lib/hyperdbModifier.js

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
var hash = require('hyperdb/lib/hash')
2+
var Readable = require('readable-stream').Readable
3+
var LRU = require('lru')
4+
5+
module.exports = { attachCreateReadStream }
6+
7+
function attachCreateReadStream (DB) {
8+
DB.prototype.createReadStream = createReadStream
9+
}
10+
11+
function createReadStream (key, opts) {
12+
if (!opts) opts = {}
13+
var self = this
14+
var path = hash(key, true)
15+
var cacheMax = opts.cacheSize || 128
16+
var keyCache = new LRU(cacheMax)
17+
var streamQueue
18+
var queueNeedsSorting = true
19+
var stream = new Readable({ objectMode: true })
20+
stream._read = read
21+
22+
return stream
23+
24+
function read () {
25+
if (stream.destroyed) return
26+
// if no heads - get heads and process first tries
27+
if (!streamQueue) {
28+
self.heads(function (err, heads) {
29+
if (err) stream.emit('error', err)
30+
if (!heads.length) {
31+
stream.push(null)
32+
return
33+
}
34+
streamQueue = heads.map(h => ({ node: h, index: 0 }))
35+
next()
36+
})
37+
return
38+
}
39+
next()
40+
}
41+
42+
function next () {
43+
if (!streamQueue.length) {
44+
stream.push(null)
45+
return
46+
}
47+
// sort stream queue first to ensure that you always get the latest node
48+
// this requires offsetting feeds sequences based on when it started in relation to others
49+
if (queueNeedsSorting) streamQueue.sort(sortQueueByClockAndSeq)
50+
var data = streamQueue.pop()
51+
var node = data.node
52+
readNext(node, data.index, (err, match) => {
53+
if (err) {
54+
return stream.emit('error', err)
55+
}
56+
if (!match) return next()
57+
// check if really a match and not encountered before
58+
check(node, (matchingNode) => {
59+
if (!matchingNode) next()
60+
else {
61+
keyCache.set(node.key, true)
62+
stream.push(matchingNode)
63+
}
64+
})
65+
})
66+
}
67+
68+
function sortQueueByClockAndSeq (a, b) {
69+
a = a.node
70+
b = b.node
71+
var sortValue = sortNodesByClock(a, b)
72+
if (sortValue !== 0) return sortValue
73+
// same time, so use sequence to order
74+
if (a.feed === b.feed) return a.seq - b.seq
75+
var bOffset = b.clock.reduce((p, v) => p + v, b.seq)
76+
var aOffset = a.clock.reduce((p, v) => p + v, a.seq)
77+
// if real sequence is the same then return sort on feed
78+
if (bOffset === aOffset) return b.feed - a.feed
79+
return aOffset - bOffset
80+
}
81+
82+
function check (node, cb) {
83+
// is it actually a match and not a collision
84+
if (!(node && node.key && node.key.indexOf(key) === 0)) return cb()
85+
// have we encountered this node before
86+
if (keyCache.get(node.key)) return cb()
87+
// it is not in the cache but might still be a duplicate if cache is full
88+
// if (keyCache.length === cacheMax) {
89+
// so check if this is the first instance of the node
90+
// TODO: Atm this is a bit of a hack to get conflicting values
91+
// ideally this should not need to retraverse the trie.
92+
// Potential issue here when db is updated after stream was created!
93+
return self._get(node.key, false, [], noop, (err, latest) => {
94+
if (err) return stream.emit('error', err)
95+
if (sortNodesByClock(node, Array.isArray(latest) ? latest[0] : latest) >= 0) {
96+
cb(latest)
97+
} else {
98+
cb()
99+
}
100+
})
101+
}
102+
103+
function readNext (node, i, cb) {
104+
var writers = self._writers
105+
var trie
106+
var missing = 0
107+
var error
108+
var vals
109+
for (; i < path.length - 1; i++) {
110+
if (node.path[i] === path[i]) continue
111+
// check trie
112+
trie = node.trie[i]
113+
if (!trie) {
114+
return cb(null)
115+
}
116+
vals = trie[path[i]]
117+
// not found
118+
if (!vals || !vals.length) {
119+
return cb(null)
120+
}
121+
122+
missing = vals.length
123+
error = null
124+
for (var j = 0; j < vals.length; j++) {
125+
// fetch potential
126+
writers[vals[j].feed].get(vals[j].seq, (err, val) => {
127+
if (err) {
128+
error = err
129+
} else {
130+
pushToQueue({ node: val, index: i })
131+
}
132+
missing--
133+
if (!missing) {
134+
cb(error)
135+
}
136+
})
137+
}
138+
return
139+
}
140+
141+
// Traverse the rest of the node's trie, recursively,
142+
// hunting for more nodes with the desired prefix.
143+
for (; i < node.trie.length; i++) {
144+
trie = node.trie[i] || []
145+
for (j = 0; j < trie.length; j++) {
146+
var entrySet = trie[j] || []
147+
for (var el = 0; el < entrySet.length; el++) {
148+
var entry = entrySet[el]
149+
missing++
150+
writers[entry.feed].get(entry.seq, (err, val) => {
151+
if (err) {
152+
error = err
153+
} else if (val.key && val.value) {
154+
pushToQueue({ node: val, index: i + 1 })
155+
}
156+
missing--
157+
if (!missing) {
158+
if (i < node.trie.length) {
159+
pushToQueue({ node: node, index: i + 1 })
160+
cb(error, false)
161+
} else {
162+
cb(error, true)
163+
}
164+
}
165+
})
166+
}
167+
}
168+
if (missing > 0) return
169+
}
170+
return cb(null, true)
171+
}
172+
173+
function pushToQueue (item) {
174+
queueNeedsSorting = streamQueue.length > 0 && (sortQueueByClockAndSeq(item, streamQueue[streamQueue.length - 1]) < 0)
175+
streamQueue.push(item)
176+
}
177+
}
178+
179+
function sortNodesByClock (a, b) {
180+
var isGreater = false
181+
var isLess = false
182+
var length = a.clock.length
183+
if (b.clock.length > length) length = b.clock.length
184+
for (var i = 0; i < length; i++) {
185+
var diff = (a.clock[i] || 0) - (b.clock[i] || 0)
186+
if (diff > 0) isGreater = true
187+
if (diff < 0) isLess = true
188+
}
189+
if (isGreater && isLess) return 0
190+
if (isLess) return -1
191+
if (isGreater) return 1
192+
return 0
193+
}
194+
195+
function noop () {}

lib/utils.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ const Variable = require('./Variable')
22

33
const defs = {
44
spo: ['subject', 'predicate', 'object'],
5-
sop: ['subject', 'object', 'predicate'],
5+
sop: ['subject', 'object', 'predicate'], // [optional]
66
pos: ['predicate', 'object', 'subject'],
7-
pso: ['predicate', 'subject', 'object'],
8-
ops: ['object', 'predicate', 'subject'],
7+
pso: ['predicate', 'subject', 'object'], // [optional]
8+
ops: ['object', 'predicate', 'subject'], // [optional]
99
osp: ['object', 'subject', 'predicate']
1010
}
1111
const defKeys = Object.keys(defs)

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
},
1313
"dependencies": {
1414
"hyperdb": "^2.0.0",
15-
"readable-stream": "^2.3.3",
16-
"pump": "^2.0.0"
15+
"lru": "^3.1.0",
16+
"pump": "^2.0.0",
17+
"readable-stream": "^2.3.3"
1718
},
1819
"devDependencies": {
1920
"chai": "^4.1.2",

readme.md

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@ This requires node v6.x.x or greater.
1717
## basic usage
1818

1919
```js
20-
var hyperdb = require('hyperdb')
2120
var hypergraph = require('hyper-graph-db')
2221

23-
var db = hypergraph(hyperdb('./my.db', {valueEncoding: 'utf-8'}))
22+
var db = hypergraph('./my.db', { valueEncoding: 'utf-8' })
2423

2524
var triple = { subject: 'a', predicate: 'b', object: 'c' }
2625

@@ -34,9 +33,23 @@ db.put(triple, function (err) {
3433

3534
## API
3635

37-
#### `var db = hypergraph(hyperdb)`
36+
#### `var db = hypergraph(storage, [key], [options])`
3837

39-
Returns an instance of hyper-graph-db using the hyperdb passed to it.
38+
Returns an instance of hyper-graph-db. Arguments are passed directly to hyperdb, look at its constructor [API](https://github.com/mafintosh/hyperdb#var-db--hyperdbstorage-key-options) for configuration options.
39+
40+
41+
#### `db.on('ready')`
42+
43+
*This event is passed on from underlying hyperdb instance.*
44+
45+
Emitted exactly once: when the db is fully ready and all static properties have
46+
been set. You do not need to wait for this when calling any async functions.
47+
48+
#### `db.on('error', err)`
49+
50+
*This event is passed on from underlying hyperdb instance.*
51+
52+
Emitted if there was a critical error before `db` is ready.
4053

4154
#### `db.put(triple, [callback])`
4255

test/join-stream.spec.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
const expect = require('chai').expect
44
const ram = require('random-access-memory')
55
const hypergraph = require('../index')
6-
const hyperdb = require('hyperdb')
76
const fixture = require('./fixture/foaf')
87

98
function ramStore (filename) {
@@ -16,7 +15,7 @@ function ramStore (filename) {
1615
describe('JoinStream', () => {
1716
let db
1817
beforeEach((done) => {
19-
db = hypergraph(hyperdb(ramStore))
18+
db = hypergraph(ramStore)
2019
db.put(fixture, done)
2120
})
2221

0 commit comments

Comments
 (0)