Skip to content

Commit

Permalink
refine iterator params to differentiate limit for clients(#26358 #263…
Browse files Browse the repository at this point in the history
…97) (#1651)

Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han authored Aug 22, 2023
1 parent e353eac commit 0a70b58
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 36 deletions.
55 changes: 46 additions & 9 deletions examples/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
DIM = 8
CLEAR_EXIST = False


def re_create_collection():
if utility.has_collection(COLLECTION_NAME) and CLEAR_EXIST:
utility.drop_collection(COLLECTION_NAME)
Expand Down Expand Up @@ -77,8 +76,7 @@ def prepare_data(collection):
def query_iterate_collection_no_offset(collection):
expr = f"10 <= {AGE} <= 14"
query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE],
offset=0, limit=5, consistency_level=CONSISTENCY_LEVEL,
iteration_extension_reduce_rate=10)
offset=0, batch_size=5, consistency_level=CONSISTENCY_LEVEL)
page_idx = 0
while True:
res = query_iterator.next()
Expand All @@ -94,8 +92,23 @@ def query_iterate_collection_no_offset(collection):
def query_iterate_collection_with_offset(collection):
expr = f"10 <= {AGE} <= 14"
query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE],
offset=10, limit=5, consistency_level=CONSISTENCY_LEVEL,
iteration_extension_reduce_rate=10)
offset=10, batch_size=50, consistency_level=CONSISTENCY_LEVEL)
page_idx = 0
while True:
res = query_iterator.next()
if len(res) == 0:
print("query iteration finished, close")
query_iterator.close()
break
for i in range(len(res)):
print(res[i])
page_idx += 1
print(f"page{page_idx}-------------------------")

def query_iterate_collection_with_limit(collection):
expr = f"10 <= {AGE} <= 44"
query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE],
batch_size=80, limit=530, consistency_level=CONSISTENCY_LEVEL)
page_idx = 0
while True:
res = query_iterator.next()
Expand All @@ -117,28 +130,52 @@ def search_iterator_collection(collection):
"metric_type": "L2",
"params": {"nprobe": 10, "radius": 1.0},
}
search_iterator = collection.search_iterator(vectors_to_search, PICTURE, search_params, limit=5,
search_iterator = collection.search_iterator(vectors_to_search, PICTURE, search_params, batch_size=500,
output_fields=[USER_ID])
page_idx = 0
while True:
res = search_iterator.next()
if len(res[0]) == 0:
if len(res) == 0:
print("query iteration finished, close")
search_iterator.close()
break
for i in range(len(res[0])):
print(res[0][i])
for i in range(len(res)):
print(res[i])
page_idx += 1
print(f"page{page_idx}-------------------------")

def search_iterator_collection_with_limit(collection):
SEARCH_NQ = 1
DIM = 8
rng = np.random.default_rng(seed=19530)
vectors_to_search = rng.random((SEARCH_NQ, DIM))
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10, "radius": 1.0},
}
search_iterator = collection.search_iterator(vectors_to_search, PICTURE, search_params, batch_size=200, limit=755,
output_fields=[USER_ID])
page_idx = 0
while True:
res = search_iterator.next()
if len(res) == 0:
print("query iteration finished, close")
search_iterator.close()
break
for i in range(len(res)):
print(res[i])
page_idx += 1
print(f"page{page_idx}-------------------------")

def main():
connections.connect("default", host=HOST, port=PORT)
collection = re_create_collection()
collection = prepare_data(collection)
query_iterate_collection_no_offset(collection)
query_iterate_collection_with_offset(collection)
query_iterate_collection_with_limit(collection)
search_iterator_collection(collection)
search_iterator_collection_with_limit(collection)


if __name__ == '__main__':
Expand Down
9 changes: 1 addition & 8 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from . import blob, entity_helper, ts_utils
from .check import check_pass_param, is_legal_collection_properties
from .constants import DEFAULT_CONSISTENCY_LEVEL, ITERATION_EXTENSION_REDUCE_RATE
from .constants import DEFAULT_CONSISTENCY_LEVEL
from .types import DataType, PlaceholderType, get_consistency_level
from .utils import traverse_info, traverse_rows_info

Expand Down Expand Up @@ -835,13 +835,6 @@ def query_request(
req.query_params.append(
common_types.KeyValuePair(key="ignore_growing", value=str(ignore_growing))
)

use_iteration_extension_reduce_rate = kwargs.get(ITERATION_EXTENSION_REDUCE_RATE, 0)
req.query_params.append(
common_types.KeyValuePair(
key=ITERATION_EXTENSION_REDUCE_RATE, value=str(use_iteration_extension_reduce_rate)
)
)
return req

@classmethod
Expand Down
11 changes: 10 additions & 1 deletion pymilvus/orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from pymilvus.settings import Config

from .connections import connections
from .constants import UNLIMITED
from .future import MutationFuture, SearchFuture
from .index import Index
from .iterator import QueryIterator, SearchIterator
Expand Down Expand Up @@ -798,7 +799,8 @@ def search_iterator(
data: List,
anns_field: str,
param: Dict,
limit: int,
batch_size: Optional[int] = 1000,
limit: Optional[int] = UNLIMITED,
expr: Optional[str] = None,
partition_names: Optional[List[str]] = None,
output_fields: Optional[List[str]] = None,
Expand All @@ -814,6 +816,7 @@ def search_iterator(
data=data,
ann_field=anns_field,
param=param,
batch_size=batch_size,
limit=limit,
expr=expr,
partition_names=partition_names,
Expand Down Expand Up @@ -919,15 +922,21 @@ def query(

def query_iterator(
self,
batch_size: Optional[int] = 1000,
limit: Optional[int] = UNLIMITED,
expr: Optional[str] = None,
output_fields: Optional[List[str]] = None,
partition_names: Optional[List[str]] = None,
timeout: Optional[float] = None,
**kwargs,
):
if expr is not None and not isinstance(expr, str):
raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(expr))
return QueryIterator(
connection=self._get_connection(),
collection_name=self._name,
batch_size=batch_size,
limit=limit,
expr=expr,
output_fields=output_fields,
partition_names=partition_names,
Expand Down
5 changes: 4 additions & 1 deletion pymilvus/orm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
CALC_DIST_DIM = "dim"

OFFSET = "offset"
LIMIT = "limit"
MILVUS_LIMIT = "limit"
BATCH_SIZE = "batch_size"
ID = "id"
METRIC_TYPE = "metric_type"
PARAMS = "params"
Expand All @@ -43,3 +44,5 @@
DEFAULT_MIN_COSINE_DISTANCE = -2.0
MAX_FILTERED_IDS_COUNT_ITERATION = 100000
INT64_MAX = 9223372036854775807
MAX_BATCH_SIZE: int = 16384
UNLIMITED: int = -1
Loading

0 comments on commit 0a70b58

Please sign in to comment.