Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ResumeAtRevision support to KV Watcher #491

Merged
merged 14 commits into from
Jun 6, 2024
8 changes: 6 additions & 2 deletions src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,14 @@ private async ValueTask<INatsJSConsumer> CreatePushConsumer(string origin)
config.HeadersOnly = true;
}

if (sequence > 0)
// Resume from a specific revision ?
if (sequence > 0 || _opts.ResumeAtRevision > 0)
niklasfp marked this conversation as resolved.
Show resolved Hide resolved
{
config.DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence;
config.OptStartSeq = sequence + 1;

// If Sequence is set, it means that the consumer is being recreated, and we should start
// from the next sequence, otherwise we should use the revision specified in the options.
config.OptStartSeq = sequence > 0 ? sequence + 1 : _opts.ResumeAtRevision;
}

var consumer = await _context.CreateOrUpdateConsumerAsync(
Expand Down
8 changes: 8 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ public record NatsKVWatchOpts
/// Async function called when the enumerator reaches the end of data. Return True to break the async enumeration, False to allow the enumeration to continue.
/// </summary>
public Func<CancellationToken, ValueTask<bool>>? OnNoData { get; init; }

/// <summary>
/// The revision to start from, if set to 0 (default) this will be ignored.
/// <remarks>
/// Setting this to a non-zero value will cause the watcher to ignore the values for <see cref="IncludeHistory"/> and <see cref="UpdatesOnly"/>.
/// </remarks>
mtmk marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public ulong ResumeAtRevision { get; set; }
niklasfp marked this conversation as resolved.
Show resolved Hide resolved
}

public record NatsKVDeleteOpts
Expand Down
84 changes: 84 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,88 @@ public async Task Serialization_errors()
break;
}
}

// Test that watch can resume from a specific revision
[Fact]
public async Task Watch_resume_at_revision()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();

const string bucket = "Watch_resume_at_revision";
var config = new NatsKVConfig(bucket) { History = 10 };

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
var store = await kv.CreateStoreAsync(config, cancellationToken: cancellationToken);

await store.PutAsync("k1", 1, cancellationToken: cancellationToken);
await store.PutAsync("k2", 2, cancellationToken: cancellationToken);
var revK3 = await store.PutAsync("k3", 3, cancellationToken: cancellationToken);
await store.PutAsync("k4", 3, cancellationToken: cancellationToken);

// Watch all
var watchOps = new NatsKVWatchOpts() { MetaOnly = true, };
var watchAll = store.WatchAsync<int>(opts: watchOps, cancellationToken: cancellationToken);

// Expect to see k1, k2, k3 and k4
var allEntries = new List<(ulong Revision, string key)>();
await foreach (var key in watchAll)
{
allEntries.Add((key.Revision, key.Key));
if (key.Delta == 0)
{
break;
}
}

// Expects k1, k2, k3 and k4
allEntries.Should().HaveCount(4);

// Watch from the revision of k3
var watchOpsFromRevK3 = watchOps with { ResumeAtRevision = revK3, };

var watchFromRevision = store.WatchAsync<int>(opts: watchOpsFromRevK3, cancellationToken: cancellationToken);

// Expect to see k2 and k3, and k4
var fromRevisionEntries = new List<(ulong Revision, string key)>();
await foreach (var key in watchFromRevision)
{
fromRevisionEntries.Add((key.Revision, key.Key));
if (key.Delta == 0)
{
break;
}
}

// Expects k2, k3 and k4
fromRevisionEntries.Should().HaveCount(2);

// Watch from none existing revision
var noData = false;
var watchOpsNoneExisting = watchOps with
{
ResumeAtRevision = 9999,
OnNoData = (_) =>
{
noData = true;
return ValueTask.FromResult(true);
},
};

var watchFromNoneExistingRevision =
store.WatchAsync<int>(opts: watchOpsNoneExisting, cancellationToken: cancellationToken);

// Expect to see no data
await foreach (var key in watchFromNoneExistingRevision)
{
// We should not see any entries, if we get here something is wrong
Assert.Fail("Should not return any entries, and OnNoData should have been called to bail out");
}

noData.Should().BeTrue();
}
}
Loading