Skip to content

Commit ba521e2

Browse files
authored
Merge pull request #2 from e-e-e/read-stream
Add offset, limit, filter and optimise for large graphs
2 parents 054e04b + 9caf986 commit ba521e2

10 files changed

Lines changed: 405 additions & 126 deletions

index.js

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,49 @@
1-
const PassThrough = require('readable-stream').PassThrough
2-
const Transform = require('readable-stream').Transform
1+
const hyperdb = require('hyperdb')
2+
const stream = require('readable-stream')
33
const pump = require('pump')
4+
const inherits = require('inherits')
5+
const events = require('events')
46

57
const utils = require('./lib/utils')
68
const Variable = require('./lib/Variable')
7-
const HyperdbDiffTransform = require('./lib/HyperdbDiffTransform')
9+
const HyperdbReadTransform = require('./lib/HyperdbReadTransform')
810
const JoinStream = require('./lib/JoinStream')
911
const planner = require('./lib/planner')
12+
const attachCreateReadStream = require('./lib/hyperdbModifier').attachCreateReadStream
1013

11-
function Graph (db, opts) {
12-
if (!(this instanceof Graph)) return new Graph(db, opts)
13-
this.db = db
14+
const Transform = stream.Transform
15+
const PassThrough = stream.PassThrough
16+
17+
// temporarily augment hyperdb prototype to include createReadStream
18+
if (!hyperdb.createReadStream) {
19+
attachCreateReadStream(hyperdb)
1420
}
1521

22+
function Graph (storage, key, opts) {
23+
if (!(this instanceof Graph)) return new Graph(storage, key, opts)
24+
events.EventEmitter.call(this)
25+
this.db = hyperdb(storage, key, opts)
26+
27+
this.db.on('error', (e) => {
28+
this.emit('error', e)
29+
})
30+
this.db.on('ready', (e) => {
31+
this.emit('ready', e)
32+
})
33+
}
34+
35+
inherits(Graph, events.EventEmitter)
36+
1637
Graph.prototype.v = (name) => new Variable(name)
1738

18-
Graph.prototype.getStream = function (triple) {
19-
const stream = this.db.createDiffStream(utils.createQuery(triple))
20-
return stream.pipe(new HyperdbDiffTransform(this.db))
39+
Graph.prototype.getStream = function (triple, opts) {
40+
const stream = this.db.createReadStream(utils.createQuery(triple))
41+
return stream.pipe(new HyperdbReadTransform(this.db, opts))
2142
}
2243

23-
Graph.prototype.get = function (triple, callback) {
24-
utils.collect(this.getStream(triple), callback)
44+
Graph.prototype.get = function (triple, opts, callback) {
45+
if (typeof opts === 'function') return this.get(triple, undefined, opts)
46+
utils.collect(this.getStream(triple, opts), callback)
2547
}
2648

2749
function doAction (action) {
@@ -55,12 +77,11 @@ function doActionStream (action) {
5577
}
5678
}
5779

58-
// this is not implemented in hyperdb yet
59-
// for now we just put a null value in the db
60-
6180
Graph.prototype.put = doAction('put')
6281
Graph.prototype.putStream = doActionStream('put')
6382

83+
// this is not implemented in hyperdb yet
84+
// for now we just put a null value in the db
6485
Graph.prototype.del = doAction('del')
6586
Graph.prototype.delStream = doActionStream('del')
6687

@@ -75,8 +96,14 @@ Graph.prototype.searchStream = function (query, options) {
7596
}
7697
const plannedQuery = planner(query)
7798

78-
var streams = plannedQuery.map((triple) => {
79-
return new JoinStream({ triple, db: this })
99+
var streams = plannedQuery.map((triple, i) => {
100+
const limit = (options && i === plannedQuery.length - 1) ? options.limit : undefined
101+
return new JoinStream({
102+
triple: utils.filterTriple(triple),
103+
filter: triple.filter,
104+
db: this,
105+
limit
106+
})
80107
})
81108

82109
streams[0].start = true
@@ -87,8 +114,12 @@ Graph.prototype.searchStream = function (query, options) {
87114
return result
88115
}
89116

90-
Graph.prototype.search = function (query, callback) {
91-
utils.collect(this.searchStream(query), callback)
117+
Graph.prototype.search = function (query, options, callback) {
118+
if (typeof options === 'function') {
119+
callback = options
120+
options = undefined
121+
}
122+
utils.collect(this.searchStream(query, options), callback)
92123
}
93124

94125
Graph.prototype.generateBatch = utils.generateBatch

lib/HyperdbDiffTransform.js

Lines changed: 0 additions & 36 deletions
This file was deleted.

lib/HyperdbReadTransform.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
const Transform = require('readable-stream').Transform
2+
const inherits = require('inherits')
3+
4+
function HyperdbReadTransform (db, options) {
5+
if (!(this instanceof HyperdbReadTransform)) {
6+
return new HyperdbReadTransform(db, options)
7+
}
8+
var opts = options || {}
9+
this.db = db
10+
this._finished = false
11+
this._count = 0
12+
this._filter = opts.filter
13+
this._offset = opts.offset || 0
14+
this._limit = opts.limit && opts.limit + this._offset
15+
Transform.call(this, Object.assign(opts, { objectMode: true }))
16+
this._sources = []
17+
this.once('pipe', (source) => {
18+
source.on('error', e => this.emit('error', e))
19+
this._sources.push(source)
20+
})
21+
}
22+
23+
inherits(HyperdbReadTransform, Transform)
24+
25+
HyperdbReadTransform.prototype._transform = function transform (nodes, encoding, done) {
26+
if (this._finished) return done()
27+
if (this._limit && this._count >= this._limit) {
28+
this.push(null)
29+
this._sources.forEach(source => source.destroy())
30+
this._finished = true
31+
return
32+
}
33+
var value = nodes[0].value && JSON.parse(nodes[0].value.toString())
34+
if (value !== null && (!this._filter || this._filter(value))) {
35+
if (this._count >= this._offset) {
36+
this.push(value)
37+
}
38+
this._count++
39+
}
40+
done()
41+
}
42+
43+
module.exports = HyperdbReadTransform

lib/JoinStream.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ function JoinStream (options) {
1616
this.matcher = matcher(options.triple)
1717
this.mask = queryMask(options.triple)
1818
this.maskUpdater = maskUpdater(options.triple)
19-
this.limit = options.limit
19+
this.limit = options && options.limit
2020
this._limitCounter = 0
2121
this.db = options.db
22-
this._index = options.index
2322
this._ended = false
23+
this.filter = options && options.filter
24+
this.offset = options && options.offset
2425

2526
this.once('pipe', (source) => {
2627
source.on('error', (err) => {
@@ -48,7 +49,7 @@ function JoinStream (options) {
4849
}
4950
}
5051

51-
this._indexPreferences = { index: this._index }
52+
this._options = { filter: this.filter, offset: this.offset }
5253
}
5354

5455
inherits(JoinStream, Transform)
@@ -60,7 +61,7 @@ JoinStream.prototype._transform = function transform (solution, encoding, done)
6061
var newMask = this.maskUpdater(solution, this.mask)
6162

6263
this._lastSolution = solution
63-
this._readStream = this.db.getStream(newMask, this._indexPreferences)
64+
this._readStream = this.db.getStream(newMask, this._options)
6465

6566
this._readStream.on('data', this._onDataStream)
6667
this._readStream.on('error', this._onErrorStream)

0 commit comments

Comments
 (0)