-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement task restart policies #280
base: main
Are you sure you want to change the base?
Conversation
* Test: test_add_task_restart_policy_patterns * Test: test_get_task_restart_policy_patterns * Test: test_remove_task_restart_policy_patterns * Test: test_clear_task_restart_policy_patterns * Test: test_task_resolve_restarts
* TaskRestartPattern * TaskRestartPolicy * TaskHistory
* Removed TaskRestartPolicy and TaskHistory * Added Traceback
* TaskReturnPattern: Confirm that the input pattern is a string type and that it is not empty. * Traceback: Confirm that the input is a list of strings and that none of them are empty.
7e82f54
to
6a167f1
Compare
Similar to `TaskHub`s, the `TaskRestartPattern` needs additonal hashed data to uniquely identify it as a Neo4j node (via the gufe key). The unit tests have been updated to reflect this change.
`statestore` methods have been added to modify the database state: * add_task_restart_patterns * remove_task_restart_patterns * get_task_restart_patterns Tests were added for each method in the integration tests for the statestore.
The `add_task_restart_patterns` method now establishes the APPLIES relationship between the each new pattern and all Tasks ACTIONED on the corresponding TaskHub. Added testing for creation of the APPLIES relationship, asserting the number of created connections over multiple TaskHubs and Tasks. Further subdivided the test classes. Additionally added a `set_task_restart_patterns_max_retries` method for updating the max_retries of a TaskRestartPattern.
"actioning" a Task on a TaskHub with preexisting TaskRestartPatterns created the APPLIES relationship between them with a num_retries value of 0. This behavior is tested in the test_action_task function in the statestore.
When an actioned Task is canceled and also has an APPLIES relationship with a TaskRestartPattern, APPLIES is removed between the two nodes. Removed org, project, and campaign fields since they are not necessary for the APPLIES relationship.
Setting an actioned Task status to the following statuses now removes the APPLIES relationship from attached TaskRestartPatterns: * complete * invalid * deleted NOTE: tests have not been added for this yet
Confirming that changing the status of an actioned Task to any of the following removes the APPLIES relationship: * complete * invalid * deleted
New statestore method placeholders: - add_task_traceback - resolve_task_restarts The compute api will add a Task Traceback and resolve restarts for returned failed Tasks. When a list of restart patterns are added, restarts are resolved.
* Renamed add_task_traceback to add_protocol_dag_result_ref_traceback * Added tests for add_protocol_dag_result_ref_traceback
Implemented half of the resolve_task_restarts test
With this decorator, if a transaction isn't passed as a keyword arg, one is automatically created (and closed). This allows a chaining behavior where many method calls share a single transaction object.
* Removed custom tokenization * Implemented _defaults to allow default tokenization to work
cancel_map has been changed from a defaultdict to a base dict and instead using the dict.get method to return None. Additionally added a set of all task/taskhub pairs that is later used to determine what should be canceled. I've also added grouping on taskhubs so the number of calls to cancel_tasks is minimized.
…olicy_resolve_restarts Restart policy: resolve restarts
Hello @ianmkenney! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:
Comment last updated at 2024-09-25 15:23:32 UTC |
The addition of source_keys and failure_keys was not included in the unit tests so all initializations of Tracebacks failed. I've added default values for the test class.
* add_task_restart_patterns * remove_task_restart_patterns * get_task_restart_patterns * set_task_restart_patterns_max_retries Additionally, I added the get_taskhubs method to Neo4jStore since get_taskhub will only get the taskhub for a single network at a time. It might make sense to replace the old method with this new one.
1071369
to
f03417c
Compare
Welcome to Codecov 🎉Once you merge this PR into your default branch, you're all set! Codecov will compare coverage reports and display results in all future pull requests. Thanks for integrating Codecov - We've got you covered ☂️ |
…t pattern endpoints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this impressive feature @ianmkenney! I have a few notes, and I've made some modifications where it was obvious to me what to do.
Can you address the notes and fix any broken tests? After that, I think we should be good to merge!
|
||
// only proceed for cases where task is not already actioned on hub | ||
// and where the task is either in 'waiting', 'running', or 'error' status | ||
WITH th, an, task | ||
WHERE NOT (th)-[:ACTIONS]->(task) | ||
AND task.status IN ['{TaskStatusEnum.waiting.value}', '{TaskStatusEnum.running.value}', '{TaskStatusEnum.error.value}'] | ||
AND task.status IN [$waiting, $running, $error] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much nicer!
alchemiscale/storage/statestore.py
Outdated
@@ -1411,30 +1461,51 @@ def action_tasks( | |||
# so we can properly return `None` if needed | |||
task_map = {str(task): None for task in tasks} | |||
|
|||
q = f""" | |||
query_safe_task_list = [str(task) for task in tasks if task] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the trailing if task
? Unclear under what conditions this would apply.
return ScopedKey.from_str(node["_scoped_key"]) | ||
|
||
transform_function = _node_to_gufe if return_gufe else _node_to_scoped_key | ||
transform_results = defaultdict(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this is a defaultdict
instead of just a dict
? Where it gets used below, you can use transform_results.get(...)
to safely ask for the value for a given key, and None
will be given if there is no matching key. Using a defaultdict
doesn't give any obvious behavior advantage here.
# TODO: should this also compare taskhub scoped keys? | ||
def __eq__(self, other): | ||
if not isinstance(other, self.__class__): | ||
return False | ||
return self.pattern == other.pattern |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm...where do we use equality for this model? That would inform to me what the most reasonable approach is here.
max_retries=max_retries, | ||
) | ||
|
||
# TODO: validation of taskhubs variable, will fail in weird ways if not enforced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add this if it's a likely failure mode?
try: | ||
dict_trp.pop(":version:") | ||
except KeyError: | ||
raise AssertionError("expected to find :version:") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.
assert tb_dict.pop("__qualname__") == "Tracebacks" | ||
assert tb_dict.pop("__module__") == "alchemiscale.storage.models" | ||
|
||
# light test of the version key | ||
try: | ||
tb_dict.pop(":version:") | ||
except KeyError: | ||
raise AssertionError("expected to find :version:") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.
applies_count = n4js.execute_query( | ||
query, taskhub_scoped_key=str(taskhub_sk) | ||
).records[0]["applies_count"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like applies_count
gets used after this?
|
||
@pytest.mark.xfail(raises=NotImplementedError) | ||
def test_task_actioning_applies_relationship(self): | ||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we intending to fill this in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see we check this behavior in test_action_task
.
|
||
@pytest.mark.xfail(raises=NotImplementedError) | ||
def test_task_deaction_applies_relationship(self): | ||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we intending to fill this in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see we check this behavior in test_cancel_task
.
closes #277