Skip to content

Commit

Permalink
colexec: optimize any_not_null func for hash aggregation
Browse files Browse the repository at this point in the history
When we're performing hash aggregation, we know that there will be
exactly one group. This insight allows us optimize the aggregate
functions, and this commit performs the optimization of any_not_null.
Now we will have two separate files for both aggregator kinds (hash and
ordered), and that part is handled by a preprocessing step that replaces
`_AGGKIND` template "variable" with the corresponding aggregator kind.

I looked into introducing some base struct for aggregates to reuse, but
it is rather complicated at the moment - some aggregate functions
iterate over `[]*oneArgOverload`, and we could flatten that out.
However, other functions operate with a custom "tmpl info" struct.
Furthermore, extending the struct to include `AggKind` variable would
require refactoring the templates themselves because we assume that `.`
(dot) of the template usually is at `lastArgWidthOverload` struct.
Probably in order to provide some "aggregate base tmpl info" struct that
would help with hash vs ordered aggregation generation we would need to
introduce an interface which custom tmpl infos will implement.

Release note: None
  • Loading branch information
yuzefovich committed Jun 16, 2020
1 parent 70e04ec commit 2bf8764
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 31 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,6 @@ DOCGEN_TARGETS := bin/.docgen_bnfs bin/.docgen_functions docs/generated/redact_s
EXECGEN_TARGETS = \
pkg/col/coldata/vec.eg.go \
pkg/sql/colexec/and_or_projection.eg.go \
pkg/sql/colexec/any_not_null_agg.eg.go \
pkg/sql/colexec/avg_agg.eg.go \
pkg/sql/colexec/bool_and_or_agg.eg.go \
pkg/sql/colexec/cast.eg.go \
Expand All @@ -842,6 +841,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/hashtable_full_default.eg.go \
pkg/sql/colexec/hashtable_full_deleting.eg.go \
pkg/sql/colexec/hash_aggregator.eg.go \
pkg/sql/colexec/hash_any_not_null_agg.eg.go \
pkg/sql/colexec/hash_utils.eg.go \
pkg/sql/colexec/like_ops.eg.go \
pkg/sql/colexec/mergejoinbase.eg.go \
Expand All @@ -854,6 +854,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/mergejoiner_leftsemi.eg.go \
pkg/sql/colexec/mergejoiner_rightouter.eg.go \
pkg/sql/colexec/min_max_agg.eg.go \
pkg/sql/colexec/ordered_any_not_null_agg.eg.go \
pkg/sql/colexec/ordered_synchronizer.eg.go \
pkg/sql/colexec/overloads_test_utils.eg.go \
pkg/sql/colexec/proj_const_left_ops.eg.go \
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/colexec/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,18 @@ func newAggregateFuncsAlloc(
aggTyps [][]*types.T,
aggFns []execinfrapb.AggregatorSpec_Func,
allocSize int64,
isHashAgg bool,
) (*aggregateFuncsAlloc, error) {
funcAllocs := make([]aggregateFuncAlloc, len(aggFns))
for i := range aggFns {
var err error
switch aggFns[i] {
case execinfrapb.AggregatorSpec_ANY_NOT_NULL:
funcAllocs[i], err = newAnyNotNullAggAlloc(allocator, aggTyps[i][0], allocSize)
if isHashAgg {
funcAllocs[i], err = newAnyNotNullHashAggAlloc(allocator, aggTyps[i][0], allocSize)
} else {
funcAllocs[i], err = newAnyNotNullOrderedAggAlloc(allocator, aggTyps[i][0], allocSize)
}
case execinfrapb.AggregatorSpec_AVG:
funcAllocs[i], err = newAvgAggAlloc(allocator, aggTyps[i][0], allocSize)
case execinfrapb.AggregatorSpec_SUM:
Expand Down
76 changes: 50 additions & 26 deletions pkg/sql/colexec/any_not_null_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,17 @@ const _TYPE_WIDTH = 0

// */}}

func newAnyNotNullAggAlloc(
func newAnyNotNull_AGGKINDAggAlloc(
allocator *colmem.Allocator, t *types.T, allocSize int64,
) (aggregateFuncAlloc, error) {
allocBase := aggAllocBase{allocator: allocator, allocSize: allocSize}
switch typeconv.TypeFamilyToCanonicalTypeFamily(t.Family()) {
// {{range .}}
case _CANONICAL_TYPE_FAMILY:
switch t.Width() {
// {{range .WidthOverloads}}
case _TYPE_WIDTH:
return &anyNotNull_TYPEAggAlloc{allocator: allocator, allocSize: allocSize}, nil
return &anyNotNull_TYPE_AGGKINDAggAlloc{aggAllocBase: allocBase}, nil
// {{end}}
}
// {{end}}
Expand All @@ -65,11 +66,13 @@ func newAnyNotNullAggAlloc(
// {{range .}}
// {{range .WidthOverloads}}

// anyNotNull_TYPEAgg implements the ANY_NOT_NULL aggregate, returning the
// anyNotNull_TYPE_AGGKINDAgg implements the ANY_NOT_NULL aggregate, returning the
// first non-null value in the input column.
type anyNotNull_TYPEAgg struct {
allocator *colmem.Allocator
groups []bool
type anyNotNull_TYPE_AGGKINDAgg struct {
allocator *colmem.Allocator
// {{if eq "_AGGKIND" "Ordered"}}
groups []bool
// {{end}}
vec coldata.Vec
col _GOTYPESLICE
nulls *coldata.Nulls
Expand All @@ -78,33 +81,44 @@ type anyNotNull_TYPEAgg struct {
foundNonNullForCurrentGroup bool
}

var _ aggregateFunc = &anyNotNull_TYPEAgg{}
var _ aggregateFunc = &anyNotNull_TYPE_AGGKINDAgg{}

const sizeOfAnyNotNull_TYPEAgg = int64(unsafe.Sizeof(anyNotNull_TYPEAgg{}))
const sizeOfAnyNotNull_TYPE_AGGKINDAgg = int64(unsafe.Sizeof(anyNotNull_TYPE_AGGKINDAgg{}))

func (a *anyNotNull_TYPEAgg) Init(groups []bool, vec coldata.Vec) {
func (a *anyNotNull_TYPE_AGGKINDAgg) Init(groups []bool, vec coldata.Vec) {
// {{if eq "_AGGKIND" "Ordered"}}
a.groups = groups
// {{end}}
a.vec = vec
a.col = vec.TemplateType()
a.nulls = vec.Nulls()
a.Reset()
}

func (a *anyNotNull_TYPEAgg) Reset() {
func (a *anyNotNull_TYPE_AGGKINDAgg) Reset() {
a.curIdx = -1
a.foundNonNullForCurrentGroup = false
a.nulls.UnsetNulls()
}

func (a *anyNotNull_TYPEAgg) CurrentOutputIndex() int {
func (a *anyNotNull_TYPE_AGGKINDAgg) CurrentOutputIndex() int {
return a.curIdx
}

func (a *anyNotNull_TYPEAgg) SetOutputIndex(idx int) {
func (a *anyNotNull_TYPE_AGGKINDAgg) SetOutputIndex(idx int) {
a.curIdx = idx
}

func (a *anyNotNull_TYPEAgg) Compute(b coldata.Batch, inputIdxs []uint32) {
func (a *anyNotNull_TYPE_AGGKINDAgg) Compute(b coldata.Batch, inputIdxs []uint32) {
// {{if eq "_AGGKIND" "Hash"}}
if a.foundNonNullForCurrentGroup {
// We have already seen non-null for the current group, and since there
// is at most a single group when performing hash aggregation, we can
// finish computing.
return
}
// {{end}}

inputLen := b.Length()
vec, sel := b.ColVec(int(inputIdxs[0])), b.Selection()
col, nulls := vec.TemplateType(), vec.Nulls()
Expand Down Expand Up @@ -141,7 +155,7 @@ func (a *anyNotNull_TYPEAgg) Compute(b coldata.Batch, inputIdxs []uint32) {
)
}

func (a *anyNotNull_TYPEAgg) Flush() {
func (a *anyNotNull_TYPE_AGGKINDAgg) Flush() {
// If we haven't found any non-nulls for this group so far, the output for
// this group should be null.
if !a.foundNonNullForCurrentGroup {
Expand All @@ -152,22 +166,21 @@ func (a *anyNotNull_TYPEAgg) Flush() {
a.curIdx++
}

func (a *anyNotNull_TYPEAgg) HandleEmptyInputScalar() {
func (a *anyNotNull_TYPE_AGGKINDAgg) HandleEmptyInputScalar() {
a.nulls.SetNull(0)
}

type anyNotNull_TYPEAggAlloc struct {
allocator *colmem.Allocator
allocSize int64
aggFuncs []anyNotNull_TYPEAgg
type anyNotNull_TYPE_AGGKINDAggAlloc struct {
aggAllocBase
aggFuncs []anyNotNull_TYPE_AGGKINDAgg
}

var _ aggregateFuncAlloc = &anyNotNull_TYPEAggAlloc{}
var _ aggregateFuncAlloc = &anyNotNull_TYPE_AGGKINDAggAlloc{}

func (a *anyNotNull_TYPEAggAlloc) newAggFunc() aggregateFunc {
func (a *anyNotNull_TYPE_AGGKINDAggAlloc) newAggFunc() aggregateFunc {
if len(a.aggFuncs) == 0 {
a.allocator.AdjustMemoryUsage(sizeOfAnyNotNull_TYPEAgg * a.allocSize)
a.aggFuncs = make([]anyNotNull_TYPEAgg, a.allocSize)
a.allocator.AdjustMemoryUsage(sizeOfAnyNotNull_TYPE_AGGKINDAgg * a.allocSize)
a.aggFuncs = make([]anyNotNull_TYPE_AGGKINDAgg, a.allocSize)
}
f := &a.aggFuncs[0]
f.allocator = a.allocator
Expand All @@ -183,9 +196,12 @@ func (a *anyNotNull_TYPEAggAlloc) newAggFunc() aggregateFunc {
// row. If a non-null value was already found, then it does nothing. If this is
// the first row of a new group, and no non-nulls have been found for the
// current group, then the output for the current group is set to null.
func _FIND_ANY_NOT_NULL(a *anyNotNull_TYPEAgg, nulls *coldata.Nulls, i int, _HAS_NULLS bool) { // */}}
func _FIND_ANY_NOT_NULL(
a *anyNotNull_TYPE_AGGKINDAgg, nulls *coldata.Nulls, i int, _HAS_NULLS bool,
) { // */}}
// {{define "findAnyNotNull" -}}

// {{if eq "_AGGKIND" "Ordered"}}
if a.groups[i] {
// The `a.curIdx` check is necessary because for the first
// group in the result set there is no "current group."
Expand All @@ -203,6 +219,8 @@ func _FIND_ANY_NOT_NULL(a *anyNotNull_TYPEAgg, nulls *coldata.Nulls, i int, _HAS
a.curIdx++
a.foundNonNullForCurrentGroup = false
}
// {{end}}

var isNull bool
// {{if .HasNulls}}
isNull = nulls.NullAt(i)
Expand All @@ -211,13 +229,19 @@ func _FIND_ANY_NOT_NULL(a *anyNotNull_TYPEAgg, nulls *coldata.Nulls, i int, _HAS
// {{end}}
if !a.foundNonNullForCurrentGroup && !isNull {
// If we haven't seen any non-nulls for the current group yet, and the
// current value is non-null, then we can pick the current value to be the
// output.
// current value is non-null, then we can pick the current value to be
// the output.
// {{with .Global}}
val := execgen.UNSAFEGET(col, i)
execgen.COPYVAL(a.curAgg, val)
// {{end}}
a.foundNonNullForCurrentGroup = true
// {{if eq "_AGGKIND" "Hash"}}
// We have already seen non-null for the current group, and since there
// is at most a single group when performing hash aggregation, we can
// finish computing.
return
// {{end}}
}
// {{end}}

Expand Down
42 changes: 42 additions & 0 deletions pkg/sql/colexec/execgen/cmd/execgen/agg_gen_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"fmt"
"io"
"strings"
)

const (
// aggKindTmplVar specifies the template "variable" that describes the kind
// of aggregator using an aggregate function. It is replaced with either
// "Hash" or "Ordered" before executing the template.
aggKindTmplVar = "_AGGKIND"
hashAggKind = "Hash"
orderedAggKind = "Ordered"
)

func registerAggGenerator(aggGen generator, filenameSuffix, dep string) {
aggGeneratorAdapter := func(aggKind string) generator {
return func(inputFileContents string, wr io.Writer) error {
inputFileContents = strings.ReplaceAll(inputFileContents, aggKindTmplVar, aggKind)
return aggGen(inputFileContents, wr)
}
}
for _, aggKind := range []string{hashAggKind, orderedAggKind} {
registerGenerator(
aggGeneratorAdapter(aggKind),
fmt.Sprintf("%s_%s", strings.ToLower(aggKind), filenameSuffix),
dep,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ func genAnyNotNullAgg(inputFileContents string, wr io.Writer) error {
}

func init() {
registerGenerator(genAnyNotNullAgg, "any_not_null_agg.eg.go", anyNotNullAggTmpl)
registerAggGenerator(genAnyNotNullAgg, "any_not_null_agg.eg.go", anyNotNullAggTmpl)
}
2 changes: 1 addition & 1 deletion pkg/sql/colexec/hash_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func NewHashAggregator(
groupTypes[i] = typs[colIdx]
}

aggFnsAlloc, err := newAggregateFuncsAlloc(allocator, aggTyps, aggFns, hashAggregatorAllocSize)
aggFnsAlloc, err := newAggregateFuncsAlloc(allocator, aggTyps, aggFns, hashAggregatorAllocSize, true /* isHashAgg */)

return &hashAggregator{
OneInputNode: NewOneInputNode(input),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/ordered_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func NewOrderedAggregator(

// We will be reusing the same aggregate functions, so we use 1 as the
// allocation size.
funcsAlloc, err := newAggregateFuncsAlloc(a.allocator, aggTypes, aggFns, 1 /* allocSize */)
funcsAlloc, err := newAggregateFuncsAlloc(a.allocator, aggTypes, aggFns, 1 /* allocSize */, false /* isHashAgg */)
if err != nil {
return nil, errors.AssertionFailedf(
"this error should have been checked in isAggregateSupported\n%+v", err,
Expand Down

0 comments on commit 2bf8764

Please sign in to comment.