Skip to content

Commit

Permalink
Fixed RBAC fetching from workflow state when template is not present
Browse files Browse the repository at this point in the history
Signed-off-by: Owais <[email protected]>
  • Loading branch information
owaiskazi19 committed Jan 15, 2025
1 parent 4b92fb8 commit 8d43327
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Enhancements
### Bug Fixes
- Remove useCase and defaultParams field in WorkflowRequest ([#758](https://github.com/opensearch-project/flow-framework/pull/758))
- Fix RBAC fetching from workflow state when template is not present ([#998](https://github.com/opensearch-project/flow-framework/pull/998))

### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,17 @@ private void resolveUserAndExecute(
boolean filterByBackendRole = requestedUser == null ? false : filterByEnabled;
// Update workflow request, check if user has permissions to update the workflow
// Get workflow and verify backend roles
getWorkflow(requestedUser, workflowId, filterByBackendRole, listener, function, client, clusterService, xContentRegistry);
getWorkflow(
requestedUser,
workflowId,
filterByBackendRole,
false,
listener,
function,
client,
clusterService,
xContentRegistry
);
} else {
// Create Workflow. No need to get current workflow.
function.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,15 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
String workflowId = request.getWorkflowId();
User user = getUserContext(client);

final boolean clearStatus = Booleans.parseBoolean(request.getParams().get(CLEAR_STATUS), false);

ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();

resolveUserAndExecute(
user,
workflowId,
filterByEnabled,
clearStatus,
listener,
() -> executeDeleteRequest(request, listener, context),
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
user,
workflowId,
filterByEnabled,
true,
listener,
() -> executeDeprovisionRequest(request, listener, context),
() -> executeDeprovisionRequest(request, listener, context, user),
client,
clusterService,
xContentRegistry
Expand All @@ -146,7 +147,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
private void executeDeprovisionRequest(
WorkflowRequest request,
ActionListener<WorkflowResponse> listener,
ThreadContext.StoredContext context
ThreadContext.StoredContext context,
User user
) {
String workflowId = request.getWorkflowId();
String allowDelete = request.getParams().get(ALLOW_DELETE);
Expand All @@ -163,7 +165,8 @@ private void executeDeprovisionRequest(
workflowId,
response.getWorkflowState().resourcesCreated(),
deleteAllowedResources,
listener
listener,
user
)
);
}, exception -> {
Expand All @@ -180,7 +183,8 @@ private void executeDeprovisionSequence(
String workflowId,
List<ResourceCreated> resourcesCreated,
Set<String> deleteAllowedResources,
ActionListener<WorkflowResponse> listener
ActionListener<WorkflowResponse> listener,
User user
) {
List<ResourceCreated> deleteNotAllowed = new ArrayList<>();
// Create a list of ProcessNodes with the corresponding deprovision workflow steps
Expand Down Expand Up @@ -294,26 +298,23 @@ private void executeDeprovisionSequence(
logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed);
}
// This is a redundant best-effort backup to the incremental deletion done earlier
updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener);
updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener, user);
}

private void updateWorkflowState(
String workflowId,
List<ResourceCreated> remainingResources,
List<ResourceCreated> deleteNotAllowed,
ActionListener<WorkflowResponse> listener
ActionListener<WorkflowResponse> listener,
User user
) {
if (remainingResources.isEmpty() && deleteNotAllowed.isEmpty()) {
// Successful deprovision of all resources, reset state to initial
flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> {
if (Boolean.TRUE.equals(templateExists)) {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
workflowId,
getUserContext(client),
ActionListener.wrap(indexResponse -> {
logger.info("Reset workflow {} state to NOT_STARTED", workflowId);
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); })
);
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(workflowId, user, ActionListener.wrap(indexResponse -> {
logger.info("Reset workflow {} state to NOT_STARTED", workflowId);
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); }));
} else {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, ActionListener.wrap(deleteResponse -> {
logger.info("Deleted workflow {} state", workflowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListe
user,
workflowId,
filterByEnabled,
true,
listener,
() -> executeGetWorkflowStateRequest(request, listener, context),
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<GetW
user,
workflowId,
filterByEnabled,
false,
listener,
() -> executeGetRequest(request, listener, context),
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
user,
workflowId,
filterByEnabled,
false,
listener,
() -> executeProvisionRequest(request, listener, context),
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ protected void doExecute(Task task, ReprovisionWorkflowRequest request, ActionLi
user,
workflowId,
filterByEnabled,
false,
listener,
() -> executeReprovisionRequest(request, listener, context),
client,
Expand Down
43 changes: 32 additions & 11 deletions src/main/java/org/opensearch/flowframework/util/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.transport.WorkflowResponse;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.index.query.BoolQueryBuilder;
Expand Down Expand Up @@ -67,6 +68,7 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;

/**
* Utility methods for Template parsing
Expand Down Expand Up @@ -284,6 +286,7 @@ public static SearchSourceBuilder addUserBackendRolesFilter(User user, SearchSou
* @param requestedUser the user to execute the request
* @param workflowId workflow id
* @param filterByEnabled filter by enabled setting
* @param statePresent state present for the transport action
* @param listener action listener
* @param function workflow function
* @param client node client
Expand All @@ -294,6 +297,7 @@ public static void resolveUserAndExecute(
User requestedUser,
String workflowId,
Boolean filterByEnabled,
Boolean statePresent,
ActionListener<? extends ActionResponse> listener,
Runnable function,
Client client,
Expand All @@ -307,7 +311,17 @@ public static void resolveUserAndExecute(
// !filterByEnabled means security is enabled and filterByEnabled is disabled
function.run();
} else {
getWorkflow(requestedUser, workflowId, filterByEnabled, listener, function, client, clusterService, xContentRegistry);
getWorkflow(
requestedUser,
workflowId,
filterByEnabled,
statePresent,
listener,
function,
client,
clusterService,
xContentRegistry
);
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -368,6 +382,7 @@ public static void checkFilterByBackendRoles(User requestedUser) {
* @param requestUser the user to execute the request
* @param workflowId workflow id
* @param filterByEnabled filter by enabled setting
* @param statePresent state present for the transport action
* @param listener action listener
* @param function workflow function
* @param client node client
Expand All @@ -378,15 +393,17 @@ public static void getWorkflow(
User requestUser,
String workflowId,
Boolean filterByEnabled,
Boolean statePresent,
ActionListener listener,
Runnable function,
Client client,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry
) {
if (clusterService.state().metadata().hasIndex(GLOBAL_CONTEXT_INDEX)) {
String index = statePresent ? WORKFLOW_STATE_INDEX : GLOBAL_CONTEXT_INDEX;
if (clusterService.state().metadata().hasIndex(index)) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
GetRequest request = new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId);
GetRequest request = new GetRequest(index, workflowId);
client.get(
request,
ActionListener.wrap(
Expand All @@ -395,9 +412,11 @@ public static void getWorkflow(
requestUser,
workflowId,
filterByEnabled,
statePresent,
listener,
function,
xContentRegistry
xContentRegistry,
context
),
exception -> {
logger.error("Failed to get workflow: {}", workflowId, exception);
Expand All @@ -407,10 +426,8 @@ public static void getWorkflow(
);
}
} else {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to retrieve template ({}) from global context.",
workflowId
).getFormattedMessage();
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage("Failed to retrieve template ({}).", workflowId)
.getFormattedMessage();
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
}
Expand All @@ -422,26 +439,30 @@ public static void getWorkflow(
* @param response get response
* @param workflowId workflow id
* @param filterByEnabled filter by enabled setting
* @param statePresent state present for the transport action
* @param listener action listener
* @param function workflow function
* @param xContentRegistry contentRegister to parse get response
* @param context thread context
*/
public static void onGetWorkflowResponse(
GetResponse response,
User requestUser,
String workflowId,
Boolean filterByEnabled,
Boolean statePresent,
ActionListener<WorkflowResponse> listener,
Runnable function,
NamedXContentRegistry xContentRegistry
NamedXContentRegistry xContentRegistry,
ThreadContext.StoredContext context
) {
if (response.isExists()) {
try (
XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())
) {
context.restore();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Template template = Template.parse(parser);
User resourceUser = template.getUser();
User resourceUser = statePresent ? WorkflowState.parse(parser).getUser() : Template.parse(parser).getUser();

if (!filterByEnabled || checkUserPermissions(requestUser, resourceUser, workflowId) || isAdmin(requestUser)) {
function.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ public Response createIndexRole(String role, String index) throws IOException {
TestHelpers.toHttpEntity(
"{\n"
+ "\"cluster_permissions\": [\n"
+ "\"cluster:admin/ingest/pipeline/put\",\n"
+ "\"cluster:admin/ingest/pipeline/delete\"\n"
+ "],\n"
+ "\"index_permissions\": [\n"
Expand All @@ -353,6 +354,7 @@ public Response createIndexRole(String role, String index) throws IOException {
+ "\"crud\",\n"
+ "\"indices:admin/create\",\n"
+ "\"indices:admin/aliases\",\n"
+ "\"indices:admin/settings/update\",\n"
+ "\"indices:admin/delete\"\n"
+ "]\n"
+ "}\n"
Expand Down

0 comments on commit 8d43327

Please sign in to comment.