Skip to content

Commit b83fa63

Browse files
committed
transparently decompress .gz/.bz2/.xz files
1 parent 66febfb commit b83fa63

3 files changed

Lines changed: 180 additions & 67 deletions

File tree

.travis.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
language: cpp
22

3-
branches:
4-
only:
5-
- master
6-
73
dist: trusty
84

95
services:

src/importdata.cpp

Lines changed: 171 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -245,21 +245,173 @@ bool ImportData::insert_line(const std::string& line)
245245
}
246246

247247
//! process an input stream (file or stdin), cache lines or insert directly.
248-
void ImportData::process_stream(std::istream& is)
248+
void ImportData::process_stream(std::istream& is, const char* fname)
249249
{
250+
m_count = 0;
251+
252+
std::string line;
253+
while (std::getline(is,line))
254+
{
255+
if (!process_line(line))
256+
return;
257+
}
258+
259+
if (mopt_firstline) {
260+
OUT("Imported " << m_count << " rows of data from " << fname);
261+
}
262+
else {
263+
OUT("Cached " << m_count << " rows of data from " << fname);
264+
}
265+
}
266+
267+
//! scan string of given size for ch starting at pos and return position or
268+
//! std::string::npos
269+
static inline
270+
size_t string_find(const char* str, size_t size, char ch, size_t pos) {
271+
while (pos < size) {
272+
if (str[pos] == ch)
273+
return pos;
274+
++pos;
275+
}
276+
return std::string::npos;
277+
}
278+
279+
//! process an input stream (file or stdin), cache lines or insert directly.
280+
void ImportData::process_stream(FILE* in, const char* fname)
281+
{
282+
m_count = 0;
283+
284+
char buffer[64 * 1024u];
250285
std::string line;
251286

252-
while ( std::getline(is,line) )
287+
while (!feof(in))
253288
{
254-
if (!mopt_all_lines && is_result_line(line) == 0)
255-
continue;
289+
size_t rb = fread(buffer, 1, sizeof(buffer), in);
290+
291+
std::string::size_type pos = 0, nl;
292+
while ((nl = string_find(buffer, rb, '\n', pos)) != std::string::npos) {
293+
line.append(buffer + pos, nl - pos);
294+
if (!process_line(line))
295+
return;
296+
line.clear();
297+
pos = nl + 1;
298+
}
299+
line.append(buffer + pos, rb - pos);
300+
}
256301

257-
if (mopt_verbose >= 2)
258-
OUT("line: " << line);
302+
if (mopt_firstline) {
303+
OUT("Imported " << m_count << " rows of data from " << fname);
304+
}
305+
else {
306+
OUT("Cached " << m_count << " rows of data from " << fname);
307+
}
308+
}
259309

260-
std::set<std::string> keyset;
310+
//! Checks if the given match string is located at the end of this string.
311+
static inline
312+
bool ends_with(const std::string& str, const char* match) {
313+
size_t str_size = str.size(), match_size = strlen(match);
314+
if (match_size > str_size)
315+
return false;
316+
317+
std::string::const_iterator s = str.end() - match_size;
318+
while (*match != 0) {
319+
if (*s != *match) return false;
320+
++s, ++match;
321+
}
322+
return true;
323+
}
261324

