Skip to content

Commit

Permalink
making some progress
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Arceo <[email protected]>
  • Loading branch information
franciscojavierarceo committed Dec 14, 2024
1 parent a28fe1f commit 70b630a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
)

from feast import Entity
from feast.type_map import PROTO_VALUE_TO_VALUE_TYPE_MAP
from feast.feature_view import FeatureView
from feast.infra.infra_object import InfraObject
from feast.infra.key_encoding_utils import (
Expand All @@ -25,11 +24,12 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.type_map import PROTO_VALUE_TO_VALUE_TYPE_MAP
from feast.types import VALUE_TYPES_TO_FEAST_TYPES, Array, PrimitiveFeastType, ValueType
from feast.utils import (
_build_retrieve_online_document_record,
to_naive_utc,
)
from feast.types import VALUE_TYPES_TO_FEAST_TYPES, PrimitiveFeastType, Array, ValueType

PROTO_TO_MILVUS_TYPE_MAPPING = {
PROTO_VALUE_TO_VALUE_TYPE_MAP['bytes_val']: DataType.STRING,
Expand Down Expand Up @@ -75,6 +75,7 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig):
metric_type: Optional[str] = "L2"
embedding_dim: Optional[int] = 128
vector_enabled: Optional[bool] = True
nlist: Optional[int] = 128


class MilvusOnlineStore(OnlineStore):
Expand Down Expand Up @@ -132,7 +133,7 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection:
index_params = {
"index_type": config.online_store.index_type,
"metric_type": config.online_store.metric_type,
"params": {"nlist": 128},
"params": {"nlist": config.online_store.nlist},
}
for vector_field in schema.fields:
if vector_field.dtype in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]:
Expand Down Expand Up @@ -160,10 +161,12 @@ def online_write_batch(

entity_batch_to_insert = []
for entity_key, values_dict, timestamp, created_ts in data:
# need to construct the composite primary key also need to handle the fact that entities are a list
entity_key_str = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
composite_key_name = '_'.join([str(value) for value in entity_key.join_keys]) + "_pk"
timestamp_int = int(to_naive_utc(timestamp).timestamp() * 1e6)
created_ts_int = (
int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0
Expand All @@ -173,12 +176,12 @@ def online_write_batch(
vector_list = getattr(values_dict[feature_name], vector_list_type_name, None)
if vector_list:
vector_values = getattr(values_dict[feature_name], vector_list_type_name).val
if vector_values != []:
if vector_values != []:
# Note here we are over-writing the feature and collapsing the list into a single value
values_dict[feature_name] = vector_values

single_entity_record = {
"entity_key": entity_key_str,
composite_key_name: entity_key_str,
"event_ts": timestamp_int,
"created_ts": created_ts_int,
}
Expand Down
6 changes: 2 additions & 4 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,9 @@ def environment(request, worker_id):
e.teardown()


from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
from tests.integration.feature_repos.universal.online_store.milvus import (
MilvusOnlineStoreCreator,
)
from tests.integration.feature_repos.universal.online_store.milvus import MilvusOnlineStoreCreator
from tests.integration.feature_repos.universal.data_sources.file import FileDataSourceCreator


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ def create_online_store(self) -> Dict[str, Any]:
"port": int(port),
"index_type": "IVF_FLAT",
"metric_type": "L2",
"embedding_dim": 128,
"embedding_dim": 3,
"vector_enabled": True,
"nlist": 1,
}

def teardown(self):
Expand Down

0 comments on commit 70b630a

Please sign in to comment.