Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support query aggregtion(#36380) #39177

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

MrPresent-Han
Copy link
Contributor

related: #36380
support query aggregation feature for milvus:
function feature:

  1. support group by multiple scalar fields like select a, b from collection group by a, b
  2. support aggregation along with group by like select a, b, sum(c), count(d) from collection group by a, b
  3. replace original count(*) with aggregation with aggregation count

code changes:

  1. add Project Operator to retrieve values of scalar fields in the executing framework
  2. implemented PhyAggregation Operator like Velox, bucketing with simd operations

@sre-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: MrPresent-Han
To complete the pull request process, please assign tedxu after the PR has been reviewed.
You can assign the PR to them by writing /assign @tedxu in a comment when ready.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@mergify mergify bot added dco-passed DCO check passed. kind/feature Issues related to feature request from users labels Jan 12, 2025
Copy link

codecov bot commented Jan 12, 2025

Codecov Report

Attention: Patch coverage is 72.33070% with 666 lines in your changes missing coverage. Please review.

Project coverage is 81.03%. Comparing base (a8a6564) to head (bd3f412).

Files with missing lines Patch % Lines
internal/agg/aggregate.go 63.16% 172 Missing and 21 partials ⚠️
internal/core/src/query/PlanProto.cpp 4.81% 79 Missing ⚠️
internal/core/src/segcore/SegmentGrowingImpl.cpp 0.00% 44 Missing ⚠️
internal/core/src/query/ExecPlanNodeVisitor.cpp 58.06% 39 Missing ⚠️
...rnal/core/src/segcore/ChunkedSegmentSealedImpl.cpp 0.00% 34 Missing ⚠️
internal/proxy/task_query.go 64.78% 21 Missing and 4 partials ⚠️
internal/core/src/exec/HashTable.h 67.64% 22 Missing ⚠️
internal/proxy/util.go 42.42% 17 Missing and 2 partials ⚠️
internal/core/src/common/FieldData.cpp 68.00% 16 Missing ⚠️
internal/util/typeutil/hash.go 50.00% 15 Missing ⚠️
... and 38 more
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff             @@
##           master   #39177       +/-   ##
===========================================
+ Coverage   69.64%   81.03%   +11.39%     
===========================================
  Files         296     1426     +1130     
  Lines       26633   199546   +172913     
===========================================
+ Hits        18548   161703   +143155     
- Misses       8085    32196    +24111     
- Partials        0     5647     +5647     
Components Coverage Δ
Client 79.53% <ø> (∅)
Core 69.90% <76.23%> (+0.25%) ⬆️
Go 83.00% <63.62%> (∅)
Files with missing lines Coverage Δ
internal/core/src/common/FieldData.h 100.00% <ø> (ø)
internal/core/src/common/Types.h 33.80% <100.00%> (+4.72%) ⬆️
internal/core/src/exec/Driver.cpp 81.72% <100.00%> (+0.32%) ⬆️
internal/core/src/exec/Driver.h 50.00% <ø> (ø)
internal/core/src/exec/VectorHasher.h 100.00% <100.00%> (ø)
internal/core/src/exec/expression/Utils.h 96.82% <100.00%> (+0.33%) ⬆️
internal/core/src/exec/operator/MvccNode.cpp 100.00% <100.00%> (+6.25%) ⬆️
internal/core/src/exec/operator/Operator.cpp 100.00% <100.00%> (ø)
internal/core/src/exec/operator/Operator.h 71.79% <100.00%> (+6.08%) ⬆️
internal/core/src/exec/operator/ProjectNode.cpp 100.00% <100.00%> (ø)
... and 76 more

... and 1084 files with indirect coverage changes

@mergify mergify bot added the ci-passed label Jan 12, 2025
Copy link
Contributor Author

@MrPresent-Han MrPresent-Han left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review comments round1

FieldID() int64
OriginalName() string
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OriginalName is users' output fields name, because all fields needed to be bucked may not be all needed to returned,
like 'select a, sum(c) from collection group by a, b',
in the sql above, the original outputfields are 'a, sum(c)', but the proxy must receive three bucked columns 'a, b, sum(c)' which must be in order, for correct reduction, so we keep the original originalName to finally project 'a, sum(c)' from the reduced result

return nil, fmt.Errorf("invalid Aggregation operator %d", pb.Op)
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the go-layer, the aggregation includes three components for aggregation reductiong:

  1. Bucket: includes all rows with identical hash values
  2. Row: includes all columns' values for one group-by line: like a_val, b_val, sum(c_val)
  3. Entry: one column value in one row, like one value for 'a_val'

target.val = new.val
return nil
}
// ensure the value type outside
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inside secore executing framework, sum and count are all int64 type, so there is no type risk here

}

const NONE int = -1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if hash key collision, we have to iterate all rows inside one bucket to check match

hasher hash.Hash64
buffer []byte
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this buffer is for hash computation and is fixed size,
one buffer for one column
so no memory risk here

append(const ColumnVector& other) {
values_->FillFieldData(other.GetRawData(), other.size());
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the iterative computing framework, result vectors are returned batch by batch, so we need to append one batch result into the final returned result, this method involves memory copy, may need to be optimized

@@ -78,11 +80,21 @@ DriverFactory::CreateDriver(std::unique_ptr<DriverContext> ctx,
plannode)) {
operators.push_back(std::make_unique<PhyVectorSearchNode>(
id, ctx.get(), vectorsearchnode));
} else if (auto groupbynode =
std::dynamic_pointer_cast<const plan::GroupByNode>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we differenct groupby to search_group_by and query_group_by operators

plannode)) {
operators.push_back(
std::make_unique<PhyGroupByNode>(id, ctx.get(), groupbynode));
std::make_unique<PhyProjectNode>(id, ctx.get(), projectNode));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for query_group_by with filter expr, the pipeline is:
agg_operator--->project_operator--->filterbits_operator-->mvcc_operator
no changes towards existing framework existing operator

@@ -135,6 +147,17 @@ Driver::Run(std::shared_ptr<Driver> self) {
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialize operators before launching the pipeline, this is needed by agg_operator and the same as Velox

TargetBitmapView active_views(activeRows);
populateLookupRows(active_views, lookup.rows_);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only kInsert is used as bucket operations in aggregation processes do not need delete existing entries

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/compilation area/internal-api area/test ci-passed dco-passed DCO check passed. kind/feature Issues related to feature request from users sig/testing size/XXL Denotes a PR that changes 1000+ lines. test/integration integration test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants