-
Notifications
You must be signed in to change notification settings - Fork 135
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: luyuncheng <[email protected]>
- Loading branch information
1 parent
5139b16
commit 2a61fcd
Showing
9 changed files
with
1,121 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
259 changes: 259 additions & 0 deletions
259
src/main/java/org/opensearch/knn/index/fetch/KNNFetchSubPhase.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,259 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
package org.opensearch.knn.index.fetch; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.extern.log4j.Log4j2; | ||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.search.DocIdSetIterator; | ||
import org.apache.lucene.search.Query; | ||
import org.apache.lucene.search.ScoreMode; | ||
import org.apache.lucene.search.Scorer; | ||
import org.apache.lucene.search.Weight; | ||
import org.apache.lucene.util.BitSet; | ||
import org.opensearch.common.io.stream.BytesStreamOutput; | ||
import org.opensearch.common.lucene.search.Queries; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.index.IndexSettings; | ||
import org.opensearch.index.mapper.DocValueFetcher; | ||
import org.opensearch.index.mapper.DocumentMapper; | ||
import org.opensearch.index.mapper.MappedFieldType; | ||
import org.opensearch.index.mapper.MapperService; | ||
import org.opensearch.index.mapper.ObjectMapper; | ||
import org.opensearch.index.mapper.ValueFetcher; | ||
import org.opensearch.knn.index.KNNSettings; | ||
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper; | ||
import org.opensearch.search.SearchHit; | ||
import org.opensearch.search.fetch.FetchContext; | ||
import org.opensearch.search.fetch.FetchSubPhase; | ||
import org.opensearch.search.fetch.FetchSubPhaseProcessor; | ||
import org.opensearch.search.internal.ContextIndexSearcher; | ||
import org.opensearch.search.lookup.SourceLookup; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.opensearch.knn.common.KNNConstants.BYTES_PER_KILOBYTES; | ||
|
||
/** | ||
* Fetch sub phase which pull data from doc values. | ||
* and fulfill the value into source map | ||
*/ | ||
@Log4j2 | ||
public class KNNFetchSubPhase implements FetchSubPhase { | ||
|
||
@Override | ||
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) throws IOException { | ||
IndexSettings indexSettings = fetchContext.getIndexSettings(); | ||
if (!KNNSettings.isKNNSyntheticSourceEnabled(indexSettings)) { | ||
log.debug("Synthetic is disabled for index: {}", fetchContext.getIndexName()); | ||
return null; | ||
} | ||
MapperService mapperService = fetchContext.mapperService(); | ||
|
||
List<DocValueField> fields = new ArrayList<>(); | ||
for (MappedFieldType mappedFieldType : mapperService.fieldTypes()) { | ||
if (mappedFieldType != null && mappedFieldType instanceof KNNVectorFieldMapper.KNNVectorFieldType) { | ||
String fieldName = mappedFieldType.name(); | ||
ValueFetcher fetcher = new DocValueFetcher( | ||
mappedFieldType.docValueFormat(null, null), | ||
fetchContext.searchLookup().doc().getForField(mappedFieldType) | ||
); | ||
fields.add(new DocValueField(fieldName, fetcher)); | ||
} | ||
} | ||
return new KNNFetchSubPhaseProcessor(fetchContext, fields); | ||
} | ||
|
||
@AllArgsConstructor | ||
@Getter | ||
class KNNFetchSubPhaseProcessor implements FetchSubPhaseProcessor { | ||
|
||
private final FetchContext fetchContext; | ||
private final List<DocValueField> fields; | ||
|
||
@Override | ||
public void setNextReader(LeafReaderContext readerContext) throws IOException { | ||
for (DocValueField f : fields) { | ||
f.fetcher.setNextReader(readerContext); | ||
} | ||
} | ||
|
||
@Override | ||
public void process(HitContext hitContext) throws IOException { | ||
MapperService mapperService = fetchContext.mapperService(); | ||
final boolean hasNested = mapperService.hasNested(); | ||
SearchHit hit = hitContext.hit(); | ||
Map<String, Object> maps = hit.getSourceAsMap(); | ||
if (maps == null) { | ||
// when source is disabled, return | ||
return; | ||
} | ||
|
||
if (hasNested) { | ||
syntheticNestedFieldWithDocValues(mapperService, hitContext, maps); | ||
} | ||
for (DocValueField f : fields) { | ||
if (maps.containsKey(f.field)) { | ||
continue; | ||
} | ||
List<Object> docValuesSource = f.fetcher.fetchValues(hitContext.sourceLookup()); | ||
if (docValuesSource.size() > 0) { | ||
maps.put(f.field, docValuesSource.get(0)); | ||
} | ||
} | ||
BytesStreamOutput streamOutput = new BytesStreamOutput(BYTES_PER_KILOBYTES); | ||
XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), streamOutput); | ||
builder.value(maps); | ||
hitContext.hit().sourceRef(BytesReference.bytes(builder)); | ||
} | ||
|
||
protected void syntheticNestedFieldWithDocValues(MapperService mapperService, HitContext hitContext, Map<String, Object> sourceMaps) | ||
throws IOException { | ||
DocumentMapper documentMapper = mapperService.documentMapper(); | ||
Map<String, ObjectMapper> mapperMap = documentMapper.objectMappers(); | ||
|
||
for (ObjectMapper objectMapper : mapperMap.values()) { | ||
if (objectMapper == null) { | ||
continue; | ||
} | ||
if (!objectMapper.nested().isNested()) { | ||
continue; | ||
} | ||
String path = objectMapper.fullPath(); | ||
for (DocValueField f : fields) { | ||
if (!checkNestedField(path, f, sourceMaps)) { | ||
continue; | ||
} | ||
// nested array in one nested path | ||
Object nestedObj = sourceMaps.get(path); | ||
ArrayList nestedDocList = (ArrayList) nestedObj; | ||
|
||
log.debug( | ||
"object mapper: nested:" | ||
+ objectMapper.nested().isNested() | ||
+ " Value:" | ||
+ objectMapper.fullPath() | ||
+ " field:" | ||
+ f.field | ||
); | ||
|
||
innerProcessOneNestedField(objectMapper, hitContext, nestedDocList, f, path); | ||
} | ||
} | ||
} | ||
|
||
private void innerProcessOneNestedField( | ||
ObjectMapper objectMapper, | ||
HitContext hitContext, | ||
ArrayList nestedDocList, | ||
DocValueField f, | ||
String path | ||
) throws IOException { | ||
|
||
BitSet parentBits = getParentDocBitSet(hitContext); | ||
DocIdSetIterator childIter = getChildDocIdSetIterator(objectMapper, hitContext); | ||
LeafReaderContext subReaderContext = hitContext.readerContext(); | ||
|
||
SearchHit hit = hitContext.hit(); | ||
int currentParent = hit.docId() - subReaderContext.docBase; | ||
int previousParent = parentBits.prevSetBit(currentParent - 1); | ||
int childDocId = childIter.advance(previousParent + 1); | ||
SourceLookup nestedVecSourceLookup = new SourceLookup(); | ||
|
||
// when nested field only have vector field and exclude source, list is empty | ||
boolean isEmpty = nestedDocList.isEmpty(); | ||
|
||
for (int offset = 0; childDocId < currentParent && childDocId != DocIdSetIterator.NO_MORE_DOCS; childDocId = childIter | ||
.nextDoc(), offset++) { | ||
nestedVecSourceLookup.setSegmentAndDocument(subReaderContext, childDocId); | ||
List<Object> nestedVecDocValuesSource = f.fetcher.fetchValues(nestedVecSourceLookup); | ||
if (nestedVecDocValuesSource == null || nestedVecDocValuesSource.isEmpty()) { | ||
continue; | ||
} | ||
if (isEmpty) { | ||
nestedDocList.add(new HashMap<String, Object>()); | ||
} | ||
if (offset < nestedDocList.size()) { | ||
Object sourceObj = nestedDocList.get(offset); | ||
if (sourceObj instanceof Map) { | ||
Map<String, Object> sourceMap = (Map<String, Object>) sourceObj; | ||
String suffix = f.field.substring(path.length() + 1); | ||
sourceMap.put(suffix, nestedVecDocValuesSource.get(0)); | ||
} | ||
} else { | ||
/** | ||
* TODO nested field partial doc only have vector and source exclude | ||
* this source map nestedDocList would out-of-order, can not fill the vector into right offset | ||
* "nested_field" : [ | ||
* {"nested_vector": [2.6, 2.6]}, | ||
* {"nested_numeric": 2, "nested_vector": [3.1, 2.3]} | ||
* ] | ||
*/ | ||
throw new UnsupportedOperationException( | ||
String.format("\"Nested Path \"%s\" in Field \"%s\" with _ID \"%s\" can not be empty\"", path, f.field, hit.getId()) | ||
); | ||
} | ||
} | ||
} | ||
|
||
private BitSet getParentDocBitSet(HitContext hitContext) throws IOException { | ||
Query parentFilter = Queries.newNonNestedFilter(); | ||
LeafReaderContext subReaderContext = hitContext.readerContext(); | ||
BitSet parentBits = fetchContext.getQueryShardContext().bitsetFilter(parentFilter).getBitSet(subReaderContext); | ||
return parentBits; | ||
} | ||
|
||
private DocIdSetIterator getChildDocIdSetIterator(ObjectMapper objectMapper, HitContext hitContext) throws IOException { | ||
Query childFilter = objectMapper.nestedTypeFilter(); | ||
ContextIndexSearcher searcher = fetchContext.searcher(); | ||
LeafReaderContext subReaderContext = hitContext.readerContext(); | ||
final Weight childWeight = searcher.createWeight(searcher.rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); | ||
Scorer childScorer = childWeight.scorer(subReaderContext); | ||
DocIdSetIterator childIter = childScorer.iterator(); | ||
return childIter; | ||
} | ||
|
||
private boolean checkNestedField(String path, DocValueField f, Map<String, Object> sourceMaps) { | ||
if (!f.field.startsWith(path)) { | ||
return false; | ||
} | ||
if (!sourceMaps.containsKey(path)) { | ||
return false; | ||
} | ||
|
||
// path to nested field: | ||
Object nestedObj = sourceMaps.get(path); | ||
if (!(nestedObj instanceof ArrayList)) { | ||
return false; | ||
} | ||
return true; | ||
} | ||
} | ||
|
||
@Getter | ||
public static class DocValueField { | ||
private final String field; | ||
private final ValueFetcher fetcher; | ||
|
||
DocValueField(String field, ValueFetcher fetcher) { | ||
this.field = field; | ||
this.fetcher = fetcher; | ||
} | ||
} | ||
} |
Oops, something went wrong.