Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add reindex integration tests (#1075) #1081

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix CI for JDK upgrade towards 21 ([#835](https://github.com/opensearch-project/neural-search/pull/835))
### Documentation
### Maintenance
- Add reindex integration tests for ingest processors ([#1075](https://github.com/opensearch-project/neural-search/pull/1075))
- Fix github CI by adding eclipse dependency in formatting.gradle ([#1079](https://github.com/opensearch-project/neural-search/pull/1079))
### Refactoring
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,11 @@
*/
package org.opensearch.neuralsearch.processor;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import org.apache.http.HttpHeaders;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.junit.Before;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.neuralsearch.BaseNeuralSearchIT;

import com.google.common.collect.ImmutableList;
import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder;

public class SparseEncodingProcessIT extends BaseNeuralSearchIT {
Expand All @@ -26,6 +17,20 @@ public class SparseEncodingProcessIT extends BaseNeuralSearchIT {

private static final String PIPELINE_NAME = "pipeline-sparse-encoding";

private static final String INGEST_DOCUMENT = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";

@Before
public void setUp() throws Exception {
super.setUp();
Expand All @@ -37,8 +42,8 @@ public void testSparseEncodingProcessor() throws Exception {
try {
modelId = prepareSparseEncodingModel();
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING);
createSparseEncodingIndex();
ingestDocument();
createIndexWithPipeline(INDEX_NAME, "SparseEncodingIndexMappings.json", PIPELINE_NAME);
ingestDocument(INDEX_NAME, INGEST_DOCUMENT);
assertEquals(1, getDocCount(INDEX_NAME));

NeuralSparseQueryBuilder neuralSparseQueryBuilder = new NeuralSparseQueryBuilder();
Expand All @@ -58,8 +63,8 @@ public void testSparseEncodingProcessorWithPrune() throws Exception {
try {
modelId = prepareSparseEncodingModel();
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING_PRUNE);
createSparseEncodingIndex();
ingestDocument();
createIndexWithPipeline(INDEX_NAME, "SparseEncodingIndexMappings.json", PIPELINE_NAME);
ingestDocument(INDEX_NAME, INGEST_DOCUMENT);
assertEquals(1, getDocCount(INDEX_NAME));

NeuralSparseQueryBuilder neuralSparseQueryBuilder = new NeuralSparseQueryBuilder();
Expand All @@ -74,42 +79,22 @@ public void testSparseEncodingProcessorWithPrune() throws Exception {
}
}

private void createSparseEncodingIndex() throws Exception {
createIndexWithConfiguration(
INDEX_NAME,
Files.readString(Path.of(classLoader.getResource("processor/SparseEncodingIndexMappings.json").toURI())),
PIPELINE_NAME
);
}

private void ingestDocument() throws Exception {
String ingestDocument = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";
Response response = makeRequest(
client(),
"POST",
INDEX_NAME + "/_doc?refresh",
null,
toHttpEntity(ingestDocument),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals("created", map.get("result"));
public void testSparseEncodingProcessorWithReindex() throws Exception {
// create a simple index and indexing data into this index.
String fromIndexName = "test-reindex-from";
createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null);
ingestDocument(fromIndexName, "{ \"text\": \"hello world\" }");
// create text embedding index for reindex
String modelId = null;
try {
modelId = prepareSparseEncodingModel();
String toIndexName = "test-reindex-to";
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING);
createIndexWithPipeline(toIndexName, "SparseEncodingIndexMappings.json", PIPELINE_NAME);
reindex(fromIndexName, toIndexName);
assertEquals(1, getDocCount(toIndexName));
} finally {
wipeOfTestResources(fromIndexName, PIPELINE_NAME, modelId, null);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
*/
package org.opensearch.neuralsearch.processor;

import com.google.common.collect.ImmutableList;
import lombok.SneakyThrows;
import org.apache.http.HttpHeaders;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.junit.Before;

import java.net.URL;
Expand All @@ -19,9 +15,6 @@
import java.util.Map;
import java.util.Objects;

import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.neuralsearch.BaseNeuralSearchIT;

Expand Down Expand Up @@ -73,7 +66,9 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmStandardToken
try {
createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
ingestDocument(TEST_DOCUMENT);

String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(INDEX_NAME, document);

List<String> expectedPassages = new ArrayList<>();
expectedPassages.add("This is an example document to be chunked. The document ");
Expand All @@ -90,7 +85,9 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmLetterTokeniz
try {
createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_LETTER_TOKENIZER_NAME);
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_LETTER_TOKENIZER_NAME);
ingestDocument(TEST_DOCUMENT);

String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(INDEX_NAME, document);

List<String> expectedPassages = new ArrayList<>();
expectedPassages.add("This is an example document to be chunked. The document ");
Expand All @@ -107,7 +104,9 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmLowercaseToke
try {
createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_LOWERCASE_TOKENIZER_NAME);
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_LOWERCASE_TOKENIZER_NAME);
ingestDocument(TEST_DOCUMENT);

String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(INDEX_NAME, document);

List<String> expectedPassages = new ArrayList<>();
expectedPassages.add("This is an example document to be chunked. The document ");
Expand All @@ -124,7 +123,10 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmStandardToken
try {
createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
Exception exception = assertThrows(Exception.class, () -> ingestDocument(TEST_LONG_DOCUMENT));
Exception exception = assertThrows(Exception.class, () -> {
String document = getDocumentFromFilePath(TEST_LONG_DOCUMENT);
ingestDocument(INDEX_NAME, document);
});
// max_token_count is 100 by index settings
assert (exception.getMessage()
.contains("The number of tokens produced by calling _analyze has exceeded the allowed maximum of [100]."));
Expand All @@ -139,7 +141,9 @@ public void testTextChunkingProcessor_withDelimiterAlgorithm_successful() {
try {
createPipelineProcessor(DELIMITER_PIPELINE_NAME);
createTextChunkingIndex(INDEX_NAME, DELIMITER_PIPELINE_NAME);
ingestDocument(TEST_DOCUMENT);

String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(INDEX_NAME, document);

List<String> expectedPassages = new ArrayList<>();
expectedPassages.add("This is an example document to be chunked.");
Expand All @@ -157,7 +161,9 @@ public void testTextChunkingProcessor_withCascadePipeline_successful() {
try {
createPipelineProcessor(CASCADE_PIPELINE_NAME);
createTextChunkingIndex(INDEX_NAME, CASCADE_PIPELINE_NAME);
ingestDocument(TEST_DOCUMENT);

String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(INDEX_NAME, document);

List<String> expectedPassages = new ArrayList<>();
expectedPassages.add("This is an example document to be chunked.");
Expand All @@ -176,6 +182,23 @@ public void testTextChunkingProcessor_withCascadePipeline_successful() {
}
}

public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmStandardTokenizer_whenReindexingDocument_thenSuccessful()
throws Exception {
try {
String fromIndexName = "test-reindex-from";
createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null);
String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(fromIndexName, document);

createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
reindex(fromIndexName, INDEX_NAME);
assertEquals(1, getDocCount(INDEX_NAME));
} finally {
wipeOfTestResources(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME, null, null);
}
}

private void validateIndexIngestResults(String indexName, String fieldName, Object expected) {
assertEquals(1, getDocCount(indexName));
MatchAllQueryBuilder query = new MatchAllQueryBuilder();
Expand Down Expand Up @@ -205,23 +228,9 @@ private void createTextChunkingIndex(String indexName, String pipelineName) thro
createIndexWithConfiguration(indexName, Files.readString(Path.of(indexSettingsURLPath.toURI())), pipelineName);
}

private void ingestDocument(String documentPath) throws Exception {
URL documentURLPath = classLoader.getResource(documentPath);
private String getDocumentFromFilePath(String filePath) throws Exception {
URL documentURLPath = classLoader.getResource(filePath);
Objects.requireNonNull(documentURLPath);
String document = Files.readString(Path.of(documentURLPath.toURI()));
Response response = makeRequest(
client(),
"POST",
INDEX_NAME + "/_doc?refresh",
null,
toHttpEntity(document),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals("created", map.get("result"));
return Files.readString(Path.of(documentURLPath.toURI()));
}
}
Loading
Loading