Skip to content

Commit

Permalink
Pass SdkClient and tenant id to util used for access control checks
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Dec 14, 2024
1 parent f611e41 commit e8832d6
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.plugins.PluginsService;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
private final WorkflowProcessSorter workflowProcessSorter;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final Client client;
private final SdkClient sdkClient;
private final FlowFrameworkSettings flowFrameworkSettings;
private final PluginsService pluginsService;
private volatile Boolean filterByEnabled;
Expand Down Expand Up @@ -96,6 +98,7 @@ public CreateWorkflowTransportAction(
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings,
Client client,
SdkClient sdkClient,
PluginsService pluginsService,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
Expand All @@ -106,6 +109,7 @@ public CreateWorkflowTransportAction(
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
this.client = client;
this.sdkClient = sdkClient;
this.pluginsService = pluginsService;
filterByEnabled = FILTER_BY_BACKEND_ROLES.get(settings);
this.clusterService = clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.TenantAwareHelper;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand All @@ -47,6 +48,7 @@ public class DeleteWorkflowTransportAction extends HandledTransportAction<Workfl
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final FlowFrameworkSettings flowFrameworkSettings;
private final Client client;
private final SdkClient sdkClient;
private volatile Boolean filterByEnabled;
private final ClusterService clusterService;
private final NamedXContentRegistry xContentRegistry;
Expand All @@ -69,6 +71,7 @@ public DeleteWorkflowTransportAction(
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings,
Client client,
SdkClient sdkClient,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
Settings settings
Expand All @@ -77,6 +80,7 @@ public DeleteWorkflowTransportAction(
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
this.client = client;
this.sdkClient = sdkClient;
filterByEnabled = FILTER_BY_BACKEND_ROLES.get(settings);
this.xContentRegistry = xContentRegistry;
this.clusterService = clusterService;
Expand All @@ -98,10 +102,13 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> executeDeleteRequest(request, listener, context),
client,
sdkClient,
clusterService,
xContentRegistry
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class DeprovisionWorkflowTransportAction extends HandledTransportAction<W

private final ThreadPool threadPool;
private final Client client;
private final SdkClient sdkClient;
private final WorkflowStepFactory workflowStepFactory;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final FlowFrameworkSettings flowFrameworkSettings;
Expand All @@ -99,6 +101,7 @@ public DeprovisionWorkflowTransportAction(
ActionFilters actionFilters,
ThreadPool threadPool,
Client client,
SdkClient sdkClient,
WorkflowStepFactory workflowStepFactory,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings,
Expand All @@ -109,6 +112,7 @@ public DeprovisionWorkflowTransportAction(
super(DeprovisionWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.threadPool = threadPool;
this.client = client;
this.sdkClient = sdkClient;
this.workflowStepFactory = workflowStepFactory;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
Expand All @@ -120,19 +124,25 @@ public DeprovisionWorkflowTransportAction(

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<WorkflowResponse> listener) {
String tenantId = request.getTemplate() == null ? null : request.getTemplate().getTenantId();
if (!TenantAwareHelper.validateTenantId(flowFrameworkSettings.isMultiTenancyEnabled(), tenantId, listener)) {
return;
}
String workflowId = request.getWorkflowId();

User user = getUserContext(client);

// Stash thread context to interact with system index
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> executeDeprovisionRequest(request, listener, context),
client,
sdkClient,
clusterService,
xContentRegistry
);
Expand All @@ -149,10 +159,6 @@ private void executeDeprovisionRequest(
ActionListener<WorkflowResponse> listener,
ThreadContext.StoredContext context
) {
String tenantId = request.getTemplate() == null ? null : request.getTemplate().getTenantId();
if (!TenantAwareHelper.validateTenantId(flowFrameworkSettings.isMultiTenancyEnabled(), tenantId, listener)) {
return;
}
String workflowId = request.getWorkflowId();
String allowDelete = request.getParams().get(ALLOW_DELETE);
GetWorkflowStateRequest getStateRequest = new GetWorkflowStateRequest(workflowId, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand All @@ -47,7 +49,9 @@ public class GetWorkflowStateTransportAction extends HandledTransportAction<GetW

private final Logger logger = LogManager.getLogger(GetWorkflowStateTransportAction.class);

private final FlowFrameworkSettings flowFrameworkSettings;
private final Client client;
private final SdkClient sdkClient;
private final NamedXContentRegistry xContentRegistry;
private volatile Boolean filterByEnabled;
private final ClusterService clusterService;
Expand All @@ -65,13 +69,17 @@ public class GetWorkflowStateTransportAction extends HandledTransportAction<GetW
public GetWorkflowStateTransportAction(
TransportService transportService,
ActionFilters actionFilters,
FlowFrameworkSettings flowFrameworkSettings,
Client client,
SdkClient sdkClient,
NamedXContentRegistry xContentRegistry,
ClusterService clusterService,
Settings settings
) {
super(GetWorkflowStateAction.NAME, transportService, actionFilters, GetWorkflowStateRequest::new);
this.flowFrameworkSettings = flowFrameworkSettings;
this.client = client;
this.sdkClient = sdkClient;
this.xContentRegistry = xContentRegistry;
filterByEnabled = FILTER_BY_BACKEND_ROLES.get(settings);
this.clusterService = clusterService;
Expand All @@ -88,10 +96,13 @@ protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListe
resolveUserAndExecute(
user,
workflowId,
null, // TODO: Get Tenant ID in through this request
filterByEnabled,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> executeGetWorkflowStateRequest(request, listener, context),
client,
sdkClient,
clusterService,
xContentRegistry
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.util.TenantAwareHelper;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand All @@ -49,6 +50,7 @@ public class GetWorkflowTransportAction extends HandledTransportAction<WorkflowR
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final FlowFrameworkSettings flowFrameworkSettings;
private final Client client;
private final SdkClient sdkClient;
private final EncryptorUtils encryptorUtils;
private volatile Boolean filterByEnabled;
private final ClusterService clusterService;
Expand All @@ -73,6 +75,7 @@ public GetWorkflowTransportAction(
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings,
Client client,
SdkClient sdkClient,
EncryptorUtils encryptorUtils,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
Expand All @@ -82,6 +85,7 @@ public GetWorkflowTransportAction(
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
this.client = client;
this.sdkClient = sdkClient;
this.encryptorUtils = encryptorUtils;
filterByEnabled = FILTER_BY_BACKEND_ROLES.get(settings);
this.xContentRegistry = xContentRegistry;
Expand All @@ -105,10 +109,13 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<GetW
resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> executeGetRequest(request, listener, context),
client,
sdkClient,
clusterService,
xContentRegistry
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.plugins.PluginsService;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class ProvisionWorkflowTransportAction extends HandledTransportAction<Wor

private final ThreadPool threadPool;
private final Client client;
private final SdkClient sdkClient;
private final WorkflowProcessSorter workflowProcessSorter;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final FlowFrameworkSettings flowFrameworkSettings;
Expand Down Expand Up @@ -101,6 +103,7 @@ public ProvisionWorkflowTransportAction(
ActionFilters actionFilters,
ThreadPool threadPool,
Client client,
SdkClient sdkClient,
WorkflowProcessSorter workflowProcessSorter,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings,
Expand All @@ -113,6 +116,7 @@ public ProvisionWorkflowTransportAction(
super(ProvisionWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.threadPool = threadPool;
this.client = client;
this.sdkClient = sdkClient;
this.workflowProcessSorter = workflowProcessSorter;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
Expand Down Expand Up @@ -140,10 +144,13 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> executeProvisionRequest(request, listener, context),
client,
sdkClient,
clusterService,
xContentRegistry
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.plugins.PluginsService;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class ReprovisionWorkflowTransportAction extends HandledTransportAction<R

private final ThreadPool threadPool;
private final Client client;
private final SdkClient sdkClient;
private final WorkflowStepFactory workflowStepFactory;
private final WorkflowProcessSorter workflowProcessSorter;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
Expand Down Expand Up @@ -104,6 +106,7 @@ public ReprovisionWorkflowTransportAction(
ActionFilters actionFilters,
ThreadPool threadPool,
Client client,
SdkClient sdkClient,
WorkflowStepFactory workflowStepFactory,
WorkflowProcessSorter workflowProcessSorter,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
Expand All @@ -117,6 +120,7 @@ public ReprovisionWorkflowTransportAction(
super(ReprovisionWorkflowAction.NAME, transportService, actionFilters, ReprovisionWorkflowRequest::new);
this.threadPool = threadPool;
this.client = client;
this.sdkClient = sdkClient;
this.workflowStepFactory = workflowStepFactory;
this.workflowProcessSorter = workflowProcessSorter;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
Expand All @@ -142,10 +146,13 @@ protected void doExecute(Task task, ReprovisionWorkflowRequest request, ActionLi
resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> executeReprovisionRequest(request, listener, context),
client,
sdkClient,
clusterService,
xContentRegistry
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.ml.repackage.com.google.common.collect.ImmutableList;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -283,20 +284,26 @@ public static SearchSourceBuilder addUserBackendRolesFilter(User user, SearchSou
* Resolve user and execute the function
* @param requestedUser the user to execute the request
* @param workflowId workflow id
* @param tenantId tenant id
* @param filterByEnabled filter by enabled setting
* @param isMultitenancyEnabled whether multitenancy is enabled
* @param listener action listener
* @param function workflow function
* @param client node client
* @param sdkClient multitenant client
* @param clusterService cluster service
* @param xContentRegistry contentRegister to parse get response
*/
public static void resolveUserAndExecute(
User requestedUser,
String workflowId,
String tenantId,
Boolean filterByEnabled,
boolean isMultitenancyEnabled,
ActionListener<? extends ActionResponse> listener,
Runnable function,
Client client,
SdkClient sdkClient,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry
) {
Expand Down
Loading

0 comments on commit e8832d6

Please sign in to comment.