diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 7622eb80d..52af6c216 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -20,10 +20,10 @@ jobs: url: ${{ steps.deployment.outputs.page_url }} steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup dotnet - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: dotnet-version: '8.x' diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index 2b47db47e..46ec6ef4b 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -9,10 +9,10 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup dotnet - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: dotnet-version: '8.x' diff --git a/.github/workflows/perf.yml b/.github/workflows/perf.yml index c814e556a..5825821f2 100644 --- a/.github/workflows/perf.yml +++ b/.github/workflows/perf.yml @@ -41,10 +41,10 @@ jobs: nats-server -v - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup dotnet - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: dotnet-version: '8.x' diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b9073efdf..f1a6e7f95 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - id: tag name: Determine tag diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3636ec585..d0baf8a07 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,8 +7,8 @@ on: - main jobs: - dotnet: - name: dotnet + linux_test: + name: Linux strategy: fail-fast: false matrix: @@ -35,36 +35,77 @@ jobs: run: nats-server -v - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup dotnet - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: dotnet-version: '8.x' - name: Build - run: dotnet build -c Debug + run: dotnet build -c Release - name: Test Core - run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj + run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Client.Core.Tests + dotnet test -c Release --no-build --logger:"console;verbosity=normal" + + - name: Test Core2 + run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Client.Core2.Tests + dotnet test -c Release --no-build + + - name: Test CoreUnit + run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Client.CoreUnit.Tests + dotnet test -c Release --no-build - name: Test JetStream - run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj + run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Client.JetStream.Tests + dotnet test -c Release --no-build - - name: Test Key/Value Store - run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.KeyValueStore.Tests/NATS.Client.KeyValueStore.Tests.csproj + - name: Test KeyValueStore + run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Client.KeyValueStore.Tests + dotnet test -c Release --no-build - name: Test Object Store - run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj + run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Client.ObjectStore.Tests + dotnet test -c Release --no-build - name: Test Services - run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj + run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Client.Services.Tests + dotnet test -c Release --no-build - name: Test OpenTelemetry - run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Net.OpenTelemetry.Tests/NATS.Net.OpenTelemetry.Tests.csproj + run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Net.OpenTelemetry.Tests + dotnet test -c Release --no-build - name: Check Native AOT run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Client.CheckNativeAot rm -rf bin obj dotnet publish -r linux-x64 -c Release -o dist | tee output.txt @@ -84,8 +125,8 @@ jobs: ./NATS.Client.CheckNativeAot - memory_test: - name: memory test + windows_test: + name: Windows strategy: fail-fast: false matrix: @@ -98,14 +139,15 @@ jobs: DOTNET_CLI_TELEMETRY_OPTOUT: 1 DOTNET_SKIP_FIRST_TIME_EXPERIENCE: 1 NUGET_XMLDOC_MODE: skip + MSYS_NO_PATHCONV: 1 steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Setup dotnet - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: dotnet-version: '8.x' @@ -114,11 +156,7 @@ jobs: run: | mkdir tools-nats-server && cd tools-nats-server branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) - for i in 1 2 3 - do - curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30 - done - mv nats-server nats-server.exe + curl -sL -o nats-server.exe "https://binaries.nats.dev/binary/github.com/nats-io/nats-server/v2?os=windows&arch=amd64&version=$branch" cygpath -w "$(pwd)" | tee -a "$GITHUB_PATH" - name: Check nats-server @@ -133,11 +171,39 @@ jobs: dotnet tool install --global NUnit.ConsoleRunner.NetCore - name: Build - run: dotnet build -c Release + run: dotnet build -c Release - name: Memory Test (net6.0) - run: dotMemoryUnit $env:userprofile\.dotnet\tools\nunit.exe --propagate-exit-code -- .\tests\NATS.Client.Core.MemoryTests\bin\Release\net6.0\NATS.Client.Core.MemoryTests.dll + run: | + tasklist | grep -i nats-server && taskkill -F -IM nats-server.exe + nats-server.exe -v + dotMemoryUnit $env:userprofile\.dotnet\tools\nunit.exe --propagate-exit-code -- .\tests\NATS.Client.Core.MemoryTests\bin\Release\net6.0\NATS.Client.Core.MemoryTests.dll - - name: Platform Test (Windows net481) - run: dotnet test -c Release --no-build --logger:"console;verbosity=normal" -f net481 .\tests\NATS.Client.Platform.Windows.Tests\NATS.Client.Platform.Windows.Tests.csproj + - name: Platform Test + run: | + tasklist | grep -i nats-server && taskkill -F -IM nats-server.exe + nats-server.exe -v + cd tests\NATS.Client.Platform.Windows.Tests + dotnet test -c Release --no-build + + # Not working on Windows reliably yet + #- name: Test Core + # run: | + # tasklist | grep -i nats-server && taskkill -F -IM nats-server.exe + # nats-server.exe -v + # cd tests/NATS.Client.Core.Tests + # dotnet test -c Release --no-build + + - name: Test Core2 + run: | + tasklist | grep -i nats-server && taskkill -F -IM nats-server.exe + nats-server.exe -v + cd tests/NATS.Client.Core2.Tests + dotnet test -c Release --no-build + - name: Test CoreUnit + run: | + tasklist | grep -i nats-server && taskkill -F -IM nats-server.exe + nats-server.exe -v + cd tests/NATS.Client.CoreUnit.Tests + dotnet test -c Release --no-build diff --git a/NATS.Client.sln b/NATS.Client.sln index 27c245115..9c78cc340 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -17,12 +17,21 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NATS.Client.Core.Tests", "t EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MicroBenchmark", "sandbox\MicroBenchmark\MicroBenchmark.csproj", "{A10F0D89-13F3-49B3-ACF7-66E45DC95225}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "files", "files", "{899BE3EA-C5CA-4394-9B62-C45CBFF3AF4E}" +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".", ".", "{899BE3EA-C5CA-4394-9B62-C45CBFF3AF4E}" ProjectSection(SolutionItems) = preProject .editorconfig = .editorconfig - .github\workflows\build-debug.yml = .github\workflows\build-debug.yml - .github\workflows\build-release.yml = .github\workflows\build-release.yml Directory.Build.props = Directory.Build.props + README.md = README.md + version.txt = version.txt + .gitattributes = .gitattributes + .gitignore = .gitignore + CODE-OF-CONDUCT.md = CODE-OF-CONDUCT.md + CONTRIBUTING.md = CONTRIBUTING.md + dependencies.md = dependencies.md + global.json = global.json + Icon.png = Icon.png + LICENSE = LICENSE + REPO_RENAME.md = REPO_RENAME.md EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NATS.Client.Hosting", "src\NATS.Client.Hosting\NATS.Client.Hosting.csproj", "{D3F09B30-1ED5-48C2-81CD-A2AD88E751AC}" @@ -115,6 +124,21 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Simplified.Test EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Simplified", "src\NATS.Client.Simplified\NATS.Client.Simplified.csproj", "{227C88B1-0510-4010-B142-C44725578FCD}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Core2.Tests", "tests\NATS.Client.Core2.Tests\NATS.Client.Core2.Tests.csproj", "{8A676AAA-FEE3-4C18-870A-66E59AD9069F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.CoreUnit.Tests", "tests\NATS.Client.CoreUnit.Tests\NATS.Client.CoreUnit.Tests.csproj", "{9521D9E0-642A-4C7E-BD10-372DF235CF62}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{B9EF0111-6720-46DF-B11A-8F8C88C3F5C1}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{0B7F1286-4426-45DE-82C2-FE7CF13CA0DA}" + ProjectSection(SolutionItems) = preProject + .github\workflows\docs.yml = .github\workflows\docs.yml + .github\workflows\format.yml = .github\workflows\format.yml + .github\workflows\perf.yml = .github\workflows\perf.yml + .github\workflows\release.yml = .github\workflows\release.yml + .github\workflows\test.yml = .github\workflows\test.yml + EndProjectSection +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -309,6 +333,14 @@ Global {227C88B1-0510-4010-B142-C44725578FCD}.Debug|Any CPU.Build.0 = Debug|Any CPU {227C88B1-0510-4010-B142-C44725578FCD}.Release|Any CPU.ActiveCfg = Release|Any CPU {227C88B1-0510-4010-B142-C44725578FCD}.Release|Any CPU.Build.0 = Release|Any CPU + {8A676AAA-FEE3-4C18-870A-66E59AD9069F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8A676AAA-FEE3-4C18-870A-66E59AD9069F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8A676AAA-FEE3-4C18-870A-66E59AD9069F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8A676AAA-FEE3-4C18-870A-66E59AD9069F}.Release|Any CPU.Build.0 = Release|Any CPU + {9521D9E0-642A-4C7E-BD10-372DF235CF62}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9521D9E0-642A-4C7E-BD10-372DF235CF62}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9521D9E0-642A-4C7E-BD10-372DF235CF62}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9521D9E0-642A-4C7E-BD10-372DF235CF62}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -362,6 +394,10 @@ Global {A15CCDD5-B707-4142-B99A-64F0AB62318A} = {95A69671-16CA-4133-981C-CC381B7AAA30} {6DAAAA87-8DDF-4E60-81CE-D8900327DE33} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {227C88B1-0510-4010-B142-C44725578FCD} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} + {8A676AAA-FEE3-4C18-870A-66E59AD9069F} = {C526E8AB-739A-48D7-8FC4-048978C9B650} + {9521D9E0-642A-4C7E-BD10-372DF235CF62} = {C526E8AB-739A-48D7-8FC4-048978C9B650} + {B9EF0111-6720-46DF-B11A-8F8C88C3F5C1} = {899BE3EA-C5CA-4394-9B62-C45CBFF3AF4E} + {0B7F1286-4426-45DE-82C2-FE7CF13CA0DA} = {B9EF0111-6720-46DF-B11A-8F8C88C3F5C1} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA} diff --git a/sandbox/BlazorWasm/Client/BlazorWasm.Client.csproj b/sandbox/BlazorWasm/Client/BlazorWasm.Client.csproj index 82c02f0cf..9c6205c4e 100644 --- a/sandbox/BlazorWasm/Client/BlazorWasm.Client.csproj +++ b/sandbox/BlazorWasm/Client/BlazorWasm.Client.csproj @@ -8,8 +8,8 @@ - - + + diff --git a/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj b/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj index 46ebcd35b..5ec643823 100644 --- a/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj +++ b/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj @@ -9,7 +9,7 @@ - + diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index 859e55d7d..96d4079ab 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -14,6 +14,7 @@ + diff --git a/src/NATS.Client.Core/NATS.Client.Core.csproj b/src/NATS.Client.Core/NATS.Client.Core.csproj index 8244f952c..204c7512e 100644 --- a/src/NATS.Client.Core/NATS.Client.Core.csproj +++ b/src/NATS.Client.Core/NATS.Client.Core.csproj @@ -65,6 +65,8 @@ + + diff --git a/tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj b/tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj index 420d82e11..0b964f3f3 100644 --- a/tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj +++ b/tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj @@ -43,7 +43,7 @@ - Always + PreserveNewest diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index 37866de35..bb7d2ef71 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -1,6 +1,3 @@ -using System.Buffers; -using System.Collections.Concurrent; -using System.Text; using Microsoft.Extensions.Logging; using NATS.Client.TestUtilities; @@ -8,378 +5,6 @@ namespace NATS.Client.Core.Tests; public class ProtocolTest { - private readonly ITestOutputHelper _output; - - public ProtocolTest(ITestOutputHelper output) => _output = output; - - [Fact] - public async Task Subscription_with_same_subject() - { - await using var server = NatsServer.Start(_output, TransportType.Tcp); - var nats1 = server.CreateClientConnection(); - var (nats2, proxy) = server.CreateProxiedClientConnection(); - - var sub1 = await nats2.SubscribeCoreAsync("foo.bar"); - var sub2 = await nats2.SubscribeCoreAsync("foo.bar"); - var sub3 = await nats2.SubscribeCoreAsync("foo.baz"); - - var sync1 = 0; - var sync2 = 0; - var sync3 = 0; - var count = new WaitSignal(3); - - var reg1 = sub1.Register(m => - { - if (m.Data == 0) - { - Interlocked.Exchange(ref sync1, 1); - return; - } - - count.Pulse(m.Subject == "foo.bar" ? null : new Exception($"Subject mismatch {m.Subject}")); - }); - - var reg2 = sub2.Register(m => - { - if (m.Data == 0) - { - Interlocked.Exchange(ref sync2, 1); - return; - } - - count.Pulse(m.Subject == "foo.bar" ? null : new Exception($"Subject mismatch {m.Subject}")); - }); - - var reg3 = sub3.Register(m => - { - if (m.Data == 0) - { - Interlocked.Exchange(ref sync3, 1); - return; - } - - count.Pulse(m.Subject == "foo.baz" ? null : new Exception($"Subject mismatch {m.Subject}")); - }); - - // Since subscription and publishing are sent through different connections there is - // a race where one or more subscriptions are made after the publishing happens. - // So, we make sure subscribers are accepted by the server before we send any test data. - await Retry.Until( - "all subscriptions are active", - () => Volatile.Read(ref sync1) + Volatile.Read(ref sync2) + Volatile.Read(ref sync3) == 3, - async () => - { - await nats1.PublishAsync("foo.bar", 0); - await nats1.PublishAsync("foo.baz", 0); - }); - - await nats1.PublishAsync("foo.bar", 1); - await nats1.PublishAsync("foo.baz", 1); - - // Wait until we received all test data - await count; - - var frames = proxy.ClientFrames.OrderBy(f => f.Message).ToList(); - - foreach (var frame in frames) - { - _output.WriteLine($"[PROXY] {frame}"); - } - - Assert.Equal(3, frames.Count); - Assert.StartsWith("SUB foo.bar", frames[0].Message); - Assert.StartsWith("SUB foo.bar", frames[1].Message); - Assert.StartsWith("SUB foo.baz", frames[2].Message); - Assert.False(frames[0].Message.Equals(frames[1].Message), "Should have different SIDs"); - - await sub1.DisposeAsync(); - await reg1; - await sub2.DisposeAsync(); - await reg2; - await sub3.DisposeAsync(); - await reg3; - await nats1.DisposeAsync(); - await nats2.DisposeAsync(); - proxy.Dispose(); - } - - [Fact] - public async Task Subscription_queue_group() - { - await using var server = NatsServer.Start(); - var (nats, proxy) = server.CreateProxiedClientConnection(); - - await using var sub1 = await nats.SubscribeCoreAsync("foo", queueGroup: "group1"); - await using var sub2 = await nats.SubscribeCoreAsync("foo", queueGroup: "group2"); - - await Retry.Until( - "frames collected", - () => proxy.ClientFrames.Count(f => f.Message.StartsWith("SUB foo")) == 2); - - var frames = proxy.ClientFrames.Select(f => f.Message).ToList(); - - foreach (var frame in frames) - { - _output.WriteLine($"frame: {frame}"); - } - - Assert.StartsWith("SUB foo group1 ", frames[0]); - Assert.StartsWith("SUB foo group2 ", frames[1]); - - await nats.DisposeAsync(); - } - - [Fact] - public async Task Publish_empty_message_for_notifications() - { - void Log(string text) - { - _output.WriteLine($"[TESTS] {DateTime.Now:HH:mm:ss.fff} {text}"); - } - - await using var server = NatsServer.Start(_output, new NatsServerOptsBuilder().UseTransport(TransportType.Tcp).Trace().Build()); - var (nats, proxy) = server.CreateProxiedClientConnection(); - - var sync = 0; - var signal1 = new WaitSignal>(); - var signal2 = new WaitSignal>(); - var sub = await nats.SubscribeCoreAsync("foo.*"); - var reg = sub.Register(m => - { - switch (m.Subject) - { - case "foo.sync": - Interlocked.Exchange(ref sync, 1); - break; - case "foo.signal1": - signal1.Pulse(m); - break; - case "foo.signal2": - signal2.Pulse(m); - break; - } - }); - - await Retry.Until( - "subscription is active", - () => Volatile.Read(ref sync) == 1, - async () => await nats.PublishAsync("foo.sync"), - retryDelay: TimeSpan.FromSeconds(1)); - - Log("PUB notifications"); - await nats.PublishAsync("foo.signal1"); - var msg1 = await signal1; - Assert.Equal(0, msg1.Data); - Assert.Null(msg1.Headers); - var pubFrame1 = proxy.Frames.First(f => f.Message.StartsWith("PUB foo.signal1")); - Assert.Equal("PUB foo.signal1 0␍␊", pubFrame1.Message); - var msgFrame1 = proxy.Frames.First(f => f.Message.StartsWith("MSG foo.signal1")); - Assert.Matches(@"^MSG foo.signal1 \w+ 0␍␊$", msgFrame1.Message); - - Log("HPUB notifications"); - await nats.PublishAsync("foo.signal2", headers: new NatsHeaders()); - var msg2 = await signal2; - Assert.Equal(0, msg2.Data); - Assert.NotNull(msg2.Headers); - Assert.Empty(msg2.Headers!); - var pubFrame2 = proxy.Frames.First(f => f.Message.StartsWith("HPUB foo.signal2")); - Assert.Equal("HPUB foo.signal2 12 12␍␊NATS/1.0␍␊␍␊", pubFrame2.Message); - var msgFrame2 = proxy.Frames.First(f => f.Message.StartsWith("HMSG foo.signal2")); - Assert.Matches(@"^HMSG foo.signal2 \w+ 12 12␍␊NATS/1.0␍␊␍␊$", msgFrame2.Message); - - await sub.DisposeAsync(); - await reg; - } - - [Fact] - public async Task Unsubscribe_max_msgs() - { - const int maxMsgs = 10; - const int pubMsgs = 5; - const int extraMsgs = 3; - - void Log(string text) - { - _output.WriteLine($"[TESTS] {DateTime.Now:HH:mm:ss.fff} {text}"); - } - - // Use a single server to test multiple scenarios to make test runs more efficient - await using var server = NatsServer.Start(_output, new NatsServerOptsBuilder().UseTransport(TransportType.Tcp).Trace().Build()); - var (nats, proxy) = server.CreateProxiedClientConnection(); - var sid = 0; - - Log("### Auto-unsubscribe after consuming max-msgs"); - { - var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; - await using var sub = await nats.SubscribeCoreAsync("foo", opts: opts); - sid++; - - await Retry.Until("all frames arrived", () => proxy.Frames.Count >= 2); - Assert.Equal($"SUB foo {sid}", proxy.Frames[0].Message); - Assert.Equal($"UNSUB {sid} {maxMsgs}", proxy.Frames[1].Message); - - Log("Send more messages than max to check we only get max"); - for (var i = 0; i < maxMsgs + extraMsgs; i++) - { - await nats.PublishAsync("foo", i); - } - - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var cancellationToken = cts.Token; - var count = 0; - await foreach (var natsMsg in sub.Msgs.ReadAllAsync(cancellationToken)) - { - Assert.Equal(count, natsMsg.Data); - count++; - } - - Assert.Equal(maxMsgs, count); - Assert.Equal(NatsSubEndReason.MaxMsgs, ((NatsSubBase)sub).EndReason); - } - - Log("### Manual unsubscribe"); - { - await proxy.FlushFramesAsync(nats); - - await using var sub = await nats.SubscribeCoreAsync("foo2"); - sid++; - await sub.UnsubscribeAsync(); - - await Retry.Until("all frames arrived", () => proxy.ClientFrames.Count == 2); - - Assert.Equal($"SUB foo2 {sid}", proxy.ClientFrames[0].Message); - Assert.Equal($"UNSUB {sid}", proxy.ClientFrames[1].Message); - - Log("Send messages to check we receive none since we're already unsubscribed"); - for (var i = 0; i < pubMsgs; i++) - { - await nats.PublishAsync("foo2", i); - } - - await Retry.Until("all pub frames arrived", () => proxy.Frames.Count(f => f.Message.StartsWith("PUB foo2")) == pubMsgs); - - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var cancellationToken = cts.Token; - var count = 0; - await foreach (var unused in sub.Msgs.ReadAllAsync(cancellationToken)) - { - count++; - } - - Assert.Equal(0, count); - Assert.Equal(NatsSubEndReason.None, ((NatsSubBase)sub).EndReason); - } - - Log("### Reconnect"); - { - proxy.Reset(); - - var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; - var sub = await nats.SubscribeCoreAsync("foo3", opts: opts); - sid++; - var count = 0; - var reg = sub.Register(_ => Interlocked.Increment(ref count)); - await Retry.Until("subscribed", () => proxy.Frames.Any(f => f.Message == $"SUB foo3 {sid}")); - - for (var i = 0; i < pubMsgs; i++) - { - await nats.PublishAsync("foo3", i); - } - - await Retry.Until("published", () => proxy.Frames.Count(f => f.Message.StartsWith("PUB foo3")) == pubMsgs); - await Retry.Until("received", () => Volatile.Read(ref count) == pubMsgs); - - var pending = maxMsgs - pubMsgs; - Assert.Equal(pending, ((NatsSubBase)sub).PendingMsgs); - - proxy.Reset(); - - Log("Expect SUB + UNSUB"); - await Retry.Until("re-subscribed", () => proxy.ClientFrames.Count == 2); - - Log("Make sure we're still using the same SID"); - Assert.Equal($"SUB foo3 {sid}", proxy.ClientFrames[0].Message); - Assert.Equal($"UNSUB {sid} {pending}", proxy.ClientFrames[1].Message); - - Log("We already published a few, this should exceed max-msgs"); - for (var i = 0; i < maxMsgs; i++) - { - await nats.PublishAsync("foo3", i); - } - - await Retry.Until( - "published more", - () => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB foo3")) == maxMsgs); - - await Retry.Until( - "unsubscribed with max-msgs", - () => ((NatsSubBase)sub).EndReason == NatsSubEndReason.MaxMsgs); - - // Wait until msg channel is completed and drained - await reg; - - Assert.Equal(maxMsgs, Volatile.Read(ref count)); - - await sub.DisposeAsync(); - } - } - - [Fact] - public async Task Reconnect_with_sub_and_additional_commands() - { - await using var server = NatsServer.Start(); - var (nats, proxy) = server.CreateProxiedClientConnection(); - - const string subject = "foo"; - - var sync = 0; - await using var sub = new NatsSubReconnectTest(nats, subject, i => Interlocked.Exchange(ref sync, i)); - await nats.AddSubAsync(sub); - - await Retry.Until( - "subscribed", - () => Volatile.Read(ref sync) == 1, - async () => await nats.PublishAsync(subject, 1)); - - var disconnected = new WaitSignal(); - nats.ConnectionDisconnected += (_, _) => - { - disconnected.Pulse(); - return default; - }; - - proxy.Reset(); - - await disconnected; - - await Retry.Until( - "re-subscribed", - () => Volatile.Read(ref sync) == 2, - async () => await nats.PublishAsync(subject, 2)); - - await Retry.Until( - "frames collected", - () => proxy.ClientFrames.Any(f => f.Message.StartsWith("PUB foo"))); - - var frames = proxy.ClientFrames.Select(f => f.Message).ToList(); - - foreach (var frame in frames) - { - _output.WriteLine($"frame: {frame}"); - } - - Assert.StartsWith("SUB foo", frames[0]); - - for (var i = 0; i < 100; i++) - { - Assert.StartsWith($"PUB bar{i}", frames[i + 1]); - } - - Assert.StartsWith("PUB foo", frames[101]); - - await nats.DisposeAsync(); - } - [Theory] [InlineData(1)] [InlineData(1024)] @@ -476,109 +101,4 @@ public async Task Protocol_parser_under_load(int size) counts.Should().BeGreaterOrEqualTo(3); } - - [Fact] - public async Task Proactively_reject_payloads_over_the_threshold_set_by_server() - { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); - - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - - var sync = 0; - var count = 0; - var signal1 = new WaitSignal>(); - var signal2 = new WaitSignal>(); - var subTask = Task.Run( - async () => - { - await foreach (var m in nats.SubscribeAsync("foo.*", cancellationToken: cts.Token)) - { - if (m.Subject == "foo.sync") - { - Interlocked.Exchange(ref sync, 1); - continue; - } - - Interlocked.Increment(ref count); - - if (m.Subject == "foo.signal1") - { - signal1.Pulse(m); - } - else if (m.Subject == "foo.signal2") - { - signal2.Pulse(m); - } - else if (m.Subject == "foo.end") - { - break; - } - } - }, - cancellationToken: cts.Token); - - await Retry.Until( - reason: "subscription is active", - condition: () => Volatile.Read(ref sync) == 1, - action: async () => await nats.PublishAsync("foo.sync", cancellationToken: cts.Token), - retryDelay: TimeSpan.FromSeconds(.3)); - { - var payload = new byte[nats.ServerInfo!.MaxPayload]; - await nats.PublishAsync("foo.signal1", payload, cancellationToken: cts.Token); - var msg1 = await signal1; - Assert.Equal(payload.Length, msg1.Data!.Length); - } - - { - var payload = new byte[nats.ServerInfo!.MaxPayload + 1]; - var exception = await Assert.ThrowsAsync(async () => - await nats.PublishAsync("foo.none", payload, cancellationToken: cts.Token)); - Assert.Matches(@"Payload size \d+ exceeds server's maximum payload size \d+", exception.Message); - } - - { - var payload = new byte[123]; - await nats.PublishAsync("foo.signal2", payload, cancellationToken: cts.Token); - var msg1 = await signal2; - Assert.Equal(payload.Length, msg1.Data!.Length); - } - - await nats.PublishAsync("foo.end", cancellationToken: cts.Token); - - await subTask; - - Assert.Equal(3, Volatile.Read(ref count)); - } - - private sealed class NatsSubReconnectTest : NatsSubBase - { - private readonly Action _callback; - - internal NatsSubReconnectTest(NatsConnection connection, string subject, Action callback) - : base(connection, connection.SubscriptionManager, subject, queueGroup: default, opts: default) => - _callback = callback; - - internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter, int sid) - { - await base.WriteReconnectCommandsAsync(commandWriter, sid); - - // Any additional commands to send on reconnect - for (var i = 0; i < 100; i++) - { - await commandWriter.PublishAsync($"bar{i}", default, default, default, NatsRawSerializer.Default, default); - } - } - - protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) - { - _callback(int.Parse(Encoding.UTF8.GetString(payloadBuffer))); - DecrementMaxMsgs(); - return ValueTask.CompletedTask; - } - - protected override void TryComplete() - { - } - } } diff --git a/tests/NATS.Client.Core.Tests/TlsClientTest.cs b/tests/NATS.Client.Core.Tests/TlsClientTest.cs index 1547c006e..8f39c8bf7 100644 --- a/tests/NATS.Client.Core.Tests/TlsClientTest.cs +++ b/tests/NATS.Client.Core.Tests/TlsClientTest.cs @@ -24,9 +24,12 @@ public async Task Client_connect_using_certificate(TransportType transportType) var clientOpts = server.ClientOpts(NatsOpts.Default with { Name = "tls-test-client" }); await using var nats = new NatsConnection(clientOpts); - await nats.ConnectAsync(); - var rtt = await nats.PingAsync(); - Assert.True(rtt > TimeSpan.Zero); + + await Task.Run(async () => + { + await nats.ConnectAsync(); + await nats.PingAsync(); + }).WaitAsync(TimeSpan.FromSeconds(10)); } [Fact] @@ -52,10 +55,13 @@ public async Task Client_connect_using_certificate_and_revocation_check() }; await using var nats = new NatsConnection(clientOpts); - // At the moment I don't know of a good way of checking if the revocation check is working - // except to check if the connection fails. So we are expecting an exception here. - var exception = await Assert.ThrowsAnyAsync(async () => await nats.ConnectAsync()); - Assert.Contains("remote certificate was rejected", exception.InnerException!.InnerException!.Message); + await Task.Run(async () => + { + // At the moment I don't know of a good way of checking if the revocation check is working + // except to check if the connection fails. So we are expecting an exception here. + var exception = await Assert.ThrowsAnyAsync(async () => await nats.ConnectAsync()); + Assert.Contains("remote certificate was rejected", exception.InnerException!.InnerException!.Message); + }).WaitAsync(TimeSpan.FromSeconds(10)); } [Theory] diff --git a/tests/NATS.Client.Core.Tests/CancellationTest.cs b/tests/NATS.Client.Core2.Tests/CancellationTest.cs similarity index 83% rename from tests/NATS.Client.Core.Tests/CancellationTest.cs rename to tests/NATS.Client.Core2.Tests/CancellationTest.cs index 4d7ae848e..7b2673c13 100644 --- a/tests/NATS.Client.Core.Tests/CancellationTest.cs +++ b/tests/NATS.Client.Core2.Tests/CancellationTest.cs @@ -1,18 +1,25 @@ +using NATS.Client.Core2.Tests; +using NATS.Client.Platform.Windows.Tests; + namespace NATS.Client.Core.Tests; +[Collection("nats-server")] public class CancellationTest { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public CancellationTest(ITestOutputHelper output) => _output = output; + public CancellationTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } // check CommandTimeout [Fact] public async Task CommandTimeoutTest() { - await using var server = NatsServer.Start(); - - await using var conn = server.CreateClientConnection(NatsOpts.Default with { CommandTimeout = TimeSpan.FromMilliseconds(1) }); + await using var conn = new NatsConnection(NatsOpts.Default with { Url = _server.Url, CommandTimeout = TimeSpan.FromMilliseconds(1) }); await conn.ConnectAsync(); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); @@ -40,9 +47,9 @@ await Assert.ThrowsAsync(async () => [Fact] public async Task CommandConnectCancellationTest() { - var server = NatsServer.Start(_output, TransportType.Tcp); + var server = await NatsServerProcess.StartAsync(); - await using var conn = server.CreateClientConnection(); + await using var conn = new NatsConnection(new NatsOpts { Url = server.Url }); await conn.ConnectAsync(); // kill the server diff --git a/tests/NATS.Client.Core.Tests/JsonSerializerTests.cs b/tests/NATS.Client.Core2.Tests/JsonSerializerTests.cs similarity index 76% rename from tests/NATS.Client.Core.Tests/JsonSerializerTests.cs rename to tests/NATS.Client.Core2.Tests/JsonSerializerTests.cs index 47acefc42..f23480240 100644 --- a/tests/NATS.Client.Core.Tests/JsonSerializerTests.cs +++ b/tests/NATS.Client.Core2.Tests/JsonSerializerTests.cs @@ -1,23 +1,29 @@ +using NATS.Client.Core2.Tests; using NATS.Client.Serializers.Json; namespace NATS.Client.Core.Tests; +[Collection("nats-server")] public class JsonSerializerTests { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public JsonSerializerTests(ITestOutputHelper output) => _output = output; + public JsonSerializerTests(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } [Fact] public async Task Serialize_any_type() { var natsOpts = NatsOpts.Default with { + Url = _server.Url, SerializerRegistry = NatsJsonSerializerRegistry.Default, }; - - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(natsOpts); + await using var nats = new NatsConnection(natsOpts); // in local runs server start is taking too long when running // the whole suite. @@ -32,7 +38,7 @@ public async Task Serialize_any_type() Assert.Equal("bar", msg.Data?.Name); // Default serializer won't work with random types - await using var nats1 = server.CreateClientConnection(); + await using var nats1 = new NatsConnection(new NatsOpts { Url = _server.Url }); var exception = await Assert.ThrowsAsync(() => nats1.PublishAsync( subject: "would.not.work", diff --git a/tests/NATS.Client.Core.Tests/MessageInterfaceTest.cs b/tests/NATS.Client.Core2.Tests/MessageInterfaceTest.cs similarity index 80% rename from tests/NATS.Client.Core.Tests/MessageInterfaceTest.cs rename to tests/NATS.Client.Core2.Tests/MessageInterfaceTest.cs index 0291b679e..094b84d6a 100644 --- a/tests/NATS.Client.Core.Tests/MessageInterfaceTest.cs +++ b/tests/NATS.Client.Core2.Tests/MessageInterfaceTest.cs @@ -1,12 +1,22 @@ +using NATS.Client.Core2.Tests; +using NATS.Client.Platform.Windows.Tests; + namespace NATS.Client.Core.Tests; +[Collection("nats-server")] public class MessageInterfaceTest { + private readonly NatsServerFixture _server; + + public MessageInterfaceTest(NatsServerFixture server) + { + _server = server; + } + [Fact] public async Task Sub_custom_builder_test() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); var sync = 0; var sub = Task.Run(async () => diff --git a/tests/NATS.Client.Core2.Tests/NATS.Client.Core2.Tests.csproj b/tests/NATS.Client.Core2.Tests/NATS.Client.Core2.Tests.csproj new file mode 100644 index 000000000..53dc4e386 --- /dev/null +++ b/tests/NATS.Client.Core2.Tests/NATS.Client.Core2.Tests.csproj @@ -0,0 +1,62 @@ + + + + net6.0;net8.0 + $(TargetFrameworks);net481 + enable + false + $(NoWarn);CS8002 + enable + $(MSBuildProjectDirectory)\test.runsettings + false + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + + + + + + + + + + + + + + resources\%(RecursiveDir)%(Filename)%(Extension) + PreserveNewest + + + + + + + + + + + + + diff --git a/tests/NATS.Client.Core2.Tests/NatsServerFixture.cs b/tests/NATS.Client.Core2.Tests/NatsServerFixture.cs new file mode 100644 index 000000000..ea92dae6c --- /dev/null +++ b/tests/NATS.Client.Core2.Tests/NatsServerFixture.cs @@ -0,0 +1,32 @@ +using NATS.Client.Platform.Windows.Tests; + +namespace NATS.Client.Core2.Tests; + +// https://xunit.net/docs/shared-context#collection-fixture +public class NatsServerFixture : IDisposable +{ + private int _next; + + public NatsServerFixture() + { + Server = NatsServerProcess.Start(); + } + + public NatsServerProcess Server { get; } + + public int Port => new Uri(Server.Url).Port; + + public string Url => Server.Url; + + public string GetNextId() => $"test{Interlocked.Increment(ref _next)}"; + + public void Dispose() + { + Server.Dispose(); + } +} + +[CollectionDefinition("nats-server")] +public class DatabaseCollection : ICollectionFixture +{ +} diff --git a/tests/NATS.Client.Core2.Tests/ProtocolTest.cs b/tests/NATS.Client.Core2.Tests/ProtocolTest.cs new file mode 100644 index 000000000..f14b463d4 --- /dev/null +++ b/tests/NATS.Client.Core2.Tests/ProtocolTest.cs @@ -0,0 +1,501 @@ +using System.Buffers; +using System.Text; +using NATS.Client.Core2.Tests; +#if !NET6_0_OR_GREATER +using NATS.Client.Core.Internal.NetStandardExtensions; +#endif + +namespace NATS.Client.Core.Tests; + +[Collection("nats-server")] +public class ProtocolTest +{ + private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; + + public ProtocolTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } + + [Fact] + public async Task Subscription_with_same_subject() + { + var nats1 = new NatsConnection(new NatsOpts { Url = _server.Url }); + var proxy = new NatsProxy(_server.Port); + var nats2 = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10) }); + + var sub1 = await nats2.SubscribeCoreAsync("foo.bar"); + var sub2 = await nats2.SubscribeCoreAsync("foo.bar"); + var sub3 = await nats2.SubscribeCoreAsync("foo.baz"); + + var sync1 = 0; + var sync2 = 0; + var sync3 = 0; + var count = new WaitSignal(3); + + var reg1 = sub1.Register(m => + { + if (m.Data == 0) + { + Interlocked.Exchange(ref sync1, 1); + return; + } + + count.Pulse(m.Subject == "foo.bar" ? null : new Exception($"Subject mismatch {m.Subject}")); + }); + + var reg2 = sub2.Register(m => + { + if (m.Data == 0) + { + Interlocked.Exchange(ref sync2, 1); + return; + } + + count.Pulse(m.Subject == "foo.bar" ? null : new Exception($"Subject mismatch {m.Subject}")); + }); + + var reg3 = sub3.Register(m => + { + if (m.Data == 0) + { + Interlocked.Exchange(ref sync3, 1); + return; + } + + count.Pulse(m.Subject == "foo.baz" ? null : new Exception($"Subject mismatch {m.Subject}")); + }); + + // Since subscription and publishing are sent through different connections there is + // a race where one or more subscriptions are made after the publishing happens. + // So, we make sure subscribers are accepted by the server before we send any test data. + await Retry.Until( + "all subscriptions are active", + () => Volatile.Read(ref sync1) + Volatile.Read(ref sync2) + Volatile.Read(ref sync3) == 3, + async () => + { + await nats1.PublishAsync("foo.bar", 0); + await nats1.PublishAsync("foo.baz", 0); + }); + + await nats1.PublishAsync("foo.bar", 1); + await nats1.PublishAsync("foo.baz", 1); + + // Wait until we received all test data + await count; + + var frames = proxy.ClientFrames.OrderBy(f => f.Message).ToList(); + + foreach (var frame in frames) + { + _output.WriteLine($"[PROXY] {frame}"); + } + + Assert.Equal(3, frames.Count); + Assert.StartsWith("SUB foo.bar", frames[0].Message); + Assert.StartsWith("SUB foo.bar", frames[1].Message); + Assert.StartsWith("SUB foo.baz", frames[2].Message); + Assert.False(frames[0].Message.Equals(frames[1].Message), "Should have different SIDs"); + + await sub1.DisposeAsync(); + await reg1; + await sub2.DisposeAsync(); + await reg2; + await sub3.DisposeAsync(); + await reg3; + await nats1.DisposeAsync(); + await nats2.DisposeAsync(); + proxy.Dispose(); + } + + [Fact] + public async Task Subscription_queue_group() + { + var proxy = new NatsProxy(_server.Port); + var nats = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10) }); + var subject = $"{_server.GetNextId()}.foo"; + + await using var sub1 = await nats.SubscribeCoreAsync(subject, queueGroup: "group1"); + await using var sub2 = await nats.SubscribeCoreAsync(subject, queueGroup: "group2"); + + await Retry.Until( + "frames collected", + () => proxy.ClientFrames.Count(f => f.Message.StartsWith($"SUB {subject}")) == 2); + + var frames = proxy.ClientFrames.Select(f => f.Message).ToList(); + + foreach (var frame in frames) + { + _output.WriteLine($"frame: {frame}"); + } + + Assert.StartsWith($"SUB {subject} group1 ", frames[0]); + Assert.StartsWith($"SUB {subject} group2 ", frames[1]); + + await nats.DisposeAsync(); + } + + [Fact] + public async Task Publish_empty_message_for_notifications() + { + void Log(string text) + { + _output.WriteLine($"[TESTS] {DateTime.Now:HH:mm:ss.fff} {text}"); + } + + var proxy = new NatsProxy(_server.Port); + var nats = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10) }); + + var prefix = $"{_server.GetNextId()}.foo"; + + var sync = 0; + var signal1 = new WaitSignal>(); + var signal2 = new WaitSignal>(); + var sub = await nats.SubscribeCoreAsync($"{prefix}.*"); + var reg = sub.Register(m => + { + if (m.Subject == $"{prefix}.sync") + { + Interlocked.Exchange(ref sync, 1); + } + else if (m.Subject == $"{prefix}.signal1") + { + signal1.Pulse(m); + } + else if (m.Subject == $"{prefix}.signal2") + { + signal2.Pulse(m); + } + }); + + await Retry.Until( + "subscription is active", + () => Volatile.Read(ref sync) == 1, + async () => await nats.PublishAsync($"{prefix}.sync"), + retryDelay: TimeSpan.FromSeconds(1)); + + Log("PUB notifications"); + await nats.PublishAsync($"{prefix}.signal1"); + var msg1 = await signal1; + Assert.Equal(0, msg1.Data); + Assert.Null(msg1.Headers); + var pubFrame1 = proxy.Frames.First(f => f.Message.StartsWith($"PUB {prefix}.signal1")); + Assert.Equal($"PUB {prefix}.signal1 0␍␊", pubFrame1.Message); + var msgFrame1 = proxy.Frames.First(f => f.Message.StartsWith($"MSG {prefix}.signal1")); + Assert.Matches($@"^MSG {prefix}.signal1 \w+ 0␍␊$", msgFrame1.Message); + + Log("HPUB notifications"); + await nats.PublishAsync($"{prefix}.signal2", headers: new NatsHeaders()); + var msg2 = await signal2; + Assert.Equal(0, msg2.Data); + Assert.NotNull(msg2.Headers); + Assert.Empty(msg2.Headers!); + var pubFrame2 = proxy.Frames.First(f => f.Message.StartsWith($"HPUB {prefix}.signal2")); + Assert.Equal($"HPUB {prefix}.signal2 12 12␍␊NATS/1.0␍␊␍␊", pubFrame2.Message); + var msgFrame2 = proxy.Frames.First(f => f.Message.StartsWith($"HMSG {prefix}.signal2")); + Assert.Matches($@"^HMSG {prefix}.signal2 \w+ 12 12␍␊NATS/1.0␍␊␍␊$", msgFrame2.Message); + + await sub.DisposeAsync(); + await reg; + } + + [Fact] + public async Task Unsubscribe_max_msgs() + { + const int maxMsgs = 10; + const int pubMsgs = 5; + const int extraMsgs = 3; + + void Log(string text) + { + _output.WriteLine($"[TESTS] {DateTime.Now:HH:mm:ss.fff} {text}"); + } + + // Use a single server to test multiple scenarios to make test runs more efficient + var proxy = new NatsProxy(_server.Port); + var nats = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10) }); + var sid = 0; + + Log("### Auto-unsubscribe after consuming max-msgs"); + { + var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; + await using var sub = await nats.SubscribeCoreAsync("foo", opts: opts); + sid++; + + await Retry.Until("all frames arrived", () => proxy.Frames.Count >= 2); + Assert.Equal($"SUB foo {sid}", proxy.Frames[0].Message); + Assert.Equal($"UNSUB {sid} {maxMsgs}", proxy.Frames[1].Message); + + Log("Send more messages than max to check we only get max"); + for (var i = 0; i < maxMsgs + extraMsgs; i++) + { + await nats.PublishAsync("foo", i); + } + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + var count = 0; + await foreach (var natsMsg in sub.Msgs.ReadAllAsync(cancellationToken)) + { + Assert.Equal(count, natsMsg.Data); + count++; + } + + Assert.Equal(maxMsgs, count); + Assert.Equal(NatsSubEndReason.MaxMsgs, ((NatsSubBase)sub).EndReason); + } + + Log("### Manual unsubscribe"); + { + await proxy.FlushFramesAsync(nats); + + await using var sub = await nats.SubscribeCoreAsync("foo2"); + sid++; + await sub.UnsubscribeAsync(); + + await Retry.Until("all frames arrived", () => proxy.ClientFrames.Count == 2); + + Assert.Equal($"SUB foo2 {sid}", proxy.ClientFrames[0].Message); + Assert.Equal($"UNSUB {sid}", proxy.ClientFrames[1].Message); + + Log("Send messages to check we receive none since we're already unsubscribed"); + for (var i = 0; i < pubMsgs; i++) + { + await nats.PublishAsync("foo2", i); + } + + await Retry.Until("all pub frames arrived", () => proxy.Frames.Count(f => f.Message.StartsWith("PUB foo2")) == pubMsgs); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + var count = 0; + await foreach (var unused in sub.Msgs.ReadAllAsync(cancellationToken)) + { + count++; + } + + Assert.Equal(0, count); + Assert.Equal(NatsSubEndReason.None, ((NatsSubBase)sub).EndReason); + } + + Log("### Reconnect"); + { + proxy.Reset(); + + var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; + var sub = await nats.SubscribeCoreAsync("foo3", opts: opts); + sid++; + var count = 0; + var reg = sub.Register(_ => Interlocked.Increment(ref count)); + await Retry.Until("subscribed", () => proxy.Frames.Any(f => f.Message == $"SUB foo3 {sid}")); + + for (var i = 0; i < pubMsgs; i++) + { + await nats.PublishAsync("foo3", i); + } + + await Retry.Until("published", () => proxy.Frames.Count(f => f.Message.StartsWith("PUB foo3")) == pubMsgs); + await Retry.Until("received", () => Volatile.Read(ref count) == pubMsgs); + + var pending = maxMsgs - pubMsgs; + Assert.Equal(pending, ((NatsSubBase)sub).PendingMsgs); + + proxy.Reset(); + + Log("Expect SUB + UNSUB"); + await Retry.Until("re-subscribed", () => proxy.ClientFrames.Count == 2); + + Log("Make sure we're still using the same SID"); + Assert.Equal($"SUB foo3 {sid}", proxy.ClientFrames[0].Message); + Assert.Equal($"UNSUB {sid} {pending}", proxy.ClientFrames[1].Message); + + Log("We already published a few, this should exceed max-msgs"); + for (var i = 0; i < maxMsgs; i++) + { + await nats.PublishAsync("foo3", i); + } + + await Retry.Until( + "published more", + () => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB foo3")) == maxMsgs); + + await Retry.Until( + "unsubscribed with max-msgs", + () => ((NatsSubBase)sub).EndReason == NatsSubEndReason.MaxMsgs); + + // Wait until msg channel is completed and drained + await reg; + + Assert.Equal(maxMsgs, Volatile.Read(ref count)); + + await sub.DisposeAsync(); + } + } + + [Fact] + public async Task Reconnect_with_sub_and_additional_commands() + { + var proxy = new NatsProxy(_server.Port); + var nats = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10) }); + + var subject = $"{_server.GetNextId()}.foo"; + var cmdSubject = $"{_server.GetNextId()}.bar"; + + var sync = 0; + await using var sub = new NatsSubReconnectTest(nats, subject, cmdSubject, i => Interlocked.Exchange(ref sync, i)); + await nats.AddSubAsync(sub); + + await Retry.Until( + "subscribed", + () => Volatile.Read(ref sync) == 1, + async () => await nats.PublishAsync(subject, 1)); + + var disconnected = new WaitSignal(); + nats.ConnectionDisconnected += (_, _) => + { + disconnected.Pulse(); + return default; + }; + + proxy.Reset(); + + await disconnected; + + await Retry.Until( + "re-subscribed", + () => Volatile.Read(ref sync) == 2, + async () => await nats.PublishAsync(subject, 2)); + + await Retry.Until( + "frames collected", + () => proxy.ClientFrames.Any(f => f.Message.StartsWith($"PUB {subject}"))); + + var frames = proxy.ClientFrames.Select(f => f.Message).ToList(); + + foreach (var frame in frames) + { + _output.WriteLine($"frame: {frame}"); + } + + Assert.StartsWith($"SUB {subject}", frames[0]); + + for (var i = 0; i < 100; i++) + { + Assert.StartsWith($"PUB {cmdSubject}{i}", frames[i + 1]); + } + + Assert.StartsWith($"PUB {subject}", frames[101]); + + await nats.DisposeAsync(); + } + + [Fact] + public async Task Proactively_reject_payloads_over_the_threshold_set_by_server() + { + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var sync = 0; + var count = 0; + var signal1 = new WaitSignal>(); + var signal2 = new WaitSignal>(); + var subTask = Task.Run( + async () => + { + await foreach (var m in nats.SubscribeAsync("foo.*", cancellationToken: cts.Token)) + { + if (m.Subject == "foo.sync") + { + Interlocked.Exchange(ref sync, 1); + continue; + } + + Interlocked.Increment(ref count); + + if (m.Subject == "foo.signal1") + { + signal1.Pulse(m); + } + else if (m.Subject == "foo.signal2") + { + signal2.Pulse(m); + } + else if (m.Subject == "foo.end") + { + break; + } + } + }, + cancellationToken: cts.Token); + + await Retry.Until( + reason: "subscription is active", + condition: () => Volatile.Read(ref sync) == 1, + action: async () => await nats.PublishAsync("foo.sync", cancellationToken: cts.Token), + retryDelay: TimeSpan.FromSeconds(.3)); + { + var payload = new byte[nats.ServerInfo!.MaxPayload]; + await nats.PublishAsync("foo.signal1", payload, cancellationToken: cts.Token); + var msg1 = await signal1; + Assert.Equal(payload.Length, msg1.Data!.Length); + } + + { + var payload = new byte[nats.ServerInfo!.MaxPayload + 1]; + var exception = await Assert.ThrowsAsync(async () => + await nats.PublishAsync("foo.none", payload, cancellationToken: cts.Token)); + Assert.Matches(@"Payload size \d+ exceeds server's maximum payload size \d+", exception.Message); + } + + { + var payload = new byte[123]; + await nats.PublishAsync("foo.signal2", payload, cancellationToken: cts.Token); + var msg1 = await signal2; + Assert.Equal(payload.Length, msg1.Data!.Length); + } + + await nats.PublishAsync("foo.end", cancellationToken: cts.Token); + + await subTask; + + Assert.Equal(3, Volatile.Read(ref count)); + } + + private sealed class NatsSubReconnectTest : NatsSubBase + { + private readonly string _cmdSubject; + private readonly Action _callback; + + internal NatsSubReconnectTest(NatsConnection connection, string subject, string cmdSubject, Action callback) + : base(connection, connection.SubscriptionManager, subject, queueGroup: default, opts: default) + { + _cmdSubject = cmdSubject; + _callback = callback; + } + + internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter, int sid) + { + await base.WriteReconnectCommandsAsync(commandWriter, sid); + + // Any additional commands to send on reconnect + for (var i = 0; i < 100; i++) + { + await commandWriter.PublishAsync($"{_cmdSubject}{i}", default, default, default, NatsRawSerializer.Default, default); + } + } + + protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) + { + _callback(int.Parse(Encoding.UTF8.GetString(payloadBuffer))); + DecrementMaxMsgs(); + return default; + } + + protected override void TryComplete() + { + } + } +} diff --git a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs b/tests/NATS.Client.Core2.Tests/RequestReplyTest.cs similarity index 87% rename from tests/NATS.Client.Core.Tests/RequestReplyTest.cs rename to tests/NATS.Client.Core2.Tests/RequestReplyTest.cs index 0f9eee201..0a012485b 100644 --- a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs +++ b/tests/NATS.Client.Core2.Tests/RequestReplyTest.cs @@ -1,19 +1,28 @@ using System.Buffers; using System.Text; +using NATS.Client.Core2.Tests; +#if !NET6_0_OR_GREATER +using NATS.Client.Core.Internal.NetStandardExtensions; +#endif namespace NATS.Client.Core.Tests; +[Collection("nats-server")] public class RequestReplyTest { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public RequestReplyTest(ITestOutputHelper output) => _output = output; + public RequestReplyTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } [Fact] public async Task Simple_request_reply_test() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); const string subject = "foo"; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); @@ -41,12 +50,11 @@ public async Task Simple_request_reply_test() [Fact] public async Task Request_reply_command_timeout_test() { - await using var server = NatsServer.Start(); - // Request timeout as default timeout { - await using var nats = server.CreateClientConnection(NatsOpts.Default with + await using var nats = new NatsConnection(new NatsOpts { + Url = _server.Url, RequestTimeout = TimeSpan.FromSeconds(1), }); @@ -66,7 +74,7 @@ await Assert.ThrowsAsync(async () => // Cancellation token usage { - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); var sub = await nats.SubscribeCoreAsync("foo"); @@ -89,11 +97,9 @@ await Assert.ThrowsAsync(async () => [Fact] public async Task Request_reply_no_responders_test() { - await using var server = NatsServer.Start(); - - // Enable no responders, and do not set a timeout. We should get a response with a 503 header code. + // Enable no responders, and do not set a timeout. We should get a response with a 503-header code. { - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); await Assert.ThrowsAsync(async () => await nats.RequestAsync(Guid.NewGuid().ToString(), 0)); } } @@ -103,8 +109,7 @@ public async Task Request_reply_many_test() { const int msgs = 10; - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); var sub = await nats.SubscribeCoreAsync("foo"); var reg = sub.Register(async msg => @@ -135,8 +140,7 @@ public async Task Request_reply_many_test() [Fact] public async Task Request_reply_many_test_overall_timeout() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); var sub = await nats.SubscribeCoreAsync("foo"); var reg = sub.Register(async msg => @@ -166,8 +170,7 @@ public async Task Request_reply_many_test_overall_timeout() [Fact] public async Task Request_reply_many_test_idle_timeout() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); var sub = await nats.SubscribeCoreAsync("foo"); var reg = sub.Register(async msg => @@ -200,8 +203,7 @@ public async Task Request_reply_many_test_idle_timeout() [Fact] public async Task Request_reply_many_test_start_up_timeout() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); var sub = await nats.SubscribeCoreAsync("foo"); var reg = sub.Register(async msg => @@ -230,8 +232,7 @@ public async Task Request_reply_many_test_start_up_timeout() [Fact] public async Task Request_reply_many_test_max_count() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); var sub = await nats.SubscribeCoreAsync("foo"); var reg = sub.Register(async msg => @@ -263,8 +264,7 @@ public async Task Request_reply_many_test_max_count() [Fact] public async Task Request_reply_many_test_sentinel() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); var sub = await nats.SubscribeCoreAsync("foo"); var reg = sub.Register(async msg => @@ -294,13 +294,13 @@ public async Task Request_reply_binary_test() { static string ToStr(ReadOnlyMemory input) { - return Encoding.ASCII.GetString(input.Span); + return Encoding.ASCII.GetString(input.Span.ToArray()); } + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); await using var sub = await nats.SubscribeCoreAsync("foo", cancellationToken: cts.Token); var reg = sub.Register(async m => { @@ -313,7 +313,7 @@ static string ToStr(ReadOnlyMemory input) } }); - var writer = new ArrayBufferWriter(); + var writer = new NatsBufferWriter(); await foreach (var msg in nats.RequestManyAsync("foo", "1", cancellationToken: cts.Token)) { writer.Write(Encoding.UTF8.GetBytes(msg.Data!)); @@ -329,8 +329,7 @@ static string ToStr(ReadOnlyMemory input) [Fact] public async Task Request_reply_many_multiple_with_timeout_test() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); // connect to avoid race to subscribe and publish await nats.ConnectAsync(); @@ -396,8 +395,7 @@ public async Task Request_reply_many_multiple_with_timeout_test() [Fact] public async Task Simple_empty_request_reply_test() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); const string subject = "foo"; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); diff --git a/tests/NATS.Client.Core.Tests/SendBufferTest.cs b/tests/NATS.Client.Core2.Tests/SendBufferTest.cs similarity index 98% rename from tests/NATS.Client.Core.Tests/SendBufferTest.cs rename to tests/NATS.Client.Core2.Tests/SendBufferTest.cs index 9676ed605..7d42ecff5 100644 --- a/tests/NATS.Client.Core.Tests/SendBufferTest.cs +++ b/tests/NATS.Client.Core2.Tests/SendBufferTest.cs @@ -2,6 +2,9 @@ using System.Net.Sockets; using Microsoft.Extensions.Logging; using NATS.Client.TestUtilities; +#if !NET6_0_OR_GREATER +using NATS.Client.Core.Internal.NetStandardExtensions; +#endif namespace NATS.Client.Core.Tests; diff --git a/tests/NATS.Client.Core.Tests/SerializerTest.cs b/tests/NATS.Client.Core2.Tests/SerializerTest.cs similarity index 94% rename from tests/NATS.Client.Core.Tests/SerializerTest.cs rename to tests/NATS.Client.Core2.Tests/SerializerTest.cs index d594213a5..eabc317df 100644 --- a/tests/NATS.Client.Core.Tests/SerializerTest.cs +++ b/tests/NATS.Client.Core2.Tests/SerializerTest.cs @@ -1,17 +1,27 @@ using System.Buffers; using System.Text; +using NATS.Client.Core2.Tests; // ReSharper disable RedundantTypeArgumentsOfMethod // ReSharper disable ReturnTypeCanBeNotNullable namespace NATS.Client.Core.Tests; +[Collection("nats-server")] public class SerializerTest { + private readonly NatsServerFixture _server; + + public SerializerTest(NatsServerFixture server) + { + _server = server; + } + [Fact] public async Task Serializer_exceptions() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + + await nats.ConnectAsync(); await Assert.ThrowsAsync(() => nats.PublishAsync( @@ -22,9 +32,6 @@ await Assert.ThrowsAsync(() => // Check that our connection isn't affected by the exceptions await using var sub = await nats.SubscribeCoreAsync("foo"); - var rtt = await nats.PingAsync(); - Assert.True(rtt > TimeSpan.Zero); - await nats.PublishAsync("foo", 1); var result = (await sub.Msgs.ReadAsync()).Data; @@ -35,8 +42,7 @@ await Assert.ThrowsAsync(() => [Fact] public async Task NatsMemoryOwner_empty_payload_should_not_throw() { - await using var server = NatsServer.Start(); - var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; @@ -97,7 +103,7 @@ void Serialize(INatsSerialize serializer, T value, string expected) { var buffer = new NatsBufferWriter(); serializer.Serialize(buffer, value); - var actual = Encoding.UTF8.GetString(buffer.WrittenMemory.Span); + var actual = Encoding.UTF8.GetString(buffer.WrittenMemory.Span.ToArray()); Assert.Equal(expected, actual); } @@ -226,8 +232,7 @@ void Deserialize() [Fact] public async Task Deserialize_with_empty_should_still_go_through_the_deserializer() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; @@ -252,9 +257,9 @@ public async Task Deserialize_with_empty_should_still_go_through_the_deserialize [Fact] public async Task Deserialize_chained_with_empty() { - await using var server = NatsServer.Start(); - await using var nats = server.CreateClientConnection(new NatsOpts + await using var nats = new NatsConnection(new NatsOpts { + Url = _server.Url, SerializerRegistry = new TestSerializerRegistry(), }); @@ -309,7 +314,7 @@ public class TestSerializerWithEmpty : INatsSerializer { public T? Deserialize(in ReadOnlySequence buffer) => (T)(object)(buffer.IsEmpty ? new TestData("__EMPTY__") - : new TestData(Encoding.ASCII.GetString(buffer))); + : new TestData(Encoding.ASCII.GetString(buffer.ToArray()))); public void Serialize(IBufferWriter bufferWriter, T value) => throw new Exception("not used"); diff --git a/tests/NATS.Client.Core.Tests/SlowConsumerTest.cs b/tests/NATS.Client.Core2.Tests/SlowConsumerTest.cs similarity index 86% rename from tests/NATS.Client.Core.Tests/SlowConsumerTest.cs rename to tests/NATS.Client.Core2.Tests/SlowConsumerTest.cs index b47b3c0ae..294df0b5f 100644 --- a/tests/NATS.Client.Core.Tests/SlowConsumerTest.cs +++ b/tests/NATS.Client.Core2.Tests/SlowConsumerTest.cs @@ -1,16 +1,27 @@ +using NATS.Client.Core2.Tests; + namespace NATS.Client.Core.Tests; +[Collection("nats-server")] public class SlowConsumerTest { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public SlowConsumerTest(ITestOutputHelper output) => _output = output; + public SlowConsumerTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } [Fact] public async Task Slow_consumer() { - await using var server = NatsServer.Start(); - var nats = server.CreateClientConnection(new NatsOpts { SubPendingChannelCapacity = 3 }); + await using var nats = new NatsConnection(new NatsOpts + { + Url = _server.Url, + SubPendingChannelCapacity = 3, + }); var lost = 0; nats.MessageDropped += (_, e) => diff --git a/tests/NATS.Client.Core2.Tests/test.runsettings b/tests/NATS.Client.Core2.Tests/test.runsettings new file mode 100644 index 000000000..27c41ad33 --- /dev/null +++ b/tests/NATS.Client.Core2.Tests/test.runsettings @@ -0,0 +1,7 @@ + + + + 1 + 300000 + + diff --git a/tests/NATS.Client.CoreUnit.Tests/NATS.Client.CoreUnit.Tests.csproj b/tests/NATS.Client.CoreUnit.Tests/NATS.Client.CoreUnit.Tests.csproj new file mode 100644 index 000000000..501e49cc0 --- /dev/null +++ b/tests/NATS.Client.CoreUnit.Tests/NATS.Client.CoreUnit.Tests.csproj @@ -0,0 +1,50 @@ + + + + net6.0;net8.0 + $(TargetFrameworks);net481 + enable + false + $(NoWarn);CS8002 + enable + $(MSBuildProjectDirectory)\test.runsettings + false + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/NATS.Client.Core.Tests/NKeyTests.cs b/tests/NATS.Client.CoreUnit.Tests/NKeyTests.cs similarity index 73% rename from tests/NATS.Client.Core.Tests/NKeyTests.cs rename to tests/NATS.Client.CoreUnit.Tests/NKeyTests.cs index ef7f75ff5..1393022ea 100644 --- a/tests/NATS.Client.Core.Tests/NKeyTests.cs +++ b/tests/NATS.Client.CoreUnit.Tests/NKeyTests.cs @@ -1,3 +1,4 @@ +using System.Text; using NATS.Client.Core.NaCl; namespace NATS.Client.Core.Tests; @@ -15,12 +16,12 @@ public void NKey_Seed_And_Sign() var nKey = NKeys.FromSeed(seed); // Sanity check public key - var actualPublicKey = Convert.ToHexString(nKey.PublicKey); + var actualPublicKey = ToHexString(nKey.PublicKey); Assert.Equal(expectedPublicKey, actualPublicKey); // Sign and verify var signedData = nKey.Sign(dataToSign.ToArray()); - var actual = Convert.ToHexString(signedData); + var actual = ToHexString(signedData); Assert.Equal(expectedSignedResult, actual); } @@ -32,8 +33,19 @@ public void Sha512_Hash() const string ExpectedHash = "B8B57504AD522AC43AF52CB86BB10D315840C7D1B80BDF3A2524654F7C2C3B07C601ADD320E9F870A6FA8DA3003CFA1BE330133D0ABED7CE49F9251D2BB97421"; var dataArray = dataToHash.ToArray(); - var actual = Convert.ToHexString(Sha512.Hash(dataArray, 0, dataArray.Length)); + var actual = ToHexString(Sha512.Hash(dataArray, 0, dataArray.Length)); Assert.Equal(ExpectedHash, actual); } + + private static string ToHexString(byte[] bytes) + { + var hex = new StringBuilder(bytes.Length * 2); + foreach (var b in bytes) + { + hex.Append($"{b:X2}"); + } + + return hex.ToString(); + } } diff --git a/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs b/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs similarity index 98% rename from tests/NATS.Client.Core.Tests/NatsHeaderTest.cs rename to tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs index 3ae66433f..240a0700e 100644 --- a/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs +++ b/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs @@ -31,7 +31,7 @@ public async Task WriterTests() await pipe.Writer.FlushAsync(); var result = await pipe.Reader.ReadAtLeastAsync((int)written); Assert.True(expected.ToSpan().SequenceEqual(result.Buffer.ToSpan())); - _output.WriteLine($"Buffer:\n{result.Buffer.FirstSpan.Dump()}"); + _output.WriteLine($"Buffer:\n{result.Buffer.First.Span.Dump()}"); } [Fact] @@ -49,7 +49,7 @@ public async Task WriterEmptyTests() await pipe.Writer.FlushAsync(); var result = await pipe.Reader.ReadAtLeastAsync((int)written); Assert.True(expected.ToSpan().SequenceEqual(result.Buffer.ToSpan())); - _output.WriteLine($"Buffer:\n{result.Buffer.FirstSpan.Dump()}"); + _output.WriteLine($"Buffer:\n{result.Buffer.First.Span.Dump()}"); } [Fact] diff --git a/tests/NATS.Client.Core.Tests/NuidTests.cs b/tests/NATS.Client.CoreUnit.Tests/NuidTests.cs similarity index 96% rename from tests/NATS.Client.Core.Tests/NuidTests.cs rename to tests/NATS.Client.CoreUnit.Tests/NuidTests.cs index 4a7a84a03..eb4effb70 100644 --- a/tests/NATS.Client.Core.Tests/NuidTests.cs +++ b/tests/NATS.Client.CoreUnit.Tests/NuidTests.cs @@ -46,7 +46,7 @@ public void GetNextNuid_ReturnsNuidOfLength22_Char() // Assert ReadOnlySpan lower = buffer.Slice(0, 22); - string resultAsString = new(lower); + string resultAsString = new(lower.ToArray()); ReadOnlySpan upper = buffer.Slice(22); Assert.True(result); @@ -112,7 +112,7 @@ public void GetNextNuid_ContainsOnlyValidCharacters_Char() // Assert Assert.True(result); - string resultAsString = new(nuid); + string resultAsString = new(nuid.ToArray()); Assert.Matches("[A-z0-9]{22}", resultAsString); } @@ -149,7 +149,7 @@ public void GetPrefix_PrefixAsExpected() DeterministicRng rng = new(new Queue(new[] { rngBytes, rngBytes })); var mi = typeof(Nuid).GetMethod("GetPrefix", BindingFlags.Static | BindingFlags.NonPublic); - var mGetPrefix = mi!.CreateDelegate>(); + var mGetPrefix = (Func)mi!.CreateDelegate(typeof(Func)); // Act var prefix = mGetPrefix(rng); @@ -207,7 +207,7 @@ public void DifferentThreads_DifferentPrefixes() foreach (var (nuid, threadId) in nuids.ToList()) { - var prefix = new string(nuid.AsSpan(0, prefixLength)); + var prefix = new string(nuid.AsSpan(0, prefixLength).ToArray()); Assert.True(uniquePrefixes.Add(prefix), $"Unique prefix {prefix}"); Assert.True(uniqueThreadIds.Add(threadId), $"Unique thread id {threadId}"); } @@ -265,7 +265,7 @@ public void AllNuidsAreUnique_SmallSequentials() return; } - var nuid = new string(buffer); + var nuid = new string(buffer.ToArray()); if (!nuids.Add(nuid)) { @@ -307,7 +307,7 @@ public void AllNuidsAreUnique_ZeroSequential() return; } - var nuid = new string(buffer); + var nuid = new string(buffer.ToArray()); if (!nuids.Add(nuid)) { diff --git a/tests/NATS.Client.Core.Tests/ObjectPoolTest.cs b/tests/NATS.Client.CoreUnit.Tests/ObjectPoolTest.cs similarity index 100% rename from tests/NATS.Client.Core.Tests/ObjectPoolTest.cs rename to tests/NATS.Client.CoreUnit.Tests/ObjectPoolTest.cs diff --git a/tests/NATS.Client.Core.Tests/SequenceBuilderTest.cs b/tests/NATS.Client.CoreUnit.Tests/SequenceBuilderTest.cs similarity index 100% rename from tests/NATS.Client.Core.Tests/SequenceBuilderTest.cs rename to tests/NATS.Client.CoreUnit.Tests/SequenceBuilderTest.cs diff --git a/tests/NATS.Client.CoreUnit.Tests/test.runsettings b/tests/NATS.Client.CoreUnit.Tests/test.runsettings new file mode 100644 index 000000000..27c41ad33 --- /dev/null +++ b/tests/NATS.Client.CoreUnit.Tests/test.runsettings @@ -0,0 +1,7 @@ + + + + 1 + 300000 + + diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index bcb464256..c86df1662 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -1,15 +1,22 @@ using System.Text.RegularExpressions; using Microsoft.Extensions.Logging; using NATS.Client.Core.Tests; +using NATS.Client.Core2.Tests; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Tests; +[Collection("nats-server")] public class ConsumerConsumeTest { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public ConsumerConsumeTest(ITestOutputHelper output) => _output = output; + public ConsumerConsumeTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } [Theory] [InlineData("Invalid.DotName")] @@ -261,14 +268,14 @@ await Retry.Until( [Fact] public async Task Consume_dispose_test() { - await using var server = NatsServer.StartJS(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + var prefix = _server.GetNextId(); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); var js = new NatsJSContext(nats); - var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + var stream = await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); + var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); var consumerOpts = new NatsJSConsumeOpts { @@ -279,7 +286,7 @@ public async Task Consume_dispose_test() for (var i = 0; i < 10; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); ack.EnsureSuccess(); } @@ -310,7 +317,7 @@ await Retry.Until( "ack pending 9", async () => { - var c = await js.GetConsumerAsync("s1", "c1", cts.Token); + var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); return c.Info.NumAckPending == 9; }, retryDelay: TimeSpan.FromSeconds(1), @@ -326,7 +333,7 @@ await Retry.Until( "ack pending 0", async () => { - var c = await js.GetConsumerAsync("s1", "c1", cts.Token); + var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); return c.Info.NumAckPending == 0; }, retryDelay: TimeSpan.FromSeconds(1), diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index fb14c5904..ff085bc48 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -1,31 +1,37 @@ using NATS.Client.Core.Tests; -using NATS.Client.JetStream.Models; +using NATS.Client.Core2.Tests; namespace NATS.Client.JetStream.Tests; +[Collection("nats-server")] public class ConsumerFetchTest { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public ConsumerFetchTest(ITestOutputHelper output) => _output = output; + public ConsumerFetchTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } [Fact] public async Task Fetch_test() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await using var server = NatsServer.StartJS(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + var prefix = _server.GetNextId(); var js = new NatsJSContext(nats); - await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); + await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); for (var i = 0; i < 10; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); ack.EnsureSuccess(); } - var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); + var consumer = (NatsJSConsumer)await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); var count = 0; await using var fc = await consumer.FetchInternalAsync(serializer: TestDataJsonSerializer.Default, opts: new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token); @@ -43,19 +49,19 @@ public async Task Fetch_test() public async Task FetchNoWait_test() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await using var server = NatsServer.StartJS(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + var prefix = _server.GetNextId(); var js = new NatsJSContext(nats); - await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); + await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); for (var i = 0; i < 10; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); ack.EnsureSuccess(); } - var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); + var consumer = (NatsJSConsumer)await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); var count = 0; await foreach (var msg in consumer.FetchNoWaitAsync(serializer: TestDataJsonSerializer.Default, opts: new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token)) { @@ -70,14 +76,14 @@ public async Task FetchNoWait_test() [Fact] public async Task Fetch_dispose_test() { - await using var server = NatsServer.StartJS(); - await using var nats = server.CreateClientConnection(); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + var prefix = _server.GetNextId(); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); var js = new NatsJSContext(nats); - var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + var stream = await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); + var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync($"{prefix}s1", $"{prefix}c1", cancellationToken: cts.Token); var fetchOpts = new NatsJSFetchOpts { @@ -88,7 +94,7 @@ public async Task Fetch_dispose_test() for (var i = 0; i < 100; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); + var ack = await js.PublishAsync($"{prefix}s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer.Default, cancellationToken: cts.Token); ack.EnsureSuccess(); } @@ -117,7 +123,7 @@ await Retry.Until( "ack pending 9", async () => { - var c = await js.GetConsumerAsync("s1", "c1", cts.Token); + var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); return c.Info.NumAckPending == 9; }, retryDelay: TimeSpan.FromSeconds(1), @@ -133,7 +139,7 @@ await Retry.Until( "ack pending 0", async () => { - var c = await js.GetConsumerAsync("s1", "c1", cts.Token); + var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); return c.Info.NumAckPending == 0; }, retryDelay: TimeSpan.FromSeconds(1), diff --git a/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs b/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs index da264a0db..566c56b17 100644 --- a/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs @@ -1,28 +1,36 @@ using NATS.Client.Core.Tests; +using NATS.Client.Core2.Tests; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Tests; +[Collection("nats-server")] public class ErrorHandlerTest { private readonly ITestOutputHelper _output; + private readonly NatsServerFixture _server; - public ErrorHandlerTest(ITestOutputHelper output) => _output = output; + public ErrorHandlerTest(ITestOutputHelper output, NatsServerFixture server) + { + _output = output; + _server = server; + } [Fact] public async Task Consumer_fetch_error_handling() { - await using var server = NatsServer.StartJS(); - var (nats1, proxy) = server.CreateProxiedClientConnection(); - await using var nats = nats1; + var proxy = new NatsProxy(_server.Port); + await using var nats = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10) }); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); - var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("c1"), cts.Token); + var stream = await js.CreateStreamAsync(new StreamConfig($"{prefix}s1", new[] { $"{prefix}s1.*" }), cts.Token); + var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig($"{prefix}c1"), cts.Token); - (await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); + (await js.PublishAsync($"{prefix}s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); var timeoutNotifications = 0; var opts = new NatsJSNextOpts @@ -44,7 +52,7 @@ public async Task Consumer_fetch_error_handling() var next = await consumer.NextAsync(opts: opts, cancellationToken: cts.Token); if (next is { } msg) { - msg.Subject.Should().Be("s1.1"); + msg.Subject.Should().Be($"{prefix}s1.1"); msg.Data.Should().Be(1); await msg.AckAsync(cancellationToken: cts.Token); } @@ -57,8 +65,8 @@ public async Task Consumer_fetch_error_handling() proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m); // Create an empty stream to potentially reduce the chance of having a message. - var stream2 = await js.CreateStreamAsync(new StreamConfig("s2", new[] { "s2.*" }), cts.Token); - var consumer2 = await stream2.CreateOrUpdateConsumerAsync(new ConsumerConfig("c2"), cts.Token); + var stream2 = await js.CreateStreamAsync(new StreamConfig($"{prefix}s2", new[] { $"{prefix}s2.*" }), cts.Token); + var consumer2 = await stream2.CreateOrUpdateConsumerAsync(new ConsumerConfig($"{prefix}c2"), cts.Token); // reduce heartbeat time out to increase the chance of receiving notification. var opts2 = new NatsJSNextOpts @@ -84,17 +92,18 @@ public async Task Consumer_fetch_error_handling() [Fact] public async Task Consumer_consume_handling() { - await using var server = NatsServer.StartJS(); - var (nats1, proxy) = server.CreateProxiedClientConnection(); - await using var nats = nats1; + var proxy = new NatsProxy(_server.Port); + await using var nats = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10) }); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); - var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("c1"), cts.Token); + var stream = await js.CreateStreamAsync(new StreamConfig($"{prefix}s1", new[] { $"{prefix}s1.*" }), cts.Token); + var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig($"{prefix}c1"), cts.Token); - (await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); + (await js.PublishAsync($"{prefix}s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); var timeoutNotifications = 0; var opts = new NatsJSConsumeOpts @@ -116,7 +125,7 @@ public async Task Consumer_consume_handling() await foreach (var msg in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) { msg.Data.Should().Be(1); - msg.Subject.Should().Be("s1.1"); + msg.Subject.Should().Be($"{prefix}s1.1"); await msg.AckAsync(cancellationToken: cts.Token); break; } @@ -149,17 +158,18 @@ public async Task Consumer_consume_handling() [Fact] public async Task Ordered_consumer_fetch_error_handling() { - await using var server = NatsServer.StartJS(); - var (nats1, proxy) = server.CreateProxiedClientConnection(); - await using var nats = nats1; + var proxy = new NatsProxy(_server.Port); + await using var nats = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10) }); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); + var stream = await js.CreateStreamAsync(new StreamConfig($"{prefix}s1", new[] { $"{prefix}s1.*" }), cts.Token); var consumer = (NatsJSOrderedConsumer)await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); - (await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); + (await js.PublishAsync($"{prefix}s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); var timeoutNotifications = 0; var opts = new NatsJSFetchOpts @@ -181,7 +191,7 @@ public async Task Ordered_consumer_fetch_error_handling() var count1 = 0; await foreach (var msg in consumer.FetchAsync(opts: opts, cancellationToken: cts.Token)) { - msg.Subject.Should().Be("s1.1"); + msg.Subject.Should().Be($"{prefix}s1.1"); msg.Data.Should().Be(1); await msg.AckAsync(cancellationToken: cts.Token); count1++; @@ -193,7 +203,7 @@ public async Task Ordered_consumer_fetch_error_handling() proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m); // Create an empty stream since ordered consumer will pick up messages from beginning everytime. - var stream2 = await js.CreateStreamAsync(new StreamConfig("s2", new[] { "s2.*" }), cts.Token); + var stream2 = await js.CreateStreamAsync(new StreamConfig($"{prefix}s2", new[] { $"{prefix}s2.*" }), cts.Token); var consumer2 = (NatsJSOrderedConsumer)await stream2.CreateOrderedConsumerAsync(cancellationToken: cts.Token); // reduce heartbeat time out to increase the chance of receiving notification. @@ -226,17 +236,18 @@ public async Task Ordered_consumer_fetch_error_handling() [Fact] public async Task Ordered_consumer_consume_handling() { - await using var server = NatsServer.StartJS(); - var (nats1, proxy) = server.CreateProxiedClientConnection(); - await using var nats = nats1; + var proxy = new NatsProxy(_server.Port); + await using var nats = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10) }); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); + var stream = await js.CreateStreamAsync(new StreamConfig($"{prefix}s1", new[] { $"{prefix}s1.*" }), cts.Token); var consumer = (NatsJSOrderedConsumer)await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); - (await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); + (await js.PublishAsync($"{prefix}s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); var timeoutNotifications = 0; var opts = new NatsJSConsumeOpts @@ -258,7 +269,7 @@ public async Task Ordered_consumer_consume_handling() await foreach (var msg in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) { msg.Data.Should().Be(1); - msg.Subject.Should().Be("s1.1"); + msg.Subject.Should().Be($"{prefix}s1.1"); break; } @@ -304,14 +315,15 @@ public async Task Ordered_consumer_consume_handling() [Fact] public async Task Exception_propagation_handling() { - await using var server = NatsServer.StartJS(); - var (nats1, proxy) = server.CreateProxiedClientConnection(); - await using var nats = nats1; + var proxy = new NatsProxy(_server.Port); + await using var nats = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10) }); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); + var stream = await js.CreateStreamAsync(new StreamConfig($"{prefix}s1", new[] { $"{prefix}s1.*" }), cts.Token); // reduce heartbeat time out to increase the chance of receiving notification. var opts = new NatsJSConsumeOpts @@ -327,7 +339,7 @@ public async Task Exception_propagation_handling() try { - var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("c1"), cts.Token); + var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig($"{prefix}c1"), cts.Token); await foreach (var unused in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) { } diff --git a/tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj b/tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj index be86630aa..19a65ee77 100644 --- a/tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj +++ b/tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj @@ -33,4 +33,8 @@ + + + + diff --git a/tests/NATS.Client.Platform.Windows.Tests/BaseNatsServerFixture.cs b/tests/NATS.Client.Platform.Windows.Tests/BaseNatsServerFixture.cs new file mode 100644 index 000000000..f81dccfa8 --- /dev/null +++ b/tests/NATS.Client.Platform.Windows.Tests/BaseNatsServerFixture.cs @@ -0,0 +1,15 @@ +namespace NATS.Client.Platform.Windows.Tests; + +public class BaseNatsServerFixture : IDisposable +{ + private readonly NatsServerProcess _server; + private int _next; + + protected BaseNatsServerFixture(string? config = default) => _server = NatsServerProcess.Start(config: config); + + public string Url => _server.Url; + + public string GetNextId() => $"test{Interlocked.Increment(ref _next)}"; + + public void Dispose() => _server.Dispose(); +} diff --git a/tests/NATS.Client.Platform.Windows.Tests/BasicTests.cs b/tests/NATS.Client.Platform.Windows.Tests/BasicTests.cs index 6d3fdd52d..56e8a8103 100644 --- a/tests/NATS.Client.Platform.Windows.Tests/BasicTests.cs +++ b/tests/NATS.Client.Platform.Windows.Tests/BasicTests.cs @@ -9,24 +9,29 @@ namespace NATS.Client.Platform.Windows.Tests; -public class BasicTests +public class BasicTests : IClassFixture { private readonly ITestOutputHelper _output; + private readonly BasicTestsNatsServerFixture _server; - public BasicTests(ITestOutputHelper output) => _output = output; + public BasicTests(ITestOutputHelper output, BasicTestsNatsServerFixture server) + { + _output = output; + _server = server; + } [Fact] public async Task Core() { - await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, SerializerRegistry = NatsJsonSerializerRegistry.Default }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, SerializerRegistry = NatsJsonSerializerRegistry.Default }); + var prefix = _server.GetNextId(); await nats.PingAsync(); - await using var sub = await nats.SubscribeCoreAsync("foo"); + await using var sub = await nats.SubscribeCoreAsync($"{prefix}.foo"); for (var i = 0; i < 16; i++) { - await nats.PublishAsync("foo", new TestData { Id = i }); + await nats.PublishAsync($"{prefix}.foo", new TestData { Id = i }); Assert.Equal(i, (await sub.Msgs.ReadAsync()).Data!.Id); } } @@ -34,19 +39,19 @@ public async Task Core() [Fact] public async Task JetStream() { - await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + var prefix = _server.GetNextId(); var js = new NatsJSContext(nats); - var stream = await js.CreateStreamAsync(new StreamConfig("s1", ["s1.>"])); + var stream = await js.CreateStreamAsync(new StreamConfig($"{prefix}s1", [$"{prefix}s1.>"])); for (var i = 0; i < 16; i++) { - var ack = await js.PublishAsync("s1.foo", $"bar{i}"); + var ack = await js.PublishAsync($"{prefix}s1.foo", $"bar{i}"); ack.EnsureSuccess(); } - var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("c1")); + var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig($"{prefix}c1")); var count = 0; await foreach (var msg in consumer.ConsumeAsync()) @@ -59,7 +64,7 @@ public async Task JetStream() } } - var orderedConsumer = await js.CreateOrderedConsumerAsync("s1"); + var orderedConsumer = await js.CreateOrderedConsumerAsync($"{prefix}s1"); count = 0; await foreach (var msg in orderedConsumer.ConsumeAsync()) { @@ -74,12 +79,13 @@ public async Task JetStream() [Fact] public async Task KV() { - await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); - var store = await kv.CreateStoreAsync("b1"); + var store = await kv.CreateStoreAsync($"{prefix}b1"); for (var i = 0; i < 16; i++) { @@ -96,12 +102,13 @@ public async Task KV() [Fact] public async Task ObjectStore() { - await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + var prefix = _server.GetNextId(); + var js = new NatsJSContext(nats); var obj = new NatsObjContext(js); - var store = await obj.CreateObjectStoreAsync("b1"); + var store = await obj.CreateObjectStoreAsync($"{prefix}b1"); for (var i = 0; i < 16; i++) { @@ -118,22 +125,23 @@ public async Task ObjectStore() [Fact] public async Task Services() { - await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + var prefix = _server.GetNextId(); + var svc = new NatsSvcContext(nats); - var s1 = await svc.AddServiceAsync("s1", "1.0.0"); + var s1 = await svc.AddServiceAsync($"{prefix}s1", "1.0.0"); await s1.AddEndpointAsync( async msg => { await msg.ReplyAsync(msg.Data * 2); }, - "multiply"); + $"{prefix}.multiply"); for (var i = 0; i < 16; i++) { - var reply = await nats.RequestAsync("multiply", i); + var reply = await nats.RequestAsync($"{prefix}.multiply", i); Assert.Equal(i * 2, reply.Data); } } @@ -143,3 +151,5 @@ private class TestData public int Id { get; set; } } } + +public class BasicTestsNatsServerFixture : BaseNatsServerFixture; diff --git a/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj b/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj index 3d4f73f62..faaf762a1 100644 --- a/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj +++ b/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj @@ -1,7 +1,7 @@ - net481 + net6.0;net8.0;net481 enable enable $(NoWarn);CS8002;VSTHRD200;VSTHRD111 diff --git a/tests/NATS.Client.Platform.Windows.Tests/NatsServerProcess.cs b/tests/NATS.Client.Platform.Windows.Tests/NatsServerProcess.cs index 1ca4d2910..17536d5a2 100644 --- a/tests/NATS.Client.Platform.Windows.Tests/NatsServerProcess.cs +++ b/tests/NATS.Client.Platform.Windows.Tests/NatsServerProcess.cs @@ -1,5 +1,6 @@ using System.ComponentModel; using System.Diagnostics; +using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text.RegularExpressions; using Exception = System.Exception; @@ -16,23 +17,28 @@ namespace NATS.Client.Platform.Windows.Tests; -public class NatsServerProcess : IAsyncDisposable +public class NatsServerProcess : IAsyncDisposable, IDisposable { private readonly Action _logger; private readonly Process _process; private readonly string _scratch; + private readonly bool _withJs; - private NatsServerProcess(Action logger, Process process, string url, string scratch) + private NatsServerProcess(Action logger, Process process, string url, string scratch, bool withJs) { Url = url; _logger = logger; _process = process; _scratch = scratch; + _withJs = withJs; } public string Url { get; } - public static async ValueTask StartAsync(Action? logger = null, string? config = null) + public static ValueTask StartAsync(Action? logger = null, string? config = null, bool withJs = true) + => new(Start(logger, config, withJs)); + + public static NatsServerProcess Start(Action? logger = null, string? config = null, bool withJs = true) { var isLoggingEnabled = logger != null; var log = logger ?? (_ => { }); @@ -42,36 +48,56 @@ public static async ValueTask StartAsync(Action? logg var portsFileDir = Path.Combine(scratch, "port"); Directory.CreateDirectory(portsFileDir); - var sd = Path.Combine(scratch, "data"); - Directory.CreateDirectory(sd); + var isWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows); - var natsServerExe = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? "nats-server.exe" : "nats-server"; + var natsServerExe = isWindows ? "nats-server.exe" : "nats-server"; var configFlag = config == null ? string.Empty : $"-c \"{config}\""; var portsFileDirEsc = portsFileDir.Replace(@"\", @"\\"); - var sdEsc = sd.Replace(@"\", @"\\"); + + string? sdEsc = null; + if (withJs) + { + var sd = Path.Combine(scratch, "data"); + Directory.CreateDirectory(sd); + sdEsc = sd.Replace(@"\", @"\\"); + } + var info = new ProcessStartInfo { FileName = natsServerExe, - Arguments = $"{configFlag} -a 127.0.0.1 -p -1 -js -sd \"{sdEsc}\" --ports_file_dir \"{portsFileDirEsc}\"", + Arguments = withJs + ? $"{configFlag} -a 127.0.0.1 -p -1 -js -sd \"{sdEsc}\" --ports_file_dir \"{portsFileDirEsc}\"" + : $"{configFlag} -a 127.0.0.1 -p -1 --ports_file_dir \"{portsFileDirEsc}\"", UseShellExecute = false, CreateNoWindow = false, - RedirectStandardError = isLoggingEnabled, - RedirectStandardOutput = isLoggingEnabled, + + // RedirectStandardError = isLoggingEnabled, + // RedirectStandardOutput = isLoggingEnabled, + RedirectStandardError = true, + RedirectStandardOutput = true, }; var process = new Process { StartInfo = info, }; if (isLoggingEnabled) { +#pragma warning disable CS8604 // Possible null reference argument. DataReceivedEventHandler outputHandler = (_, e) => log(e.Data); +#pragma warning restore CS8604 // Possible null reference argument. process.OutputDataReceived += outputHandler; process.ErrorDataReceived += outputHandler; } + else + { + process.OutputDataReceived += (_, e) => { }; + process.ErrorDataReceived += (_, e) => { }; + } process.Start(); - ChildProcessTracker.AddProcess(process); + if (isWindows) + ChildProcessTracker.AddProcess(process); - if (isLoggingEnabled) + // if (isLoggingEnabled) { process.BeginOutputReadLine(); process.BeginErrorReadLine(); @@ -92,7 +118,7 @@ public static async ValueTask StartAsync(Action? logg catch (Exception e) { exception = e; - await Task.Delay(100 + (500 * i)); + Thread.Sleep(100 + (500 * i)); } } @@ -105,18 +131,52 @@ public static async ValueTask StartAsync(Action? logg log($"ports={ports}"); log($"url={url}"); - return new NatsServerProcess(log, process, url, scratch); + for (var i = 0; i < 10; i++) + { + try + { + using var tcpClient = new TcpClient(); + tcpClient.Connect("127.0.0.1", new Uri(url).Port); + using var networkStream = tcpClient.GetStream(); + using var streamReader = new StreamReader(networkStream); + var readLine = streamReader.ReadLine(); + if (readLine == null || !readLine.StartsWith("INFO", StringComparison.OrdinalIgnoreCase)) + { + continue; + } + + return new NatsServerProcess(log, process, url, scratch, withJs); + } + catch + { + Thread.Sleep(1_000 + (i * 500)); + } + } + + throw new Exception("Failed to setup the server."); } - public async ValueTask DisposeAsync() + public ValueTask DisposeAsync() { - try - { - _process.Kill(); - } - catch + Dispose(); + return default; + } + + public void Dispose() + { + for (var i = 0; i < 10; i++) { - // best effort + try + { + _process.Kill(); + } + catch + { + // best effort + } + + if (_process.WaitForExit(1_000)) + break; } for (var i = 0; i < 3; i++) @@ -128,7 +188,7 @@ public async ValueTask DisposeAsync() } catch { - await Task.Delay(100); + Thread.Sleep(100); } } diff --git a/tests/NATS.Client.Platform.Windows.Tests/TlsTests.cs b/tests/NATS.Client.Platform.Windows.Tests/TlsTests.cs index 8683c3216..593eda5ef 100644 --- a/tests/NATS.Client.Platform.Windows.Tests/TlsTests.cs +++ b/tests/NATS.Client.Platform.Windows.Tests/TlsTests.cs @@ -4,17 +4,21 @@ namespace NATS.Client.Platform.Windows.Tests; -public class TlsTests +public class TlsTests : IClassFixture { private readonly ITestOutputHelper _output; + private readonly TlsTestsNatsServerFixture _server; - public TlsTests(ITestOutputHelper output) => _output = output; + public TlsTests(ITestOutputHelper output, TlsTestsNatsServerFixture server) + { + _output = output; + _server = server; + } [Fact] public async Task Tls_fails_without_certificates() { - await using var server = await NatsServerProcess.StartAsync(config: "resources/configs/tls.conf"); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); var exception = await Assert.ThrowsAsync(async () => await nats.ConnectAsync()); Assert.Contains("TLS authentication failed", exception.InnerException?.Message); @@ -26,6 +30,7 @@ public async Task Tls_with_certificates() { const string caCertFile = "resources/certs/ca-cert.pem"; const string clientCertBundleFile = "resources/certs/client-cert-bundle.pfx"; + var prefix = _server.GetNextId(); var tlsOpts = new NatsTlsOpts { @@ -33,16 +38,17 @@ public async Task Tls_with_certificates() CertBundleFile = clientCertBundleFile, }; - await using var server = await NatsServerProcess.StartAsync(config: "resources/configs/tls.conf"); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, TlsOpts = tlsOpts }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, TlsOpts = tlsOpts }); await nats.PingAsync(); - await using var sub = await nats.SubscribeCoreAsync("foo"); + await using var sub = await nats.SubscribeCoreAsync($"{prefix}.foo"); for (var i = 0; i < 64; i++) { - await nats.PublishAsync("foo", i); + await nats.PublishAsync($"{prefix}.foo", i); Assert.Equal(i, (await sub.Msgs.ReadAsync()).Data); } } } + +public class TlsTestsNatsServerFixture() : BaseNatsServerFixture("resources/configs/tls.conf"); diff --git a/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs b/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs index 46f39b375..5be0989ea 100644 --- a/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs +++ b/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs @@ -48,7 +48,9 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except public bool IsEnabled(LogLevel logLevel) => logLevel >= level; #if NET8_0_OR_GREATER - public IDisposable? BeginScope(TState state) +#pragma warning disable CS8633 // Nullability in constraints for type parameter doesn't match the constraints for type parameter in implicitly implemented interface method'. + public IDisposable BeginScope(TState state) +#pragma warning restore CS8633 // Nullability in constraints for type parameter doesn't match the constraints for type parameter in implicitly implemented interface method'. where TState : notnull => new NullDisposable(); #else public IDisposable BeginScope(TState state) => new NullDisposable(); diff --git a/tests/NATS.Client.TestUtilities/MockServer.cs b/tests/NATS.Client.TestUtilities/MockServer.cs index 634154295..d7a5252fe 100644 --- a/tests/NATS.Client.TestUtilities/MockServer.cs +++ b/tests/NATS.Client.TestUtilities/MockServer.cs @@ -3,6 +3,10 @@ using System.Text; using System.Text.RegularExpressions; +#if !NET6_0_OR_GREATER +using NATS.Client.Core.Internal.NetStandardExtensions; +#endif + namespace NATS.Client.TestUtilities; public class MockServer : IAsyncDisposable @@ -32,7 +36,11 @@ public MockServer( var n = 0; while (!cancellationToken.IsCancellationRequested) { +#if NET6_0_OR_GREATER var tcpClient = await _server.AcceptTcpClientAsync(cancellationToken); +#else + var tcpClient = await _server.AcceptTcpClientAsync(); +#endif var client = new Client(this, tcpClient); n++; Log($"[S] [{n}] New client connected"); @@ -150,6 +158,9 @@ public async ValueTask DisposeAsync() catch (IOException) { } + catch (ObjectDisposedException) + { + } } public void Log(string m) => _logger(m); diff --git a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj index b9421f412..437d1b3c6 100644 --- a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj +++ b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj @@ -14,6 +14,8 @@ + + @@ -29,4 +31,8 @@ + + + + diff --git a/tests/NATS.Client.TestUtilities/NatsProxy.cs b/tests/NATS.Client.TestUtilities/NatsProxy.cs index ad2c887d9..62a21c85b 100644 --- a/tests/NATS.Client.TestUtilities/NatsProxy.cs +++ b/tests/NATS.Client.TestUtilities/NatsProxy.cs @@ -1,3 +1,4 @@ +using System.Buffers; using System.Diagnostics; using System.Net; using System.Net.Sockets; @@ -8,7 +9,7 @@ namespace NATS.Client.Core.Tests; public class NatsProxy : IDisposable { - private readonly ITestOutputHelper _outputHelper; + private readonly ITestOutputHelper? _outputHelper; private readonly bool _trace; private readonly TcpListener _tcpListener; private readonly List _clients = new(); @@ -16,7 +17,7 @@ public class NatsProxy : IDisposable private readonly Stopwatch _watch = new(); private int _syncCount; - public NatsProxy(int port, ITestOutputHelper outputHelper, bool trace) + public NatsProxy(int port, ITestOutputHelper? outputHelper = null, bool trace = false) { _outputHelper = outputHelper; _trace = trace; @@ -262,7 +263,23 @@ void Write(string? rawFrame) var span = buffer.AsSpan(); while (true) { +#if !NET6_0_OR_GREATER + var bytes = ArrayPool.Shared.Rent(span.Length); + var read = sr.Read(bytes, 0, span.Length); + + if (read > 0) + { + for (var i = 0; i < read; i++) + { + span[i] = bytes[i]; + } + } + + ArrayPool.Shared.Return(bytes); +#else var read = sr.Read(span); +#endif + if (read == 0) break; if (read == -1) @@ -301,7 +318,7 @@ private void AddFrame(Frame frame) _frames.Add(frame); } - private void Log(string text) => _outputHelper.WriteLine($"[PROXY] {DateTime.Now:HH:mm:ss.fff} {text}"); + private void Log(string text) => _outputHelper?.WriteLine($"[PROXY] {DateTime.Now:HH:mm:ss.fff} {text}"); public record Frame(TimeSpan Timestamp, int Client, string Origin, string Message); } diff --git a/tests/NATS.Client.TestUtilities/Utils.cs b/tests/NATS.Client.TestUtilities/Utils.cs index 47024a41e..117efb7a0 100644 --- a/tests/NATS.Client.TestUtilities/Utils.cs +++ b/tests/NATS.Client.TestUtilities/Utils.cs @@ -2,6 +2,9 @@ using System.Net; using System.Net.Sockets; using System.Text; +#if !NET6_0_OR_GREATER +using NATS.Client.Core.Internal.NetStandardExtensions; +#endif namespace NATS.Client.Core.Tests; @@ -136,7 +139,7 @@ await Retry.Until("service is found", async () => { var count = 0; - // nats cli sends an empty JSON object '{}' as the request payload so we do the same here + // NATS cli sends an empty JSON object '{}' as the request payload, so we do the same here await foreach (var msg in nats.RequestManyAsync(subject, "{}", replySerializer: serializer, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false)) { if (++count == limit) diff --git a/tests/NATS.Client.TestUtilities/WaitSignal.cs b/tests/NATS.Client.TestUtilities/WaitSignal.cs index 8a81ed97c..abeae9b47 100644 --- a/tests/NATS.Client.TestUtilities/WaitSignal.cs +++ b/tests/NATS.Client.TestUtilities/WaitSignal.cs @@ -1,4 +1,7 @@ using System.Runtime.CompilerServices; +#if !NET6_0_OR_GREATER +using NATS.Client.Core.Internal.NetStandardExtensions; +#endif namespace NATS.Client.Core.Tests; @@ -134,6 +137,20 @@ public void Pulse(T result, Exception? exception = null) public TaskAwaiter GetAwaiter() { - return _tcs.Task.WaitAsync(_timeout).GetAwaiter(); + var timeoutTask = Task.Delay(_timeout); + return Task.WhenAny(_tcs.Task, timeoutTask).ContinueWith( +#pragma warning restore VSTHRD105 + // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local + completedTask => + { +#pragma warning disable VSTHRD103 + if (completedTask.Result == timeoutTask) +#pragma warning restore VSTHRD103 + { + throw new TimeoutException("The operation has timed out."); + } + + return _tcs.Task; + }).Unwrap().GetAwaiter(); } } diff --git a/tests/NATS.Extensions.Microsoft.DependencyInjection.Tests/NatsHostingExtensionsTests.cs b/tests/NATS.Extensions.Microsoft.DependencyInjection.Tests/NatsHostingExtensionsTests.cs index a2a62a5cc..cb76c4f2c 100644 --- a/tests/NATS.Extensions.Microsoft.DependencyInjection.Tests/NatsHostingExtensionsTests.cs +++ b/tests/NATS.Extensions.Microsoft.DependencyInjection.Tests/NatsHostingExtensionsTests.cs @@ -116,11 +116,11 @@ public async Task AddNatsClient_ConfigureConnectionResolvesServices() .ConfigureConnection((_, _) => { }) // Add multiple to test chaining .ConfigureConnection((serviceProvider, conn) => { - conn.OnConnectingAsync = async instance => + conn.OnConnectingAsync = instance => { var resolved = serviceProvider.GetRequiredService().GetValue(); - return (resolved, instance.Port); + return new ValueTask<(string Host, int Port)>((resolved, instance.Port)); }; })); diff --git a/tools/Schema.Generation/Schema.Generation.csproj b/tools/Schema.Generation/Schema.Generation.csproj index 03a0db923..e26ff8244 100644 --- a/tools/Schema.Generation/Schema.Generation.csproj +++ b/tools/Schema.Generation/Schema.Generation.csproj @@ -10,6 +10,9 @@ + + +