Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ResumeAtRevision support to KV Watcher #491

Merged
merged 14 commits into from
Jun 6, 2024
3 changes: 3 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public interface INatsKVStore
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>An asynchronous enumerable which can be used in <c>await foreach</c> loops</returns>
/// <exception cref="InvalidOperationException">There was a conflict in options, e.g. IncludeHistory and UpdatesOnly are only valid when ResumeAtRevision is not set.</exception>
IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(IEnumerable<string> keys, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
Expand All @@ -112,6 +113,7 @@ public interface INatsKVStore
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>An async enumerable of entries to be used in an <c>await foreach</c></returns>
/// <exception cref="InvalidOperationException">There was a conflict in options, e.g. IncludeHistory and UpdatesOnly are only valid when ResumeAtRevision is not set.</exception>
IAsyncEnumerable<NatsKVEntry<T>> HistoryAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
Expand All @@ -134,5 +136,6 @@ public interface INatsKVStore
/// <param name="opts">Watch options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>An async enumerable of keys to be used in an <c>await foreach</c></returns>
/// <exception cref="InvalidOperationException">There was a conflict in options, e.g. IncludeHistory and UpdatesOnly are only valid when ResumeAtRevision is not set.</exception>
IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
}
6 changes: 6 additions & 0 deletions src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,17 @@ private async ValueTask<INatsJSConsumer> CreatePushConsumer(string origin)
config.HeadersOnly = true;
}

// Resume from a specific revision ?
if (sequence > 0)
{
config.DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence;
config.OptStartSeq = sequence + 1;
}
else if (_opts.ResumeAtRevision > 0)
{
config.DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence;
config.OptStartSeq = _opts.ResumeAtRevision;
}

var consumer = await _context.CreateOrUpdateConsumerAsync(
_stream,
Expand Down
35 changes: 35 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ public record NatsKVWatchOpts
/// <summary>
/// Include history of the entries
/// </summary>
/// <remarks>
/// Setting this will cause the watcher to throw <see cref="InvalidOperationException"/>
/// if the values for <see cref="UpdatesOnly"/> and/or <see cref="ResumeAtRevision"/> are set.
/// </remarks>
public bool IncludeHistory { get; init; } = false;

/// <summary>
/// Only retrieve updates, not current values
/// </summary>
/// <remarks>
/// Setting this will cause the watcher to throw <see cref="InvalidOperationException"/>
/// if the values for <see cref="IncludeHistory"/> and/or <see cref="ResumeAtRevision"/> are set.
/// </remarks>
public bool UpdatesOnly { get; init; } = false;

/// <summary>
Expand All @@ -41,6 +49,33 @@ public record NatsKVWatchOpts
/// Async function called when the enumerator reaches the end of data. Return True to break the async enumeration, False to allow the enumeration to continue.
/// </summary>
public Func<CancellationToken, ValueTask<bool>>? OnNoData { get; init; }

/// <summary>
/// The revision to start from, if set to 0 (default) this will be ignored.
/// <remarks>
/// Setting this to a non-zero value will cause the watcher to throw <see cref="InvalidOperationException"/>
/// if the values for <see cref="IncludeHistory"/> and/or <see cref="UpdatesOnly"/> are set.
/// </remarks>
/// </summary>
public ulong ResumeAtRevision { get; init; }

internal void ThrowIfInvalid()
{
if (ResumeAtRevision > 0)
{
if (IncludeHistory || UpdatesOnly)
{
throw new InvalidOperationException("IncludeHistory and UpdatesOnly are only valid when ResumeAtRevision is not set.");
}
}
else
{
if (IncludeHistory && UpdatesOnly)
{
throw new InvalidOperationException("IncludeHistory and UpdatesOnly are mutually exclusive.");
}
}
}
}

public record NatsKVDeleteOpts
Expand Down
4 changes: 4 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ public async IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(IEnumerable<string>
}
}

/// <inheritdoc />
public async IAsyncEnumerable<NatsKVEntry<T>> HistoryAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
try
Expand Down Expand Up @@ -382,6 +383,7 @@ public async ValueTask<NatsKVStatus> GetStatusAsync(CancellationToken cancellati
public IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default) =>
WatchAsync<T>([">"], serializer, opts, cancellationToken);

