Skip to content

Commit

Permalink
enhance: pass partition key scalar info if enable for vector mem index
Browse files Browse the repository at this point in the history
Signed-off-by: chasingegg <[email protected]>
  • Loading branch information
chasingegg committed Jan 9, 2025
1 parent aceb972 commit d9accac
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 57 deletions.
13 changes: 13 additions & 0 deletions internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ VectorMemIndex<T>::Build(const Config& config) {
auto field_datas =
file_manager_->CacheRawDataToMemory(insert_files.value());

auto opt_fields = GetValueFromConfig<OptFieldT>(config, VEC_OPT_FIELDS);
std::vector<std::vector<uint32_t>> scalar_info;
if (opt_fields.has_value() && index_.IsAdditionalScalarSupported() &&
config["partition_key_isolation"]) {
scalar_info = file_manager_->CacheOptFieldToMemory(opt_fields.value());
}

Config build_config;
build_config.update(config);
build_config.erase("insert_files");
Expand Down Expand Up @@ -316,6 +323,9 @@ VectorMemIndex<T>::Build(const Config& config) {
field_datas.clear();

auto dataset = GenDataset(total_num_rows, dim, buf.get());
if (!scalar_info.empty()) {
dataset->Set(knowhere::meta::SCALAR_INFO, std::move(scalar_info));
}
BuildWithDataset(dataset, build_config);
} else {
// sparse
Expand Down Expand Up @@ -346,6 +356,9 @@ VectorMemIndex<T>::Build(const Config& config) {
}
auto dataset = GenDataset(total_rows, dim, vec.data());
dataset->SetIsSparse(true);
if (!scalar_info.empty()) {
dataset->Set(knowhere::meta::SCALAR_INFO, std::move(scalar_info));
}
BuildWithDataset(dataset, build_config);
}
}
Expand Down
52 changes: 3 additions & 49 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,16 +357,6 @@ DiskFileManagerImpl::CacheTextLogToDisk(
}
}

void
SortByPath(std::vector<std::string>& paths) {
std::sort(paths.begin(),
paths.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of("/") + 1)) <
std::stol(b.substr(b.find_last_of("/") + 1));
});
}

template <typename DataType>
std::string
DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
Expand Down Expand Up @@ -478,20 +468,6 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
return local_data_path;
}

template <typename T, typename = void>
struct has_native_type : std::false_type {};
template <typename T>
struct has_native_type<T, std::void_t<typename T::NativeType>>
: std::true_type {};
template <DataType T>
using DataTypeNativeOrVoid =
typename std::conditional<has_native_type<TypeTraits<T>>::value,
typename TypeTraits<T>::NativeType,
void>::type;
template <DataType T>
using DataTypeToOffsetMap =
std::unordered_map<DataTypeNativeOrVoid<T>, int64_t>;

