Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tree/dataframe/inc/ROOT/RCsvDS.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
namespace ROOT::Internal::RDF {
class R__CLING_PTRCHECK(off) RCsvDSColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase {
void *fValuePtr;
void *GetImpl(Long64_t) final { return fValuePtr; }
void *GetImpl(std::size_t) final { return fValuePtr; }
void LoadImpl(Long64_t, bool) final {}

public:
RCsvDSColumnReader(void *valuePtr) : fValuePtr(valuePtr) {}
Expand Down
20 changes: 11 additions & 9 deletions tree/dataframe/inc/ROOT/RDF/RAction.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -99,31 +99,33 @@ public:
}

template <typename ColType>
auto GetValueChecked(unsigned int slot, std::size_t readerIdx, Long64_t entry) -> ColType &
auto GetValueChecked(unsigned int slot, std::size_t readerIdx, std::size_t idx) -> ColType &
{
if (auto *val = fValues[slot][readerIdx]->template TryGet<ColType>(entry))
if (auto *val = fValues[slot][readerIdx]->template TryGet<ColType>(idx))
return *val;

throw std::out_of_range{"RDataFrame: Action (" + fHelper.GetActionName() +
") could not retrieve value for column '" + fColumnNames[readerIdx] + "' for entry " +
std::to_string(entry) +
std::to_string(idx) +
". You can use the DefaultValueFor operation to provide a default value, or "
"FilterAvailable/FilterMissing to discard/keep entries with missing values instead."};
}

template <typename... ColTypes, std::size_t... S>
void CallExec(unsigned int slot, Long64_t entry, TypeList<ColTypes...>, std::index_sequence<S...>)
void CallExec(unsigned int slot, std::size_t idx, TypeList<ColTypes...>, std::index_sequence<S...>)
{
ROOT::Internal::RDF::CallGuaranteedOrder{[&](auto &&...args) { return fHelper.Exec(slot, args...); },
GetValueChecked<ColTypes>(slot, S, entry)...};
(void)entry; // avoid unused parameter warning (gcc 12.1)
GetValueChecked<ColTypes>(slot, S, idx)...};
(void)idx; // avoid unused parameter warning (gcc 12.1)
}

void Run(unsigned int slot, Long64_t entry) final
{
// check if entry passes all filters
if (fPrevNode.CheckFilters(slot, entry))
CallExec(slot, entry, ColumnTypes_t{}, TypeInd_t{});
const auto mask = fPrevNode.CheckFilters(slot, entry);
std::for_each(fValues[slot].begin(), fValues[slot].end(), [entry, mask](auto *v) { v->Load(entry, mask); });

if (mask)
CallExec(slot, /*idx=*/0u, ColumnTypes_t{}, TypeInd_t{});
}

void TriggerChildrenCount() final { fPrevNode.IncrChildrenCount(); }
Expand Down
19 changes: 11 additions & 8 deletions tree/dataframe/inc/ROOT/RDF/RActionSnapshot.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -166,27 +166,27 @@ public:
fHelper.InitTask(r, slot);
}

void *GetValue(unsigned int slot, std::size_t readerIdx, Long64_t entry)
void *GetValue(unsigned int slot, std::size_t readerIdx, std::size_t idx)
{
assert(slot < fValues.size());
assert(readerIdx < fValues[slot].size());
if (auto *val = fValues[slot][readerIdx]->template TryGet<void>(entry))
if (auto *val = fValues[slot][readerIdx]->template TryGet<void>(idx))
return val;

throw std::out_of_range{"RDataFrame: Action (" + fHelper.GetActionName() +
") could not retrieve value for column '" + fColumnNames[readerIdx] + "' for entry " +
std::to_string(entry) +
std::to_string(idx) +
". You can use the DefaultValueFor operation to provide a default value, or "
"FilterAvailable/FilterMissing to discard/keep entries with missing values instead."};
}

void CallExec(unsigned int slot, Long64_t entry)
void CallExec(unsigned int slot, std::size_t idx)
{
std::vector<void *> untypedValues;
auto nReaders = fValues[slot].size();
untypedValues.reserve(nReaders);
for (decltype(nReaders) readerIdx{}; readerIdx < nReaders; readerIdx++)
untypedValues.push_back(GetValue(slot, readerIdx, entry));
untypedValues.push_back(GetValue(slot, readerIdx, idx));

fHelper.Exec(slot, untypedValues);
}
Expand All @@ -207,14 +207,17 @@ public:
std::vector<void *> untypedValues;
auto nReaders = fValues[slot].size();
untypedValues.reserve(nReaders);
std::for_each(fValues[slot].begin(), fValues[slot].end(), [entry](auto *v) { v->Load(entry, true); });
for (decltype(nReaders) readerIdx{}; readerIdx < nReaders; readerIdx++)
untypedValues.push_back(GetValue(slot, readerIdx, entry));
untypedValues.push_back(GetValue(slot, readerIdx, /*idx=*/0u));

