Skip to content

Commit a4ff1c8

Browse files
taiyang-liwilliamhyun
authored andcommitted
ORC-1950: [C++] Make sure dictionary is sorted before flushed into ORC file to follow ORC specs
### What changes were proposed in this pull request? Make sure dictionary is sorted before flushed into ORC file to follow ORC specs. The [issue](#2321 (comment)) was brought by #2336. ### Why are the changes needed? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? Closes #2337 from taiyang-li/make_dict_sorted. Authored-by: taiyang-li <654010905@qq.com> Signed-off-by: William Hyun <william@apache.org>
1 parent 725fbc5 commit a4ff1c8

11 files changed

Lines changed: 364 additions & 178 deletions

CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ option (BUILD_LIBHDFSPP
4545
"Include LIBHDFSPP library in the build process"
4646
OFF)
4747

48+
option (BUILD_SPARSEHASH
49+
"Include sparsehash library in the build process"
50+
OFF)
51+
4852
option(BUILD_CPP_TESTS
4953
"Build the googletest unit tests"
5054
ON)

c++/src/CMakeLists.txt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ set(SOURCE_FILES
170170
Compression.cc
171171
ConvertColumnReader.cc
172172
CpuInfoUtil.cc
173+
Dictionary.cc
173174
Exceptions.cc
174175
Geospatial.cc
175176
Int128.cc
@@ -212,8 +213,8 @@ target_link_libraries (orc
212213
$<BUILD_INTERFACE:orc::snappy>
213214
$<BUILD_INTERFACE:orc::lz4>
214215
$<BUILD_INTERFACE:orc::zstd>
215-
$<BUILD_INTERFACE:orc::sparsehash>
216216
$<BUILD_INTERFACE:${LIBHDFSPP_LIBRARIES}>
217+
$<BUILD_INTERFACE:${SPARSEHASH_LIBRARIES}>
217218
)
218219

219220
target_include_directories (orc
@@ -232,6 +233,10 @@ if (BUILD_LIBHDFSPP)
232233
target_compile_definitions(orc PUBLIC -DBUILD_LIBHDFSPP)
233234
endif (BUILD_LIBHDFSPP)
234235

236+
if (BUILD_SPARSEHASH)
237+
target_compile_definitions(orc PUBLIC -DBUILD_SPARSEHASH)
238+
endif (BUILD_SPARSEHASH)
239+
235240
if (BUILD_CPP_ENABLE_METRICS)
236241
message(STATUS "Enable the metrics collection")
237242
target_compile_definitions(orc PUBLIC ENABLE_METRICS=1)

c++/src/ColumnWriter.cc

Lines changed: 8 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,12 @@
2424
#include <memory>
2525
#include "ByteRLE.hh"
2626
#include "ColumnWriter.hh"
27+
#include "Dictionary.hh"
2728
#include "RLE.hh"
2829
#include "Statistics.hh"
2930
#include "Timezone.hh"
3031
#include "Utils.hh"
3132

32-
#include <sparsehash/dense_hash_map>
33-
3433
namespace orc {
3534
StreamsFactory::~StreamsFactory() {
3635
// PASS
@@ -927,104 +926,6 @@ namespace orc {
927926
ColumnWriter::finishStreams();
928927
dataStream_->finishStream();
929928
}
930-
931-
/**
932-
* Implementation of increasing sorted string dictionary
933-
*/
934-
class SortedStringDictionary {
935-
public:
936-
struct DictEntry {
937-
DictEntry(const char* str, size_t len) : data(std::make_unique<std::string>(str, len)) {}
938-
939-
std::unique_ptr<std::string> data;
940-
};
941-
942-
SortedStringDictionary() : totalLength_(0) {
943-
/// Need to set empty key otherwise dense_hash_map will not work correctly
944-
keyToIndex_.set_empty_key(std::string_view{});
945-
}
946-
947-
// insert a new string into dictionary, return its insertion order
948-
size_t insert(const char* str, size_t len);
949-
950-
// write dictionary data & length to output buffer
951-
void flush(AppendOnlyBufferedStream* dataStream, RleEncoder* lengthEncoder) const;
952-
953-
// get dict entries in insertion order
954-
const std::vector<DictEntry>& getEntriesInInsertionOrder() const;
955-
956-
// return count of entries
957-
size_t size() const;
958-
959-
// return total length of strings in the dictioanry
960-
uint64_t length() const;
961-
962-
void clear();
963-
964-
private:
965-
// store dictionary entries in insertion order
966-
mutable std::vector<DictEntry> flatDict_;
967-
968-
// map from string to its insertion order index
969-
google::dense_hash_map<std::string_view, size_t> keyToIndex_;
970-
uint64_t totalLength_;
971-
972-
// use friend class here to avoid being bothered by const function calls
973-
friend class StringColumnWriter;
974-
friend class CharColumnWriter;
975-
friend class VarCharColumnWriter;
976-
// store indexes of insertion order in the dictionary for not-null rows
977-
std::vector<int64_t> idxInDictBuffer_;
978-
};
979-
980-
// insert a new string into dictionary, return its insertion order
981-
size_t SortedStringDictionary::insert(const char* str, size_t len) {
982-
size_t index = flatDict_.size();
983-
984-
auto it = keyToIndex_.find(std::string_view{str, len});
985-
if (it != keyToIndex_.end()) {
986-
return it->second;
987-
} else {
988-
flatDict_.emplace_back(str, len);
989-
totalLength_ += len;
990-
991-
const auto& lastEntry = flatDict_.back();
992-
keyToIndex_.emplace(std::string_view{lastEntry.data->data(), lastEntry.data->size()}, index);
993-
return index;
994-
}
995-
}
996-
997-
// write dictionary data & length to output buffer
998-
void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream,
999-
RleEncoder* lengthEncoder) const {
1000-
for (const auto& entry : flatDict_) {
1001-
dataStream->write(entry.data->data(), entry.data->size());
1002-
lengthEncoder->write(static_cast<int64_t>(entry.data->size()));
1003-
}
1004-
}
1005-
1006-
// get dict entries in insertion order
1007-
const std::vector<SortedStringDictionary::DictEntry>&
1008-
SortedStringDictionary::getEntriesInInsertionOrder() const {
1009-
return flatDict_;
1010-
}
1011-
1012-
// return count of entries
1013-
size_t SortedStringDictionary::size() const {
1014-
return flatDict_.size();
1015-
}
1016-
1017-
// return total length of strings in the dictioanry
1018-
uint64_t SortedStringDictionary::length() const {
1019-
return totalLength_;
1020-
}
1021-
1022-
void SortedStringDictionary::clear() {
1023-
totalLength_ = 0;
1024-
keyToIndex_.clear();
1025-
flatDict_.clear();
1026-
}
1027-
1028929
class StringColumnWriter : public ColumnWriter {
1029930
public:
1030931
StringColumnWriter(const Type& type, const StreamsFactory& factory,
@@ -1324,6 +1225,9 @@ namespace orc {
13241225
// flush dictionary data & length streams
13251226
dictionary.flush(dictStream.get(), dictLengthEncoder.get());
13261227

1228+
// convert index from insertion order to dictionary order
1229+
dictionary.reorder(dictionary.idxInDictBuffer_);
1230+
13271231
// write data sequences
13281232
int64_t* data = dictionary.idxInDictBuffer_.data();
13291233
if (enableIndex) {
@@ -1367,14 +1271,15 @@ namespace orc {
13671271
}
13681272

13691273
// get dictionary entries in insertion order
1370-
const auto& entries = dictionary.getEntriesInInsertionOrder();
1274+
std::vector<const SortedStringDictionary::DictEntry*> entries;
1275+
dictionary.getEntriesInInsertionOrder(entries);
13711276

13721277
// store each length of the data into a vector
13731278
for (uint64_t i = 0; i != dictionary.idxInDictBuffer_.size(); ++i) {
13741279
// write one row data in direct encoding
13751280
const auto& dictEntry = entries[static_cast<size_t>(dictionary.idxInDictBuffer_[i])];
1376-
directDataStream->write(dictEntry.data->data(), dictEntry.data->size());
1377-
directLengthEncoder->write(static_cast<int64_t>(dictEntry.data->size()));
1281+
directDataStream->write(dictEntry->data->data(), dictEntry->data->size());
1282+
directLengthEncoder->write(static_cast<int64_t>(dictEntry->data->size()));
13781283
}
13791284

13801285
deleteDictStreams();

c++/src/Dictionary.cc

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#include "Dictionary.hh"
20+
21+
namespace orc {
22+
23+
// insert a new string into dictionary, return its insertion order
24+
size_t SortedStringDictionary::insert(const char* str, size_t len) {
25+
size_t index = flatDict_.size();
26+
27+
auto it = keyToIndex_.find(std::string_view{str, len});
28+
if (it != keyToIndex_.end()) {
29+
return it->second;
30+
} else {
31+
flatDict_.emplace_back(str, len, index);
32+
totalLength_ += len;
33+
34+
const auto& lastEntry = flatDict_.back().entry;
35+
keyToIndex_.emplace(std::string_view{lastEntry.data->data(), lastEntry.data->size()}, index);
36+
return index;
37+
}
38+
}
39+
40+
// write dictionary data & length to output buffer
41+
void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream,
42+
RleEncoder* lengthEncoder) const {
43+
std::sort(flatDict_.begin(), flatDict_.end(), LessThan());
44+
45+
for (const auto& entryWithIndex : flatDict_) {
46+
dataStream->write(entryWithIndex.entry.data->data(), entryWithIndex.entry.data->size());
47+
lengthEncoder->write(static_cast<int64_t>(entryWithIndex.entry.data->size()));
48+
}
49+
}
50+
51+
/**
52+
* Reorder input index buffer from insertion order to dictionary order
53+
*
54+
* We require this function because string values are buffered by indexes
55+
* in their insertion order. Until the entire dictionary is complete can
56+
* we get their sorted indexes in the dictionary in that ORC specification
57+
* demands dictionary should be ordered. Therefore this function transforms
58+
* the indexes from insertion order to dictionary value order for final
59+
* output.
60+
*/
61+
void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
62+
// iterate the dictionary to get mapping from insertion order to value order
63+
std::vector<size_t> mapping(flatDict_.size());
64+
for (size_t i = 0; i < flatDict_.size(); ++i) {
65+
mapping[flatDict_[i].index] = i;
66+
}
67+
68+
// do the transformation
69+
for (size_t i = 0; i != idxBuffer.size(); ++i) {
70+
idxBuffer[i] = static_cast<int64_t>(mapping[static_cast<size_t>(idxBuffer[i])]);
71+
}
72+
}
73+
74+
// get dict entries in insertion order
75+
void SortedStringDictionary::getEntriesInInsertionOrder(
76+
std::vector<const DictEntry*>& entries) const {
77+
/// flatDict_ is sorted in insertion order before [[SortedStringDictionary::flush]] is invoked.
78+
entries.resize(flatDict_.size());
79+
for (size_t i = 0; i < flatDict_.size(); ++i) {
80+
entries[i] = &(flatDict_[i].entry);
81+
}
82+
}
83+
84+
// return count of entries
85+
size_t SortedStringDictionary::size() const {
86+
return flatDict_.size();
87+
}
88+
89+
// return total length of strings in the dictioanry
90+
uint64_t SortedStringDictionary::length() const {
91+
return totalLength_;
92+
}
93+
94+
void SortedStringDictionary::clear() {
95+
totalLength_ = 0;
96+
keyToIndex_.clear();
97+
flatDict_.clear();
98+
}
99+
} // namespace orc

c++/src/Dictionary.hh

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#include <cstddef>
20+
#include <memory>
21+
#include <string>
22+
23+
#ifdef BUILD_SPARSEHASH
24+
#include <sparsehash/dense_hash_map>
25+
#else
26+
#include <unordered_map>
27+
#endif
28+
29+
#include "RLE.hh"
30+
31+
namespace orc {
32+
/**
33+
* Implementation of increasing sorted string dictionary
34+
*/
35+
class SortedStringDictionary {
36+
public:
37+
struct DictEntry {
38+
DictEntry(const char* str, size_t len) : data(std::make_unique<std::string>(str, len)) {}
39+
40+
std::unique_ptr<std::string> data;
41+
};
42+
43+
struct DictEntryWithIndex {
44+
DictEntryWithIndex(const char* str, size_t len, size_t index)
45+
: entry(str, len), index(index) {}
46+
47+
DictEntry entry;
48+
size_t index;
49+
};
50+
51+
SortedStringDictionary() : totalLength_(0) {
52+
#ifdef BUILD_SPARSEHASH
53+
/// Need to set empty key otherwise dense_hash_map will not work correctly
54+
keyToIndex_.set_empty_key(std::string_view{});
55+
#endif
56+
}
57+
58+
// insert a new string into dictionary, return its insertion order
59+
size_t insert(const char* str, size_t len);
60+
61+
// write dictionary data & length to output buffer
62+
void flush(AppendOnlyBufferedStream* dataStream, RleEncoder* lengthEncoder) const;
63+
64+
// reorder input index buffer from insertion order to dictionary order
65+
void reorder(std::vector<int64_t>& idxBuffer) const;
66+
67+
// get dict entries in insertion order
68+
void getEntriesInInsertionOrder(std::vector<const DictEntry*>&) const;
69+
70+
// return count of entries
71+
size_t size() const;
72+
73+
// return total length of strings in the dictioanry
74+
uint64_t length() const;
75+
76+
void clear();
77+
78+
private:
79+
struct LessThan {
80+
bool operator()(const DictEntryWithIndex& l, const DictEntryWithIndex& r) {
81+
return *l.entry.data < *r.entry.data; // use std::string's operator<
82+
}
83+
};
84+
// store dictionary entries in insertion order
85+
mutable std::vector<DictEntryWithIndex> flatDict_;
86+
87+
#ifdef BUILD_SPARSEHASH
88+
// map from string to its insertion order index
89+
google::dense_hash_map<std::string_view, size_t> keyToIndex_;
90+
#else
91+
std::unordered_map<std::string_view, size_t> keyToIndex_;
92+
#endif
93+
94+
uint64_t totalLength_;
95+
96+
// use friend class here to avoid being bothered by const function calls
97+
friend class StringColumnWriter;
98+
friend class CharColumnWriter;
99+
friend class VarCharColumnWriter;
100+
// store indexes of insertion order in the dictionary for not-null rows
101+
std::vector<int64_t> idxInDictBuffer_;
102+
};
103+
104+
} // namespace orc

0 commit comments

Comments
 (0)