diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 3aa77d7b64c..0dd45d77f5b 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -41,7 +41,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Operators/UnorderedSourceOp.cpp b/dbms/src/Operators/UnorderedSourceOp.cpp index 09533826dc6..2f05bf2a142 100644 --- a/dbms/src/Operators/UnorderedSourceOp.cpp +++ b/dbms/src/Operators/UnorderedSourceOp.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include diff --git a/dbms/src/Operators/UnorderedSourceOp.h b/dbms/src/Operators/UnorderedSourceOp.h index 65f0bb68550..cf771822d20 100644 --- a/dbms/src/Operators/UnorderedSourceOp.h +++ b/dbms/src/Operators/UnorderedSourceOp.h @@ -15,7 +15,6 @@ #pragma once #include -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h index e060b22e1b0..eb5154bd231 100644 --- a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h @@ -50,7 +50,7 @@ class BitmapFilter private: void set(std::span row_ids, const FilterPtr & f); - std::vector filter; + IColumn::Filter filter; bool all_match; }; diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp index b38a9eabaa7..3020cb9119e 100644 --- a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp @@ -32,40 +32,18 @@ BitmapFilterBlockInputStream::BitmapFilterBlockInputStream( Block BitmapFilterBlockInputStream::read() { - FilterPtr block_filter = nullptr; - auto block = children.at(0)->read(block_filter, true); + auto block = children.at(0)->read(); if (!block) return block; filter.resize(block.rows()); - bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); - if (!block_filter) - { - if (all_match) - return block; - size_t passed_count = countBytesInFilter(filter); - for (auto & col : block) - { - col.column = col.column->filter(filter, passed_count); - } - } - else + if (bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); all_match) + return block; + + size_t passed_count = countBytesInFilter(filter); + for (auto & col : block) { - RUNTIME_CHECK(filter.size() == block_filter->size(), filter.size(), block_filter->size()); - if (!all_match) - { - std::transform( - filter.begin(), - filter.end(), - block_filter->begin(), - block_filter->begin(), - [](UInt8 a, UInt8 b) { return a && b; }); - } - size_t passed_count = countBytesInFilter(*block_filter); - for (auto & col : block) - { - col.column = col.column->filter(*block_filter, passed_count); - } + col.column = col.column->filter(filter, passed_count); } return block; } diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.h index f3648f8daea..9c3274187a1 100644 --- a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.h @@ -41,7 +41,7 @@ class BitmapFilterBlockInputStream : public IBlockInputStream private: Block header; BitmapFilterPtr bitmap_filter; - IColumn::Filter filter; + IColumn::Filter filter; // reuse the memory allocated among all `read` }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h index 91122e39ab8..a2f85a7ad40 100644 --- a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h @@ -52,6 +52,14 @@ class BitmapFilterView return BitmapFilterView(filter, filter_offset + offset, size); } + IColumn::Filter getRawSubFilter(UInt32 offset, UInt32 size) const + { + RUNTIME_CHECK(offset + size <= filter_size, offset, size, filter_size); + return IColumn::Filter{ + filter->filter.data() + filter_offset + offset, + filter->filter.data() + filter_offset + offset + size}; + } + // Caller should ensure n in [0, size). inline bool get(UInt32 n) const { return filter->get(filter_offset + n); } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp index 811e295e917..825477d5e67 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp @@ -50,10 +50,23 @@ SkippableBlockInputStreamPtr ColumnFileSetWithVectorIndexInputStream::tryBuild( rest_columns->emplace_back(cd); } - // No vector index column is specified, just use the normal logic. + // No vector index column is specified, fallback. if (!vec_cd.has_value()) return fallback(); + bool has_vector_index = false; + for (const auto & file : delta_snap->getColumnFiles()) + { + if (auto * tiny_file = file->tryToTinyFile(); tiny_file && tiny_file->hasIndex(ann_query_info->index_id())) + { + has_vector_index = true; + break; + } + } + // No file has vector index, fallback. + if (!has_vector_index) + return fallback(); + // All check passed. Let's read via vector index. return std::make_shared( context, @@ -68,25 +81,6 @@ SkippableBlockInputStreamPtr ColumnFileSetWithVectorIndexInputStream::tryBuild( read_tag_); } -Block ColumnFileSetWithVectorIndexInputStream::read(FilterPtr & res_filter, bool return_filter) -{ - if (return_filter) - return readImpl(res_filter); - - // If return_filter == false, we must filter by ourselves. - - FilterPtr filter = nullptr; - auto res = readImpl(filter); - if (filter != nullptr) - { - auto passed_count = countBytesInFilter(*filter); - for (auto & col : res) - col.column = col.column->filter(*filter, passed_count); - } - // filter == nullptr means all rows are valid and no need to filter. - return res; -} - Block ColumnFileSetWithVectorIndexInputStream::readOtherColumns() { auto reset_column_file_reader = (*cur_column_file_reader)->createNewReader(rest_col_defs, ReadTag::Query); @@ -102,7 +96,7 @@ void ColumnFileSetWithVectorIndexInputStream::toNextFile(size_t current_file_ind tiny_readers[current_file_index].reset(); } -Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter) +Block ColumnFileSetWithVectorIndexInputStream::read() { load(); @@ -115,7 +109,7 @@ Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter) continue; } auto current_file_index = std::distance(reader.column_file_readers.begin(), cur_column_file_reader); - // If has index, we can read the column by vector index. + // If file has index, we can read the column by vector index. if (tiny_readers[current_file_index] != nullptr) { const auto file_rows = column_files[current_file_index]->getRows(); @@ -133,44 +127,43 @@ Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter) auto tiny_reader = tiny_readers[current_file_index]; auto vec_column = vec_cd.type->createColumn(); const std::span file_selected_rows{selected_row_begin, selected_row_end}; - tiny_reader->read(vec_column, file_selected_rows, /* rowid_start_offset= */ read_rows, file_rows); - assert(vec_column->size() == file_rows); + tiny_reader->read(vec_column, file_selected_rows, /* rowid_start_offset= */ read_rows); + assert(vec_column->size() == selected_rows); + // read other columns if needed Block block; if (!rest_col_defs->empty()) { block = readOtherColumns(); - assert(block.rows() == vec_column->size()); - } - - auto index = header.getPositionByName(vec_cd.name); - block.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name)); - - // Fill res_filter - if (selected_rows == file_rows) - { - res_filter = nullptr; - } - else - { filter.clear(); filter.resize_fill(file_rows, 0); for (const auto rowid : file_selected_rows) filter[rowid - read_rows] = 1; - res_filter = &filter; + for (auto & col : block) + col.column = col.column->filter(filter, selected_rows); + + assert(block.rows() == selected_rows); } + auto index = header.getPositionByName(vec_cd.name); + block.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name)); + // All rows in this ColumnFileTiny have been read. block.setStartOffset(read_rows); toNextFile(current_file_index, file_rows); return block; } + // If file does not have index, reader by cur_column_file_reader. auto block = (*cur_column_file_reader)->readNextBlock(); if (block) { block.setStartOffset(read_rows); - read_rows += block.rows(); - res_filter = nullptr; + size_t rows = block.rows(); + filter = valid_rows.getRawSubFilter(read_rows, rows); + size_t passed_count = countBytesInFilter(filter); + for (auto & col : block) + col.column = col.column->filter(filter, passed_count); + read_rows += rows; return block; } else diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h index 905200f62b7..4b9bca9bdd3 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h @@ -92,24 +92,13 @@ class ColumnFileSetWithVectorIndexInputStream : public VectorIndexBlockInputStre String getName() const override { return "ColumnFileSetWithVectorIndex"; } Block getHeader() const override { return header; } - Block read() override - { - FilterPtr filter = nullptr; - return read(filter, false); - } - - // When all rows in block are not filtered out, - // `res_filter` will be set to null. - // The caller needs to do handle this situation. - Block read(FilterPtr & res_filter, bool return_filter) override; + Block read() override; std::vector load() override; void setSelectedRows(const std::span & selected_rows) override; private: - Block readImpl(FilterPtr & res_filter); - Block readOtherColumns(); void toNextFile(size_t current_file_index, size_t current_file_rows); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp index 079c14ae9ca..fe07ad9da12 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp @@ -27,32 +27,18 @@ namespace DB::DM void ColumnFileTinyVectorIndexReader::read( MutableColumnPtr & vec_column, const std::span & read_rowids, - size_t rowid_start_offset, - size_t read_rows) + size_t rowid_start_offset) { RUNTIME_CHECK(loaded); Stopwatch watch; - vec_column->reserve(read_rows); + vec_column->reserve(read_rowids.size()); std::vector value; - size_t current_rowid = rowid_start_offset; for (const auto & rowid : read_rowids) { // Each ColomnFileTiny has its own vector index, rowid_start_offset is the offset of the ColmnFilePersistSet. vec_index->get(rowid - rowid_start_offset, value); - if (rowid > current_rowid) - { - UInt32 nulls = rowid - current_rowid; - // Insert [] if column is Not Null, or NULL if column is Nullable - vec_column->insertManyDefaults(nulls); - } vec_column->insertData(reinterpret_cast(value.data()), value.size() * sizeof(Float32)); - current_rowid = rowid + 1; - } - if (current_rowid < rowid_start_offset + read_rows) - { - UInt32 nulls = rowid_start_offset + read_rows - current_rowid; - vec_column->insertManyDefaults(nulls); } perf_stat.returned_rows = read_rowids.size(); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h index 993b0a072a3..cb6c09c6576 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h @@ -74,15 +74,11 @@ class ColumnFileTinyVectorIndexReader ~ColumnFileTinyVectorIndexReader(); - // Read vector column data and set filter. - // The column will be as same as as the rows of the tiny file, - // but only the rows in selected_rows will be filled, - // others will be filled with default values. + // Read vector column data with the specified rowids. void read( MutableColumnPtr & vec_column, const std::span & read_rowids, - size_t rowid_start_offset, - size_t read_rows); + size_t rowid_start_offset); // Load vector index and search results. // Return the rowids of the selected rows. diff --git a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp index 33206792603..6cdc5e2a32f 100644 --- a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -132,13 +133,13 @@ Block ConcatSkippableBlockInputStream::readWithFilter(const IColumn } template -Block ConcatSkippableBlockInputStream::read(FilterPtr & res_filter, bool return_filter) +Block ConcatSkippableBlockInputStream::read() { Block res; while (current_stream != children.end()) { - res = (*current_stream)->read(res_filter, return_filter); + res = (*current_stream)->read(); if (res) { res.setStartOffset(res.startOffset() + precede_stream_rows); @@ -239,12 +240,34 @@ void ConcatVectorIndexBlockInputStream::load() sr_it = end; } - // Not used anymore, release memory. - index_streams.clear(); loaded = true; } +Block ConcatVectorIndexBlockInputStream::read() +{ + load(); + auto block = stream->read(); + + // The block read from `VectorIndexBlockInputStream` only return the selected rows. Return it directly. + // For streams which are not `VectorIndexBlockInputStream`, the block should be filtered by bitmap. + if (auto index = std::distance(stream->children.begin(), stream->current_stream); !index_streams[index]) + { + filter.resize(block.rows()); + if (bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); all_match) + return block; + + size_t passed_count = countBytesInFilter(filter); + for (auto & col : block) + { + col.column = col.column->filter(filter, passed_count); + } + } + + return block; +} + SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build( + const BitmapFilterPtr & bitmap_filter, std::shared_ptr> stream, const ANNQueryInfoPtr & ann_query_info) { @@ -267,6 +290,7 @@ SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build( return stream; return std::make_shared( + bitmap_filter, stream, std::move(index_streams), ann_query_info->top_k()); diff --git a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h index 5a17d44ec85..48d988fe11e 100644 --- a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h @@ -47,13 +47,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream Block readWithFilter(const IColumn::Filter & filter) override; - Block read() override - { - FilterPtr filter = nullptr; - return read(filter, false); - } - - Block read(FilterPtr & res_filter, bool return_filter) override; + Block read() override; private: friend class ConcatVectorIndexBlockInputStream; @@ -71,16 +65,20 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream class ConcatVectorIndexBlockInputStream : public SkippableBlockInputStream { public: + // Only return the rows that match `bitmap_filter_` ConcatVectorIndexBlockInputStream( + const BitmapFilterPtr & bitmap_filter_, std::shared_ptr> stream, std::vector && index_streams, UInt32 topk_) : stream(std::move(stream)) , index_streams(std::move(index_streams)) , topk(topk_) + , bitmap_filter(bitmap_filter_) {} static SkippableBlockInputStreamPtr build( + const BitmapFilterPtr & bitmap_filter, std::shared_ptr> stream, const ANNQueryInfoPtr & ann_query_info); @@ -97,17 +95,7 @@ class ConcatVectorIndexBlockInputStream : public SkippableBlockInputStream throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); } - Block read() override - { - FilterPtr filter = nullptr; - return read(filter, false); - } - - Block read(FilterPtr & res_filter, bool return_filter) override - { - load(); - return stream->read(res_filter, return_filter); - } + Block read() override; private: void load(); @@ -117,6 +105,9 @@ class ConcatVectorIndexBlockInputStream : public SkippableBlockInputStream std::vector index_streams; UInt32 topk = 0; bool loaded = false; + + BitmapFilterPtr bitmap_filter; + IColumn::Filter filter; // reuse the memory allocated among all `read` }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index e2b28f896e8..1f4dff89f00 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -213,7 +213,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn file_provider, read_limiter, rows_threshold_per_read, - read_one_pack_every_time, + false, // read multiple packs at once tracing_id, enable_read_thread, scan_context, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index 430edd1c971..6e21fa99dbb 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -71,7 +71,6 @@ class DMFilePackFilter const RSResults & getHandleRes() const { return handle_res; } const RSResults & getPackResConst() const { return pack_res; } - RSResults & getPackRes() { return pack_res; } UInt64 countUsePack() const; Handle getMinHandle(size_t pack_id) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 21b697b71df..b26cf5b5321 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -79,6 +79,7 @@ DMFileReader::DMFileReader( , file_provider(file_provider_) , log(Logger::get(tracing_id_)) { + // Initialize column_streams for (const auto & cd : read_columns) { // New inserted column, will be filled with default value later @@ -100,112 +101,75 @@ DMFileReader::DMFileReader( const auto data_type = dmfile->getColumnStat(cd.id).type; data_type->enumerateStreams(callback, {}); } + + // Initialize data_sharing_col_data_cache if needed if (max_sharing_column_bytes > 0) { data_sharing_col_data_cache = std::make_unique(ColumnCacheType::DataSharingCache); } - initAllMatchBlockInfo(); + + initPackOffset(); + + initReadBlockInfos(); +} + +void DMFileReader::initPackOffset() +{ + const auto & pack_stats = dmfile->getPackStats(); + pack_offset.resize(pack_stats.size()); + { + size_t offset = 0; + for (size_t i = 0; i < pack_stats.size(); ++i) + { + pack_offset[i] = offset; + offset += pack_stats[i].rows; + } + } } bool DMFileReader::getSkippedRows(size_t & skip_rows) { skip_rows = 0; - const auto & pack_res = pack_filter.getPackResConst(); const auto & pack_stats = dmfile->getPackStats(); - for (; next_pack_id < pack_res.size() && !pack_res[next_pack_id].isUse(); ++next_pack_id) - { + const auto end_pack_id = read_block_infos.empty() ? pack_stats.size() : read_block_infos.front().start_pack_id; + for (; next_pack_id < end_pack_id; ++next_pack_id) skip_rows += pack_stats[next_pack_id].rows; - addSkippedRows(pack_stats[next_pack_id].rows); - } - next_row_offset += skip_rows; + addSkippedRows(skip_rows); + // return false if it is the end of stream. - return next_pack_id < pack_res.size(); + return !read_block_infos.empty(); } // Skip the block which should be returned by next read() size_t DMFileReader::skipNextBlock() { - // find the first pack which is used - if (size_t skip_rows; !getSkippedRows(skip_rows)) - { - // no block left in the stream - return 0; - } - - // move forward next_pack_id and next_row_offset - const auto [read_rows, rs_result] = getReadRows(); - if (read_rows == 0) + if (size_t skip_rows = 0; !getSkippedRows(skip_rows)) return 0; + const auto [start_pack_id, pack_count, rs_result, read_rows] = read_block_infos.front(); + read_block_infos.pop_front(); + next_pack_id = start_pack_id + pack_count; addSkippedRows(read_rows); - scan_context->late_materialization_skip_rows += read_rows; return read_rows; } -// Get the number of rows to read in the next block -// Move forward next_pack_id and next_row_offset -std::pair DMFileReader::getReadRows() -{ - const auto & pack_res = pack_filter.getPackResConst(); - const size_t start_pack_id = next_pack_id; - const size_t read_pack_limit = getReadPackLimit(start_pack_id); - const auto & pack_stats = dmfile->getPackStats(); - size_t read_rows = 0; - auto last_pack_res = RSResult::All; - for (; next_pack_id < pack_res.size() && pack_res[next_pack_id].isUse() && read_rows < rows_threshold_per_read; - ++next_pack_id) - { - if (next_pack_id - start_pack_id >= read_pack_limit) - break; - last_pack_res = last_pack_res && pack_res[next_pack_id]; - read_rows += pack_stats[next_pack_id].rows; - } - - next_row_offset += read_rows; - if (read_tag == ReadTag::Query && last_pack_res.allMatch()) - scan_context->rs_dmfile_read_with_all += next_pack_id - start_pack_id; - return {read_rows, last_pack_res}; -} - Block DMFileReader::readWithFilter(const IColumn::Filter & filter) { - /// 1. Skip filtered out packs. - if (size_t skip_rows; !getSkippedRows(skip_rows)) - { - // no block left in the stream + if (size_t skip_rows = 0; !getSkippedRows(skip_rows)) return {}; - } - /// 2. Mark pack_res[i] = None if all rows in the i-th pack are filtered out by filter. + /// 1. Get start_pack_id, rs_result, read_rows, and new read block infos - const auto & pack_stats = dmfile->getPackStats(); - auto & pack_res = pack_filter.getPackRes(); + // The read_block_info before splited by `filter` + const auto ori_block_info = read_block_infos.front(); + read_block_infos.pop_front(); + // do not update next_pack_id here, it will be updated in readImpl(). + // next_pack_id = start_pack_id + pack_count; + RUNTIME_CHECK(ori_block_info.read_rows == filter.size(), ori_block_info.read_rows, filter.size()); + const auto new_block_infos = splitReadBlockInfos(ori_block_info, filter); + const size_t start_row_offset = pack_offset[ori_block_info.start_pack_id]; - size_t start_row_offset = next_row_offset; - size_t start_pack_id = next_pack_id; - const auto [read_rows, rs_result] = getReadRows(); - RUNTIME_CHECK(read_rows == filter.size(), read_rows, filter.size()); - size_t last_pack_id = next_pack_id; - { - size_t offset = 0; - for (size_t i = start_pack_id; i < last_pack_id; ++i) - { - if (countBytesInFilter(filter, offset, pack_stats[i].rows) == 0) - pack_res[i] = RSResult::None; - offset += pack_stats[i].rows; - } - } - - /// 3. Mark the pack_res[last_pack_id] as None temporarily to avoid reading it and its following packs in this round - - auto next_pack_id_pack_res_cp = RSResult::None; - if (last_pack_id < pack_res.size()) - { - next_pack_id_pack_res_cp = pack_res[last_pack_id]; - pack_res[last_pack_id] = RSResult::None; - } - - /// 4. Read and filter packs + /// 2. Read and filter packs MutableColumns columns; columns.reserve(read_columns.size()); @@ -217,63 +181,46 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter) columns.emplace_back(std::move(col)); } - size_t offset = 0; - // reset next_pack_id to start_pack_id, next_row_offset to start_row_offset - next_pack_id = start_pack_id; - next_row_offset = start_row_offset; - for (size_t pack_id = start_pack_id; pack_id < last_pack_id; ++pack_id) + auto block_pack_res = RSResult::All; + for (const auto & block_info : new_block_infos) { - // When the next pack is not used or the pack is the last pack, call read() to read theses packs and filter them - // For example: - // When next_pack_id_cp = pack_res.size() and pack_res[next_pack_id:next_pack_id_cp] = [true, true, false, true, true, true] - // The algorithm runs as follows: - // When i = next_pack_id + 2, call read() to read {next_pack_id, next_pack_id + 1}th packs - // When i = next_pack_id + 5, call read() to read {next_pack_id + 3, next_pack_id + 4, next_pack_id + 5}th packs - if (pack_res[pack_id].isUse() && (pack_id + 1 == pack_res.size() || !pack_res[pack_id + 1].isUse())) - { - Block block = read(); - size_t rows = block.rows(); + const size_t new_start_row_offset = pack_offset[block_info.start_pack_id]; + + const auto offset_begin = new_start_row_offset - start_row_offset; + const auto offset_end = offset_begin + block_info.read_rows; - if (size_t passed_count = countBytesInFilter(filter, offset, rows); passed_count != rows) + Block block = readImpl(block_info); + block_pack_res = block_pack_res && block_info.rs_result; + if (size_t passed_count = countBytesInFilter(filter, offset_begin, block_info.read_rows); + passed_count != block_info.read_rows) + { + std::vector positions; + positions.reserve(passed_count); + for (size_t i = offset_begin; i < offset_end; ++i) { - std::vector positions; - positions.reserve(passed_count); - for (size_t p = offset; p < offset + rows; ++p) - { - if (filter[p]) - positions.push_back(p - offset); - } - for (size_t i = 0; i < block.columns(); ++i) - { - columns[i]->insertDisjunctFrom(*block.getByPosition(i).column, positions); - } + if (filter[i]) + positions.push_back(i - offset_begin); } - else + for (size_t i = 0; i < block.columns(); ++i) { - for (size_t i = 0; i < block.columns(); ++i) - { - columns[i]->insertRangeFrom( - *block.getByPosition(i).column, - 0, - block.getByPosition(i).column->size()); - } + auto column = block.getByPosition(i).column; + columns[i]->insertDisjunctFrom(*column, positions); } - offset += rows; } - else if (!pack_res[pack_id].isUse()) + else { - offset += pack_stats[pack_id].rows; + for (size_t i = 0; i < block.columns(); ++i) + { + columns[i]->insertRangeFrom(*block.getByPosition(i).column, 0, passed_count); + } } } - /// 5. Restore the pack_res[last_pack_id] - - if (last_pack_id < pack_res.size()) - pack_res[last_pack_id] = next_pack_id_pack_res_cp; - Block res = getHeader().cloneWithColumns(std::move(columns)); res.setStartOffset(start_row_offset); - res.setRSResult(rs_result); + res.setRSResult(block_pack_res); + addSkippedRows(ori_block_info.read_rows - total_passed_count); + return res; } @@ -283,37 +230,45 @@ bool DMFileReader::isCacheableColumn(const ColumnDefine & cd) } Block DMFileReader::read() +{ + if (size_t skip_rows = 0; !getSkippedRows(skip_rows)) + return {}; + + const auto read_info = read_block_infos.front(); + read_block_infos.pop_front(); + return readImpl(read_info); +} + +Block DMFileReader::readImpl(const ReadBlockInfo & read_info) { Stopwatch watch; SCOPE_EXIT(scan_context->total_dmfile_read_time_ns += watch.elapsed();); - /// 1. Skip filtered out packs. - if (size_t skip_rows; !getSkippedRows(skip_rows)) - return {}; + /// 1. Update next_pack_id and add scanned rows + const auto & [start_pack_id, pack_count, rs_result, read_rows] = read_info; + if (read_tag == ReadTag::Query && rs_result.allMatch()) + scan_context->rs_dmfile_read_with_all += pack_count; - /// 2. Find the max continuous rows can be read. - - size_t start_pack_id = next_pack_id; - size_t start_row_offset = next_row_offset; - const auto [read_rows, rs_result] = getReadRows(); - if (read_rows == 0) - return {}; + next_pack_id = start_pack_id + pack_count; + const size_t start_row_offset = pack_offset[start_pack_id]; addScannedRows(read_rows); - /// 3. Find packs can do clean read. + /// 2. Find packs can do clean read. + const bool need_read_extra_columns = std::any_of(read_columns.cbegin(), read_columns.cend(), [](const auto & cd) { + return cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == TAG_COLUMN_ID || cd.id == VERSION_COLUMN_ID; + }); const auto & pack_stats = dmfile->getPackStats(); const auto & pack_properties = dmfile->getPackProperties(); const auto & handle_res = pack_filter.getHandleRes(); // alias of handle_res in pack_filter - const size_t read_packs = next_pack_id - start_pack_id; std::vector handle_column_clean_read_packs; std::vector del_column_clean_read_packs; std::vector version_column_clean_read_packs; - if (is_fast_scan) + if (need_read_extra_columns && is_fast_scan) { if (enable_del_clean_read) { - del_column_clean_read_packs.reserve(read_packs); + del_column_clean_read_packs.reserve(pack_count); for (size_t i = start_pack_id; i < next_pack_id; ++i) { // If delete rows is 0, we do not need to read del column. @@ -328,7 +283,7 @@ Block DMFileReader::read() } if (enable_handle_clean_read) { - handle_column_clean_read_packs.reserve(read_packs); + handle_column_clean_read_packs.reserve(pack_count); for (size_t i = start_pack_id; i < next_pack_id; ++i) { // If all handle in a pack are in the given range, and del column do clean read, we do not need to read handle column. @@ -346,11 +301,11 @@ Block DMFileReader::read() } } } - else if (enable_handle_clean_read) + else if (need_read_extra_columns && enable_handle_clean_read) { - handle_column_clean_read_packs.reserve(read_packs); - version_column_clean_read_packs.reserve(read_packs); - del_column_clean_read_packs.reserve(read_packs); + handle_column_clean_read_packs.reserve(pack_count); + version_column_clean_read_packs.reserve(pack_count); + del_column_clean_read_packs.reserve(pack_count); for (size_t i = start_pack_id; i < next_pack_id; ++i) { // If all handle in a pack are in the given range, no not_clean rows, and max version <= max_read_version, @@ -365,11 +320,12 @@ Block DMFileReader::read() } } - /// 4. Read columns. + /// 3. Read columns. ColumnsWithTypeAndName columns; columns.reserve(read_columns.size()); - for (auto & cd : read_columns) + + for (const auto & cd : read_columns) { try { @@ -378,16 +334,16 @@ Block DMFileReader::read() switch (cd.id) { case EXTRA_HANDLE_COLUMN_ID: - col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, handle_column_clean_read_packs); + col = readExtraColumn(cd, start_pack_id, pack_count, read_rows, handle_column_clean_read_packs); break; case TAG_COLUMN_ID: - col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, del_column_clean_read_packs); + col = readExtraColumn(cd, start_pack_id, pack_count, read_rows, del_column_clean_read_packs); break; case VERSION_COLUMN_ID: - col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, version_column_clean_read_packs); + col = readExtraColumn(cd, start_pack_id, pack_count, read_rows, version_column_clean_read_packs); break; default: - col = readColumn(cd, start_pack_id, read_packs, read_rows); + col = readColumn(cd, start_pack_id, pack_count, read_rows); break; } columns.emplace_back(std::move(col), cd.type, cd.name, cd.id); @@ -508,6 +464,7 @@ ColumnPtr DMFileReader::readExtraColumn( column->insertRangeFrom(*src_col, 0, src_col->size()); } } + return column; } @@ -528,8 +485,8 @@ ColumnPtr DMFileReader::readColumn(const ColumnDefine & cd, size_t start_pack_id return readFromDiskOrSharingCache(cd, type_on_disk, 0, dmfile->getPacks(), dmfile->getRows()); }); - auto column = type_on_disk->createColumn(); - column->insertRangeFrom(*column_all_data, next_row_offset - read_rows, read_rows); + auto column = column_all_data->cut(pack_offset[start_pack_id], read_rows); + // Cast column's data from DataType in disk to what we need now return convertColumnByColumnDefineIfNeed(type_on_disk, std::move(column), cd); } @@ -537,6 +494,7 @@ ColumnPtr DMFileReader::readColumn(const ColumnDefine & cd, size_t start_pack_id if (!enable_column_cache || !isCacheableColumn(cd)) { auto column = readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows); + // Cast column's data from DataType in disk to what we need now return convertColumnByColumnDefineIfNeed(type_on_disk, std::move(column), cd); } @@ -612,11 +570,9 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( { GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment(); } - bool enable_sharing_column = has_concurrent_reader && !reach_sharing_column_memory_limit; - ColumnPtr column; - if (enable_sharing_column) + if (has_concurrent_reader && !reach_sharing_column_memory_limit) { - column = getColumnFromCache( + auto column = getColumnFromCache( data_sharing_col_data_cache, cd, type_on_disk, @@ -635,20 +591,14 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( MemoryTrackerSetter mem_tracker_guard(true, nullptr); return readFromDisk(cd, type_on_disk, start_pack_id, read_rows); }); - } - else - { - column = readFromDisk(cd, type_on_disk, start_pack_id, read_rows); - } - - // Set the column to DMFileReaderPool to share the column data. - if (enable_sharing_column && data_sharing_col_data_cache != nullptr) - { + // Set the column to DMFileReaderPool to share the column data. DMFileReaderPool::instance().set(*this, cd.id, start_pack_id, pack_count, column); // Delete column from local cache since it is not used anymore. - data_sharing_col_data_cache->delColumn(cd.id, next_pack_id); + data_sharing_col_data_cache->delColumn(cd.id, start_pack_id + pack_count); + return column; } - return column; + + return readFromDisk(cd, type_on_disk, start_pack_id, read_rows); } void DMFileReader::addColumnToCache( @@ -754,58 +704,86 @@ void DMFileReader::addSkippedRows(UInt64 rows) } } -void DMFileReader::initAllMatchBlockInfo() +void DMFileReader::initReadBlockInfos() { const auto & pack_res = pack_filter.getPackResConst(); const auto & pack_stats = dmfile->getPackStats(); - // Get continuous packs with RSResult::All - auto get_all_match_block = [&](size_t start_pack) { - size_t count = 0; - size_t rows = 0; - for (size_t i = start_pack; i < pack_res.size(); ++i) - { - if (!pack_res[i].allMatch() || rows >= rows_threshold_per_read) - break; + const size_t read_pack_limit = read_one_pack_every_time ? 1 : std::numeric_limits::max(); + size_t start_pack_id = 0; + size_t read_rows = 0; + auto prev_block_pack_res = RSResult::All; + for (size_t pack_id = 0; pack_id < pack_res.size(); ++pack_id) + { + bool is_use = pack_res[pack_id].isUse(); + bool reach_limit = pack_id - start_pack_id >= read_pack_limit || read_rows >= rows_threshold_per_read; + // Get continuous packs with RSResult::All but don't split the read if it is too small. + // Too small block may hurts performance. + bool break_all_match = prev_block_pack_res.allMatch() && !pack_res[pack_id].allMatch() + && read_rows >= rows_threshold_per_read / 2; - ++count; - rows += pack_stats[i].rows; + if (!is_use) + { + if (read_rows > 0) + read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, prev_block_pack_res, read_rows); + // Current pack is not included in the next read_block_info + start_pack_id = pack_id + 1; + read_rows = 0; + prev_block_pack_res = RSResult::All; } - return std::make_pair(count, rows); - }; - - for (size_t i = 0; i < pack_res.size();) - { - if (!pack_res[i].allMatch()) + else if (reach_limit || break_all_match) { - ++i; - continue; + if (read_rows > 0) + read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, prev_block_pack_res, read_rows); + // Current pack must be included in the next read_block_info + start_pack_id = pack_id; + read_rows = pack_stats[pack_id].rows; + prev_block_pack_res = pack_res[pack_id]; + } + else + { + prev_block_pack_res = prev_block_pack_res && pack_res[pack_id]; + read_rows += pack_stats[pack_id].rows; } - auto [pack_count, rows] = get_all_match_block(i); - // Do not read block too small, it may hurts performance - if (rows >= rows_threshold_per_read / 2) - all_match_block_infos.emplace(i, pack_count); - i += pack_count; } + if (read_rows > 0) + read_block_infos.emplace_back(start_pack_id, pack_res.size() - start_pack_id, prev_block_pack_res, read_rows); } -size_t DMFileReader::getReadPackLimit(size_t start_pack_id) +std::vector DMFileReader::splitReadBlockInfos( + const ReadBlockInfo & read_info, + const IColumn::Filter & filter) const { - if (all_match_block_infos.empty() || read_one_pack_every_time) - return read_one_pack_every_time ? 1 : std::numeric_limits::max(); - - const auto [next_all_match_block_start_pack_id, pack_count] = all_match_block_infos.front(); - // Read packs with RSResult::All - if (next_all_match_block_start_pack_id == start_pack_id) + const auto pack_end = read_info.start_pack_id + read_info.pack_count; + const size_t start_row_offset = pack_offset[read_info.start_pack_id]; + const auto & pack_res = pack_filter.getPackResConst(); + const auto & pack_stats = dmfile->getPackStats(); + std::vector new_read_block_infos; + new_read_block_infos.reserve(pack_end - read_info.start_pack_id); + size_t read_rows = 0; + size_t start_pack_id = read_info.start_pack_id; + auto last_pack_res = RSResult::All; + for (size_t pack_id = read_info.start_pack_id; pack_id < pack_end; ++pack_id) { - all_match_block_infos.pop(); - return pack_count; + if (countBytesInFilter(filter, pack_offset[pack_id] - start_row_offset, pack_stats[pack_id].rows) == 0) + { + // no rows should be returned in this pack according to the `filter` + if (read_rows > 0) + new_read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, last_pack_res, read_rows); + start_pack_id = pack_id + 1; + read_rows = 0; + last_pack_res = RSResult::All; + } + else + { + read_rows += pack_stats[pack_id].rows; + last_pack_res = last_pack_res && pack_res[pack_id]; + } } - // Read packs until next_all_match_block_start_pack_id - RUNTIME_CHECK( - next_all_match_block_start_pack_id > start_pack_id, - next_all_match_block_start_pack_id, - start_pack_id); - return next_all_match_block_start_pack_id - start_pack_id; + if (read_rows > 0) + new_read_block_infos.emplace_back(start_pack_id, pack_end - start_pack_id, last_pack_res, read_rows); + new_read_block_infos.shrink_to_fit(); + return new_read_block_infos; } + } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 2cba9b63c87..85c49951b9e 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -100,7 +100,25 @@ class DMFileReader friend class tests::DMFileMetaV2Test; private: - std::pair getReadRows(); + // Initialize, called by constructor + // Initialize pack_offset before initializing read_block_infos + void initPackOffset(); + void initReadBlockInfos(); + + struct ReadBlockInfo + { + size_t start_pack_id = 0; + size_t pack_count = 0; + RSResult rs_result = RSResult::All; + size_t read_rows = 0; + }; + // Split the first read block info to multiple read block infos accroding to `filter` + // Used by readWithFilter, return new read block infos. + std::vector splitReadBlockInfos(const ReadBlockInfo & read_info, const IColumn::Filter & filter) + const; + + Block readImpl(const ReadBlockInfo & read_info); + ColumnPtr readExtraColumn( const ColumnDefine & cd, size_t start_pack_id, @@ -143,9 +161,7 @@ class DMFileReader void addScannedRows(UInt64 rows); void addSkippedRows(UInt64 rows); - void initAllMatchBlockInfo(); - size_t getReadPackLimit(size_t start_pack_id); - +private: DMFilePtr dmfile; ColumnDefines read_columns; ColumnReadStreamMap column_streams; @@ -167,7 +183,6 @@ class DMFileReader const UInt64 max_read_version; -private: /// Filters DMFilePackFilter pack_filter; @@ -181,9 +196,6 @@ class DMFileReader const size_t rows_threshold_per_read; const size_t max_sharing_column_bytes; - size_t next_pack_id = 0; - size_t next_row_offset = 0; - FileProviderPtr file_provider; LoggerPtr log; @@ -191,11 +203,11 @@ class DMFileReader // DataSharing ColumnCachePtr data_sharing_col_data_cache; - // - // Each pair object indicates several continuous packs with RSResult::All and will be read as a Block. - // It is sorted by start_pack. - std::queue> all_match_block_infos; - std::unordered_map last_read_from_cache{}; + std::deque read_block_infos; + // row_offset of the given pack_id + std::vector pack_offset; + // last read pack_id + 1, used by getSkippedRows + size_t next_pack_id = 0; public: void setColumnCacheLongTerm(ColumnCacheLongTermPtr column_cache_long_term_, ColumnID pk_col_id_) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp index 9cbfedba955..f25090b390a 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp @@ -198,33 +198,19 @@ std::vector DMFileVectorIndexReader::loadVector void DMFileVectorIndexReader::read( MutableColumnPtr & vec_column, - const std::span & selected_rows, - size_t start_offset, - size_t column_size) + const std::span & selected_rows) { Stopwatch watch; RUNTIME_CHECK(loaded); - vec_column->reserve(column_size); + vec_column->reserve(selected_rows.size()); std::vector value; - size_t current_rowid = start_offset; for (auto rowid : selected_rows) { vec_index->get(rowid, value); - if (rowid > current_rowid) - { - UInt32 nulls = rowid - current_rowid; - // Insert [] if column is Not Null, or NULL if column is Nullable - vec_column->insertManyDefaults(nulls); - } vec_column->insertData(reinterpret_cast(value.data()), value.size() * sizeof(Float32)); - current_rowid = rowid + 1; - } - if (current_rowid < start_offset + column_size) - { - UInt32 nulls = column_size + start_offset - current_rowid; - vec_column->insertManyDefaults(nulls); } + perf_stat.duration_read_vec_column += watch.elapsedSeconds(); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h index b19850a1180..3c8a3630e0b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h @@ -70,16 +70,8 @@ class DMFileVectorIndexReader ~DMFileVectorIndexReader(); - // Read vector column data and set filter. - // The column will be as same as as the rows of the tiny file, - // but only the rows in sorted_results will be filled, - // others will be filled with default values. - // return the real number of rows read. - void read( - MutableColumnPtr & vec_column, - const std::span & selected_rows, - size_t start_offset, - size_t column_size); + // Read vector column data with the specified rowids. + void read(MutableColumnPtr & vec_column, const std::span & selected_rows); // Load vector index and search results. // Return the rowids of the selected rows. diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp index 4f008c7454f..6f4ac3b10d2 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp @@ -65,147 +65,55 @@ DMFileWithVectorIndexBlockInputStream::~DMFileWithVectorIndexBlockInputStream() valid_packs_after_search); } -Block DMFileWithVectorIndexBlockInputStream::read(FilterPtr & res_filter, bool return_filter) -{ - if (return_filter) - return readImpl(res_filter); - - // If return_filter == false, we must filter by ourselves. - - FilterPtr filter = nullptr; - auto res = readImpl(filter); - if (filter != nullptr) - { - for (auto & col : res) - col.column = col.column->filter(*filter, -1); - } - // filter == nullptr means all rows are valid and no need to filter. - - return res; -} - -Block DMFileWithVectorIndexBlockInputStream::readImpl(FilterPtr & res_filter) +Block DMFileWithVectorIndexBlockInputStream::read() { internalLoad(); - auto [res, real_rows] = reader.read_columns.empty() ? readByIndexReader() : readByFollowingOtherColumns(); - - if (!res) + if (reader.read_block_infos.empty()) return {}; - // If all rows are valid, res_filter is nullptr. - if (real_rows == res.rows()) - { - res_filter = nullptr; - return res; - } - - // Assign output filter according to sorted_results. - // - // For example, if sorted_results is [3, 10], the complete filter array is: - // [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1] - // And we should only return filter array starting from res.startOffset(), like: - // [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1] - // ^startOffset ^startOffset+rows - // filter: [0, 0, 0, 0, 0] - // - // We will use startOffset as lowerBound (inclusive), ans startOffset+rows - // as upperBound (exclusive) to find whether this range has a match in sorted_results. - - const auto start_offset = res.startOffset(); - const auto max_rowid_exclusive = start_offset + res.rows(); - - filter.clear(); - filter.resize_fill(res.rows(), 0); - - auto it = std::lower_bound(sorted_results.begin(), sorted_results.end(), start_offset); - while (it != sorted_results.end()) - { - auto rowid = *it; - if (rowid >= max_rowid_exclusive) - break; - filter[rowid - start_offset] = 1; - ++it; - } + const auto [start_pack_id, pack_count, rs_result, read_rows] = reader.read_block_infos.front(); + const auto start_row_offset = reader.pack_offset[start_pack_id]; - res_filter = &filter; - return res; -} + auto vec_column = vec_cd.type->createColumn(); + auto begin = std::lower_bound(sorted_results.cbegin(), sorted_results.cend(), start_row_offset); + auto end = std::lower_bound(begin, sorted_results.cend(), start_row_offset + read_rows); + const std::span block_selected_rows{begin, end}; + if (block_selected_rows.empty()) + return {}; -std::tuple DMFileWithVectorIndexBlockInputStream::readByIndexReader() -{ - const auto & pack_stats = dmfile->getPackStats(); - size_t all_packs = pack_stats.size(); - const auto & pack_res = reader.pack_filter.getPackResConst(); + // read vector column + vec_index_reader->read(vec_column, block_selected_rows); - RUNTIME_CHECK(pack_res.size() == all_packs); + Block block; - // Skip as many packs as possible according to Pack Filter - while (index_reader_next_pack_id < all_packs) + // read other columns if needed + if (!reader.read_columns.empty()) { - if (pack_res[index_reader_next_pack_id].isUse()) - break; - index_reader_next_row_id += pack_stats[index_reader_next_pack_id].rows; - ++index_reader_next_pack_id; - } + Stopwatch w; - if (index_reader_next_pack_id >= all_packs) - // Finished - return {}; + filter.clear(); + filter.resize_fill(read_rows, 0); + for (const auto rowid : block_selected_rows) + filter[rowid - start_row_offset] = 1; - auto block_start_row_id = index_reader_next_row_id; - while (index_reader_next_pack_id < all_packs) + block = reader.read(); + for (auto & col : block) + col.column = col.column->filter(filter, block_selected_rows.size()); + duration_read_from_other_columns_seconds += w.elapsedSeconds(); + } + else { - if (!pack_res[index_reader_next_pack_id].isUse()) - break; - index_reader_next_row_id += pack_stats[index_reader_next_pack_id].rows; - ++index_reader_next_pack_id; + // Since we do not call `reader.read()` here, we need to pop the read_block_infos manually. + reader.read_block_infos.pop_front(); } - Block block; - block.setStartOffset(block_start_row_id); - - size_t read_rows = index_reader_next_row_id - block_start_row_id; - auto vec_column = vec_cd.type->createColumn(); - - auto begin = std::lower_bound(sorted_results.cbegin(), sorted_results.cend(), block_start_row_id); - auto end = std::lower_bound(begin, sorted_results.cend(), index_reader_next_row_id); - const std::span block_selected_rows{begin, end}; - vec_index_reader->read(vec_column, block_selected_rows, block_start_row_id, read_rows); - - block.insert(ColumnWithTypeAndName{std::move(vec_column), vec_cd.type, vec_cd.name, vec_cd.id}); - return {block, block_selected_rows.size()}; -} - -std::tuple DMFileWithVectorIndexBlockInputStream::readByFollowingOtherColumns() -{ - // First read other columns. - Stopwatch w; - auto block_others = reader.read(); - duration_read_from_other_columns_seconds += w.elapsedSeconds(); - - if (!block_others) - return {}; - - auto read_rows = block_others.rows(); - - // Using vec_cd.type to construct a Column directly instead of using - // the type from dmfile, so that we don't need extra transforms - // (e.g. wrap with a Nullable). vec_index_reader is compatible with - // both Nullable and NotNullable. - auto vec_column = vec_cd.type->createColumn(); - - // Then read from vector index for the same pack. - auto begin = std::lower_bound(sorted_results.cbegin(), sorted_results.cend(), block_others.startOffset()); - auto end = std::lower_bound(begin, sorted_results.cend(), block_others.startOffset() + read_rows); - const std::span block_selected_rows{begin, end}; - vec_index_reader->read(vec_column, block_selected_rows, block_others.startOffset(), read_rows); - - // Re-assemble block using the same layout as header. - // Insert the vector column into the block. auto index = header.getPositionByName(vec_cd.name); - block_others.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name)); - return {block_others, block_selected_rows.size()}; + block.insert(index, ColumnWithTypeAndName{std::move(vec_column), vec_cd.type, vec_cd.name, vec_cd.id}); + + block.setStartOffset(start_row_offset); + block.setRSResult(rs_result); + return block; } std::vector DMFileWithVectorIndexBlockInputStream::load() @@ -227,46 +135,77 @@ void DMFileWithVectorIndexBlockInputStream::internalLoad() for (const auto & row : search_results) sorted_results.push_back(row.key); - updateRSResult(); + updateReadBlockInfos(); } -void DMFileWithVectorIndexBlockInputStream::updateRSResult() +void DMFileWithVectorIndexBlockInputStream::updateReadBlockInfos() { // Vector index is very likely to filter out some packs. For example, // if we query for Top 1, then only 1 pack will be remained. So we - // update the pack filter used by the DMFileReader to avoid reading - // unnecessary data for other columns. + // update the reader's read_block_infos to avoid reading unnecessary data for other columns. + + // The following logic is nearly the same with DMFileReader::initReadBlockInfos. + + auto & read_block_infos = reader.read_block_infos; + const auto & pack_offset = reader.pack_offset; + + read_block_infos.clear(); const auto & pack_stats = dmfile->getPackStats(); - auto & pack_res = reader.pack_filter.getPackRes(); + const auto & pack_res = reader.pack_filter.getPackResConst(); - auto results_it = sorted_results.begin(); - UInt32 pack_start = 0; - for (size_t pack_id = 0; pack_id < dmfile->getPacks(); ++pack_id) + // Update valid_packs_before_search + for (const auto res : pack_res) + valid_packs_before_search += res.isUse(); + + // Update read_block_infos + size_t start_pack_id = 0; + size_t read_rows = 0; + auto prev_block_pack_res = RSResult::All; + auto sorted_results_it = sorted_results.cbegin(); + size_t pack_id = 0; + for (; pack_id < pack_stats.size(); ++pack_id) { - if (pack_res[pack_id].isUse()) - ++valid_packs_before_search; - - bool pack_has_result = false; - UInt32 pack_end = pack_start + pack_stats[pack_id].rows; - while (results_it != sorted_results.end() && *results_it >= pack_start && *results_it < pack_end) + if (sorted_results_it == sorted_results.cend()) + break; + auto begin = std::lower_bound(sorted_results_it, sorted_results.cend(), pack_offset[pack_id]); + auto end = std::lower_bound(begin, sorted_results.cend(), pack_offset[pack_id] + pack_stats[pack_id].rows); + bool is_use = begin != end; + bool reach_limit = read_rows >= reader.rows_threshold_per_read; + bool break_all_match = prev_block_pack_res.allMatch() && !pack_res[pack_id].allMatch() + && read_rows >= reader.rows_threshold_per_read / 2; + + if (!is_use) { - pack_has_result = true; - ++results_it; + if (read_rows > 0) + read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, prev_block_pack_res, read_rows); + start_pack_id = pack_id + 1; + read_rows = 0; + prev_block_pack_res = RSResult::All; + } + else if (reach_limit || break_all_match) + { + if (read_rows > 0) + read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, prev_block_pack_res, read_rows); + start_pack_id = pack_id; + read_rows = pack_stats[pack_id].rows; + prev_block_pack_res = pack_res[pack_id]; + } + else + { + prev_block_pack_res = prev_block_pack_res && pack_res[pack_id]; + read_rows += pack_stats[pack_id].rows; } - if (!pack_has_result) - pack_res[pack_id] = RSResult::None; - - if (pack_res[pack_id].isUse()) - ++valid_packs_after_search; - - pack_start = pack_end; + sorted_results_it = end; } + if (read_rows > 0) + read_block_infos.emplace_back(start_pack_id, pack_id - start_pack_id, prev_block_pack_res, read_rows); - RUNTIME_CHECK_MSG( - results_it == sorted_results.end(), - "All packs has been visited but not all results are consumed"); + // Update valid_packs_after_search + for (const auto & block_info : read_block_infos) + valid_packs_after_search += block_info.pack_count; + RUNTIME_CHECK_MSG(sorted_results_it == sorted_results.cend(), "All results are not consumed"); loaded = true; } @@ -276,7 +215,7 @@ void DMFileWithVectorIndexBlockInputStream::setSelectedRows(const std::span & selected_rows) override; private: - // Only used in readByIndexReader() - size_t index_reader_next_pack_id = 0; - // Only used in readByIndexReader() - size_t index_reader_next_row_id = 0; - - // Read data totally from the VectorColumnFromIndexReader. This is used - // when there is no other column to read. - std::tuple readByIndexReader(); - - // Read data from other columns first, then read from VectorColumnFromIndexReader. This is used - // when there are other columns to read. - std::tuple readByFollowingOtherColumns(); - // Load vector index and update sorted_results. void internalLoad(); - // Update the RSResult according to the sorted_results. - void updateRSResult(); + // Update the read_block_infos according to the sorted_results. + void updateReadBlockInfos(); private: const LoggerPtr log; diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.cpp b/dbms/src/Storages/DeltaMerge/ScanContext.cpp index 64191ac89f9..f1b9e095988 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.cpp +++ b/dbms/src/Storages/DeltaMerge/ScanContext.cpp @@ -139,7 +139,6 @@ String ScanContext::toJson() const json->set("mvcc_input_rows", mvcc_input_rows.load()); json->set("mvcc_input_bytes", mvcc_input_bytes.load()); json->set("mvcc_skip_rows", mvcc_input_rows.load() - mvcc_output_rows.load()); - json->set("late_materialization_skip_rows", late_materialization_skip_rows.load()); json->set("learner_read_time", fmt::format("{:.3f}ms", learner_read_ns.load() / NS_TO_MS_SCALE)); json->set("create_snapshot_time", fmt::format("{:.3f}ms", create_snapshot_time_ns.load() / NS_TO_MS_SCALE)); diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.h b/dbms/src/Storages/DeltaMerge/ScanContext.h index 5084296d34a..f963104adab 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.h +++ b/dbms/src/Storages/DeltaMerge/ScanContext.h @@ -78,7 +78,6 @@ class ScanContext std::atomic mvcc_input_rows{0}; std::atomic mvcc_input_bytes{0}; std::atomic mvcc_output_rows{0}; - std::atomic late_materialization_skip_rows{0}; // Learner read std::atomic learner_read_ns{0}; @@ -133,7 +132,6 @@ class ScanContext mvcc_input_rows = tiflash_scan_context_pb.mvcc_input_rows(); mvcc_input_bytes = tiflash_scan_context_pb.mvcc_input_bytes(); mvcc_output_rows = tiflash_scan_context_pb.mvcc_output_rows(); - late_materialization_skip_rows = tiflash_scan_context_pb.lm_skip_rows(); build_bitmap_time_ns = tiflash_scan_context_pb.total_build_bitmap_ms() * 1000000; num_stale_read = tiflash_scan_context_pb.stale_read_regions(); build_inputstream_time_ns = tiflash_scan_context_pb.total_build_inputstream_ms() * 1000000; @@ -186,7 +184,6 @@ class ScanContext tiflash_scan_context_pb.set_mvcc_input_rows(mvcc_input_rows); tiflash_scan_context_pb.set_mvcc_input_bytes(mvcc_input_bytes); tiflash_scan_context_pb.set_mvcc_output_rows(mvcc_output_rows); - tiflash_scan_context_pb.set_lm_skip_rows(late_materialization_skip_rows); tiflash_scan_context_pb.set_total_build_bitmap_ms(build_bitmap_time_ns / 1000000); tiflash_scan_context_pb.set_stale_read_regions(num_stale_read); tiflash_scan_context_pb.set_total_build_inputstream_ms(build_inputstream_time_ns / 1000000); @@ -243,7 +240,6 @@ class ScanContext mvcc_input_rows += other.mvcc_input_rows; mvcc_input_bytes += other.mvcc_input_bytes; mvcc_output_rows += other.mvcc_output_rows; - late_materialization_skip_rows += other.late_materialization_skip_rows; learner_read_ns += other.learner_read_ns; create_snapshot_time_ns += other.create_snapshot_time_ns; @@ -300,7 +296,6 @@ class ScanContext mvcc_input_rows += other.mvcc_input_rows(); mvcc_input_bytes += other.mvcc_input_bytes(); mvcc_output_rows += other.mvcc_output_rows(); - late_materialization_skip_rows += other.lm_skip_rows(); build_bitmap_time_ns += other.total_build_bitmap_ms() * 1000000; num_stale_read += other.stale_read_regions(); build_inputstream_time_ns += other.total_build_inputstream_ms() * 1000000; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index c355f12d46d..237716abe81 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -3355,7 +3355,7 @@ SkippableBlockInputStreamPtr Segment::getConcatSkippableBlockInputStream( assert(stream != nullptr); stream->appendChild(persisted_files_stream, persisted_files->getRows()); stream->appendChild(mem_table_stream, memtable->getRows()); - return ConcatVectorIndexBlockInputStream::build(stream, ann_query_info); + return ConcatVectorIndexBlockInputStream::build(bitmap_filter, stream, ann_query_info); } BlockInputStreamPtr Segment::getLateMaterializationStream( @@ -3519,7 +3519,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( read_data_block_rows); } - auto skippable_stream = getConcatSkippableBlockInputStream( + auto stream = getConcatSkippableBlockInputStream( bitmap_filter, segment_snap, dm_context, @@ -3529,8 +3529,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( start_ts, read_data_block_rows, ReadTag::Query); - auto stream = std::make_shared(columns_to_read, skippable_stream, bitmap_filter); - if (auto * vector_index_stream = dynamic_cast(skippable_stream.get()); + if (auto * vector_index_stream = dynamic_cast(stream.get()); vector_index_stream) { // For vector search, there are more likely to return small blocks from different @@ -3542,7 +3541,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( /*min_block_size_bytes=*/0, dm_context.tracing_id); } - return stream; + return std::make_shared(columns_to_read, stream, bitmap_filter); } // clipBlockRows try to limit the block size not exceed settings.max_block_bytes. diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index 738eb68d0c8..0672993e4f4 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -170,9 +170,15 @@ class DMFileMetaV2Test : public DB::base::TiFlashStorageTestBasic ASSERT_EQ(n, s.size()); } - static RSResults & getReaderPackRes(DMFileBlockInputStreamPtr & stream) + static void skipFirstPack(DMFileBlockInputStreamPtr & stream) { - return stream->reader.pack_filter.getPackRes(); + const auto & pack_stats = stream->reader.dmfile->getPackStats(); + auto [pack_id, pack_count, res, rows] = stream->reader.read_block_infos.front(); + stream->reader.read_block_infos.pop_front(); + stream->reader.read_block_infos + .emplace_front(pack_id + 1, pack_count - 1, res, rows - pack_stats[pack_id].rows); + stream->reader.read_block_infos.emplace_front(pack_id, 1, res, pack_stats[pack_id].rows); + ASSERT_EQ(stream->skipNextBlock(), pack_stats[pack_id].rows); } protected: @@ -910,10 +916,7 @@ try auto stream = builder.setColumnCache(column_cache) .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}, std::make_shared()); - auto & pack_res = getReaderPackRes(stream); - pack_res[1] = RSResult::None; - stream->skipNextBlock(); - pack_res[1] = RSResult::Some; + skipFirstPack(stream); std::vector partial_expect_arr_values; partial_expect_arr_values.insert( partial_expect_arr_values.cend(), @@ -1125,10 +1128,7 @@ try auto stream = builder.setColumnCache(column_cache) .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}, std::make_shared()); - auto & pack_res = getReaderPackRes(stream); - pack_res[1] = RSResult::None; - ASSERT_EQ(stream->skipNextBlock(), num_rows_write / 3); - pack_res[1] = RSResult::Some; + skipFirstPack(stream); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name}),