Skip to content

Commit

Permalink
vector: refine vector search (#9709)
Browse files Browse the repository at this point in the history
ref #9032

Signed-off-by: Lloyd-Pottiger <[email protected]>
Signed-off-by: JaySon-Huang <[email protected]>

Co-authored-by: JaySon <[email protected]>
  • Loading branch information
Lloyd-Pottiger and JaySon-Huang authored Dec 23, 2024
1 parent bfe0366 commit a55423c
Show file tree
Hide file tree
Showing 25 changed files with 412 additions and 576 deletions.
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include <Operators/NullSourceOp.h>
#include <Operators/UnorderedSourceOp.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/DeltaMerge/ReadThread/UnorderedInputStream.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot.h>
#include <Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h>
#include <Storages/DeltaMerge/ScanContext.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Operators/UnorderedSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h>
#include <Operators/UnorderedSourceOp.h>
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Operators/UnorderedSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#pragma once

#include <Common/Logger.h>
#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <Flash/Coprocessor/RuntimeFilterMgr.h>
#include <Operators/Operator.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BitmapFilter
private:
void set(std::span<const UInt32> row_ids, const FilterPtr & f);

std::vector<UInt8> filter;
IColumn::Filter filter;
bool all_match;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,40 +32,18 @@ BitmapFilterBlockInputStream::BitmapFilterBlockInputStream(

Block BitmapFilterBlockInputStream::read()
{
FilterPtr block_filter = nullptr;
auto block = children.at(0)->read(block_filter, true);
auto block = children.at(0)->read();
if (!block)
return block;

filter.resize(block.rows());
bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows());
if (!block_filter)
{
if (all_match)
return block;
size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
{
col.column = col.column->filter(filter, passed_count);
}
}
else
if (bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); all_match)
return block;

size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
{
RUNTIME_CHECK(filter.size() == block_filter->size(), filter.size(), block_filter->size());
if (!all_match)
{
std::transform(
filter.begin(),
filter.end(),
block_filter->begin(),
block_filter->begin(),
[](UInt8 a, UInt8 b) { return a && b; });
}
size_t passed_count = countBytesInFilter(*block_filter);
for (auto & col : block)
{
col.column = col.column->filter(*block_filter, passed_count);
}
col.column = col.column->filter(filter, passed_count);
}
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BitmapFilterBlockInputStream : public IBlockInputStream
private:
Block header;
BitmapFilterPtr bitmap_filter;
IColumn::Filter filter;
IColumn::Filter filter; // reuse the memory allocated among all `read`
};

} // namespace DB::DM
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ class BitmapFilterView
return BitmapFilterView(filter, filter_offset + offset, size);
}

IColumn::Filter getRawSubFilter(UInt32 offset, UInt32 size) const
{
RUNTIME_CHECK(offset + size <= filter_size, offset, size, filter_size);
return IColumn::Filter{
filter->filter.data() + filter_offset + offset,
filter->filter.data() + filter_offset + offset + size};
}

// Caller should ensure n in [0, size).
inline bool get(UInt32 n) const { return filter->get(filter_offset + n); }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,23 @@ SkippableBlockInputStreamPtr ColumnFileSetWithVectorIndexInputStream::tryBuild(
rest_columns->emplace_back(cd);
}

// No vector index column is specified, just use the normal logic.
// No vector index column is specified, fallback.
if (!vec_cd.has_value())
return fallback();

bool has_vector_index = false;
for (const auto & file : delta_snap->getColumnFiles())
{
if (auto * tiny_file = file->tryToTinyFile(); tiny_file && tiny_file->hasIndex(ann_query_info->index_id()))
{
has_vector_index = true;
break;
}
}
// No file has vector index, fallback.
if (!has_vector_index)
return fallback();

// All check passed. Let's read via vector index.
return std::make_shared<ColumnFileSetWithVectorIndexInputStream>(
context,
Expand All @@ -68,25 +81,6 @@ SkippableBlockInputStreamPtr ColumnFileSetWithVectorIndexInputStream::tryBuild(
read_tag_);
}