fHelper.Exec(slot, untypedValues, filterPassed);
}
} else {
if (fPrevNodes.front()->CheckFilters(slot, entry))
CallExec(slot, entry);
const auto mask = fPrevNodes.front()->CheckFilters(slot, entry);
std::for_each(fValues[slot].begin(), fValues[slot].end(), [entry, mask](auto *v) { v->Load(entry, mask); });
if (mask)
CallExec(slot, /*idx=*/0u);
}
}

Expand Down
21 changes: 18 additions & 3 deletions tree/dataframe/inc/ROOT/RDF/RColumnReaderBase.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,38 @@ This pure virtual class provides a common base class for the different column re
RDSColumnReader.
**/
class R__CLING_PTRCHECK(off) RColumnReaderBase {
Long64_t fLoadedEntry = -1;

public:
virtual ~RColumnReaderBase() = default;

/// Load the column value for the given entry.
/// \param entry The entry number to load.
/// \param mask The entry mask. Values will be loaded only for entries for which the mask equals true.
void Load(Long64_t entry, bool mask)
{
// For now, as `mask` is just a single boolean, as an optimization we can return early here if `mask == false`.
if (mask) {
fLoadedEntry = entry;
this->LoadImpl(entry, mask);
}
}

/// Return the column value for the given entry.
/// \tparam T The column type
/// \param entry The entry number
///
/// The caller is responsible for checking that the returned value actually
/// exists.
template <typename T>
T *TryGet(Long64_t entry)
T *TryGet(std::size_t idx)
{
return static_cast<T *>(GetImpl(entry));
return static_cast<T *>(GetImpl(idx));
}

private:
virtual void *GetImpl(Long64_t entry) = 0;
virtual void *GetImpl(std::size_t idx) = 0;
virtual void LoadImpl(Long64_t /*entry*/, bool /*mask*/) = 0;
};

} // namespace RDF
Expand Down
3 changes: 2 additions & 1 deletion tree/dataframe/inc/ROOT/RDF/RDSColumnReader.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ template <typename T>
class R__CLING_PTRCHECK(off) RDSColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase {
T **fDSValuePtr = nullptr;

void *GetImpl(Long64_t) final { return *fDSValuePtr; }
void *GetImpl(std::size_t) final { return *fDSValuePtr; }
void LoadImpl(Long64_t, bool) final {}

public:
RDSColumnReader(void *DSValuePtr) : fDSValuePtr(static_cast<T **>(DSValuePtr)) {}
Expand Down
13 changes: 8 additions & 5 deletions tree/dataframe/inc/ROOT/RDF/RDefaultValueFor.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ class R__CLING_PTRCHECK(off) RDefaultValueFor final : public RDefineBase {
/// The map key is the full variation name, e.g. "pt:up".
std::unordered_map<std::string, std::unique_ptr<RDefineBase>> fVariedDefines;

T &GetValueOrDefault(unsigned int slot, Long64_t entry)
T &GetValueOrDefault(unsigned int slot, std::size_t idx)
{
if (auto *value = fValues[slot]->template TryGet<T>(entry))
if (auto *value = fValues[slot]->template TryGet<T>(idx))
return *value;
else
return fDefaultValue;
Expand Down Expand Up @@ -104,12 +104,15 @@ public:
}

/// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry
void Update(unsigned int slot, Long64_t entry) final
void Update(unsigned int slot, Long64_t entry, bool mask) final
{
if (entry != fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()]) {
// evaluate this define expression, cache the result
fLastResults[slot * RDFInternal::CacheLineStep<T>()] = GetValueOrDefault(slot, entry);
fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()] = entry;
fValues[slot]->Load(entry, mask);
if (mask) {
fLastResults[slot * RDFInternal::CacheLineStep<T>()] = GetValueOrDefault(slot, /*idx=*/0u);
fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()] = entry;
}
}
}

