-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Validate index refreshing state before cancel job #2728
Validate index refreshing state before cancel job #2728
Conversation
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
@@ -70,7 +70,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { | |||
MockFlintSparkJob flintIndexJob = | |||
new MockFlintSparkJob( | |||
flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); | |||
flintIndexJob.active(); | |||
flintIndexJob.refreshing(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing these because
- Auto refresh index's normal state is
REFRESHING
. It shouldn't beACTIVE
- This PR introduce a change where cancelStreamingJob will only be called if index state is initially
REFRESHING
Signed-off-by: Sean Kao <[email protected]>
flintIndexJob.assertState(FlintIndexState.ACTIVE); | ||
localEMRSClient.startJobRunCalled(1); | ||
localEMRSClient.getJobRunResultCalled(1); | ||
localEMRSClient.cancelJobRunCalled(0); | ||
Map<String, Object> mappings = mockDS.getIndexMappings(); | ||
Map<String, Object> meta = (HashMap<String, Object>) mappings.get("_meta"); | ||
Map<String, Object> options = (Map<String, Object>) meta.get("options"); | ||
Assertions.assertEquals("false", options.get("auto_refresh")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto_refresh and index state should be updated in EMR job, not remain unchanged
@@ -479,7 +479,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { | |||
/** | |||
* Index state is stable, Drop Index operation is retryable, expectation is | |||
* | |||
* <p>(1) call EMR-S (2) change index state to: DELETED | |||
* <p>(1) not call EMR-S (2) change index state to: DELETED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR prevents cancelling job for active index
Signed-off-by: Sean Kao <[email protected]>
@@ -143,6 +146,13 @@ private void commit(FlintIndexStateModel flintIndex) { | |||
*/ | |||
public void cancelStreamingJob(FlintIndexStateModel flintIndexStateModel) | |||
throws InterruptedException, TimeoutException { | |||
// TODO: null handling | |||
if (initialFlintIndexStateModel.getIndexState() != FlintIndexState.REFRESHING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the condition check should in validate(FlintIndexState state)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but validate for DROP and ALTER both allows for states other than REFRESHING. Need additional check for cancelStreamingJob itself
// TODO: null handling | ||
if (initialFlintIndexStateModel.getIndexState() != FlintIndexState.REFRESHING) { | ||
// Only refreshing index will have a job to cancel. Adding this validation prevents cancelling | ||
// the interactive job where a non-auto-refresh index is created. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add log.info is exist early.
Synced up with @seankao-az. Moving index jobs to FlintJob might be a cleaner solution..also ideally jobId should be correctly populated. |
drafting another PR for a different fix per discussion offline |
Description
Validate index state when cancelling job. Only cancel when index is refreshing.
Updated behavior of several test cases (please see comments in code)
Issues Resolved
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.