From 12e6aa740a62421da16e729e6707d744b5aa7644 Mon Sep 17 00:00:00 2001 From: Brian Flores Date: Fri, 10 Jan 2025 19:34:35 -0800 Subject: [PATCH 1/8] undeploy models with no WorkerNodes This commit aims to undeploy modelIds that have no nodes associated to them so as to keep the intention of undeploy truthful. Signed-off-by: Brian Flores --- .../TransportUndeployModelsAction.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java index 579fa51a38..340d5c2a87 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java @@ -7,16 +7,20 @@ import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX; +import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import org.opensearch.OpenSearchStatusException; import org.opensearch.action.ActionRequest; +import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -32,6 +36,7 @@ import org.opensearch.index.query.TermsQueryBuilder; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.MLModel; +import org.opensearch.ml.common.model.MLModelState; import org.opensearch.ml.common.transport.deploy.MLDeployModelRequest; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelAction; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesRequest; @@ -51,6 +56,7 @@ import org.opensearch.transport.TransportService; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import lombok.extern.log4j.Log4j2; @@ -157,10 +163,36 @@ private void undeployModels(String[] targetNodeIds, String[] modelIds, ActionLis MLUndeployModelNodesRequest mlUndeployModelNodesRequest = new MLUndeployModelNodesRequest(targetNodeIds, modelIds); client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(r -> { + if (r.getNodes().isEmpty()) { + bulkSetModelIndexToUndeploy(modelIds); + } listener.onResponse(new MLUndeployModelsResponse(r)); }, listener::onFailure)); } + private void bulkSetModelIndexToUndeploy(String[] modelIds) { + BulkRequest bulkUpdateRequest = new BulkRequest(); + for (String modelId : modelIds) { + UpdateRequest updateRequest = new UpdateRequest(); + Instant now = Instant.now(); + ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.put(MLModel.MODEL_STATE_FIELD, MLModelState.UNDEPLOYED.name()); + + builder.put(MLModel.PLANNING_WORKER_NODES_FIELD, List.of()); + builder.put(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD, 0); + + builder.put(MLModel.LAST_UPDATED_TIME_FIELD, now.toEpochMilli()); + builder.put(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD, 0); + updateRequest.index(ML_MODEL_INDEX).id(modelId).doc(builder.build()); + bulkUpdateRequest.add(updateRequest); + } + bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + log.info("No models service: {}", modelIds.toString()); + client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> { log.debug("Successfully set modelIds to UNDEPLOY in index"); }, e -> { + log.error("Failed to set modelIds to UNDEPLOY in index", e); + })); + } + private void validateAccess(String modelId, ActionListener listener) { User user = RestActionUtils.getUserContext(client); boolean isSuperAdmin = isSuperAdminUserWrapper(clusterService, client); From 9104cb806e7d51c475d35e5adf9b7264c82535aa Mon Sep 17 00:00:00 2001 From: Brian Flores Date: Fri, 10 Jan 2025 20:50:38 -0800 Subject: [PATCH 2/8] Exit early when no nodes service the model Now when entering this method its guaranteed to write to index first before sending back the MLUndeploy response. And will also send back a exception if the write back fails Signed-off-by: Brian Flores --- .../TransportUndeployModelsAction.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java index 340d5c2a87..dc93419070 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java @@ -40,6 +40,7 @@ import org.opensearch.ml.common.transport.deploy.MLDeployModelRequest; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelAction; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesRequest; +import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesResponse; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsAction; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsRequest; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsResponse; @@ -162,15 +163,20 @@ protected void doExecute(Task task, ActionRequest request, ActionListener listener) { MLUndeployModelNodesRequest mlUndeployModelNodesRequest = new MLUndeployModelNodesRequest(targetNodeIds, modelIds); - client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(r -> { - if (r.getNodes().isEmpty()) { - bulkSetModelIndexToUndeploy(modelIds); + client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(response -> { + if (response.getNodes().isEmpty()) { + bulkSetModelIndexToUndeploy(modelIds, listener, response); + return; } - listener.onResponse(new MLUndeployModelsResponse(r)); + listener.onResponse(new MLUndeployModelsResponse(response)); }, listener::onFailure)); } - private void bulkSetModelIndexToUndeploy(String[] modelIds) { + private void bulkSetModelIndexToUndeploy( + String[] modelIds, + ActionListener listener, + MLUndeployModelNodesResponse response + ) { BulkRequest bulkUpdateRequest = new BulkRequest(); for (String modelId : modelIds) { UpdateRequest updateRequest = new UpdateRequest(); @@ -186,10 +192,16 @@ private void bulkSetModelIndexToUndeploy(String[] modelIds) { updateRequest.index(ML_MODEL_INDEX).id(modelId).doc(builder.build()); bulkUpdateRequest.add(updateRequest); } + bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - log.info("No models service: {}", modelIds.toString()); - client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> { log.debug("Successfully set modelIds to UNDEPLOY in index"); }, e -> { + log.info("No nodes service: {}", modelIds.toString()); + + client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> { + log.debug("Successfully set modelIds to UNDEPLOY in index"); + listener.onResponse(new MLUndeployModelsResponse(response)); + }, e -> { log.error("Failed to set modelIds to UNDEPLOY in index", e); + listener.onFailure(e); })); } From 6d3b398e24b204cf1acbb8a4636bb861ffc37297 Mon Sep 17 00:00:00 2001 From: Brian Flores Date: Sat, 11 Jan 2025 20:30:10 -0800 Subject: [PATCH 3/8] add UTs for undeploy stale model index fix Added UTs for the 2 scenarios 1. Check that the bulk operation occured when no nodes are returned from the Undeploy response is , 2. Check that the bulk operation did not occur when there are nodes that have found the model within their cache. Signed-off-by: Brian Flores --- .../TransportUndeployModelsAction.java | 2 +- .../TransportUndeployModelsActionTests.java | 152 ++++++++++++++++++ 2 files changed, 153 insertions(+), 1 deletion(-) diff --git a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java index dc93419070..bf9f630097 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java @@ -194,7 +194,7 @@ private void bulkSetModelIndexToUndeploy( } bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - log.info("No nodes service: {}", modelIds.toString()); + log.info("No nodes service: {}", Arrays.toString(modelIds)); client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> { log.debug("Successfully set modelIds to UNDEPLOY in index"); diff --git a/plugin/src/test/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsActionTests.java b/plugin/src/test/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsActionTests.java index 42152f473d..c0405d766c 100644 --- a/plugin/src/test/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsActionTests.java +++ b/plugin/src/test/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsActionTests.java @@ -13,14 +13,17 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX; import static org.opensearch.ml.task.MLPredictTaskRunnerTests.USER_STRING; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.junit.Before; import org.junit.Rule; @@ -29,7 +32,10 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opensearch.action.FailedNodeException; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.service.ClusterService; @@ -42,6 +48,7 @@ import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLModel; +import org.opensearch.ml.common.model.MLModelState; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodeResponse; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesResponse; import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsRequest; @@ -164,6 +171,129 @@ public void setup() throws IOException { }).when(mlModelManager).getModel(any(), any(), any(), isA(ActionListener.class)); } + public void testDoExecute_undeployModelIndex_WhenNoNodesServiceModel() { + String modelId = "someModelId"; + MLModel mlModel = MLModel + .builder() + .user(User.parse(USER_STRING)) + .modelGroupId("111") + .version("111") + .name("Test Model") + .modelId(modelId) + .algorithm(FunctionName.BATCH_RCF) + .content("content") + .totalChunks(2) + .isHidden(true) + .build(); + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(3); + listener.onResponse(mlModel); + return null; + }).when(mlModelManager).getModel(any(), any(), any(), isA(ActionListener.class)); + + doReturn(true).when(transportUndeployModelsAction).isSuperAdminUserWrapper(clusterService, client); + + List responseList = new ArrayList<>(); + List failuresList = new ArrayList<>(); + MLUndeployModelNodesResponse nodesResponse = new MLUndeployModelNodesResponse(clusterName, responseList, failuresList); + + // Send back a response with no nodes associated to the model. Thus, will write back to the model index that its UNDEPLOYED + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(2); + listener.onResponse(nodesResponse); + return null; + }).when(client).execute(any(), any(), isA(ActionListener.class)); + + ArgumentCaptor bulkRequestCaptor = ArgumentCaptor.forClass(BulkRequest.class); + + // mock the bulk response that can be captured for inspecting the contents of the write to index + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.hasFailures()).thenReturn(false); + listener.onResponse(bulkResponse); + return null; + }).when(client).bulk(bulkRequestCaptor.capture(), any(ActionListener.class)); + + String[] modelIds = new String[] { modelId }; + String[] nodeIds = new String[] { "test_node_id1", "test_node_id2" }; + MLUndeployModelsRequest request = new MLUndeployModelsRequest(modelIds, nodeIds); + + transportUndeployModelsAction.doExecute(task, request, actionListener); + + BulkRequest capturedBulkRequest = bulkRequestCaptor.getValue(); + assertEquals(1, capturedBulkRequest.numberOfActions()); + UpdateRequest updateRequest = (UpdateRequest) capturedBulkRequest.requests().get(0); + + @SuppressWarnings("unchecked") + Map updateDoc = updateRequest.doc().sourceAsMap(); + String modelIdFromBulkRequest = updateRequest.id(); + String indexNameFromBulkRequest = updateRequest.index(); + + assertEquals("Check that the write happened at the model index", ML_MODEL_INDEX, indexNameFromBulkRequest); + assertEquals("Check that the result bulk write hit this specific modelId", modelId, modelIdFromBulkRequest); + + assertEquals(MLModelState.UNDEPLOYED.name(), updateDoc.get(MLModel.MODEL_STATE_FIELD)); + assertEquals(0, updateDoc.get(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD)); + assertEquals(0, updateDoc.get(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD)); + assertEquals(List.of(), updateDoc.get(MLModel.PLANNING_WORKER_NODES_FIELD)); + assertTrue(updateDoc.containsKey(MLModel.LAST_UPDATED_TIME_FIELD)); + + verify(actionListener).onResponse(any(MLUndeployModelsResponse.class)); + verify(client).bulk(any(BulkRequest.class), any(ActionListener.class)); + } + + public void testDoExecute_noBulkRequestFired_WhenSomeNodesServiceModel() { + String modelId = "someModelId"; + MLModel mlModel = MLModel + .builder() + .user(User.parse(USER_STRING)) + .modelGroupId("111") + .version("111") + .name("Test Model") + .modelId(modelId) + .algorithm(FunctionName.BATCH_RCF) + .content("content") + .totalChunks(2) + .isHidden(true) + .build(); + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(3); + listener.onResponse(mlModel); + return null; + }).when(mlModelManager).getModel(any(), any(), any(), isA(ActionListener.class)); + + doReturn(true).when(transportUndeployModelsAction).isSuperAdminUserWrapper(clusterService, client); + + List responseList = new ArrayList<>(); + responseList.add(mock(MLUndeployModelNodeResponse.class)); + responseList.add(mock(MLUndeployModelNodeResponse.class)); + List failuresList = new ArrayList<>(); + failuresList.add(mock(FailedNodeException.class)); + failuresList.add(mock(FailedNodeException.class)); + + MLUndeployModelNodesResponse nodesResponse = new MLUndeployModelNodesResponse(clusterName, responseList, failuresList); + + // Send back a response with nodes associated to the model + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(2); + listener.onResponse(nodesResponse); + return null; + }).when(client).execute(any(), any(), isA(ActionListener.class)); + + String[] modelIds = new String[] { modelId }; + String[] nodeIds = new String[] { "test_node_id1", "test_node_id2" }; + MLUndeployModelsRequest request = new MLUndeployModelsRequest(modelIds, nodeIds); + + transportUndeployModelsAction.doExecute(task, request, actionListener); + + verify(actionListener).onResponse(any(MLUndeployModelsResponse.class)); + // Check that no bulk write occurred Since there were nodes servicing the model + verify(client, never()).bulk(any(BulkRequest.class), any(ActionListener.class)); + } + public void testHiddenModelSuccess() { MLModel mlModel = MLModel .builder() @@ -186,16 +316,28 @@ public void testHiddenModelSuccess() { List responseList = new ArrayList<>(); List failuresList = new ArrayList<>(); MLUndeployModelNodesResponse response = new MLUndeployModelNodesResponse(clusterName, responseList, failuresList); + doAnswer(invocation -> { ActionListener listener = invocation.getArgument(2); listener.onResponse(response); return null; }).when(client).execute(any(), any(), isA(ActionListener.class)); + // Mock the client.bulk call + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.hasFailures()).thenReturn(false); + listener.onResponse(bulkResponse); + return null; + }).when(client).bulk(any(BulkRequest.class), any(ActionListener.class)); + doReturn(true).when(transportUndeployModelsAction).isSuperAdminUserWrapper(clusterService, client); MLUndeployModelsRequest request = new MLUndeployModelsRequest(modelIds, nodeIds); transportUndeployModelsAction.doExecute(task, request, actionListener); + verify(actionListener).onResponse(any(MLUndeployModelsResponse.class)); + verify(client).bulk(any(BulkRequest.class), any(ActionListener.class)); } public void testHiddenModelPermissionError() { @@ -249,9 +391,19 @@ public void testDoExecute() { listener.onResponse(response); return null; }).when(client).execute(any(), any(), isA(ActionListener.class)); + // Mock the client.bulk call + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.hasFailures()).thenReturn(false); + listener.onResponse(bulkResponse); + return null; + }).when(client).bulk(any(BulkRequest.class), any(ActionListener.class)); + MLUndeployModelsRequest request = new MLUndeployModelsRequest(modelIds, nodeIds); transportUndeployModelsAction.doExecute(task, request, actionListener); verify(actionListener).onResponse(any(MLUndeployModelsResponse.class)); + verify(client).bulk(any(BulkRequest.class), any(ActionListener.class)); } public void testDoExecute_modelAccessControl_notEnabled() { From fc8ded64b5950ef74a16ca5507abdcb3277f8dae Mon Sep 17 00:00:00 2001 From: Brian Flores Date: Mon, 13 Jan 2025 11:54:20 -0800 Subject: [PATCH 4/8] update code change with comment explaining the change Signed-off-by: Brian Flores --- .../undeploy/TransportUndeployModelsAction.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java index bf9f630097..3032bb85b7 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java @@ -164,6 +164,16 @@ private void undeployModels(String[] targetNodeIds, String[] modelIds, ActionLis MLUndeployModelNodesRequest mlUndeployModelNodesRequest = new MLUndeployModelNodesRequest(targetNodeIds, modelIds); client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(response -> { + /* + * The method TransportUndeployModelsAction.processUndeployModelResponseAndUpdate(...) performs + * undeploy action of models by removing the models from the nodes cache and updating the index when it's able to find it. + * + * The problem becomes when the models index is incorrect and no node(s) are servicing the model. This results in + * `{}` responses (on undeploy action), with no update to the model index thus, causing incorrect model state status. + * + * Having this change enables a check that this edge case occurs along with having access to the model id + * allowing us to update the stale model index correctly to `UNDEPLOYED` since no nodes service the model. + */ if (response.getNodes().isEmpty()) { bulkSetModelIndexToUndeploy(modelIds, listener, response); return; @@ -180,14 +190,14 @@ private void bulkSetModelIndexToUndeploy( BulkRequest bulkUpdateRequest = new BulkRequest(); for (String modelId : modelIds) { UpdateRequest updateRequest = new UpdateRequest(); - Instant now = Instant.now(); + ImmutableMap.Builder builder = ImmutableMap.builder(); builder.put(MLModel.MODEL_STATE_FIELD, MLModelState.UNDEPLOYED.name()); builder.put(MLModel.PLANNING_WORKER_NODES_FIELD, List.of()); builder.put(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD, 0); - builder.put(MLModel.LAST_UPDATED_TIME_FIELD, now.toEpochMilli()); + builder.put(MLModel.LAST_UPDATED_TIME_FIELD, Instant.now().toEpochMilli()); builder.put(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD, 0); updateRequest.index(ML_MODEL_INDEX).id(modelId).doc(builder.build()); bulkUpdateRequest.add(updateRequest); From 6323086e83a685281e3b8862ad644d255afc2bd8 Mon Sep 17 00:00:00 2001 From: Brian Flores Date: Mon, 13 Jan 2025 16:43:48 -0800 Subject: [PATCH 5/8] add context stash/restore to write operation Signed-off-by: Brian Flores --- .../TransportUndeployModelsAction.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java index 3032bb85b7..961c2d0893 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java @@ -15,6 +15,7 @@ import org.opensearch.OpenSearchStatusException; import org.opensearch.action.ActionRequest; import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.ActionFilters; @@ -206,13 +207,22 @@ private void bulkSetModelIndexToUndeploy( bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); log.info("No nodes service: {}", Arrays.toString(modelIds)); - client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> { - log.debug("Successfully set modelIds to UNDEPLOY in index"); - listener.onResponse(new MLUndeployModelsResponse(response)); - }, e -> { - log.error("Failed to set modelIds to UNDEPLOY in index", e); + try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { + ActionListener listenerWithContextRestoration = ActionListener.runBefore(listener, () -> threadContext.restore()); + ActionListener bulkResponseListener = ActionListener.wrap(br -> { + log.debug("Successfully set modelIds to UNDEPLOY in index"); + listenerWithContextRestoration.onResponse(new MLUndeployModelsResponse(response)); + }, e -> { + log.error("Failed to set modelIds to UNDEPLOY in index", e); + listenerWithContextRestoration.onFailure(e); + }); + + client.bulk(bulkUpdateRequest, bulkResponseListener); + } catch (Exception e) { + log.error("Unexpected error while setting modelIds to UNDEPLOY status to index", e); listener.onFailure(e); - })); + } + } private void validateAccess(String modelId, ActionListener listener) { From 46363ef03011b709225bd7bfc37e940494ef8cf6 Mon Sep 17 00:00:00 2001 From: Brian Flores Date: Mon, 13 Jan 2025 16:45:07 -0800 Subject: [PATCH 6/8] Apply spotless Signed-off-by: Brian Flores --- .../ml/action/undeploy/TransportUndeployModelsAction.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java index 961c2d0893..917645d5d6 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java @@ -208,7 +208,8 @@ private void bulkSetModelIndexToUndeploy( log.info("No nodes service: {}", Arrays.toString(modelIds)); try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { - ActionListener listenerWithContextRestoration = ActionListener.runBefore(listener, () -> threadContext.restore()); + ActionListener listenerWithContextRestoration = ActionListener + .runBefore(listener, () -> threadContext.restore()); ActionListener bulkResponseListener = ActionListener.wrap(br -> { log.debug("Successfully set modelIds to UNDEPLOY in index"); listenerWithContextRestoration.onResponse(new MLUndeployModelsResponse(response)); From 77f6e5bbb837191fe804ee26c5abd89a9942727e Mon Sep 17 00:00:00 2001 From: Brian Flores Date: Mon, 13 Jan 2025 17:09:07 -0800 Subject: [PATCH 7/8] Add better logging to write request Signed-off-by: Brian Flores --- .../ml/action/undeploy/TransportUndeployModelsAction.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java index 917645d5d6..950b98f9a1 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java @@ -205,22 +205,22 @@ private void bulkSetModelIndexToUndeploy( } bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - log.info("No nodes service: {}", Arrays.toString(modelIds)); + log.info("No nodes running these models: {}", Arrays.toString(modelIds)); try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { ActionListener listenerWithContextRestoration = ActionListener .runBefore(listener, () -> threadContext.restore()); ActionListener bulkResponseListener = ActionListener.wrap(br -> { - log.debug("Successfully set modelIds to UNDEPLOY in index"); + log.debug("Successfully set the following modelId(s) to UNDEPLOY in index: {}", Arrays.toString(modelIds)); listenerWithContextRestoration.onResponse(new MLUndeployModelsResponse(response)); }, e -> { - log.error("Failed to set modelIds to UNDEPLOY in index", e); + log.error("Failed to set the following modelId(s) to UNDEPLOY in index: {}", Arrays.toString(modelIds), e); listenerWithContextRestoration.onFailure(e); }); client.bulk(bulkUpdateRequest, bulkResponseListener); } catch (Exception e) { - log.error("Unexpected error while setting modelIds to UNDEPLOY status to index", e); + log.error("Unexpected error while setting the following modelId(s) to UNDEPLOY in index: {}", Arrays.toString(modelIds), e); listener.onFailure(e); } From 24896bf04c02a42317c5e7b12cd167b6b8cb5e19 Mon Sep 17 00:00:00 2001 From: Brian Flores Date: Mon, 13 Jan 2025 21:12:46 -0800 Subject: [PATCH 8/8] wrap exception into 5xx Signed-off-by: Brian Flores --- .../undeploy/TransportUndeployModelsAction.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java index 950b98f9a1..21fc9ca9f5 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/undeploy/TransportUndeployModelsAction.java @@ -214,8 +214,15 @@ private void bulkSetModelIndexToUndeploy( log.debug("Successfully set the following modelId(s) to UNDEPLOY in index: {}", Arrays.toString(modelIds)); listenerWithContextRestoration.onResponse(new MLUndeployModelsResponse(response)); }, e -> { - log.error("Failed to set the following modelId(s) to UNDEPLOY in index: {}", Arrays.toString(modelIds), e); - listenerWithContextRestoration.onFailure(e); + String modelsNotFoundMessage = String + .format("Failed to set the following modelId(s) to UNDEPLOY in index: %s", Arrays.toString(modelIds)); + log.error(modelsNotFoundMessage, e); + + OpenSearchStatusException exception = new OpenSearchStatusException( + modelsNotFoundMessage + e.getMessage(), + RestStatus.INTERNAL_SERVER_ERROR + ); + listenerWithContextRestoration.onFailure(exception); }); client.bulk(bulkUpdateRequest, bulkResponseListener);