Skip to content

Commit

Permalink
Implement processing & monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dsame committed Nov 14, 2023
1 parent 3eeea9c commit 878273c
Show file tree
Hide file tree
Showing 15 changed files with 176 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public async Task Should_ReturnUri()
var provider = _containerFixture.GetProvider();
var fileObject = _fileProviderFixture.GetFileObject("test.bin",1);
// Act
var actual = await provider.StoreBlobAsync(fileObject.Name, fileObject.Path);
var actual = await provider.UploadBlobAsync(fileObject.Name, fileObject.Path);
_containerFixture.DisposableBag.Add(fileObject.Name);
// Assert
var readResponse = await _containerFixture.ContainerClient.GetBlobClient(fileObject.Name).ExistsAsync();
Expand All @@ -42,7 +42,7 @@ public async Task Should_ThrowException()
var provider = _containerFixture.GetProvider();
var fileObject = new IFileProviderService.FileObject("test.bin","/nonexistent");
// Act
var exception = await Record.ExceptionAsync(() => provider.StoreBlobAsync(fileObject.Name, fileObject.Path));
var exception = await Record.ExceptionAsync(() => provider.UploadBlobAsync(fileObject.Name, fileObject.Path));
// Assert
Assert.NotNull(exception);
Assert.IsType<AggregateException>(exception);
Expand Down
19 changes: 19 additions & 0 deletions az-appservice-dotnet/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using az_appservice_dotnet.services.v1;
using az_appservice_dotnet.services.v1.Blob;
using az_appservice_dotnet.services.v1.Blob.dependencies;
using az_appservice_dotnet.services.v1.ImageProcessing;
using az_appservice_dotnet.services.v1.Monitor;
using az_appservice_dotnet.services.v1.State;
using az_appservice_dotnet.services.v1.State.dependencies;
using az_appservice_dotnet.services.v1.UploadedFiles;
Expand All @@ -19,12 +21,16 @@ public static async Task Main(string[] args)
builder.Services.AddSingleton<IBlobProvider, AzureBlobProvider>();
builder.Services.AddSingleton<IPersistProcessingStateProvider, CosmosDbPersistProcessingStateProvider>();
builder.Services.AddSingleton<IPublishProcessingStateProvider, AzureSbPublishProcessingStateProvider>();
builder.Services.AddSingleton<ISubscribeProcessingStateProvider, AzureSbSubscribeProcessingStateProvider>();

builder.Services.AddSingleton<IFileProviderService, FakeFileProviderService>();
builder.Services.AddSingleton<IBlobService, BlobService>();
builder.Services.AddSingleton<IProcessingStateService, ProcessingStateService>();
builder.Services.AddSingleton<IStateMonitor, ConsoleMonitor>();
builder.Services.AddSingleton<IImageProcessorService, NullImageProcessorService>();

builder.Services.AddSingleton<ProducerService>();
builder.Services.AddSingleton<ProcessorService>();

var app = builder.Build();

Expand All @@ -37,7 +43,20 @@ public static async Task Main(string[] args)

app.MapGroup("/1")
.MapApi1(app);

var monitorService = app.Services.GetService<IStateMonitor>();
if (monitorService == null)
{
throw new Exception("StateMonitor is not registered");
}
monitorService.StartStateMonitor();

var processorService = app.Services.GetService<ProcessorService>();
if (processorService == null)
{
throw new Exception("ProcessorService is not registered");
}
processorService.StartWaitForImagesToProcess();
await app.RunAsync();
}

Expand Down
14 changes: 12 additions & 2 deletions az-appservice-dotnet/providers/Azure/v1/AzureBlobProvider.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using az_appservice_dotnet.services;
using az_appservice_dotnet.services.v1.Blob.dependencies;
using Azure.Storage.Blobs;

Expand All @@ -23,7 +22,7 @@ public AzureBlobProvider(BlobContainerClient containerClient)
_containerClient = containerClient;
}

public Task<Uri> StoreBlobAsync(string name, string localFilePath)
public Task<Uri> UploadBlobAsync(string name, string localFilePath)
{
var blobClient = _containerClient.GetBlobClient(name);
return blobClient.UploadAsync(localFilePath, true)
Expand All @@ -33,4 +32,15 @@ public Task<Uri> StoreBlobAsync(string name, string localFilePath)
return blobClient.Uri;
});
}

