Skip to content
This repository was archived by the owner on Mar 25, 2026. It is now read-only.

Commit 470b26b

Browse files
committed
elasticsearch 7.x support
1 parent ee708da commit 470b26b

6 files changed

Lines changed: 454 additions & 38 deletions

File tree

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,11 @@ elasticbulk.import(data, {
257257
## Tests
258258

259259
```bash
260-
# It tests importing with your ES version. Don't tests on production environment
261-
ES_URL=http://localhost:9200 mocha tests/indexSpec.js -t 100000
260+
# Test ES 1.7
261+
docker run -it -d -p 9200:9200 -p 9300:9300 -v $HOME/elasticsearch1.7/data:/data -v $HOME/elasticsearch1.7/logs:/logs barnybug/elasticsearch:1.7.2
262+
mocha --exit -t 15000 tests/elasticitemsSpec.js
263+
264+
# Test ES 7.x
265+
docker run -it -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.10.1
266+
mocha --exit -t 15000 tests/elasticitems7xSpec.js
262267
```

lib.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const Promise = require('bluebird');
33
//const isStream = require('is-stream');
44

55
const elasticsearch = require('./src/elasticitems');
6+
const elasticsearch7x = require('./src/elasticitems7x');
67
const itemsapi = require('./src/itemsapi');
78
const meilisearch = require('./src/meilisearch');
89

@@ -17,6 +18,8 @@ module.exports.import = function(data, options, schema) {
1718
return itemsapi.import(data, options, schema);
1819
} else if (options.engine === 'meilisearch') {
1920
return meilisearch.import(data, options, schema);
21+
} else if (options.engine === 'elasticsearch7x') {
22+
return elasticsearch7x.import(data, options, schema);
2023
} else {
2124
return elasticsearch.import(data, options, schema);
2225
}

package-lock.json

Lines changed: 20 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "elasticbulk",
3-
"version": "1.0.22",
3+
"version": "1.0.24",
44
"description": "Add data in bulk to ItemsAPI or Elasticsearch. It supports data streaming from PostgreSQL or filesystem",
55
"main": "lib.js",
66
"scripts": {
@@ -10,7 +10,7 @@
1010
"license": "MIT",
1111
"dependencies": {
1212
"bluebird": "^3.5.1",
13-
"elasticsearch": "^13.3.1",
13+
"elasticsearch": "^16.7.2",
1414
"is-stream": "^1.1.0",
1515
"itemsapi": "^2.1.0",
1616
"lodash": "^4.17.15",

src/elasticitems7x.js

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
'use strict';
2+
3+
const _ = require('lodash');
4+
const Promise = require('bluebird');
5+
const isStream = require('is-stream');
6+
const elasticsearch = require('elasticsearch');
7+
8+
/**
9+
* data is json array of objects or stream
10+
*/
11+
module.exports.import = function(data, options, schema) {
12+
13+
options = options || {}
14+
options.concurrency = options.concurrency || 1
15+
options.type = options.type || options.index
16+
17+
if (!options.host) {
18+
return Promise.reject('Please define host name')
19+
}
20+
21+
if (!options.index) {
22+
return Promise.reject('Please provide index name')
23+
}
24+
25+
if (isStream(data)) {
26+
data.pause();
27+
}
28+
29+
var elastic = new elasticsearch.Client({
30+
host: options.host,
31+
defer: function () {
32+
return Promise.defer();
33+
}
34+
});
35+
36+
return Promise.resolve()
37+
.then(function(res) {
38+
39+
if (schema) {
40+
41+
return elastic.indices.create({
42+
index: options.index,
43+
body: schema
44+
})
45+
}
46+
})
47+
.then(function(res) {
48+
49+
if (isStream(data)) {
50+
return module.exports.addItemsStream(elastic, data, options)
51+
} else {
52+
53+
return Promise.all(_.chunk(data, options.chunk_size))
54+
.map(items => {
55+
return module.exports.addBulkItems(elastic, items, options)
56+
}, {
57+
concurrency: options.concurrency
58+
})
59+
}
60+
})
61+
.catch(function(res) {
62+
console.log(res);
63+
})
64+
}
65+
66+
/**
67+
* import data by stream (file, json, psql, etc)
68+
*/
69+
module.exports.addItemsStream = function(elastic, stream, options) {
70+
71+
72+
return new Promise(function(resolve, reject) {
73+
var counter = 0;
74+
var global_counter = 0;
75+
var items = [];
76+
var counter_limit = options.chunk_size || options.limit || 100
77+
var concurrency = 1
78+
var added = 1
79+
80+
stream.on('data', function (item) {
81+
82+
++counter
83+
items.push(item)
84+
85+
if (counter >= counter_limit) {
86+
stream.pause();
87+
88+
return module.exports.addBulkItems(elastic, items, options)
89+
.then(function(res) {
90+
counter = 0;
91+
items = []
92+
console.log(added + ' series added!');
93+
added++
94+
stream.resume()
95+
})
96+
}
97+
})
98+
99+
stream.on('end', function (data) {
100+
101+
if (!items.length) {
102+
return resolve()
103+
}
104+
105+
module.exports.addBulkItems(elastic, items, options)
106+
.then(function(res) {
107+
console.log('Last ' + added + ' series added!');
108+
return resolve()
109+
})
110+
})
111+
112+
stream.on('close', function (data) {
113+
//console.log('close');
114+
return resolve()
115+
})
116+
117+
stream.on('error', function (err) {
118+
//console.log('error');
119+
return reject(err)
120+
})
121+
122+
/**
123+
* it waits until creating schema is not finished
124+
*/
125+
stream.resume();
126+
})
127+
}
128+
129+
module.exports.addBulkItems = function(elastic, items, options) {
130+
131+
var body = [];
132+
for (var i = 0 ; i < items.length ; ++i) {
133+
var o = { index: { _id: items[i] ? items[i]._id : undefined } };
134+
body.push(o);
135+
body.push(items[i]);
136+
}
137+
138+
return elastic.bulk({
139+
index: options.index,
140+
//type: options.type,
141+
body: body
142+
})
143+
.then(v => {
144+
145+
if (options.debug && v.errors) {
146+
console.log(JSON.stringify(v, null, 2));
147+
}
148+
})
149+
}

0 commit comments

Comments
 (0)