Skip to content

Commit

Permalink
actually only process cancelled events that were cancelled during the…
Browse files Browse the repository at this point in the history
… current run
  • Loading branch information
jasquat committed Oct 25, 2023
1 parent be0b1d6 commit c090742
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def update_task_model_with_spiff_task(
self,
spiff_task: SpiffTask,
start_and_end_times: StartAndEndTimes | None = None,
cancelled_spiff_task_guids_with_events: list[str] | None = None,
) -> TaskModel:
new_bpmn_process = None
if str(spiff_task.id) in self.task_models:
Expand Down Expand Up @@ -211,29 +210,22 @@ def update_task_model_with_spiff_task(

# let failed tasks raise and we will log the event then
if task_model.state in ["COMPLETED", "CANCELLED"]:
skip_event_entry = False
event_type = ProcessInstanceEventType.task_completed.value
if task_model.state == "CANCELLED":
if (
cancelled_spiff_task_guids_with_events
and task_model.guid in cancelled_spiff_task_guids_with_events
):
skip_event_entry = True
event_type = ProcessInstanceEventType.task_cancelled.value

if skip_event_entry is False:
timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time()
(
process_instance_event,
_process_instance_error_detail,
) = ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance,
event_type,
task_guid=task_model.guid,
timestamp=timestamp,
add_to_db_session=False,
)
self.process_instance_events[task_model.guid] = process_instance_event
timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time()
(
process_instance_event,
_process_instance_error_detail,
) = ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance,
event_type,
task_guid=task_model.guid,
timestamp=timestamp,
add_to_db_session=False,
)
self.process_instance_events[task_model.guid] = process_instance_event

self.update_bpmn_process(spiff_task.workflow, bpmn_process)
return task_model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import MessageInstanceCorrelationRuleModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.services.assertion_service import safe_assertion
from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService
Expand Down Expand Up @@ -202,6 +201,8 @@ def __init__(
self.spiff_tasks_to_process: set[UUID] = set()
self.spiff_task_timestamps: dict[UUID, StartAndEndTimes] = {}

self.run_started_at = time.time()

self.task_service = TaskService(
process_instance=self.process_instance,
serializer=self.serializer,
Expand Down Expand Up @@ -260,16 +261,16 @@ def add_object_to_db_session(self, bpmn_process_instance: BpmnWorkflow) -> None:
):
self.task_service.update_task_model_with_spiff_task(waiting_spiff_task)

cancelled_spiff_tasks = bpmn_process_instance.get_tasks(state=TaskState.CANCELLED)
cancelled_spiff_task_guids = [str(sp.id) for sp in cancelled_spiff_tasks]
existing_cancelled_events = ProcessInstanceEventModel.query.filter(
ProcessInstanceEventModel.task_guid.in_(cancelled_spiff_task_guids) # type: ignore
).all()
cancelled_spiff_task_guids_with_events = [event.task_guid for event in existing_cancelled_events]
# only process cancelled tasks that were cancelled during this run
# NOTE: this could mean we do not add task models that we should be adding
# in which case we may have to remove the updated_ts filter here and
# instead just avoid creating the event in update_task_model_with_spiff_task
cancelled_spiff_tasks = bpmn_process_instance.get_tasks(
state=TaskState.CANCELLED, updated_ts=self.run_started_at
)
for cancelled_spiff_task in cancelled_spiff_tasks:
self.task_service.update_task_model_with_spiff_task(
spiff_task=cancelled_spiff_task,
cancelled_spiff_task_guids_with_events=cancelled_spiff_task_guids_with_events,
)

self.task_service.save_objects_to_database()
Expand Down

0 comments on commit c090742

Please sign in to comment.