public Task<bool> DownloadBlobAsync(string name, string localFilePath)
{
var blobClient = _containerClient.GetBlobClient(name);
return blobClient.DownloadToAsync(localFilePath)
.ContinueWith(task =>
{
if (task.Exception != null) throw task.Exception;
return true;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@

namespace az_appservice_dotnet.providers.Azure.v1;

/**
* <summary>
* Known bug/feature: the provider makes a lookup to IPersistProcessingStateProvider in order
* to find the state by the ID extracted from the message. By the moment the message is
* read from the queue, the state might be already changed in the database.
*
* So the message sent for change to State1 can cause the propagation of the State2. It would not
* be a problem if the State2 had not its own message sent to the queue. In this case the subscriber
* will receive 2 notification for State2 and none for State1.
*
* Generally speaking, it it correct behaviour, but still might cause the problem with handling the
* some states twice and missing the others.
* </summary>
*/

public class AzureSbSubscribeProcessingStateProvider : ISubscribeProcessingStateProvider
{
private readonly ServiceBusProcessor _processor;
Expand Down
11 changes: 9 additions & 2 deletions az-appservice-dotnet/services/v1/Blob/BlobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace az_appservice_dotnet.services.v1.Blob;
* <summary>
* This is a wrapper around a blob provider to implement some possible business logic common
* for differnt blob providers.
*
* For ex. logging error handling should be placed here.
* </summary>
*/
public class BlobService: IBlobService
Expand All @@ -17,8 +19,13 @@ public BlobService(IBlobProvider blobProvider)
_blobProvider = blobProvider;
}

public Task<Uri> StoreBlobAsync(string name, string localFilePath)
public Task<Uri> UploadBlobAsync(string name, string localFilePath)
{
return _blobProvider.StoreBlobAsync(name, localFilePath);
return _blobProvider.UploadBlobAsync(name, localFilePath);
}

public Task<bool> DownloadBlobAsync(string name, string localFilePath)
{
return _blobProvider.DownloadBlobAsync(name, localFilePath);
}
}
3 changes: 2 additions & 1 deletion az-appservice-dotnet/services/v1/Blob/IBlobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ namespace az_appservice_dotnet.services.v1.Blob;

public interface IBlobService
{
Task<Uri> StoreBlobAsync(string name, string localFilePath);
Task<Uri> UploadBlobAsync(string name, string localFilePath);
Task<bool> DownloadBlobAsync(string name, string localFilePath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ namespace az_appservice_dotnet.services.v1.Blob.dependencies;

public interface IBlobProvider
{
Task<Uri> StoreBlobAsync(string name, string localFilePath);
Task<Uri> UploadBlobAsync(string name, string localFilePath);
Task<Boolean> DownloadBlobAsync(string name, string localFilePath);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace az_appservice_dotnet.services.v1.ImageProcessing;

public interface IImageProcessorService
{
Task<string> ProcessImageAsync(string imageFilePath);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace az_appservice_dotnet.services.v1.ImageProcessing;

public class NullImageProcessorService: IImageProcessorService
{
public Task<string> ProcessImageAsync(string imageFilePath)
{
if (!File.Exists(imageFilePath))
{
throw new FileNotFoundException($"NullImageProcessorService: File not found: {imageFilePath}");
}

return Task.Run(() =>
{
// get tmp file path
var tmpFilePath = Path.GetTempFileName();
// copy imageFilePath to tmpFilePath
File.Copy(imageFilePath, tmpFilePath, true);
return tmpFilePath;
});
}
}
21 changes: 21 additions & 0 deletions az-appservice-dotnet/services/v1/Monitor/ConsoleMonitor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using az_appservice_dotnet.services.v1.State;

namespace az_appservice_dotnet.services.v1.Monitor;

public class ConsoleMonitor: IStateMonitor
{
private readonly IProcessingStateService _processingStateService;

public ConsoleMonitor(IProcessingStateService processingStateService)
{
_processingStateService = processingStateService;
}

public void StartStateMonitor()
{
_processingStateService.ListenToStateChanges(state =>
{
Console.WriteLine($"State changed to {state.Status}: file={state.FileName}, originalUrl={state.OriginalFileUrl}, processedUrl={state.ProcessedFileUrl}");
});
}
}
6 changes: 6 additions & 0 deletions az-appservice-dotnet/services/v1/Monitor/IStateMonitor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace az_appservice_dotnet.services.v1.Monitor;

public interface IStateMonitor
{
void StartStateMonitor();
}
54 changes: 54 additions & 0 deletions az-appservice-dotnet/services/v1/ProcessorService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using az_appservice_dotnet.services.v1.Blob;
using az_appservice_dotnet.services.v1.ImageProcessing;
using az_appservice_dotnet.services.v1.State;

namespace az_appservice_dotnet.services.v1;

public class ProcessorService
{
private IBlobService _blobService;
private IProcessingStateService _processingStateService;
private IImageProcessorService _imageProcessorService;

public ProcessorService(IBlobService blobService, IProcessingStateService processingStateService,
IImageProcessorService imageProcessorService)
{
_imageProcessorService = imageProcessorService;
_blobService = blobService;
_processingStateService = processingStateService;
}

public void StartWaitForImagesToProcess()
{
_processingStateService.ListenToStateChanges(async state =>
{
if (state.Status == IProcessingStateService.Status.WaitingForProcessing)
{
Console.WriteLine(state.OriginalFileUrl);
var tmpFilePath = Path.GetTempFileName();
var stateProcessing = await _processingStateService.MoveToProcessingStateAsync(state);
try
{
var ok = await _blobService.DownloadBlobAsync(state.FileName, tmpFilePath);
var processedFilePath = await _imageProcessorService.ProcessImageAsync(tmpFilePath);
var uploadedUri =
await _blobService.UploadBlobAsync($"processed-{state.FileName}", processedFilePath);
await _processingStateService.MoveToCompletedStateAsync(stateProcessing, uploadedUri.ToString());
}
catch (Exception e)
{
await _processingStateService.MoveToFailedStateAsync(stateProcessing, e.Message);
Console.WriteLine(e);
throw;
}
finally
{
if (File.Exists(tmpFilePath))
{
File.Delete(tmpFilePath);
}
}
}
});
}
}
2 changes: 1 addition & 1 deletion az-appservice-dotnet/services/v1/ProducerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void StartProcessImage(IFileProviderService.FileObject fileObject)
var state2 = await _processingStateService.MoveToUploadingStateAsync(state1);
try
{
var blobUri = await _blobService.StoreBlobAsync(fileObject.Name, fileObject.Path);
var blobUri = await _blobService.UploadBlobAsync(fileObject.Name, fileObject.Path);
// TODO: not sure is it necessary to await the last call
await _processingStateService.MoveToWaitingForProcessingStateAsync(state2, blobUri.ToString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public State WithFailedStatus(string? failureReason)
}
}

public delegate void StateChangeHandler(in State state);
public delegate void StateChangeHandler(State state);

public Task<State> CreateInitialState(in TaskId taskId, in string fileName);
public Task<State> MoveToUploadingStateAsync(in State state);
Expand All @@ -109,5 +109,5 @@ public State WithFailedStatus(string? failureReason)
public Task<State> MoveToFailedStateAsync(in State state, string? failureReason);
public Task<ImmutableDictionary<StateId, State>> GetStates();

public void ListenToStateChanges(in StateChangeHandler onStateChange);
public void ListenToStateChanges(StateChangeHandler onStateChange);
}
23 changes: 4 additions & 19 deletions az-appservice-dotnet/services/v1/State/ProcessingStateService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace az_appservice_dotnet.services.v1.State;

public class ProcessingStateService : IProcessingStateService, IDisposable
public class ProcessingStateService : IProcessingStateService
{
private readonly IPublishProcessingStateProvider _publishProcessingStateProvider;
private readonly ISubscribeProcessingStateProvider _subscribeProcessingStateProvider;
Expand Down Expand Up @@ -83,24 +83,9 @@ private IProcessingStateService.State PublishTask(in Task<IProcessingStateServic
return _persistProcessingStateProvider.ListStatesAsync();
}


private IProcessingStateService.StateChangeHandler? _stateChangeHandler;
public void ListenToStateChanges(in IProcessingStateService.StateChangeHandler onStateChange)
public void ListenToStateChanges(IProcessingStateService.StateChangeHandler onStateChange)
{
if (_stateChangeHandler != null)
{
_subscribeProcessingStateProvider.RemoveStateChangeHandler(_stateChangeHandler);
}

_stateChangeHandler = onStateChange;
_subscribeProcessingStateProvider.AddStateChangeHandler(_stateChangeHandler);
}

public void Dispose()
{
if (_stateChangeHandler != null)
{
_subscribeProcessingStateProvider.RemoveStateChangeHandler(_stateChangeHandler);
}
_subscribeProcessingStateProvider.RemoveStateChangeHandler(onStateChange);
_subscribeProcessingStateProvider.AddStateChangeHandler(onStateChange);
}
}

0 comments on commit 878273c

Please sign in to comment.