/// <inheritdoc />
public async ValueTask PurgeDeletesAsync(NatsKVPurgeOpts? opts = default, CancellationToken cancellationToken = default)
{
opts ??= NatsKVPurgeOpts.Default;
Expand Down Expand Up @@ -459,6 +461,8 @@ internal async ValueTask<NatsKVWatcher<T>> WatchInternalAsync<T>(IEnumerable<str
opts ??= NatsKVWatchOpts.Default;
serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer<T>();

opts.ThrowIfInvalid();

var watcher = new NatsKVWatcher<T>(
context: _context,
bucket: Bucket,
Expand Down
159 changes: 159 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -460,4 +460,163 @@ public async Task Watch_with_multiple_filter_on_old_server()
Assert.Equal(10094, exception.Error.ErrCode);
Assert.Equal("consumer delivery policy is deliver last per subject, but optional filter subject is not set", exception.Error.Description);
}

// Test that watch can resume from a specific revision
[Fact]
public async Task Watch_resume_at_revision()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();

const string bucket = "Watch_resume_at_revision";
var config = new NatsKVConfig(bucket) { History = 10 };

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
var store = await kv.CreateStoreAsync(config, cancellationToken: cancellationToken);

await store.PutAsync("k1", 1, cancellationToken: cancellationToken);
await store.PutAsync("k2", 2, cancellationToken: cancellationToken);
var revK3 = await store.PutAsync("k3", 3, cancellationToken: cancellationToken);
await store.PutAsync("k4", 3, cancellationToken: cancellationToken);

// Watch all
var watchOps = new NatsKVWatchOpts() { MetaOnly = true, };
var watchAll = store.WatchAsync<int>(opts: watchOps, cancellationToken: cancellationToken);

// Expect to see k1, k2, k3 and k4
var allEntries = new List<(ulong Revision, string key)>();
await foreach (var key in watchAll)
{
allEntries.Add((key.Revision, key.Key));
if (key.Delta == 0)
{
break;
}
}

// Expects k1, k2, k3 and k4
allEntries.Should().HaveCount(4);

// Watch from the revision of k3
var watchOpsFromRevK3 = watchOps with { ResumeAtRevision = revK3, };

var watchFromRevision = store.WatchAsync<int>(opts: watchOpsFromRevK3, cancellationToken: cancellationToken);

// Expect to see k2 and k3, and k4
var fromRevisionEntries = new List<(ulong Revision, string key)>();
await foreach (var key in watchFromRevision)
{
fromRevisionEntries.Add((key.Revision, key.Key));
if (key.Delta == 0)
{
break;
}
}

// Expects k2, k3 and k4
fromRevisionEntries.Should().HaveCount(2);

// Watch from none existing revision
var noData = false;
var watchOpsNoneExisting = watchOps with
{
ResumeAtRevision = 9999,
OnNoData = (_) =>
{
noData = true;
return ValueTask.FromResult(true);
},
};

var watchFromNoneExistingRevision =
store.WatchAsync<int>(opts: watchOpsNoneExisting, cancellationToken: cancellationToken);

// Expect to see no data
await foreach (var key in watchFromNoneExistingRevision)
{
// We should not see any entries, if we get here something is wrong
Assert.Fail("Should not return any entries, and OnNoData should have been called to bail out");
}

noData.Should().BeTrue();
}

[Fact]
public async Task Validate_watch_options()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();

const string bucket = nameof(Validate_watch_options);
var config = new NatsKVConfig(bucket) { History = 10 };

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
var store = await kv.CreateStoreAsync(config, cancellationToken: cancellationToken);

for (var i = 0; i < 10; i++)
{
await store.PutAsync("x", i, cancellationToken: cancellationToken);
}

// Valid options
foreach (var opts in new[]
{
new NatsKVWatchOpts { IncludeHistory = false, UpdatesOnly = false, ResumeAtRevision = 5 },
new NatsKVWatchOpts { IncludeHistory = true, UpdatesOnly = false, ResumeAtRevision = 0 },
new NatsKVWatchOpts { IncludeHistory = false, UpdatesOnly = true, ResumeAtRevision = 0 },
})
{
var count = 0;
var cts2 = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

if (opts.UpdatesOnly)
{
cts2.Cancel();
count++;
}

try
{
await foreach (var entry in store.WatchAsync<int>([">"], opts: opts, cancellationToken: cts2.Token))
{
count++;
_output.WriteLine($"entry: {entry.Key} ({entry.Revision}): {entry.Value}");
if (entry.Value == 9)
break;
}
}
catch (TaskCanceledException)
{
}
catch (OperationCanceledException)
{
}

count.Should().BeGreaterThan(0);
}

// Invalid options
foreach (var opts in new[]
{
new NatsKVWatchOpts { IncludeHistory = true, UpdatesOnly = false, ResumeAtRevision = 5 },
new NatsKVWatchOpts { IncludeHistory = true, UpdatesOnly = true, ResumeAtRevision = 5 },
new NatsKVWatchOpts { IncludeHistory = false, UpdatesOnly = true, ResumeAtRevision = 5 },
new NatsKVWatchOpts { IncludeHistory = true, UpdatesOnly = true, ResumeAtRevision = 0 },
})
{
await Assert.ThrowsAsync<InvalidOperationException>(async () =>
{
await foreach (var entry in store.WatchAsync<int>([">"], opts: opts, cancellationToken: cancellationToken))
{
}
});
}
}
}
Loading