Expand Down
38 changes: 22 additions & 16 deletions tree/dataframe/inc/ROOT/RDF/RDefine.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -71,39 +71,43 @@ class R__CLING_PTRCHECK(off) RDefine final : public RDefineBase {
std::unordered_map<std::string, std::unique_ptr<RDefineBase>> fVariedDefines;

template <typename ColType>
auto GetValueChecked(unsigned int slot, std::size_t readerIdx, Long64_t entry) -> ColType &
auto GetValueChecked(unsigned int slot, std::size_t readerIdx, std::size_t idx) -> ColType &
{
if (auto *val = fValues[slot][readerIdx]->template TryGet<ColType>(entry))
if (auto *val = fValues[slot][readerIdx]->template TryGet<ColType>(idx))
return *val;

throw std::out_of_range{"RDataFrame: Define could not retrieve value for column '" + fColumnNames[readerIdx] +
"' for entry " + std::to_string(entry) +
"' for entry " + std::to_string(idx) +
". You can use the DefaultValueFor operation to provide a default value, or "
"FilterAvailable/FilterMissing to discard/keep entries with missing values instead."};
}

template <typename... ColTypes, std::size_t... S>
void UpdateHelper(unsigned int slot, Long64_t entry, TypeList<ColTypes...>, std::index_sequence<S...>, NoneTag)
void UpdateHelper(unsigned int slot, std::size_t idx, Long64_t /*entry*/, TypeList<ColTypes...>,
std::index_sequence<S...>, NoneTag)
{
fLastResults[slot * RDFInternal::CacheLineStep<ret_type>()] =
fExpression(GetValueChecked<ColTypes>(slot, S, entry)...);
(void)entry; // avoid unused parameter warning (gcc 12.1)
fExpression(GetValueChecked<ColTypes>(slot, S, idx)...);
(void)idx; // avoid unused parameter warning (gcc 12.1)
}

template <typename... ColTypes, std::size_t... S>
void UpdateHelper(unsigned int slot, Long64_t entry, TypeList<ColTypes...>, std::index_sequence<S...>, SlotTag)
void UpdateHelper(unsigned int slot, std::size_t idx, Long64_t /*entry*/, TypeList<ColTypes...>,
std::index_sequence<S...>, SlotTag)
{
fLastResults[slot * RDFInternal::CacheLineStep<ret_type>()] =
fExpression(slot, GetValueChecked<ColTypes>(slot, S, entry)...);
(void)entry; // avoid unused parameter warning (gcc 12.1)
fExpression(slot, GetValueChecked<ColTypes>(slot, S, idx)...);
(void)idx; // avoid unused parameter warning (gcc 12.1)
}

template <typename... ColTypes, std::size_t... S>
void
UpdateHelper(unsigned int slot, Long64_t entry, TypeList<ColTypes...>, std::index_sequence<S...>, SlotAndEntryTag)
void UpdateHelper(unsigned int slot, std::size_t idx, Long64_t batchFirstEntry, TypeList<ColTypes...>,
std::index_sequence<S...>, SlotAndEntryTag)
{
fLastResults[slot * RDFInternal::CacheLineStep<ret_type>()] =
fExpression(slot, entry, GetValueChecked<ColTypes>(slot, S, entry)...);
fExpression(slot, batchFirstEntry + idx, GetValueChecked<ColTypes>(slot, S, idx)...);
(void)idx; // avoid unused parameter warning (gcc 12.1)
(void)batchFirstEntry; // avoid unused parameter warning (gcc 12.1)
}

public:
Expand Down Expand Up @@ -134,12 +138,14 @@ public:
}

/// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry
void Update(unsigned int slot, Long64_t entry) final
void Update(unsigned int slot, Long64_t entry, bool mask) final
{
if (entry != fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()]) {
// evaluate this define expression, cache the result
UpdateHelper(slot, entry, ColumnTypes_t{}, TypeInd_t{}, ExtraArgsTag{});
fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()] = entry;
std::for_each(fValues[slot].begin(), fValues[slot].end(), [entry, mask](auto *v) { v->Load(entry, mask); });
if (mask) {
UpdateHelper(slot, /*idx=*/0u, entry, ColumnTypes_t{}, TypeInd_t{}, ExtraArgsTag{});
fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()] = entry;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion tree/dataframe/inc/ROOT/RDF/RDefineBase.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public:
std::string GetName() const;
std::string GetTypeName() const;
/// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry
virtual void Update(unsigned int slot, Long64_t entry) = 0;
virtual void Update(unsigned int slot, Long64_t entry, bool mask) = 0;
/// Update function to be called once per sample, used if the derived type is a RDefinePerSample
virtual void Update(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &/*id*/) {}
/// Clean-up operations to be performed at the end of a task.
Expand Down
2 changes: 1 addition & 1 deletion tree/dataframe/inc/ROOT/RDF/RDefinePerSample.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public:
return static_cast<void *>(&fLastResults[slot * RDFInternal::CacheLineStep<RetType_t>()]);
}

