Skip to content

Commit

Permalink
Merge branch 'ad-freiburg:master' into words-and-docs-file-parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Flixtastic authored Jan 4, 2025
2 parents d0ec708 + 2953f16 commit a7823fb
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 77 deletions.
170 changes: 101 additions & 69 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) {
if (useHashMapOptimization) {
const auto* child = _subtree->getRootOperation()->getChildren().at(0);
// Skip sorting
subresult = child->getResult();
subresult = child->getResult(true);
// Update runtime information
auto runTimeInfoChildren =
child->getRootOperation()->getRuntimeInfoPointer();
Expand Down Expand Up @@ -366,13 +366,28 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) {
}

if (useHashMapOptimization) {
auto localVocab = subresult->getCopyOfLocalVocab();
IdTable idTable = CALL_FIXED_SIZE(
groupByCols.size(), &GroupBy::computeGroupByForHashMapOptimization,
this, metadataForUnsequentialData->aggregateAliases_,
subresult->idTable(), groupByCols, &localVocab);
// Helper lambda that calls `computeGroupByForHashMapOptimization` for the
// given `subresults`.
auto computeWithHashMap = [this, &metadataForUnsequentialData,
&groupByCols](auto&& subresults) {
auto doCompute = [&]<int NumCols> {
return computeGroupByForHashMapOptimization<NumCols>(
metadataForUnsequentialData->aggregateAliases_, AD_FWD(subresults),
groupByCols);
};
return ad_utility::callFixedSize(groupByCols.size(), doCompute);
};

return {std::move(idTable), resultSortedOn(), std::move(localVocab)};
// Now call `computeWithHashMap` and return the result. It expects a range
// of results, so if the result is fully materialized, we create an array
// with a single element.
if (subresult->isFullyMaterialized()) {
return computeWithHashMap(
std::array{std::pair{std::cref(subresult->idTable()),
std::cref(subresult->localVocab())}});
} else {
return computeWithHashMap(std::move(subresult->idTables()));
}
}

size_t inWidth = _subtree->getResultWidth();
Expand Down Expand Up @@ -846,7 +861,7 @@ std::optional<IdTable> GroupBy::computeGroupByForJoinWithFullScan() const {
const auto& index = getExecutionContext()->getIndex();

// TODO<joka921, C++23> Simplify the following pattern by using
// `ql::views::chunkd_by` and implement a lazy version of this view for
// `ql::views::chunk_by` and implement a lazy version of this view for
// input iterators.

// Take care of duplicate values in the input.
Expand Down Expand Up @@ -1487,78 +1502,95 @@ static constexpr auto makeProcessGroupsVisitor =

// _____________________________________________________________________________
template <size_t NUM_GROUP_COLUMNS>
IdTable GroupBy::computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases,
const IdTable& subresult, const std::vector<size_t>& columnIndices,
LocalVocab* localVocab) const {
AD_CONTRACT_CHECK(columnIndices.size() == NUM_GROUP_COLUMNS ||
NUM_GROUP_COLUMNS == 0);

// Initialize aggregation data
Result GroupBy::computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases, auto subresults,
const std::vector<size_t>& columnIndices) const {
AD_CORRECTNESS_CHECK(columnIndices.size() == NUM_GROUP_COLUMNS ||
NUM_GROUP_COLUMNS == 0);
LocalVocab localVocab;

// Initialize the data for the aggregates of the GROUP BY operation.
HashMapAggregationData<NUM_GROUP_COLUMNS> aggregationData(
getExecutionContext()->getAllocator(), aggregateAliases,
columnIndices.size());

// Initialize evaluation context
sparqlExpression::EvaluationContext evaluationContext(
*getExecutionContext(), _subtree->getVariableColumns(), subresult,
getExecutionContext()->getAllocator(), *localVocab, cancellationHandle_,
deadline_);

evaluationContext._groupedVariables = ad_utility::HashSet<Variable>{
_groupByVariables.begin(), _groupByVariables.end()};
evaluationContext._isPartOfGroupBy = true;

// Process the input blocks (pairs of `IdTable` and `LocalVocab`) one after
// the other.
ad_utility::Timer lookupTimer{ad_utility::Timer::Stopped};
ad_utility::Timer aggregationTimer{ad_utility::Timer::Stopped};
for (size_t i = 0; i < subresult.size(); i += GROUP_BY_HASH_MAP_BLOCK_SIZE) {
checkCancellation();

evaluationContext._beginIndex = i;
evaluationContext._endIndex =
std::min(i + GROUP_BY_HASH_MAP_BLOCK_SIZE, subresult.size());

auto currentBlockSize = evaluationContext.size();

// Perform HashMap lookup once for all groups in current block
using U = HashMapAggregationData<NUM_GROUP_COLUMNS>::template ArrayOrVector<
std::span<const Id>>;
U groupValues;
resizeIfVector(groupValues, columnIndices.size());

// TODO<C++23> use views::enumerate
size_t j = 0;
for (auto& idx : columnIndices) {
groupValues[j] = subresult.getColumn(idx).subspan(
evaluationContext._beginIndex, currentBlockSize);
++j;
}
lookupTimer.cont();
auto hashEntries = aggregationData.getHashEntries(groupValues);
lookupTimer.stop();

aggregationTimer.cont();
for (auto& aggregateAlias : aggregateAliases) {
for (auto& aggregate : aggregateAlias.aggregateInfo_) {
sparqlExpression::ExpressionResult expressionResult =
GroupBy::evaluateChildExpressionOfAggregateFunction(
aggregate, evaluationContext);

auto& aggregationDataVariant =
aggregationData.getAggregationDataVariant(
aggregate.aggregateDataIndex_);

std::visit(makeProcessGroupsVisitor(currentBlockSize,
&evaluationContext, hashEntries),
std::move(expressionResult), aggregationDataVariant);
for (const auto& [inputTableRef, inputLocalVocabRef] : subresults) {
const IdTable& inputTable = inputTableRef;
const LocalVocab& inputLocalVocab = inputLocalVocabRef;

// Merge the local vocab of each input block.
//
// NOTE: If the input blocks have very similar or even identical non-empty
// local vocabs, no deduplication is performed.
localVocab.mergeWith(std::span{&inputLocalVocab, 1});

// Setup the `EvaluationContext` for this input block.
sparqlExpression::EvaluationContext evaluationContext(
*getExecutionContext(), _subtree->getVariableColumns(), inputTable,
getExecutionContext()->getAllocator(), localVocab, cancellationHandle_,
deadline_);
evaluationContext._groupedVariables = ad_utility::HashSet<Variable>{
_groupByVariables.begin(), _groupByVariables.end()};
evaluationContext._isPartOfGroupBy = true;

// Iterate of the rows of this input block. Process (up to)
// `GROUP_BY_HASH_MAP_BLOCK_SIZE` rows at a time.
for (size_t i = 0; i < inputTable.size();
i += GROUP_BY_HASH_MAP_BLOCK_SIZE) {
checkCancellation();

evaluationContext._beginIndex = i;
evaluationContext._endIndex =
std::min(i + GROUP_BY_HASH_MAP_BLOCK_SIZE, inputTable.size());

auto currentBlockSize = evaluationContext.size();

// Perform HashMap lookup once for all groups in current block
using U = HashMapAggregationData<
NUM_GROUP_COLUMNS>::template ArrayOrVector<std::span<const Id>>;
U groupValues;
resizeIfVector(groupValues, columnIndices.size());

// TODO<C++23> use views::enumerate
size_t j = 0;
for (auto& idx : columnIndices) {
groupValues[j] = inputTable.getColumn(idx).subspan(
evaluationContext._beginIndex, currentBlockSize);
++j;
}
lookupTimer.cont();
auto hashEntries = aggregationData.getHashEntries(groupValues);
lookupTimer.stop();

aggregationTimer.cont();
for (auto& aggregateAlias : aggregateAliases) {
for (auto& aggregate : aggregateAlias.aggregateInfo_) {
sparqlExpression::ExpressionResult expressionResult =
GroupBy::evaluateChildExpressionOfAggregateFunction(
aggregate, evaluationContext);

auto& aggregationDataVariant =
aggregationData.getAggregationDataVariant(
aggregate.aggregateDataIndex_);

std::visit(makeProcessGroupsVisitor(currentBlockSize,
&evaluationContext, hashEntries),
std::move(expressionResult), aggregationDataVariant);
}
}
aggregationTimer.stop();
}
aggregationTimer.stop();
}

runtimeInfo().addDetail("timeMapLookup", lookupTimer.msecs());
runtimeInfo().addDetail("timeAggregation", aggregationTimer.msecs());

return createResultFromHashMap(aggregationData, aggregateAliases, localVocab);
IdTable resultTable =
createResultFromHashMap(aggregationData, aggregateAliases, &localVocab);
return {std::move(resultTable), resultSortedOn(), std::move(localVocab)};
}

// _____________________________________________________________________________
Expand Down
14 changes: 6 additions & 8 deletions src/engine/GroupBy.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright 2018, University of Freiburg,
// Copyright 2018 - 2024, University of Freiburg
// Chair of Algorithms and Data Structures.
// Author:
// 2018 Florian Kramer ([email protected])
// 2020- Johannes Kalmbach ([email protected])
// Authors: Florian Kramer [2018]
// Johannes Kalmbach <[email protected]>

#pragma once

Expand Down Expand Up @@ -316,10 +315,9 @@ class GroupBy : public Operation {
// Create result IdTable by using a HashMap mapping groups to aggregation data
// and subsequently calling `createResultFromHashMap`.
template <size_t NUM_GROUP_COLUMNS>
IdTable computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases,
const IdTable& subresult, const std::vector<size_t>& columnIndices,
LocalVocab* localVocab) const;
Result computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases, auto subresults,
const std::vector<size_t>& columnIndices) const;

using AggregationData =
std::variant<AvgAggregationData, CountAggregationData, MinAggregationData,
Expand Down
46 changes: 46 additions & 0 deletions test/GroupByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Authors: Florian Kramer ([email protected])
// Johannes Kalmbach ([email protected])

#include <engine/SpatialJoinAlgorithms.h>
#include <gmock/gmock.h>

#include <cstdio>
Expand Down Expand Up @@ -36,6 +37,7 @@ using ::testing::Optional;

namespace {
auto I = IntId;
auto D = DoubleId;

// Return a matcher that checks, whether a given `std::optional<IdTable` has a
// value and that value is equal to `makeIdTableFromVector(table)`.
Expand Down Expand Up @@ -747,6 +749,50 @@ TEST_F(GroupByOptimizations, correctResultForHashMapOptimization) {
resultWithoutOptimization->asDebugString());
}

// _____________________________________________________________________________
TEST_F(GroupByOptimizations, hashMapOptimizationLazyAndMaterializedInputs) {
/* Setup query:
SELECT ?x (AVG(?y) as ?avg) WHERE {
# explicitly defined subresult.
} GROUP BY ?x
*/
// Setup three unsorted input blocks. The first column will be the grouped
// `?x`, and the second column the variable `?y` of which we compute the
// average.
auto runTest = [this](bool inputIsLazy) {
std::vector<IdTable> tables;
tables.push_back(makeIdTableFromVector({{3, 6}, {8, 27}, {5, 7}}, I));
tables.push_back(makeIdTableFromVector({{8, 27}, {5, 9}}, I));
tables.push_back(makeIdTableFromVector({{5, 2}, {3, 4}}, I));
// The expected averages are as follows: (3 -> 5.0), (5 -> 6.0), (8
// -> 27.0).
auto subtree = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, std::move(tables),
std::vector<std::optional<Variable>>{Variable{"?x"}, Variable{"?y"}});
auto& values =
dynamic_cast<ValuesForTesting&>(*subtree->getRootOperation());
values.forceFullyMaterialized() = !inputIsLazy;

SparqlExpressionPimpl avgYPimpl = makeAvgPimpl(varY);
std::vector<Alias> aliasesAvgY{Alias{avgYPimpl, Variable{"?avg"}}};

// Calculate result with optimization
qec->getQueryTreeCache().clearAll();
RuntimeParameters().set<"group-by-hash-map-enabled">(true);
GroupBy groupBy{qec, variablesOnlyX, aliasesAvgY, std::move(subtree)};
auto result = groupBy.computeResultOnlyForTesting();
ASSERT_TRUE(result.isFullyMaterialized());
EXPECT_THAT(
result.idTable(),
matchesIdTableFromVector({{I(3), D(5)}, {I(5), D(6)}, {I(8), D(27)}}));
};
runTest(true);
runTest(false);

// Disable optimization for following tests
RuntimeParameters().set<"group-by-hash-map-enabled">(false);
}

// _____________________________________________________________________________
TEST_F(GroupByOptimizations, correctResultForHashMapOptimizationForCountStar) {
/* Setup query:
Expand Down
2 changes: 2 additions & 0 deletions test/engine/ValuesForTesting.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class ValuesForTesting : public Operation {
}
bool supportsLimit() const override { return supportsLimit_; }

bool& forceFullyMaterialized() { return forceFullyMaterialized_; }

private:
// ___________________________________________________________________________
string getCacheKeyImpl() const override {
Expand Down

0 comments on commit a7823fb

Please sign in to comment.