Skip to content

Commit

Permalink
Merge pull request #743 from Project-MONAI/AC-1914
Browse files Browse the repository at this point in the history
Ac 1914
  • Loading branch information
neildsouth authored Apr 14, 2023
2 parents 1c86cef + ff0c754 commit e972799
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 4 deletions.
11 changes: 9 additions & 2 deletions src/Shared/Configuration/MessageBrokerConfigurationKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,23 @@ public class MessageBrokerConfigurationKeys

/// <summary>
/// Gets or sets the topic for publishing task update events.
/// Defaults to `md.tasks.update`.
/// Defaults to `md.tasks.cancellation`.
/// </summary>
[ConfigurationKeyName("taskCancellation")]
public string TaskCancellationRequest { get; set; } = "md.tasks.cancellation";

/// <summary>
/// Gets or sets the topic for publishing clinical review request events.
/// Defaults to `md.tasks.update`.
/// Defaults to `aide.clinical_review.request`.
/// </summary>
[ConfigurationKeyName("aideClinicalReviewRequest")]
public string AideClinicalReviewRequest { get; set; } = "aide.clinical_review.request";

/// <summary>
/// Gets or sets the topic for publishing clinical review cancelation events.
/// Defaults to `aide.clinical_review.cancellation`.
/// </summary>
[ConfigurationKeyName("aideClinicalReviewCancelation")]
public string AideClinicalReviewCancelation { get; set; } = "aide.clinical_review.cancellation";
}
}
5 changes: 5 additions & 0 deletions src/Shared/Configuration/WorkflowManagerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using System;
using System.Collections.Generic;
using Microsoft.Extensions.Configuration;

namespace Monai.Deploy.WorkflowManager.Configuration
Expand Down Expand Up @@ -47,6 +48,10 @@ public class WorkflowManagerOptions : PagedOptions
[ConfigurationKeyName("taskTimeoutMinutes")]
public double TaskTimeoutMinutes { get; set; } = 60;

[ConfigurationKeyName("perTaskTypeTimeoutMinutes")]
public Dictionary<string, double> PerTaskTypeTimeoutMinutes { get; set; }


