Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Sending of Webhooks #61

Merged
merged 4 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@ public UserCreatedWebhookFactory(IUserResolver userResolver) {
this.userResolver = userResolver;
}

public async Task<IdentityWebhook> CreateAsync(IWebhookSubscription subscription, EventInfo eventInfo, CancellationToken cancellationToken = default) {
var userCreated = (UserCreatedEvent?)eventInfo.Data;
public async Task<IList<IdentityWebhook>> CreateAsync(IWebhookSubscription subscription, EventNotification notification, CancellationToken cancellationToken = default) {
var @event = notification.Events[0];

var userCreated = (UserCreatedEvent?)@event.Data;
var user = await userResolver.ResolveUserAsync(userCreated!.UserId, cancellationToken);

if (user == null)
throw new InvalidOperationException();

return new IdentityWebhook {
EventId = eventInfo.Id,
EventType = "user_created",
TimeStamp = eventInfo.TimeStamp,
User = user
return new [] {
new IdentityWebhook {
EventId = @event.Id,
EventType = @event.EventType,
TimeStamp = @event.TimeStamp,
User = user
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections.Concurrent;
using System.Linq.Dynamic.Core;
using System.Linq.Expressions;

Expand All @@ -23,7 +24,7 @@ namespace Deveel.Webhooks {
/// </summary>
/// <typeparam name="TWebhook"></typeparam>
public sealed class LinqWebhookFilterEvaluator<TWebhook> : IWebhookFilterEvaluator<TWebhook> where TWebhook : class {
private readonly IDictionary<string, Func<object, bool>> filterCache;
private readonly IDictionary<FilterKey, Func<object, bool>> filterCache;
private readonly WebhookSenderOptions<TWebhook> senderOptions;

/// <summary>
Expand All @@ -35,7 +36,7 @@ public sealed class LinqWebhookFilterEvaluator<TWebhook> : IWebhookFilterEvaluat
/// the filter evaluator.
/// </param>
public LinqWebhookFilterEvaluator(IOptions<WebhookSenderOptions<TWebhook>> senderOptions) {
filterCache = new Dictionary<string, Func<object, bool>>();
filterCache = new ConcurrentDictionary<FilterKey, Func<object, bool>>();
this.senderOptions = senderOptions.Value;
}

Expand All @@ -51,15 +52,16 @@ static LinqWebhookFilterEvaluator() {
string IWebhookFilterEvaluator<TWebhook>.Format => "linq";

private Func<object, bool> Compile(Type objType, string filter) {
if (!filterCache.TryGetValue(filter, out var compiled)) {
var key = new FilterKey(objType.FullName!, filter);
if (!filterCache.TryGetValue(key, out var compiled)) {
var config = ParsingConfig.Default;

var parameters = new[] {
Expression.Parameter(objType, "hook")
};
var parsed = DynamicExpressionParser.ParseLambda(config, parameters, typeof(bool), filter).Compile();
compiled = hook => (bool)(parsed.DynamicInvoke(hook)!);
filterCache[filter] = compiled;
filterCache[key] = compiled;
}

return compiled;
Expand All @@ -84,10 +86,8 @@ private Func<object, bool> Compile(Type objType, IList<string> filters) {

/// <inheritdoc/>
public async Task<bool> MatchesAsync(WebhookSubscriptionFilter filter, TWebhook webhook, CancellationToken cancellationToken) {
if (filter is null)
throw new ArgumentNullException(nameof(filter));
if (webhook is null)
throw new ArgumentNullException(nameof(webhook));
ArgumentNullException.ThrowIfNull(filter, nameof(filter));
ArgumentNullException.ThrowIfNull(webhook, nameof(webhook));

if (filter.FilterFormat != "linq")
throw new ArgumentException($"Filter format '{filter.FilterFormat}' not supported by the LINQ evaluator");
Expand All @@ -111,7 +111,27 @@ public async Task<bool> MatchesAsync(WebhookSubscriptionFilter filter, TWebhook
} catch(Exception ex) {
throw new WebhookException("Unable to evaluate the filter", ex);
}
}

readonly struct FilterKey {
public FilterKey(string typeName, string filter) : this() {
TypeName = typeName;
Filter = filter;
}

public string TypeName { get; }

public string Filter { get; }

public override bool Equals(object? obj) {
return obj is FilterKey key &&
TypeName == key.TypeName &&
Filter == key.Filter;
}

public override int GetHashCode() {
return HashCode.Combine(TypeName, Filter);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class DefaultMongoWebhookConverter<TWebhook> : IMongoWebhookConverter<TWe
/// Converts the given <see cref="EventInfo"/> and the webhook object into
/// a <see cref="MongoWebhook"/> object.
/// </summary>
/// <param name="eventInfo">
/// The event information that is being notified.
/// <param name="notification">
/// The event notification that was sent to the subscribers.
/// </param>
/// <param name="webhook">
/// The webhook that was notified to the subscribers.
Expand All @@ -42,7 +42,7 @@ public class DefaultMongoWebhookConverter<TWebhook> : IMongoWebhookConverter<TWe
/// Returns an instance of <see cref="MongoWebhook"/> that represents the
/// webhook that can be stored into the database.
/// </returns>
public MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) {
public MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook) {
if (webhook is IWebhook obj) {
return new MongoWebhook {
WebhookId = obj.Id,
Expand All @@ -52,10 +52,14 @@ public MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) {
};
}

// TODO: we should support multiple events in a single notification

var firstEvent = notification.Events.First();

return new MongoWebhook {
EventType = eventInfo.EventType,
TimeStamp = eventInfo.TimeStamp,
WebhookId = eventInfo.Id,
EventType = notification.EventType,
TimeStamp = firstEvent.TimeStamp,
WebhookId = notification.NotificationId,
Data = BsonValueUtil.ConvertData(webhook)
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ public interface IMongoWebhookConverter<TWebhook>
/// Converts the given webhook to an object that can be stored
/// in a MongoDB database.
/// </summary>
/// <param name="eventInfo">
/// The information about the event that triggered the
/// notification of the webhook.
/// <param name="notification">
/// The event notification that was sent to the subscribers.
/// </param>
/// <param name="webhook">
/// The instance of the webhook to be converted.
Expand All @@ -37,6 +36,6 @@ public interface IMongoWebhookConverter<TWebhook>
/// Returns an instance of <see cref="MongoWebhook"/>
/// that can be stored in a MongoDB database.
/// </returns>
MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook);
MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections;
using System.Reflection;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

using MongoDB.Bson;

namespace Deveel.Webhooks {
/// <summary>
/// An implementation of <see cref="IWebhookDeliveryResultLogger{TWebhook}"/> that
Expand Down Expand Up @@ -75,6 +70,9 @@ public MongoDbWebhookDeliveryResultLogger(
/// Converts the given result to an object that can be stored in the
/// MongoDB database collection.
/// </summary>
/// <param name="notification">
/// The aggregate of the events that are being delivered to the receiver.
/// </param>
/// <param name="eventInfo">
/// The information about the event that triggered the delivery of the webhook.
/// </param>
Expand All @@ -87,14 +85,14 @@ public MongoDbWebhookDeliveryResultLogger(
/// <returns>
/// Returns an object that can be stored in the MongoDB database collection.
/// </returns>
protected virtual TResult ConvertResult(EventInfo eventInfo, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result) {
protected virtual TResult ConvertResult(EventNotification notification, EventInfo eventInfo, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result) {
var obj = new TResult();

obj.TenantId = subscription.TenantId;
obj.OperationId = result.OperationId;
obj.EventInfo = CreateEvent(eventInfo);
obj.Receiver = CreateReceiver(subscription);
obj.Webhook = ConvertWebhook(eventInfo, result.Webhook);
obj.Webhook = ConvertWebhook(notification, result.Webhook);
obj.DeliveryAttempts = result.Attempts?.Select(ConvertDeliveryAttempt).ToList()
?? new List<MongoWebhookDeliveryAttempt>();

Expand Down Expand Up @@ -162,8 +160,8 @@ protected virtual MongoWebhookDeliveryAttempt ConvertDeliveryAttempt(WebhookDeli
/// Converts the given webhook to an object that can be stored in the
/// MongoDB database collection.
/// </summary>
/// <param name="eventInfo">
/// The information about the event that triggered the delivery of the webhook.
/// <param name="notification">
/// The aggregate of the events that are being delivered to the receiver.
/// </param>
/// <param name="webhook">
/// The instance of the webhook to convert.
Expand All @@ -175,15 +173,15 @@ protected virtual MongoWebhookDeliveryAttempt ConvertDeliveryAttempt(WebhookDeli
/// Thrown when the given type of webhook is not supported by this instance and
/// no converter was provided.
/// </exception>
protected virtual MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) {
protected virtual MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook) {
if (webhookConverter != null)
return webhookConverter.ConvertWebhook(eventInfo, webhook);
return webhookConverter.ConvertWebhook(notification, webhook);

throw new NotSupportedException("The given type of webhook is not supported by this instance of the logger");
}

/// <inheritdoc/>
public async Task LogResultAsync(EventInfo eventInfo, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result, CancellationToken cancellationToken) {
public async Task LogResultAsync(EventNotification notification, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result, CancellationToken cancellationToken) {
ArgumentNullException.ThrowIfNull(result, nameof(result));
ArgumentNullException.ThrowIfNull(subscription, nameof(subscription));

Expand All @@ -195,11 +193,11 @@ public async Task LogResultAsync(EventInfo eventInfo, IWebhookSubscription subsc
typeof(TWebhook), subscription.TenantId);

try {
var resultObj = ConvertResult(eventInfo, subscription, result);
var results = notification.Select(e => ConvertResult(notification, e, subscription, result));

var repository = await RepositoryProvider.GetRepositoryAsync(subscription.TenantId, cancellationToken);

await repository.AddAsync(resultObj, cancellationToken);
await repository.AddRangeAsync(results, cancellationToken);
} catch (Exception ex) {
Logger.LogError(ex, "Could not log the result of the delivery of the Webhook of type '{WebhookType}' for tenant '{TenantId}' because of an error",
typeof(TWebhook), subscription.TenantId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook,
/// <returns>
/// Returns the current instance of the builder for chaining.
/// </returns>
public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook>(Func<EventInfo, TWebhook, MongoWebhook> converter)
public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook>(Func<EventNotification, TWebhook, MongoWebhook> converter)
where TWebhook : class {

Services.AddSingleton<IMongoWebhookConverter<TWebhook>>(new MongoWebhookConverterWrapper<TWebhook>(converter));
Expand All @@ -211,13 +211,14 @@ public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook>
}

private class MongoWebhookConverterWrapper<TWebhook> : IMongoWebhookConverter<TWebhook> where TWebhook : class {
private readonly Func<EventInfo, TWebhook, MongoWebhook> converter;
private readonly Func<EventNotification, TWebhook, MongoWebhook> converter;

public MongoWebhookConverterWrapper(Func<EventInfo, TWebhook, MongoWebhook> converter) {
public MongoWebhookConverterWrapper(Func<EventNotification, TWebhook, MongoWebhook> converter) {
this.converter = converter;
}

public MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) => converter.Invoke(eventInfo, webhook);
public MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook)
=> converter.Invoke(notification, webhook);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public async Task<IList<IWebhookSubscription>> ResolveSubscriptionsAsync(string
var list = await GetCachedAsync(eventType, cancellationToken);

if (list == null) {
logger.LogTrace("No webhook subscriptions to event {EventType} of tenant {TenantId} were found in cache", eventType);
logger.LogTrace("No webhook subscriptions to event {EventType} were found in cache", eventType);

var result = await repository.GetByEventTypeAsync(eventType, activeOnly, cancellationToken);
list = result.Cast<IWebhookSubscription>().ToList();
Expand Down
4 changes: 4 additions & 0 deletions src/Deveel.Webhooks/Webhooks/DefaultWebhookFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Microsoft.Extensions.Options;

namespace Deveel.Webhooks {
/// <summary>
/// A default implementation of the <see cref="IWebhookFactory{TWebhook}"/>
/// that creates a <see cref="Webhook"/> instance using the information
/// provided by the subscription and the event.
/// </summary>
public sealed class DefaultWebhookFactory : DefaultWebhookFactory<Webhook> {
public DefaultWebhookFactory(IOptions<WebhookFactoryOptions<Webhook>> options) : base(options) {

Check warning on line 24 in src/Deveel.Webhooks/Webhooks/DefaultWebhookFactory.cs

View workflow job for this annotation

GitHub Actions / Build and Test

Missing XML comment for publicly visible type or member 'DefaultWebhookFactory.DefaultWebhookFactory(IOptions<WebhookFactoryOptions<Webhook>>)'

Check warning on line 24 in src/Deveel.Webhooks/Webhooks/DefaultWebhookFactory.cs

View workflow job for this annotation

GitHub Actions / Build and Test

Missing XML comment for publicly visible type or member 'DefaultWebhookFactory.DefaultWebhookFactory(IOptions<WebhookFactoryOptions<Webhook>>)'
}
}
}
Loading
Loading