Block ColumnFileSetWithVectorIndexInputStream::read(FilterPtr & res_filter, bool return_filter)
{
if (return_filter)
return readImpl(res_filter);

// If return_filter == false, we must filter by ourselves.

FilterPtr filter = nullptr;
auto res = readImpl(filter);
if (filter != nullptr)
{
auto passed_count = countBytesInFilter(*filter);
for (auto & col : res)
col.column = col.column->filter(*filter, passed_count);
}
// filter == nullptr means all rows are valid and no need to filter.
return res;
}

Block ColumnFileSetWithVectorIndexInputStream::readOtherColumns()
{
auto reset_column_file_reader = (*cur_column_file_reader)->createNewReader(rest_col_defs, ReadTag::Query);
Expand All @@ -102,7 +96,7 @@ void ColumnFileSetWithVectorIndexInputStream::toNextFile(size_t current_file_ind
tiny_readers[current_file_index].reset();
}

Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter)
Block ColumnFileSetWithVectorIndexInputStream::read()
{
load();

Expand All @@ -115,7 +109,7 @@ Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter)
continue;
}
auto current_file_index = std::distance(reader.column_file_readers.begin(), cur_column_file_reader);
// If has index, we can read the column by vector index.
// If file has index, we can read the column by vector index.
if (tiny_readers[current_file_index] != nullptr)
{
const auto file_rows = column_files[current_file_index]->getRows();
Expand All @@ -133,44 +127,43 @@ Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter)
auto tiny_reader = tiny_readers[current_file_index];
auto vec_column = vec_cd.type->createColumn();
const std::span file_selected_rows{selected_row_begin, selected_row_end};
tiny_reader->read(vec_column, file_selected_rows, /* rowid_start_offset= */ read_rows, file_rows);
assert(vec_column->size() == file_rows);
tiny_reader->read(vec_column, file_selected_rows, /* rowid_start_offset= */ read_rows);
assert(vec_column->size() == selected_rows);

// read other columns if needed
Block block;
if (!rest_col_defs->empty())
{
block = readOtherColumns();
assert(block.rows() == vec_column->size());
}

auto index = header.getPositionByName(vec_cd.name);
block.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name));

// Fill res_filter
if (selected_rows == file_rows)
{
res_filter = nullptr;
}
else
{
filter.clear();
filter.resize_fill(file_rows, 0);
for (const auto rowid : file_selected_rows)
filter[rowid - read_rows] = 1;
res_filter = &filter;
for (auto & col : block)
col.column = col.column->filter(filter, selected_rows);

assert(block.rows() == selected_rows);
}

auto index = header.getPositionByName(vec_cd.name);
block.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name));