template <DataType T>
bool
WriteOptFieldIvfDataImpl(
Expand All @@ -513,7 +489,7 @@ WriteOptFieldIvfDataImpl(
}

// Do not write to disk if there is only one value
if (mp.size() == 1) {
if (mp.size() <= 1) {
return false;
}

Expand Down Expand Up @@ -624,23 +600,10 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
local_chunk_manager, segment_id, vec_field_id) +
std::string(VEC_OPT_FIELDS);
local_chunk_manager->CreateFile(local_data_path);

std::vector<FieldDataPtr> field_datas;
std::vector<std::string> batch_files;
uint64_t write_offset = 0;
WriteOptFieldsIvfMeta(
local_chunk_manager, local_data_path, num_of_fields, write_offset);

auto FetchRawData = [&]() {
auto fds = GetObjectData(rcm_.get(), batch_files);
for (size_t i = 0; i < batch_files.size(); ++i) {
auto data = fds[i].get()->GetFieldData();
field_datas.emplace_back(data);
}
};

auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
std::unordered_set<int64_t> actual_field_ids;
for (auto& [field_id, tup] : fields_map) {
const auto& field_type = std::get<1>(tup);
Expand All @@ -650,19 +613,10 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
return "";
}

std::vector<FieldDataPtr>().swap(field_datas);
SortByPath(field_paths);
std::vector<FieldDataPtr> field_datas =
FetchFieldData(rcm_.get(), field_paths);

for (auto& file : field_paths) {
if (batch_files.size() >= parallel_degree) {
FetchRawData();
batch_files.clear();
}
batch_files.emplace_back(file);
}
if (batch_files.size() > 0) {
FetchRawData();
}
if (WriteOptFieldIvfData(field_type,
field_id,
local_chunk_manager,
Expand Down
90 changes: 84 additions & 6 deletions internal/core/src/storage/MemFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,7 @@ MemFileManagerImpl::LoadIndexToMemory(
std::vector<FieldDataPtr>
MemFileManagerImpl::CacheRawDataToMemory(
std::vector<std::string> remote_files) {
std::sort(remote_files.begin(),
remote_files.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of("/") + 1)) <
std::stol(b.substr(b.find_last_of("/") + 1));
});
SortByPath(remote_files);

auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
Expand Down Expand Up @@ -160,6 +155,89 @@ MemFileManagerImpl::CacheRawDataToMemory(
return field_datas;
}

template <DataType T>
std::vector<std::vector<uint32_t>>
GetOptFieldIvfDataImpl(const std::vector<FieldDataPtr>& field_datas) {
using FieldDataT = DataTypeNativeOrVoid<T>;
std::unordered_map<FieldDataT, std::vector<uint32_t>> mp;
uint32_t offset = 0;
for (const auto& field_data : field_datas) {
for (int64_t i = 0; i < field_data->get_num_rows(); ++i) {
auto val =
*reinterpret_cast<const FieldDataT*>(field_data->RawValue(i));
mp[val].push_back(offset++);
}
}

// opt field data is not used if there is only one value
if (mp.size() <= 1) {
return {};
}
std::vector<std::vector<uint32_t>> scalar_info;
scalar_info.reserve(mp.size());
for (auto& [field_id, tup] : mp) {
scalar_info.emplace_back(std::move(tup));
}
LOG_INFO("Get opt fields with {} categories", scalar_info.size());
return scalar_info;
}

std::vector<std::vector<uint32_t>>
GetOptFieldIvfData(const DataType& dt,
const std::vector<FieldDataPtr>& field_datas) {
switch (dt) {
case DataType::BOOL:
return GetOptFieldIvfDataImpl<DataType::BOOL>(field_datas);
case DataType::INT8:
return GetOptFieldIvfDataImpl<DataType::INT8>(field_datas);
case DataType::INT16:
return GetOptFieldIvfDataImpl<DataType::INT16>(field_datas);
case DataType::INT32:
return GetOptFieldIvfDataImpl<DataType::INT32>(field_datas);
case DataType::INT64:
return GetOptFieldIvfDataImpl<DataType::INT64>(field_datas);
case DataType::FLOAT:
return GetOptFieldIvfDataImpl<DataType::FLOAT>(field_datas);
case DataType::DOUBLE:
return GetOptFieldIvfDataImpl<DataType::DOUBLE>(field_datas);
case DataType::STRING:
return GetOptFieldIvfDataImpl<DataType::STRING>(field_datas);
case DataType::VARCHAR:
return GetOptFieldIvfDataImpl<DataType::VARCHAR>(field_datas);
default:
LOG_WARN("Unsupported data type in optional scalar field: ", dt);
return {};
}
return {};
}

std::vector<std::vector<uint32_t>>
MemFileManagerImpl::CacheOptFieldToMemory(OptFieldT& fields_map) {
const uint32_t num_of_fields = fields_map.size();
if (0 == num_of_fields) {
return {};
} else if (num_of_fields > 1) {
PanicInfo(
ErrorCode::NotImplemented,
"vector index build with multiple fields is not supported yet");
}

std::unordered_set<int64_t> actual_field_ids;
for (auto& [field_id, tup] : fields_map) {
const auto& field_type = std::get<1>(tup);
auto& field_paths = std::get<2>(tup);
if (0 == field_paths.size()) {
LOG_WARN("optional field {} has no data", field_id);
return {};
}

SortByPath(field_paths);
std::vector<FieldDataPtr> field_datas =
FetchFieldData(rcm_.get(), field_paths);
return GetOptFieldIvfData(field_type, field_datas);
}
}

std::optional<bool>
MemFileManagerImpl::IsExisted(const std::string& filename) noexcept {
// TODO: implement this interface
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/storage/MemFileManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class MemFileManagerImpl : public FileManagerImpl {
return remote_paths_to_size_;
}

std::vector<std::vector<uint32_t>>
CacheOptFieldToMemory(OptFieldT& fields_map);

private:
// remote file path
std::map<std::string, int64_t> remote_paths_to_size_;
Expand Down
28 changes: 28 additions & 0 deletions internal/core/src/storage/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,4 +865,32 @@ MergeFieldData(std::vector<FieldDataPtr>& data_array) {
return merged_data;
}

std::vector<FieldDataPtr>
FetchFieldData(ChunkManager* cm, const std::vector<std::string>& remote_files) {
std::vector<FieldDataPtr> field_datas;
std::vector<std::string> batch_files;
auto FetchRawData = [&]() {
auto fds = GetObjectData(cm, batch_files);
for (size_t i = 0; i < batch_files.size(); ++i) {
auto data = fds[i].get()->GetFieldData();
field_datas.emplace_back(data);
}
};

auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);

for (auto& file : remote_files) {
if (batch_files.size() >= parallel_degree) {
FetchRawData();
batch_files.clear();
}
batch_files.emplace_back(file);
}
if (batch_files.size() > 0) {
FetchRawData();
}
return field_datas;
}

} // namespace milvus::storage
27 changes: 27 additions & 0 deletions internal/core/src/storage/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,31 @@ CollectFieldDataChannel(FieldDataChannelPtr& channel);
FieldDataPtr
MergeFieldData(std::vector<FieldDataPtr>& data_array);

template <typename T, typename = void>
struct has_native_type : std::false_type {};
template <typename T>
struct has_native_type<T, std::void_t<typename T::NativeType>>
: std::true_type {};
template <DataType T>
using DataTypeNativeOrVoid =
typename std::conditional<has_native_type<TypeTraits<T>>::value,
typename TypeTraits<T>::NativeType,
void>::type;
template <DataType T>
using DataTypeToOffsetMap =
std::unordered_map<DataTypeNativeOrVoid<T>, int64_t>;

std::vector<FieldDataPtr>
FetchFieldData(ChunkManager* cm, const std::vector<std::string>& batch_files);

inline void
SortByPath(std::vector<std::string>& paths) {
std::sort(paths.begin(),
paths.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of("/") + 1)) <
std::stol(b.substr(b.find_last_of("/") + 1));
});
}

} // namespace milvus::storage
4 changes: 2 additions & 2 deletions internal/core/thirdparty/knowhere/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
# Update KNOWHERE_VERSION for the first occurrence
milvus_add_pkg_config("knowhere")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( KNOWHERE_VERSION 9a6a8dfb )
set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git")
set( KNOWHERE_VERSION support-mv )
set( GIT_REPOSITORY "https://github.com/chasingegg/knowhere.git")
message(STATUS "Knowhere repo: ${GIT_REPOSITORY}")
message(STATUS "Knowhere version: ${KNOWHERE_VERSION}")

Expand Down

0 comments on commit d9accac

Please sign in to comment.