Skip to content

Commit 987ebfb

Browse files
authored
Merge pull request bingmann#18 from leoisl/load_complete_streaming
Allowing streaming of COBS indexes
2 parents 9601c8e + 51f21dd commit 987ebfb

13 files changed

Lines changed: 141 additions & 19 deletions

cobs/file/classic_index_header.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,20 @@ void ClassicIndexHeader::serialize(std::ostream& os) const {
3636
}
3737

3838
void ClassicIndexHeader::deserialize(std::istream& is) {
39-
deserialize_magic_begin(is, magic_word, version);
39+
std::streamsize nb_of_bytes_read = 0;
40+
nb_of_bytes_read += deserialize_magic_begin(is, magic_word, version);
4041

4142
uint32_t file_names_size;
42-
stream_get(is, term_size_, canonicalize_,
43+
nb_of_bytes_read += stream_get(is, term_size_, canonicalize_,
4344
file_names_size, signature_size_, num_hashes_);
4445
file_names_.resize(file_names_size);
4546
for (auto& file_name : file_names_) {
4647
std::getline(is, file_name);
48+
nb_of_bytes_read += file_name.size() + 1;
4749
}
4850

49-
deserialize_magic_end(is, magic_word);
51+
nb_of_bytes_read += deserialize_magic_end(is, magic_word);
52+
header_size_ = nb_of_bytes_read;
5053
}
5154

5255
void ClassicIndexHeader::write_file(std::ostream& os,

cobs/file/classic_index_header.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ class ClassicIndexHeader
2626
uint64_t num_hashes_;
2727
//! list of document file names
2828
std::vector<std::string> file_names_;
29+
//! header size in bytes
30+
std::streamsize header_size_;
2931

3032
public:
3133
static const std::string magic_word;

cobs/file/header.hpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
namespace cobs {
2121

2222
static inline
23-
void check_magic_word(std::istream& is, const std::string& magic_word) {
23+
std::streamsize check_magic_word(std::istream& is, const std::string& magic_word) {
2424
std::vector<char> mw_v(magic_word.size(), ' ');
2525
is.read(mw_v.data(), magic_word.size());
2626
std::string mw(mw_v.data(), mw_v.size());
2727
assert_throw<FileIOException>(mw == magic_word, "invalid file type");
2828
assert_throw<FileIOException>(is.good(), "input filestream broken");
29+
return is.gcount();
2930
}
3031

3132
static inline
@@ -43,19 +44,21 @@ void serialize_magic_end(
4344
}
4445

4546
static inline
46-
void deserialize_magic_begin(
47+
std::streamsize deserialize_magic_begin(
4748
std::istream& is, const std::string& magic_word, const uint32_t& version) {
48-
check_magic_word(is, "COBS:");
49-
check_magic_word(is, magic_word);
49+
std::streamsize nb_of_bytes_read = 0;
50+
nb_of_bytes_read += check_magic_word(is, "COBS:");
51+
nb_of_bytes_read += check_magic_word(is, magic_word);
5052
uint32_t v;
51-
stream_get(is, v);
53+
nb_of_bytes_read += stream_get(is, v);
5254
assert_throw<FileIOException>(v == version, "invalid file version");
55+
return nb_of_bytes_read;
5356
}
5457

5558
static inline
56-
void deserialize_magic_end(
59+
std::streamsize deserialize_magic_end(
5760
std::istream& is, const std::string& magic_word) {
58-
check_magic_word(is, magic_word);
61+
return check_magic_word(is, magic_word);
5962
}
6063

6164
} // namespace cobs

cobs/query/classic_index/mmap_search_file.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ ClassicIndexMMapSearchFile::ClassicIndexMMapSearchFile(const fs::path& path)
2020
data_ = handle_.data + stream_pos_.curr_pos;
2121
}
2222

23+
ClassicIndexMMapSearchFile::ClassicIndexMMapSearchFile(std::ifstream &ifs, int64_t index_file_size)
24+
: ClassicIndexSearchFile(ifs, index_file_size) {
25+
handle_ = initialize_stream(ifs, stream_pos_.size());
26+
data_ = handle_.data;
27+
}
28+
2329
ClassicIndexMMapSearchFile::~ClassicIndexMMapSearchFile() {
2430
destroy_mmap(handle_);
2531
}

cobs/query/classic_index/mmap_search_file.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class ClassicIndexMMapSearchFile : public ClassicIndexSearchFile
2525

2626
public:
2727
explicit ClassicIndexMMapSearchFile(const fs::path& path);
28+
explicit ClassicIndexMMapSearchFile(std::ifstream &ifs, int64_t index_file_size);
2829
~ClassicIndexMMapSearchFile();
2930
};
3031

cobs/query/classic_index/search_file.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ ClassicIndexSearchFile::ClassicIndexSearchFile(const fs::path& path) {
1818
stream_pos_ = get_stream_pos(ifs);
1919
}
2020

21+
ClassicIndexSearchFile::ClassicIndexSearchFile(std::ifstream &ifs, int64_t index_file_size) {
22+
header_ = deserialize_header<ClassicIndexHeader>(ifs);
23+
stream_pos_ = StreamPos { (uint64_t) header_.header_size_, (uint64_t) index_file_size};
24+
}
25+
26+
2127
uint64_t ClassicIndexSearchFile::counts_size() const {
2228
return 8 * header_.row_size();
2329
}

cobs/query/classic_index/search_file.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class ClassicIndexSearchFile : public IndexSearchFile
1818
{
1919
protected:
2020
explicit ClassicIndexSearchFile(const fs::path& path);
21+
explicit ClassicIndexSearchFile(std::ifstream &ifs, int64_t index_file_size);
2122

2223
uint32_t term_size() const final { return header_.term_size_; }
2324
uint8_t canonicalize() const final { return header_.canonicalize_; }

cobs/query/search.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,21 @@ static inline std::vector<std::shared_ptr<cobs::IndexSearchFile> > get_cobs_inde
7878
return indices;
7979
}
8080

81+
static inline std::vector<std::shared_ptr<cobs::IndexSearchFile> > get_cobs_indexes_given_streams (
82+
const std::vector<std::ifstream*> &streams, const std::vector<int64_t> &index_sizes) {
83+
84+
std::vector<std::shared_ptr<cobs::IndexSearchFile> > indices;
85+
for (size_t i=0; i<streams.size(); i++)
86+
{
87+
std::ifstream *stream = streams[i];
88+
int64_t index_file_size = index_sizes[i];
89+
indices.push_back(
90+
std::make_shared<cobs::ClassicIndexMMapSearchFile>(*stream, index_file_size));
91+
}
92+
93+
return indices;
94+
}
95+
8196
static inline void process_query(
8297
cobs::Search &s, double threshold, unsigned num_results,
8398
const std::string &query_line, const std::string &query_file,

cobs/util/file.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <cobs/util/fs.hpp>
1717

1818
#include <tlx/die.hpp>
19+
#include <tlx/logger.hpp>
1920

2021
namespace cobs {
2122

@@ -43,6 +44,14 @@ Header deserialize_header(std::ifstream& ifs, const fs::path& p) {
4344
return h;
4445
}
4546

47+
template <class Header>
48+
Header deserialize_header(std::ifstream& ifs) {
49+
LOG1 << "Deserializing header from stream";
50+
Header h;
51+
h.deserialize(ifs);
52+
return h;
53+
}
54+
4655
template <class Header>
4756
Header deserialize_header(const fs::path& p) {
4857
std::ifstream ifs;

cobs/util/query.cpp

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ int open_file(const fs::path& path, int flags) {
3030
}
3131

3232
void close_file(int fd) {
33-
if (close(fd)) {
34-
print_errno("could not close index file");
33+
if (fd>=0) {
34+
if (close(fd)) {
35+
print_errno("could not close index file");
36+
}
3537
}
3638
}
3739

@@ -88,6 +90,43 @@ MMapHandle initialize_mmap(const fs::path& path)
8890
}
8991
}
9092

93+
MMapHandle initialize_stream(std::ifstream& is, int64_t index_file_size)
94+
{
95+
off_t size = index_file_size;
96+
LOG1 << "Reading complete index from stream";
97+
void* ptr = nullptr;
98+
if (posix_memalign(&ptr, 2 * 1024 * 1024, size)) {
99+
print_errno("posix_memalign()");
100+
}
101+
char* data_ptr = reinterpret_cast<char*>(ptr);
102+
#if defined(MADV_HUGEPAGE)
103+
if (madvise(data_ptr, size, MADV_HUGEPAGE)) {
104+
print_errno("madvise failed for MADV_HUGEPAGE");
105+
}
106+
LOG1 << "Advising to use huge pages";
107+
#endif
108+
uint64_t remain = size;
109+
const uint64_t one_gb = 1024*1024*1024;
110+
uint64_t pos = 0;
111+
while (remain != 0) {
112+
is.read(data_ptr + pos, std::min(one_gb, remain));
113+
int64_t rb = is.gcount();
114+
if (rb < 0) {
115+
print_errno("read failed");
116+
break;
117+
}
118+
remain -= rb;
119+
pos += rb;
120+
LOG1 << "Read " << tlx::format_iec_units(pos)
121+
<< "B / " << tlx::format_iec_units(size) << "B - "
122+
<< pos * 100 / size << "%";
123+
}
124+
LOG1 << "Index loaded into RAM.";
125+
return MMapHandle {
126+
-1 /* not a valid fd, won't be closed */, reinterpret_cast<uint8_t*>(data_ptr), uint64_t(size)
127+
};
128+
}
129+
91130
void destroy_mmap(MMapHandle& handle)
92131
{
93132
if (!gopt_load_complete_index) {

0 commit comments

Comments
 (0)