public TimeSpan TaskTimeout { get => TimeSpan.FromMinutes(TaskTimeoutMinutes); }

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion src/TaskManager/TaskManager/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
"exportComplete": "md.export.complete",
"exportRequestPrefix": "md.export.request",
"taskCallback": "md.tasks.callback",
"aideClinicalReviewRequest": "aide.clinical_review.request"
"aideClinicalReviewRequest": "aide.clinical_review.request",
"aideClinicalReviewCancelation": "aide.clinical_review.cancellation"
},
"dicomAgents": {
"dicomWebAgentName": "monaidicomweb",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System.Globalization;
using System.Linq;
using System.Runtime.CompilerServices;
using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -52,9 +53,11 @@ public class WorkflowExecuterService : IWorkflowExecuterService
private readonly IPayloadService _payloadService;
private readonly StorageServiceConfiguration _storageConfiguration;
private readonly double _defaultTaskTimeoutMinutes;
private readonly Dictionary<string, double> _defaultPerTaskTypeTimeoutMinutes = new Dictionary<string, double>();

private string TaskDispatchRoutingKey { get; }
private string ExportRequestRoutingKey { get; }
private string TaskTimeoutRoutingKey { get; }

public WorkflowExecuterService(
ILogger<WorkflowExecuterService> logger,
Expand All @@ -81,7 +84,9 @@ public WorkflowExecuterService(

_storageConfiguration = storageConfiguration.Value;
_defaultTaskTimeoutMinutes = configuration.Value.TaskTimeoutMinutes;
_defaultPerTaskTypeTimeoutMinutes = configuration.Value.PerTaskTypeTimeoutMinutes;
TaskDispatchRoutingKey = configuration.Value.Messaging.Topics.TaskDispatchRequest;
TaskTimeoutRoutingKey = configuration.Value.Messaging.Topics.AideClinicalReviewCancelation;
ExportRequestRoutingKey = $"{configuration.Value.Messaging.Topics.ExportRequestPrefix}.{configuration.Value.Messaging.DicomAgents.ScuAgentName}";

_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand Down Expand Up @@ -275,6 +280,7 @@ public async Task<bool> ProcessTaskUpdate(TaskUpdateEvent message)
if (message.Reason == FailureReason.TimedOut && currentTask.Status == TaskExecutionStatus.Failed)
{
_logger.TaskTimedOut(message.TaskId, message.WorkflowInstanceId, currentTask.Timeout);
await TimeOutEvent(workflowInstance, currentTask, message.CorrelationId);

return false;
}
Expand Down Expand Up @@ -750,6 +756,15 @@ private async Task<bool> ExportRequest(WorkflowInstance workflowInstance, TaskEx
return true;
}

private async Task<bool> TimeOutEvent(WorkflowInstance workflowInstance, TaskExecution taskExec, string correlationId)
{
var exportRequestEvent = EventMapper.GenerateTaskCancellationEvent("", taskExec.ExecutionId, workflowInstance.Id, taskExec.TaskId, FailureReason.TimedOut, "Timed out");
var jsonMesssage = new JsonMessage<TaskCancellationEvent>(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, correlationId, Guid.NewGuid().ToString());

await _messageBrokerPublisherService.Publish(TaskTimeoutRoutingKey, jsonMesssage.ToMessage());
return true;
}

private async Task<WorkflowInstance> CreateWorkflowInstanceAsync(WorkflowRequestEvent message, WorkflowRevision workflow)
{
Guard.Against.Null(message, nameof(message));
Expand Down Expand Up @@ -822,6 +837,10 @@ public async Task<TaskExecution> CreateTaskExecutionAsync(TaskObject task,
{
task.TimeoutMinutes = _defaultTaskTimeoutMinutes;
}
if (_defaultPerTaskTypeTimeoutMinutes is not null && _defaultPerTaskTypeTimeoutMinutes.ContainsKey(task.Type))
{
task.TimeoutMinutes = _defaultPerTaskTypeTimeoutMinutes[task.Type];
}

var inputArtifacts = new Dictionary<string, string>();
var artifactFound = true;
Expand Down
3 changes: 3 additions & 0 deletions src/WorkflowManager/WorkflowManager/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"TaskManager": {
"concurrency": 1,
"taskTimeoutMinutes": 60,
"perTaskTypeTimeoutMinutes": {
"aide_clinical_review": 5760
},
"plug-ins": {
"argo": "Monai.Deploy.WorkflowManager.TaskManager.Argo.ArgoPlugin, Monai.Deploy.WorkflowManager.TaskManager.Argo",
"aide_clinical_review": "Monai.Deploy.WorkflowManager.TaskManager.AideClinicalReview.AideClinicalReviewPlugin, Monai.Deploy.WorkflowManager.TaskManager.AideClinicalReview",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class WorkflowExecuterServiceTests
private readonly Mock<IWorkflowService> _workflowService;
private readonly IOptions<WorkflowManagerOptions> _configuration;
private readonly IOptions<StorageServiceConfiguration> _storageConfiguration;
private readonly int _timeoutForTypeTask = 999;
private readonly int _timeoutForDefault = 966;

public WorkflowExecuterServiceTests()
{
Expand All @@ -71,7 +73,7 @@ public WorkflowExecuterServiceTests()
_payloadService = new Mock<IPayloadService>();
_workflowService = new Mock<IWorkflowService>();

_configuration = Options.Create(new WorkflowManagerOptions() { Messaging = new MessageBrokerConfiguration { Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request" }, DicomAgents = new DicomAgentConfiguration { DicomWebAgentName = "monaidicomweb" } } });
_configuration = Options.Create(new WorkflowManagerOptions() { TaskTimeoutMinutes = _timeoutForDefault, PerTaskTypeTimeoutMinutes = new Dictionary<string, double> { { "taskType", _timeoutForTypeTask } }, Messaging = new MessageBrokerConfiguration { Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request" }, DicomAgents = new DicomAgentConfiguration { DicomWebAgentName = "monaidicomweb" } } });
_storageConfiguration = Options.Create(new StorageServiceConfiguration() { Settings = new Dictionary<string, string> { { "bucket", "testbucket" }, { "endpoint", "localhost" }, { "securedConnection", "False" } } });

var dicom = new Mock<IDicomService>();
Expand Down Expand Up @@ -1906,6 +1908,65 @@ public async Task ProcessTaskUpdate_ValidTaskUpdateEventWorkflowDoesNotExist_Ret
response.Should().BeTrue();
}

[Fact]
public async Task ProcessTaskUpdate_Timout_Sends_Message_To_TaskTimeoutRoutingKey()
{
var workflowInstanceId = Guid.NewGuid().ToString();

var metadata = new Dictionary<string, object>();
metadata.Add("a", "b");
metadata.Add("c", "d");

var updateEvent = new TaskUpdateEvent
{
WorkflowInstanceId = workflowInstanceId,
TaskId = "pizza",
ExecutionId = Guid.NewGuid().ToString(),
Status = TaskExecutionStatus.Failed,
Reason = FailureReason.TimedOut,
Message = "This is a message",
Metadata = metadata,
CorrelationId = Guid.NewGuid().ToString()
};

var workflowId = Guid.NewGuid().ToString();

var workflow = new WorkflowRevision
{
Id = Guid.NewGuid().ToString(),
WorkflowId = workflowId,
Revision = 1,
Workflow = new Workflow
{
Name = "Workflowname2",
Description = "Workflowdesc2",
Version = "1",
}
};

var workflowInstance = new WorkflowInstance
{
Id = workflowInstanceId,
WorkflowId = workflowId,
WorkflowName = workflow.Workflow.Name,
PayloadId = Guid.NewGuid().ToString(),
Status = Status.Created,
BucketId = "bucket",
Tasks = new List<TaskExecution>
{
new TaskExecution
{
TaskId = "pizza",
Status = TaskExecutionStatus.Failed
}
}
};

_workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(workflowInstance.Id)).ReturnsAsync(workflowInstance);
var response = await WorkflowExecuterService.ProcessTaskUpdate(updateEvent);
_messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.AideClinicalReviewCancelation, It.IsAny<Message>()), Times.Exactly(1));
}

[Fact]
public async Task ProcessExportComplete_ValidExportCompleteEventMultipleTaskDestinationsDispatched_ReturnsTrue()
{
Expand Down Expand Up @@ -2259,5 +2320,57 @@ public void AttachPatientMetaData_AtachesDataToTaskExec_TaskExecShouldHavePatien
taskExec.TaskPluginArguments[PatientKeys.PatientHospitalId].Should().BeSameAs(patientDetails.PatientHospitalId);
taskExec.TaskPluginArguments[PatientKeys.PatientName].Should().BeSameAs(patientDetails.PatientName);
}

[Fact]
public async Task TaskExecShouldHaveCorrectTimeout()
{
var workflowId = Guid.NewGuid().ToString();
var payloadId = Guid.NewGuid().ToString();
var workflowInstanceId = Guid.NewGuid().ToString();

var pizzaTask = new TaskObject
{
Id = "pizza",
Type = "taskType",
Description = "taskdesc",
};

var workflowInstance = new WorkflowInstance
{
Id = workflowInstanceId,
WorkflowId = workflowId,
};
var bucket = "bucket";

var newPizza = await WorkflowExecuterService.CreateTaskExecutionAsync(pizzaTask, workflowInstance, bucket, payloadId);
Assert.Equal(_timeoutForTypeTask, newPizza.TimeoutInterval);

}

[Fact]
public async Task TaskExecShouldPickUpConfiguredDefaultTimeout()
{
var workflowId = Guid.NewGuid().ToString();
var payloadId = Guid.NewGuid().ToString();
var workflowInstanceId = Guid.NewGuid().ToString();

var pizzaTask = new TaskObject
{
Id = "pizza",
Type = "someothertype",
Description = "taskdesc",
};

var workflowInstance = new WorkflowInstance
{
Id = workflowInstanceId,
WorkflowId = workflowId,
};
var bucket = "bucket";

var newPizza = await WorkflowExecuterService.CreateTaskExecutionAsync(pizzaTask, workflowInstance, bucket, payloadId);
Assert.Equal(_timeoutForDefault, newPizza.TimeoutInterval);

}
}
}

0 comments on commit e972799

Please sign in to comment.