262-
if (!mopt_firstline)
325+
//! process a line: cache lines or insert directly.
326+
void ImportData::process_file(const std::string& fname)
327+
{
328+
if (ends_with(fname, ".gz")) {
329+
FILE* in = popen(("gzip -dc " + fname).c_str(), "r");
330+
if (in == NULL) {
331+
if (mopt_empty_okay)
332+
OUT("Error reading " << fname << ": " << strerror(errno));
333+
else
334+
OUT_THROW("Error reading " << fname << ": " << strerror(errno));
335+
}
336+
else {
337+
process_stream(in, fname.c_str());
338+
pclose(in);
339+
}
340+
}
341+
else if (ends_with(fname, ".bz2")) {
342+
FILE* in = popen(("bzip2 -dc " + fname).c_str(), "r");
343+
if (in == NULL) {
344+
if (mopt_empty_okay)
345+
OUT("Error reading " << fname << ": " << strerror(errno));
346+
else
347+
OUT_THROW("Error reading " << fname << ": " << strerror(errno));
348+
}
349+
else {
350+
process_stream(in, fname.c_str());
351+
pclose(in);
352+
}
353+
}
354+
else if (ends_with(fname, ".xz")) {
355+
FILE* in = popen(("xz -dc " + fname).c_str(), "r");
356+
if (in == NULL) {
357+
if (mopt_empty_okay)
358+
OUT("Error reading " << fname << ": " << strerror(errno));
359+
else
360+
OUT_THROW("Error reading " << fname << ": " << strerror(errno));
361+
}
362+
else {
363+
process_stream(in, fname.c_str());
364+
pclose(in);
365+
}
366+
}
367+
else {
368+
std::ifstream in(fname.c_str());
369+
if (!in.good()) {
370+
if (mopt_empty_okay)
371+
OUT("Error reading " << fname << ": " << strerror(errno));
372+
else
373+
OUT_THROW("Error reading " << fname << ": " << strerror(errno));
374+
}
375+
else {
376+
process_stream(in, fname.c_str());
377+
}
378+
}
379+
}
380+
381+
//! process a line: cache lines or insert directly.
382+
bool ImportData::process_line(const std::string& line)
383+
{
384+
if (!mopt_all_lines && is_result_line(line) == 0)
385+
return true;
386+
387+
if (mopt_verbose >= 2)
388+
OUT("line: " << line);
389+
390+
std::set<std::string> keyset;
391+
392+
if (!mopt_firstline)
393+
{
394+
// split line and detect types of each field
395+
slist_type slist = split_result_line(line);
396+
397+
size_t col = 0;
398+
for (slist_type::iterator si = slist.begin();
399+
si != slist.end(); ++si, ++col)
400+
{
401+
if (si->size() == 0) return true;
402+
std::string key, value;
403+
split_keyvalue(*si, col, key, value);
404+
key = dedup_key(key, keyset);
405+
m_fieldset.add_field(key, value);
406+
}
407+
408+
// cache line
409+
m_linedata.push_back(line);
410+
++m_count, ++m_total_count;
411+
}
412+
else
413+
{
414+
if (m_total_count == 0)
263415
{
264416
// split line and detect types of each field
265417
slist_type slist = split_result_line(line);
@@ -268,44 +420,23 @@ void ImportData::process_stream(std::istream& is)
268420
for (slist_type::iterator si = slist.begin();
269421
si != slist.end(); ++si, ++col)
270422
{
271-
if (si->size() == 0) continue;
423+
if (si->size() == 0) return true;
272424
std::string key, value;
273425
split_keyvalue(*si, col, key, value);
274426
key = dedup_key(key, keyset);
275427
m_fieldset.add_field(key, value);
276428
}
277429

278-
// cache line
279-
m_linedata.push_back(line);
280-
++m_count, ++m_total_count;
430+
// immediately create table from first row
431+
if (!create_table()) return false;
281432
}
282-
else
283-
{
284-
if (m_total_count == 0)
285-
{
286-
// split line and detect types of each field
287-
slist_type slist = split_result_line(line);
288-
289-
size_t col = 0;
290-
for (slist_type::iterator si = slist.begin();
291-
si != slist.end(); ++si, ++col)
292-
{
293-
if (si->size() == 0) continue;
294-
std::string key, value;
295-
split_keyvalue(*si, col, key, value);
296-
key = dedup_key(key, keyset);
297-
m_fieldset.add_field(key, value);
298-
}
299-
300-
// immediately create table from first row
301-
if (!create_table()) return;
302-
}
303433

304-
if (insert_line(line)) {
305-
++m_count, ++m_total_count;
306-
}
434+
if (insert_line(line)) {
435+
++m_count, ++m_total_count;
307436
}
308437
}
438+
439+
return true;
309440
}
310441

311442
//! process cached data lines
@@ -479,34 +610,13 @@ int ImportData::main(int argc, char* argv[])
479610
}
480611

481612
for (int fi = 0; fi < glob.FileCount(); ++fi)
482-
{
483-
const char* fname = glob.File(fi);
484-
485-
m_count = 0;
486-
std::ifstream in(fname);
487-
if (!in.good()) {
488-
if (mopt_empty_okay)
489-
OUT("Error reading " << fname << ": " << strerror(errno));
490-
else
491-
OUT_THROW("Error reading " << fname << ": " << strerror(errno));
492-
}
493-
else {
494-
process_stream(in);
495-
496-
if (mopt_firstline) {
497-
OUT("Imported " << m_count << " rows of data from " << fname);
498-
}
499-
else {
500-
OUT("Cached " << m_count << " rows of data from " << fname);
501-
}
502-
}
503-
}
613+
process_file(glob.File(fi));
504614
}
505-
else // no file arguments -> process stdin
615+
else
506616
{
617+
// no file arguments -> process stdin
507618
OUT("Reading data from stdin ...");
508-
509-
process_stream(std::cin);
619+
process_stream(stdin, "<stdin>");
510620
}
511621

512622
// process cached data lines

src/importdata.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,15 @@ class ImportData
9292
//! insert a line into the database table
9393
bool insert_line(const std::string& line);
9494

95-
//! process an input stream (file or stdin), cache lines or insert directly.
96-
void process_stream(std::istream& is);
95+
//! process a line: cache lines or insert directly.
96+
bool process_line(const std::string& line);
97+
98+
//! process an input stream and split into lines
99+
void process_stream(FILE* in, const char* fname);
100+
void process_stream(std::istream& in, const char* fname);
101+
102+
//! process a line: cache lines or insert directly.
103+
void process_file(const std::string& fname);
97104

98105
//! process cached data lines
99106
void process_linedata();

0 commit comments

Comments
 (0)