Skip to content

Commit

Permalink
fix(repeater): msgpack serialization
Browse files Browse the repository at this point in the history
fixes #165
  • Loading branch information
derevnjuk committed Oct 4, 2023
1 parent b664a7e commit 44886cc
Show file tree
Hide file tree
Showing 15 changed files with 131 additions and 87 deletions.
2 changes: 1 addition & 1 deletion src/SecTester.Repeater/Bus/DefaultRepeaterBusFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public IRepeaterBus Create(string repeaterId)
{
Serializer = new SocketIOMessagePackSerializer()
};
var wrapper = new SocketIoClientWrapper(client);
var wrapper = new SocketIoConnection(client);

var scope = _scopeFactory.CreateAsyncScope();
var timerProvider = scope.ServiceProvider.GetRequiredService<ITimerProvider>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

namespace SecTester.Repeater.Bus;

internal interface ISocketIoClient : IDisposable
internal interface ISocketIoConnection : IDisposable
{
public bool Connected { get; }
public Task Connect();
public Task Disconnect();
public void On(string eventName, Action<ISocketIoResponse> callback);
public void On(string eventName, Action<ISocketIoMessage> callback);
public void Off(string eventName);
public Task EmitAsync(string eventName, params object[] data);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

namespace SecTester.Repeater.Bus;

internal interface ISocketIoResponse
internal interface ISocketIoMessage
{
public T GetValue<T>(int i = 0);
public T GetValue<T>(int index = 0);
public Task CallbackAsync(params object[] data);
public Task CallbackAsync(CancellationToken cancellationToken, params object[] data);
}
13 changes: 7 additions & 6 deletions src/SecTester.Repeater/Bus/IncomingRequest.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using MessagePack;
using SecTester.Core.Bus;
using SecTester.Repeater.Runners;

namespace SecTester.Repeater.Bus;

[MessagePackObject(true)]
public record IncomingRequest(Uri Url) : Event, IRequest
{
public string? Body { get; init; }
public HttpMethod Method { get; init; } = HttpMethod.Get;
public Protocol Protocol { get; init; } = Protocol.Http;
public Uri Url { get; init; } = Url ?? throw new ArgumentNullException(nameof(Url));

public IEnumerable<KeyValuePair<string, IEnumerable<string>>> Headers { get; init; } =
public string? Body { get; set; }
public HttpMethod Method { get; set; } = HttpMethod.Get;
public Protocol Protocol { get; set; } = Protocol.Http;
public Uri Url { get; set; } = Url ?? throw new ArgumentNullException(nameof(Url));
public IEnumerable<KeyValuePair<string, IEnumerable<string>>> Headers { get; set; } =
new List<KeyValuePair<string, IEnumerable<string>>>();
}
18 changes: 8 additions & 10 deletions src/SecTester.Repeater/Bus/OutgoingResponse.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
using System.Collections.Generic;
using MessagePack;
using SecTester.Repeater.Runners;

namespace SecTester.Repeater.Bus;

[MessagePackObject(true)]
public record OutgoingResponse : IResponse
{
public int? StatusCode { get; init; }

public string? Body { get; init; }
public string? Message { get; init; }

public string? ErrorCode { get; init; }

public Protocol Protocol { get; init; } = Protocol.Http;

public IEnumerable<KeyValuePair<string, IEnumerable<string>>> Headers { get; init; } =
public int? StatusCode { get; set; }
public string? Body { get; set; }
public string? Message { get; set; }
public string? ErrorCode { get; set; }
public Protocol Protocol { get; set; } = Protocol.Http;
public IEnumerable<KeyValuePair<string, IEnumerable<string>>> Headers { get; set; } =
new List<KeyValuePair<string, IEnumerable<string>>>();
}
9 changes: 9 additions & 0 deletions src/SecTester.Repeater/Bus/RepeaterError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using MessagePack;

namespace SecTester.Repeater.Bus;

[MessagePackObject(true)]
public sealed record RepeaterError
{
public string Message { get; set; } = null!;
}
9 changes: 9 additions & 0 deletions src/SecTester.Repeater/Bus/RepeaterInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using MessagePack;

namespace SecTester.Repeater.Bus;

[MessagePackObject(true)]
public sealed record RepeaterInfo
{
public string RepeaterId { get; set; } = null!;
}
9 changes: 9 additions & 0 deletions src/SecTester.Repeater/Bus/RepeaterVersion.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using MessagePack;

namespace SecTester.Repeater.Bus;

[MessagePackObject(true)]
public sealed record RepeaterVersion
{
public string Version { get; set; } = null!;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

namespace SecTester.Repeater.Bus;

internal sealed class SocketIoClientWrapper : ISocketIoClient
internal sealed class SocketIoConnection : ISocketIoConnection
{
private readonly SocketIOClient.SocketIO _socketIo;

public SocketIoClientWrapper(SocketIOClient.SocketIO socketIo) => _socketIo = socketIo ?? throw new ArgumentNullException(nameof(socketIo));
public SocketIoConnection(SocketIOClient.SocketIO socketIo) => _socketIo = socketIo ?? throw new ArgumentNullException(nameof(socketIo));

public void Dispose()
{
Expand All @@ -21,7 +21,7 @@ public void Dispose()

public Task Disconnect() => _socketIo.DisconnectAsync();

public void On(string eventName, Action<ISocketIoResponse> callback) => _socketIo.On(eventName, x => callback(x as ISocketIoResponse));
public void On(string eventName, Action<ISocketIoMessage> callback) => _socketIo.On(eventName, x => callback(new SocketIoMessage(x)));

public void Off(string eventName) => _socketIo.Off(eventName);

Expand Down
22 changes: 22 additions & 0 deletions src/SecTester.Repeater/Bus/SocketIoMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using SocketIOClient;

namespace SecTester.Repeater.Bus;

internal class SocketIoMessage : ISocketIoMessage
{
private readonly SocketIOResponse _response;

public SocketIoMessage(SocketIOResponse response)
{
_response = response ?? throw new ArgumentNullException(nameof(response));
}

public virtual T GetValue<T>(int index = 0) => _response.GetValue<T>(index);

public virtual Task CallbackAsync(params object[] data) => _response.CallbackAsync(data);

public virtual Task CallbackAsync(CancellationToken cancellationToken, params object[] data) => _response.CallbackAsync(cancellationToken, data);
}
34 changes: 15 additions & 19 deletions src/SecTester.Repeater/Bus/SocketIoRepeaterBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,29 @@ internal sealed class SocketIoRepeaterBus : IRepeaterBus
private static readonly TimeSpan PingInterval = TimeSpan.FromSeconds(10);

private readonly ITimerProvider _heartbeat;
private readonly ISocketIoClient _client;
private readonly ISocketIoConnection _connection;
private readonly ILogger<IRepeaterBus> _logger;
private readonly SocketIoRepeaterBusOptions _options;

internal SocketIoRepeaterBus(SocketIoRepeaterBusOptions options, ISocketIoClient client, ITimerProvider heartbeat, ILogger<IRepeaterBus> logger)
internal SocketIoRepeaterBus(SocketIoRepeaterBusOptions options, ISocketIoConnection connection, ITimerProvider heartbeat, ILogger<IRepeaterBus> logger)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_client = client ?? throw new ArgumentNullException(nameof(client));
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
_heartbeat = heartbeat ?? throw new ArgumentNullException(nameof(heartbeat));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

internal sealed record RepeaterVersion(string Version);
internal sealed record RepeaterError(string Message);
internal sealed record RepeaterInfo(string RepeaterId);

public event Func<IncomingRequest, Task<OutgoingResponse>>? RequestReceived;
public event Action<Exception>? ErrorOccurred;
public event Action<Version>? UpgradeAvailable;

public async Task Connect()
{
if (_client is not { Connected: true })
if (_connection is not { Connected: true })
{
DelegateEvents();

await _client.Connect().ConfigureAwait(false);
await _connection.Connect().ConfigureAwait(false);

await SchedulePing().ConfigureAwait(false);

Expand All @@ -48,19 +44,19 @@ public async Task Connect()

private void DelegateEvents()
{
_client.On("error", response =>
_connection.On("error", response =>
{
var err = response.GetValue<RepeaterError>();
ErrorOccurred?.Invoke(new(err.Message));
});

_client.On("update-available", response =>
_connection.On("update-available", response =>
{
var version = response.GetValue<RepeaterVersion>();
UpgradeAvailable?.Invoke(new(version.Version));
});

_client.On("request", async response =>
_connection.On("request", async response =>
{
if (RequestReceived == null)
{
Expand All @@ -76,15 +72,15 @@ private void DelegateEvents()

public async ValueTask DisposeAsync()
{
if (_client is { Connected: true })
if (_connection is { Connected: true })
{
_heartbeat.Elapsed -= Ping;
_heartbeat.Stop();
await _client.Disconnect().ConfigureAwait(false);
await _connection.Disconnect().ConfigureAwait(false);
_logger.LogDebug("Repeater disconnected from {BaseUrl}", _options.BaseUrl);
}

_client.Dispose();
_connection.Dispose();

RequestReceived = null;
ErrorOccurred = null;
Expand All @@ -99,9 +95,9 @@ public async Task Deploy(string repeaterId, CancellationToken? cancellationToken
{
var tcs = new TaskCompletionSource<RepeaterInfo>();

_client.On("deployed", response => tcs.TrySetResult(response.GetValue<RepeaterInfo>()));
_connection.On("deployed", response => tcs.TrySetResult(response.GetValue<RepeaterInfo>()));

await _client.EmitAsync("deploy", new RepeaterInfo(repeaterId)).ConfigureAwait(false);
await _connection.EmitAsync("deploy", new RepeaterInfo { RepeaterId = repeaterId }).ConfigureAwait(false);

using var _ = cancellationToken?.Register(() => tcs.TrySetCanceled());

Expand All @@ -111,7 +107,7 @@ public async Task Deploy(string repeaterId, CancellationToken? cancellationToken
}
finally
{
_client.Off("deployed");
_connection.Off("deployed");
}
}

Expand All @@ -130,6 +126,6 @@ private async void Ping(object sender, ElapsedEventArgs args)

private async Task Ping()
{
await _client.EmitAsync("ping").ConfigureAwait(false);
await _connection.EmitAsync("ping").ConfigureAwait(false);
}
}
10 changes: 5 additions & 5 deletions src/SecTester.Repeater/Runners/IRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ namespace SecTester.Repeater.Runners;

public interface IRequest
{
string? Body { get; init; }
HttpMethod Method { get; init; }
Protocol Protocol { get; init; }
Uri Url { get; init; }
IEnumerable<KeyValuePair<string, IEnumerable<string>>> Headers { get; init; }
string? Body { get; set; }
HttpMethod Method { get; set; }
Protocol Protocol { get; set; }
Uri Url { get; set; }
IEnumerable<KeyValuePair<string, IEnumerable<string>>> Headers { get; set; }
}
12 changes: 6 additions & 6 deletions src/SecTester.Repeater/Runners/IResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ namespace SecTester.Repeater.Runners;

public interface IResponse
{
int? StatusCode { get; init; }
string? Body { get; init; }
string? Message { get; init; }
string? ErrorCode { get; init; }
Protocol Protocol { get; init; }
IEnumerable<KeyValuePair<string, IEnumerable<string>>> Headers { get; init; }
int? StatusCode { get; set; }
string? Body { get; set; }
string? Message { get; set; }
string? ErrorCode { get; set; }
Protocol Protocol { get; set; }
IEnumerable<KeyValuePair<string, IEnumerable<string>>> Headers { get; set; }
}
6 changes: 3 additions & 3 deletions src/SecTester.Scan/Target.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public Target WithBody(FormUrlEncodedContent value)
{
return WithBody(value, async () =>
{
var text = await value.ReadAsStringAsync().ConfigureAwait(true);
var text = await value.ReadAsStringAsync().ConfigureAwait(false);
var query = UrlUtils.ParseQuery(text);
var parameters = query.Select(k => new PostDataParameter(k.Key, k.Value)).ToList();

Expand All @@ -140,7 +140,7 @@ public Target WithBody(MultipartContent value)
{
return WithBody(value, async () =>
{
var text = await value.ReadAsStringAsync().ConfigureAwait(true);
var text = await value.ReadAsStringAsync().ConfigureAwait(false);
var tasks = value.Select(async k =>
{
var contentDisposition = k.Headers.ContentDisposition;
Expand Down Expand Up @@ -172,7 +172,7 @@ public Target WithBody(HttpContent value)
{
return WithBody(value, async () =>
{
var text = await value.ReadAsStringAsync().ConfigureAwait(true);
var text = await value.ReadAsStringAsync().ConfigureAwait(false);

return new PostData
{
Expand Down
Loading

0 comments on commit 44886cc

Please sign in to comment.