// All rows in this ColumnFileTiny have been read.
block.setStartOffset(read_rows);
toNextFile(current_file_index, file_rows);
return block;
}
// If file does not have index, reader by cur_column_file_reader.
auto block = (*cur_column_file_reader)->readNextBlock();
if (block)
{
block.setStartOffset(read_rows);
read_rows += block.rows();
res_filter = nullptr;
size_t rows = block.rows();
filter = valid_rows.getRawSubFilter(read_rows, rows);
size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
col.column = col.column->filter(filter, passed_count);
read_rows += rows;
return block;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,13 @@ class ColumnFileSetWithVectorIndexInputStream : public VectorIndexBlockInputStre
String getName() const override { return "ColumnFileSetWithVectorIndex"; }
Block getHeader() const override { return header; }

Block read() override
{
FilterPtr filter = nullptr;
return read(filter, false);
}

// When all rows in block are not filtered out,
// `res_filter` will be set to null.
// The caller needs to do handle this situation.
Block read(FilterPtr & res_filter, bool return_filter) override;
Block read() override;

std::vector<VectorIndexViewer::SearchResult> load() override;

void setSelectedRows(const std::span<const UInt32> & selected_rows) override;

private:
Block readImpl(FilterPtr & res_filter);

Block readOtherColumns();

void toNextFile(size_t current_file_index, size_t current_file_rows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,18 @@ namespace DB::DM
void ColumnFileTinyVectorIndexReader::read(
MutableColumnPtr & vec_column,
const std::span<const VectorIndexViewer::Key> & read_rowids,
size_t rowid_start_offset,
size_t read_rows)
size_t rowid_start_offset)
{
RUNTIME_CHECK(loaded);

Stopwatch watch;
vec_column->reserve(read_rows);
vec_column->reserve(read_rowids.size());
std::vector<Float32> value;
size_t current_rowid = rowid_start_offset;
for (const auto & rowid : read_rowids)
{
// Each ColomnFileTiny has its own vector index, rowid_start_offset is the offset of the ColmnFilePersistSet.
vec_index->get(rowid - rowid_start_offset, value);
if (rowid > current_rowid)
{
UInt32 nulls = rowid - current_rowid;
// Insert [] if column is Not Null, or NULL if column is Nullable
vec_column->insertManyDefaults(nulls);
}
vec_column->insertData(reinterpret_cast<const char *>(value.data()), value.size() * sizeof(Float32));
current_rowid = rowid + 1;
}
if (current_rowid < rowid_start_offset + read_rows)
{
UInt32 nulls = rowid_start_offset + read_rows - current_rowid;
vec_column->insertManyDefaults(nulls);
}

perf_stat.returned_rows = read_rowids.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,11 @@ class ColumnFileTinyVectorIndexReader

~ColumnFileTinyVectorIndexReader();

// Read vector column data and set filter.
// The column will be as same as as the rows of the tiny file,
// but only the rows in selected_rows will be filled,
// others will be filled with default values.
// Read vector column data with the specified rowids.
void read(
MutableColumnPtr & vec_column,
const std::span<const VectorIndexViewer::Key> & read_rowids,
size_t rowid_start_offset,
size_t read_rows);
size_t rowid_start_offset);

// Load vector index and search results.
// Return the rowids of the selected rows.
Expand Down
32 changes: 28 additions & 4 deletions dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Columns/countBytesInFilter.h>
#include <Storages/DeltaMerge/ConcatSkippableBlockInputStream.h>
#include <Storages/DeltaMerge/ScanContext.h>

Expand Down Expand Up @@ -132,13 +133,13 @@ Block ConcatSkippableBlockInputStream<need_row_id>::readWithFilter(const IColumn
}

template <bool need_row_id>
Block ConcatSkippableBlockInputStream<need_row_id>::read(FilterPtr & res_filter, bool return_filter)
Block ConcatSkippableBlockInputStream<need_row_id>::read()
{
Block res;

while (current_stream != children.end())
{
res = (*current_stream)->read(res_filter, return_filter);
res = (*current_stream)->read();
if (res)
{
res.setStartOffset(res.startOffset() + precede_stream_rows);
Expand Down Expand Up @@ -239,12 +240,34 @@ void ConcatVectorIndexBlockInputStream::load()
sr_it = end;
}

// Not used anymore, release memory.
index_streams.clear();
loaded = true;
}

Block ConcatVectorIndexBlockInputStream::read()
{
load();
auto block = stream->read();

// The block read from `VectorIndexBlockInputStream` only return the selected rows. Return it directly.
// For streams which are not `VectorIndexBlockInputStream`, the block should be filtered by bitmap.
if (auto index = std::distance(stream->children.begin(), stream->current_stream); !index_streams[index])
{
filter.resize(block.rows());
if (bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); all_match)
return block;

size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
{
col.column = col.column->filter(filter, passed_count);
}
}

return block;
}

SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build(
const BitmapFilterPtr & bitmap_filter,
std::shared_ptr<ConcatSkippableBlockInputStream<false>> stream,
const ANNQueryInfoPtr & ann_query_info)
{
Expand All @@ -267,6 +290,7 @@ SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build(
return stream;

return std::make_shared<ConcatVectorIndexBlockInputStream>(
bitmap_filter,
stream,
std::move(index_streams),
ann_query_info->top_k());
Expand Down
Loading

0 comments on commit a55423c

Please sign in to comment.