void Update(unsigned int, Long64_t) final
void Update(unsigned int, Long64_t, bool) final
{
// no-op
}
Expand Down
8 changes: 3 additions & 5 deletions tree/dataframe/inc/ROOT/RDF/RDefineReader.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ class R__CLING_PTRCHECK(off) RDefineReader final : public ROOT::Detail::RDF::RCo
/// The slot this value belongs to.
unsigned int fSlot = std::numeric_limits<unsigned int>::max();

void *GetImpl(Long64_t entry) final
{
fDefine.Update(fSlot, entry);
return fValuePtr;
}
void *GetImpl(std::size_t /*idx*/) final { return fValuePtr; }

void LoadImpl(Long64_t entry, bool mask) final { fDefine.Update(fSlot, entry, mask); }

public:
RDefineReader(unsigned int slot, RDFDetail::RDefineBase &define)
Expand Down
52 changes: 28 additions & 24 deletions tree/dataframe/inc/ROOT/RDF/RFilter.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -95,43 +95,47 @@ public:

bool CheckFilters(unsigned int slot, Long64_t entry) final
{
if (entry != fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()]) {
if (!fPrevNode.CheckFilters(slot, entry)) {
// a filter upstream returned false, cache the result
fLastResult[slot * RDFInternal::CacheLineStep<int>()] = false;
} else {
// evaluate this filter, cache the result
auto passed = CheckFilterHelper(slot, entry, ColumnTypes_t{}, TypeInd_t{});
passed ? ++fAccepted[slot * RDFInternal::CacheLineStep<ULong64_t>()]
auto &newMask = fLastResult[slot * RDFInternal::CacheLineStep<int>()];
auto &lastEntry = fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()];

if (entry != lastEntry) {
newMask = fPrevNode.CheckFilters(slot, entry);

// evaluate this filter, cache the result
std::for_each(fValues[slot].begin(), fValues[slot].end(),
[entry, newMask](auto *v) { v->Load(entry, newMask); });
CheckFilterHelper(slot, /*idx=*/0u, newMask, ColumnTypes_t{}, TypeInd_t{});

lastEntry = entry;
}

return newMask;
}

template <typename... ColTypes, std::size_t... S>
void CheckFilterHelper(unsigned int slot, std::size_t idx, int &entryMask, TypeList<ColTypes...>,
std::index_sequence<S...>)
{
if (entryMask) {
entryMask = fFilter(GetValueChecked<ColTypes>(slot, S, idx)...);
entryMask ? ++fAccepted[slot * RDFInternal::CacheLineStep<ULong64_t>()]
: ++fRejected[slot * RDFInternal::CacheLineStep<ULong64_t>()];
fLastResult[slot * RDFInternal::CacheLineStep<int>()] = passed;
}
fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()] = entry;
}
return fLastResult[slot * RDFInternal::CacheLineStep<int>()];
(void)idx; // avoid unused parameter warning (gcc 12.1)
}

template <typename ColType>
auto GetValueChecked(unsigned int slot, std::size_t readerIdx, Long64_t entry) -> ColType &
auto GetValueChecked(unsigned int slot, std::size_t readerIdx, std::size_t idx) -> ColType &
{
if (auto *val = fValues[slot][readerIdx]->template TryGet<ColType>(entry))
if (auto *val = fValues[slot][readerIdx]->template TryGet<ColType>(idx))
return *val;

throw std::out_of_range{"RDataFrame: Filter could not retrieve value for column '" + fColumnNames[readerIdx] +
"' for entry " + std::to_string(entry) +
"' for entry " + std::to_string(idx) +
". You can use the DefaultValueFor operation to provide a default value, or "
"FilterAvailable/FilterMissing to discard/keep entries with missing values instead."};
}

template <typename... ColTypes, std::size_t... S>
bool CheckFilterHelper(unsigned int slot, Long64_t entry, TypeList<ColTypes...>, std::index_sequence<S...>)
{
return fFilter(GetValueChecked<ColTypes>(slot, S, entry)...);
// avoid unused parameter warnings (gcc 12.1)
(void)slot;
(void)entry;
}

void InitSlot(TTreeReader *r, unsigned int slot) final
{
RDFInternal::RColumnReadersInfo info{fColumnNames, fColRegister, fIsDefine.data(), *fLoopManager};
Expand Down
Loading
Loading