From 612e45dc38578088350b55b9f550b6b13095e2a0 Mon Sep 17 00:00:00 2001 From: antyadev Date: Mon, 2 May 2022 11:05:45 +0300 Subject: [PATCH] added ReportingManager --- src/NBomber.Contracts/Contracts.fs | 4 +- src/NBomber/Api/CSharp.fs | 7 - src/NBomber/Api/FSharp.fs | 6 - src/NBomber/Contracts.fs | 7 + .../Scheduler/ScenarioScheduler.fs | 23 ++- .../Domain/Stats/ScenarioStatsActor.fs | 66 +++++---- src/NBomber/Domain/Stats/Statistics.fs | 28 ++-- src/NBomber/Domain/Step.fs | 6 +- .../TestHost/ReportingManager.fs | 126 ++++++++++++++++ ...ostReportingSinks.fs => ReportingSinks.fs} | 2 +- .../DomainServices/TestHost/TestHost.fs | 140 +++++++----------- .../TestHost/TestHostReportingActor.fs | 123 --------------- .../{TestHostPlugins.fs => WorkerPlugins.fs} | 2 +- src/NBomber/Extensions/Internal.fs | 20 ++- src/NBomber/NBomber.fsproj | 14 +- .../ConstantActorSchedulerTests.fs | 2 +- .../Concurrency/OneTimeActorSchedulerTests.fs | 2 +- .../Plugins/PluginTests.fs | 2 +- 18 files changed, 296 insertions(+), 284 deletions(-) create mode 100644 src/NBomber/DomainServices/TestHost/ReportingManager.fs rename src/NBomber/DomainServices/TestHost/{TestHostReportingSinks.fs => ReportingSinks.fs} (94%) delete mode 100644 src/NBomber/DomainServices/TestHost/TestHostReportingActor.fs rename src/NBomber/DomainServices/TestHost/{TestHostPlugins.fs => WorkerPlugins.fs} (97%) diff --git a/src/NBomber.Contracts/Contracts.fs b/src/NBomber.Contracts/Contracts.fs index 9c6517f9..b699d092 100644 --- a/src/NBomber.Contracts/Contracts.fs +++ b/src/NBomber.Contracts/Contracts.fs @@ -15,10 +15,10 @@ open NBomber.Contracts.Stats type Response = { StatusCode: Nullable IsError: bool - Message: string + mutable Message: string SizeBytes: int LatencyMs: float - Payload: obj + mutable Payload: obj } type ScenarioInfo = { diff --git a/src/NBomber/Api/CSharp.fs b/src/NBomber/Api/CSharp.fs index 92fe867c..7def9729 100644 --- a/src/NBomber/Api/CSharp.fs +++ b/src/NBomber/Api/CSharp.fs @@ -262,13 +262,6 @@ type NBomberRunner = static member WithReportingInterval(context: NBomberContext, interval: TimeSpan) = context |> FSharp.NBomberRunner.withReportingInterval interval - /// Sets reporting sinks. - /// Reporting sink is used to save real-time metrics to correspond database. - [] - static member WithReportingSinks(context: NBomberContext, []reportingSinks: IReportingSink[]) = - let sinks = reportingSinks |> Seq.toList - context |> FSharp.NBomberRunner.withReportingSinks sinks - /// Sets worker plugins. /// Worker plugin is a plugin that starts at the test start and works as a background worker. [] diff --git a/src/NBomber/Api/FSharp.fs b/src/NBomber/Api/FSharp.fs index 17a840d5..1874d3da 100644 --- a/src/NBomber/Api/FSharp.fs +++ b/src/NBomber/Api/FSharp.fs @@ -268,12 +268,6 @@ module NBomberRunner = let report = { context.Reporting with ReportingInterval = interval } { context with Reporting = report } - /// Sets reporting sinks. - /// Reporting sink is used to save real-time metrics to correspond database - let withReportingSinks (reportingSinks: IReportingSink list) (context: NBomberContext) = - let report = { context.Reporting with Sinks = reportingSinks } - { context with Reporting = report } - /// Sets worker plugins. /// Worker plugin is a plugin that starts at the test start and works as a background worker. let withWorkerPlugins (plugins: IWorkerPlugin list) (context: NBomberContext) = diff --git a/src/NBomber/Contracts.fs b/src/NBomber/Contracts.fs index 4e94f38b..e48f0f75 100644 --- a/src/NBomber/Contracts.fs +++ b/src/NBomber/Contracts.fs @@ -52,6 +52,7 @@ type NBomberContext = { namespace NBomber.Contracts.Internal +open System open CommandLine open NBomber.Contracts @@ -67,3 +68,9 @@ type StepResponse = { EndTimeMs: float LatencyMs: float } + +type ScenarioRawStats = { + ScenarioName: string + Data: StepResponse list + Timestamp: TimeSpan +} diff --git a/src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs b/src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs index bee89419..a15c6dd6 100644 --- a/src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs +++ b/src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs @@ -7,6 +7,7 @@ open FSharp.Control.Reactive open NBomber open NBomber.Contracts +open NBomber.Contracts.Internal open NBomber.Contracts.Stats open NBomber.Domain open NBomber.Domain.DomainTypes @@ -80,7 +81,7 @@ let emptyExec (dep: ActorDep) (actorPool: ScenarioActor list) (scheduledActorCou type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) = - let log = dep.Logger.ForContext() + let _log = dep.Logger.ForContext() let mutable _warmUp = false let mutable _scenario = dep.Scenario let mutable _currentSimulation = dep.Scenario.LoadTimeLine.Head.LoadSimulation @@ -110,8 +111,15 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) = getOneTimeActorCount() ) - let getRealtimeStats (executionTime) = - let executedDuration = _scenario |> Scenario.getDuration |> correctExecutedDuration executionTime + let addRawStats (rawStats: ScenarioRawStats) = + dep.ScenarioStatsActor.Publish(AddResponses rawStats.Data) + + let getRawStats (timestamp) = + let executedDuration = _scenario |> Scenario.getDuration |> correctExecutedDuration timestamp + dep.ScenarioStatsActor.GetRawStats executedDuration + + let getRealtimeStats (timestamp) = + let executedDuration = _scenario |> Scenario.getDuration |> correctExecutedDuration timestamp let simulationStats = getCurrentSimulationStats() dep.ScenarioStatsActor.GetRealtimeStats(simulationStats, executedDuration) @@ -198,13 +206,16 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) = member _.EventStream = _eventStream :> IObservable<_> member _.Scenario = dep.Scenario - member _.PublishStatsToCoordinator() = dep.ScenarioStatsActor.Publish(ActorMessage.PublishStatsToCoordinator) - member _.GetRealtimeStats(executionTime) = getRealtimeStats executionTime + + member _.AddRawStats(rawStats) = addRawStats rawStats + member _.GetRawStats(timestamp) = getRawStats timestamp + member _.GetRealtimeStats(timestamp) = getRealtimeStats timestamp member _.GetFinalStats() = getFinalStats() + member _.GetStatusMessages() = () interface IDisposable with member _.Dispose() = stop() _eventStream.Dispose() - log.Verbose $"{nameof ScenarioScheduler} disposed" + _log.Verbose $"{nameof ScenarioScheduler} disposed" diff --git a/src/NBomber/Domain/Stats/ScenarioStatsActor.fs b/src/NBomber/Domain/Stats/ScenarioStatsActor.fs index 44a7731a..82166e35 100644 --- a/src/NBomber/Domain/Stats/ScenarioStatsActor.fs +++ b/src/NBomber/Domain/Stats/ScenarioStatsActor.fs @@ -13,27 +13,34 @@ open NBomber.Domain.DomainTypes open NBomber.Domain.Stats.Statistics type ActorMessage = - | AddResponse of StepResponse - | AddResponses of StepResponse[] - | PublishStatsToCoordinator - | GetRealtimeStats of reply:TaskCompletionSource * LoadSimulationStats * duration:TimeSpan - | GetFinalStats of reply:TaskCompletionSource * LoadSimulationStats * duration:TimeSpan + | AddResponse of StepResponse + | AddResponses of StepResponse list + | GetRawStats of reply:TaskCompletionSource * timestamp:TimeSpan + | GetRealtimeStats of reply:TaskCompletionSource * LoadSimulationStats * duration:TimeSpan + | GetFinalStats of reply:TaskCompletionSource * LoadSimulationStats * duration:TimeSpan type IScenarioStatsActor = abstract Publish: ActorMessage -> unit + abstract GetRawStats: timestamp:TimeSpan -> Task abstract GetRealtimeStats: LoadSimulationStats * duration:TimeSpan -> Task abstract GetFinalStats: LoadSimulationStats * duration:TimeSpan -> Task -type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval: TimeSpan) = +type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval: TimeSpan, keepRawStats: bool) = let _allStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty()) let mutable _intervalStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty()) + let mutable _intervalRawStats = List.empty - let addResponse (allData: StepStatsRawData[]) (intervalData: StepStatsRawData[]) (resp: StepResponse) = - let allStData = allData.[resp.StepIndex] - let intervalStData = intervalData.[resp.StepIndex] - allData.[resp.StepIndex] <- StepStatsRawData.addResponse allStData resp - intervalData.[resp.StepIndex] <- StepStatsRawData.addResponse intervalStData resp + let addResponse (resp: StepResponse) = + let allStData = _allStepsData.[resp.StepIndex] + let intervalStData = _intervalStepsData.[resp.StepIndex] + _allStepsData.[resp.StepIndex] <- StepStatsRawData.addResponse allStData resp + _intervalStepsData.[resp.StepIndex] <- StepStatsRawData.addResponse intervalStData resp + + if keepRawStats then + resp.ClientResponse.Payload <- null // to prevent sending in cluster mode + resp.ClientResponse.Message <- null + _intervalRawStats <- resp :: _intervalRawStats let createScenarioStats (stepsData, simulationStats, operation, duration, interval) = ScenarioStats.create scenario stepsData simulationStats operation duration interval @@ -41,14 +48,13 @@ type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval: let _actor = ActionBlock(fun msg -> try match msg with - | AddResponse response -> - addResponse _allStepsData _intervalStepsData response - - | AddResponses responses -> - responses |> Array.iter(addResponse _allStepsData _intervalStepsData) + | AddResponse response -> addResponse response + | AddResponses responses -> responses |> List.iter addResponse - | PublishStatsToCoordinator -> - failwith "invalid operation" // it's only needed for cluster + | GetRawStats (reply, timestamp) -> + let stats = { ScenarioName = scenario.ScenarioName; Data = _intervalRawStats; Timestamp = timestamp } + reply.TrySetResult(stats) |> ignore + _intervalRawStats <- List.empty | GetRealtimeStats (reply, simulationStats, duration) -> let scnStats = createScenarioStats(_intervalStepsData, simulationStats, OperationType.Bombing, duration, reportingInterval) @@ -60,7 +66,7 @@ type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval: let scnStats = createScenarioStats(_allStepsData, simulationStats, OperationType.Complete, duration, duration) reply.TrySetResult(scnStats) |> ignore with - | ex -> logger.Error(ex, "GlobalScenarioStatsActor failed") + | ex -> logger.Error $"{nameof ScenarioStatsActor} failed: {ex.ToString()}" ) interface IScenarioStatsActor with @@ -68,15 +74,23 @@ type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval: [] member _.Publish(msg) = _actor.Post(msg) |> ignore + member _.GetRawStats(timestamp) = + let reply = TaskCompletionSource() + GetRawStats(reply, timestamp) |> _actor.Post |> ignore + reply.Task + member _.GetRealtimeStats(simulationStats, duration) = - let tcs = TaskCompletionSource() - GetRealtimeStats(tcs, simulationStats, duration) |> _actor.Post |> ignore - tcs.Task + let reply = TaskCompletionSource() + GetRealtimeStats(reply, simulationStats, duration) |> _actor.Post |> ignore + reply.Task member _.GetFinalStats(simulationStats, duration) = - let tcs = TaskCompletionSource() - GetFinalStats(tcs, simulationStats, duration) |> _actor.Post |> ignore - tcs.Task + let reply = TaskCompletionSource() + GetFinalStats(reply, simulationStats, duration) |> _actor.Post |> ignore + reply.Task let create (logger: ILogger) (scenario: Scenario) (reportingInterval: TimeSpan) = - ScenarioStatsActor(logger, scenario, reportingInterval) :> IScenarioStatsActor + ScenarioStatsActor(logger, scenario, reportingInterval, keepRawStats = false) :> IScenarioStatsActor + +let createWithRawStats (logger: ILogger) (scenario: Scenario) (reportingInterval: TimeSpan) = + ScenarioStatsActor(logger, scenario, reportingInterval, keepRawStats = true) :> IScenarioStatsActor diff --git a/src/NBomber/Domain/Stats/Statistics.fs b/src/NBomber/Domain/Stats/Statistics.fs index eefe12ad..ae8dce02 100644 --- a/src/NBomber/Domain/Stats/Statistics.fs +++ b/src/NBomber/Domain/Stats/Statistics.fs @@ -2,7 +2,6 @@ module internal NBomber.Domain.Stats.Statistics open System open System.Collections.Generic -open System.Data open HdrHistogram @@ -206,19 +205,20 @@ module ScenarioStats = module NodeStats = let create (testInfo: TestInfo) (nodeInfo: NodeInfo) (scnStats: ScenarioStats[]) = - - let maxDuration = scnStats |> Array.maxBy(fun x -> x.Duration) |> fun scn -> scn.Duration - - { RequestCount = scnStats |> Array.sumBy(fun x -> x.RequestCount) - OkCount = scnStats |> Array.sumBy(fun x -> x.OkCount) - FailCount = scnStats |> Array.sumBy(fun x -> x.FailCount) - AllBytes = scnStats |> Array.sumBy(fun x -> x.AllBytes) - ScenarioStats = scnStats - PluginStats = Array.empty - NodeInfo = nodeInfo - TestInfo = testInfo - ReportFiles = Array.empty - Duration = maxDuration } + if Array.isEmpty scnStats then + NodeStats.empty + else + let maxDuration = scnStats |> Array.maxBy(fun x -> x.Duration) |> fun scn -> scn.Duration + { RequestCount = scnStats |> Array.sumBy(fun x -> x.RequestCount) + OkCount = scnStats |> Array.sumBy(fun x -> x.OkCount) + FailCount = scnStats |> Array.sumBy(fun x -> x.FailCount) + AllBytes = scnStats |> Array.sumBy(fun x -> x.AllBytes) + ScenarioStats = scnStats + PluginStats = Array.empty + NodeInfo = nodeInfo + TestInfo = testInfo + ReportFiles = Array.empty + Duration = maxDuration } let round (stats: NodeStats) = { stats with ScenarioStats = stats.ScenarioStats |> Array.map(ScenarioStats.round) diff --git a/src/NBomber/Domain/Step.fs b/src/NBomber/Domain/Step.fs index 2c03d512..4c521438 100644 --- a/src/NBomber/Domain/Step.fs +++ b/src/NBomber/Domain/Step.fs @@ -176,6 +176,8 @@ module RunningStep = let execStep (dep: StepDep) (step: RunningStep) = backgroundTask { let! response = measureExec step dep.ScenarioGlobalTimer + let payload = response.ClientResponse.Payload + response.ClientResponse.Payload <- null // to prevent holding it for stats actor if not step.Value.DoNotTrack then dep.ScenarioStatsActor.Publish(AddResponse response) @@ -183,7 +185,7 @@ module RunningStep = if response.ClientResponse.IsError then dep.Logger.Fatal($"Step '{step.Value.StepName}' from Scenario: '{dep.ScenarioInfo.ScenarioName}' has failed. Error: {response.ClientResponse.Message}") else - dep.Data[Constants.StepResponseKey] <- response.ClientResponse.Payload + dep.Data[Constants.StepResponseKey] <- payload return response.ClientResponse @@ -194,7 +196,7 @@ module RunningStep = if response.ClientResponse.IsError then dep.Logger.Fatal($"Step '{step.Value.StepName}' from Scenario: '{dep.ScenarioInfo.ScenarioName}' has failed. Error: {response.ClientResponse.Message}") else - dep.Data[Constants.StepResponseKey] <- response.ClientResponse.Payload + dep.Data[Constants.StepResponseKey] <- payload return response.ClientResponse } diff --git a/src/NBomber/DomainServices/TestHost/ReportingManager.fs b/src/NBomber/DomainServices/TestHost/ReportingManager.fs new file mode 100644 index 00000000..2948429a --- /dev/null +++ b/src/NBomber/DomainServices/TestHost/ReportingManager.fs @@ -0,0 +1,126 @@ +module internal NBomber.DomainServices.TestHost.ReportingManager + +open System +open System.Diagnostics +open System.Threading.Tasks + +open FSharp.Control.Reactive +open FsToolkit.ErrorHandling + +open NBomber.Contracts.Stats +open NBomber.Extensions.Internal +open NBomber.Domain +open NBomber.Domain.LoadTimeLine +open NBomber.Domain.Stats.Statistics +open NBomber.Domain.Concurrency.Scheduler.ScenarioScheduler +open NBomber.Infra.Dependency +open NBomber.DomainServices.NBomberContext + +type IReportingManager = + inherit IDisposable + abstract IsWorking: bool + abstract RealTimeStatsStream: IObservable + abstract Start: unit -> unit + abstract Stop: unit -> Task + abstract GetCurrentSessionTime: unit -> TimeSpan + abstract GetSessionResult: NodeInfo -> Task + +type ReportingManager(dep: IGlobalDependency, schedulers: ScenarioScheduler list, sessionArgs: SessionArgs) = + + let _saveStatsTimer = new Timers.Timer(sessionArgs.ReportingInterval.TotalMilliseconds) + let _globalSessionTimer = Stopwatch() + let _statsStream = Subject.broadcast + let mutable _currentHistory = List.empty + let mutable _isWorking = false + + let _realtimeTimerMaxDuration = + schedulers |> Seq.map(fun x -> x.Scenario.PlanedDuration) |> Seq.max |> fun duration -> duration.Add(TimeSpan.FromSeconds 2) + + let getHints (finalStats: NodeStats) = + if sessionArgs.UseHintsAnalyzer then + dep.WorkerPlugins + |> WorkerPlugins.getHints + |> List.append(HintsAnalyzer.analyzeNodeStats finalStats) + |> List.toArray + else + Array.empty + + let getFinalStats (nodeInfo: NodeInfo) = backgroundTask { + let! scenarioStats = + schedulers + |> List.map(fun x -> x.GetFinalStats()) + |> Task.WhenAll + + let nodeStats = + NodeStats.create sessionArgs.TestInfo nodeInfo scenarioStats + |> NodeStats.round + + let! pluginStats = WorkerPlugins.getStats dep.Logger dep.WorkerPlugins nodeStats + return { nodeStats with PluginStats = pluginStats } + } + + let getRealtimeStats () = + schedulers + |> List.filter(fun x -> x.Working = true) + |> List.map(fun x -> + _globalSessionTimer.Elapsed + |> x.GetRealtimeStats + |> Task.map ScenarioStats.round + ) + |> Task.WhenAll + + let getRealtimeHistoryRecord () = backgroundTask { + let! realtimeStats = getRealtimeStats() + return TimeLineHistoryRecord.create realtimeStats + } + + let fetchAndPublishStats () = backgroundTask { + let! historyRecord = getRealtimeHistoryRecord() + _currentHistory <- historyRecord :: _currentHistory + + _statsStream.OnNext historyRecord.ScenarioStats + } + + let getSessionResult (nodeInfo: NodeInfo) = backgroundTask { + let history = + _currentHistory + |> TimeLineHistory.filterRealtime + |> List.toArray + + let! finalStats = getFinalStats nodeInfo + let hints = getHints finalStats + + let result = { FinalStats = finalStats; TimeLineHistory = history; Hints = hints } + return result + } + + do + _saveStatsTimer.Elapsed.Add(fun _ -> + if _globalSessionTimer.Elapsed <= _realtimeTimerMaxDuration then + fetchAndPublishStats() |> ignore + else + _isWorking <- false + ) + + interface IReportingManager with + member _.IsWorking = _isWorking + member _.RealTimeStatsStream = _statsStream :> IObservable + + member _.Start() = + _isWorking <- true + _saveStatsTimer.Start() + _globalSessionTimer.Reset() + _globalSessionTimer.Start() + + member _.Stop() = backgroundTask { + _isWorking <- false + _saveStatsTimer.Stop() + _globalSessionTimer.Stop() + } + + member _.GetCurrentSessionTime() = _globalSessionTimer.Elapsed + member _.GetSessionResult(nodeInfo) = getSessionResult nodeInfo + + interface IDisposable with + member _.Dispose() = + _saveStatsTimer.Dispose() diff --git a/src/NBomber/DomainServices/TestHost/TestHostReportingSinks.fs b/src/NBomber/DomainServices/TestHost/ReportingSinks.fs similarity index 94% rename from src/NBomber/DomainServices/TestHost/TestHostReportingSinks.fs rename to src/NBomber/DomainServices/TestHost/ReportingSinks.fs index 6272c818..2302f274 100644 --- a/src/NBomber/DomainServices/TestHost/TestHostReportingSinks.fs +++ b/src/NBomber/DomainServices/TestHost/ReportingSinks.fs @@ -1,4 +1,4 @@ -module internal NBomber.DomainServices.TestHost.TestHostReportingSinks +module internal NBomber.DomainServices.TestHost.ReportingSinks open Serilog open FsToolkit.ErrorHandling diff --git a/src/NBomber/DomainServices/TestHost/TestHost.fs b/src/NBomber/DomainServices/TestHost/TestHost.fs index 96e7566a..b8b8c27f 100644 --- a/src/NBomber/DomainServices/TestHost/TestHost.fs +++ b/src/NBomber/DomainServices/TestHost/TestHost.fs @@ -23,14 +23,14 @@ open NBomber.Infra open NBomber.Infra.Dependency open NBomber.DomainServices open NBomber.DomainServices.NBomberContext -open NBomber.DomainServices.TestHost.TestHostReportingActor +open NBomber.DomainServices.TestHost.ReportingManager type internal TestHost(dep: IGlobalDependency, regScenarios: Scenario list, getStepOrder: Scenario -> int[], execSteps: StepDep -> RunningStep[] -> int[] -> Task) as this = - let log = dep.Logger.ForContext() + let _log = dep.Logger.ForContext() let mutable _stopped = false let mutable _disposed = false let mutable _targetScenarios = List.empty @@ -50,7 +50,7 @@ type internal TestHost(dep: IGlobalDependency, |> List.tryFind(fun sch -> sch.Scenario.ScenarioName = scenarioName) |> Option.iter(fun sch -> sch.Stop() - log.Warning("Stopping scenario early: {ScenarioName}, reason: {StopReason}", sch.Scenario.ScenarioName, reason) + _log.Warning("Stopping scenario early: {ScenarioName}, reason: {StopReason}", sch.Scenario.ScenarioName, reason) ) | StopTest reason -> this.StopScenarios(reason) |> ignore @@ -63,11 +63,11 @@ type internal TestHost(dep: IGlobalDependency, let createScheduler (cancelToken: CancellationToken) (scn: Scenario) = let actorDep = { - Logger = log + Logger = _log CancellationToken = cancelToken ScenarioGlobalTimer = Stopwatch() Scenario = scn - ScenarioStatsActor = createStatsActor log scn _sessionArgs.ReportingInterval + ScenarioStatsActor = createStatsActor _log scn _sessionArgs.ReportingInterval ExecStopCommand = execStopCommand GetStepOrder = getStepOrder ExecSteps = execSteps @@ -91,14 +91,14 @@ type internal TestHost(dep: IGlobalDependency, cancelToken: CancellationToken, scenarios: Scenario list) = taskResult { - let baseContext = NBomberContext.createBaseContext(sessionArgs.TestInfo, getCurrentNodeInfo(), cancelToken, log) + let baseContext = NBomberContext.createBaseContext(sessionArgs.TestInfo, getCurrentNodeInfo(), cancelToken, _log) let defaultScnContext = Scenario.ScenarioContext.create baseContext let enabledScenarios = scenarios |> List.filter(fun x -> x.IsEnabled) let disabledScenarios = scenarios |> List.filter(fun x -> not x.IsEnabled) - do! dep.ReportingSinks |> TestHostReportingSinks.init dep baseContext - do! dep.WorkerPlugins |> TestHostPlugins.init dep baseContext + do! dep.ReportingSinks |> ReportingSinks.init dep baseContext + do! dep.WorkerPlugins |> WorkerPlugins.init dep baseContext let! initializedScenarios = TestHostScenario.initScenarios(dep, baseContext, defaultScnContext, sessionArgs, enabledScenarios) return initializedScenarios @ disabledScenarios @@ -106,24 +106,23 @@ type internal TestHost(dep: IGlobalDependency, let startWarmUp (schedulers: ScenarioScheduler list) = backgroundTask { let isWarmUp = true + _currentSchedulers <- schedulers + TestHostConsole.displayBombingProgress(dep.ApplicationType, schedulers, isWarmUp) do! schedulers |> List.map(fun x -> x.Start isWarmUp) |> Task.WhenAll } - let startBombing (schedulers: ScenarioScheduler list, - flushStatsTimer: Timers.Timer option, - reportingTimer: Timers.Timer, - currentOperationTimer: Stopwatch) = backgroundTask { + let startBombing (schedulers: ScenarioScheduler list) + (reportingManager: IReportingManager) = backgroundTask { let isWarmUp = false + _currentSchedulers <- schedulers TestHostConsole.displayBombingProgress(dep.ApplicationType, schedulers, isWarmUp) - do! dep.ReportingSinks |> TestHostReportingSinks.start log - do! dep.WorkerPlugins |> TestHostPlugins.start log + do! dep.ReportingSinks |> ReportingSinks.start _log + do! dep.WorkerPlugins |> WorkerPlugins.start _log - flushStatsTimer |> Option.iter(fun x -> x.Start()) - reportingTimer.Start() - currentOperationTimer.Start() + reportingManager.Start() // waiting on all scenarios to finish do! schedulers |> List.map(fun x -> x.Start isWarmUp) |> Task.WhenAll @@ -131,32 +130,22 @@ type internal TestHost(dep: IGlobalDependency, // wait on final metrics and reporting tick do! Task.Delay Constants.ReportingTimerCompleteMs - flushStatsTimer |> Option.iter(fun x -> x.Stop()) - reportingTimer.Stop() - currentOperationTimer.Stop() + // waiting (in case of cluster) on all raw stats + do! reportingManager.Stop() - do! dep.ReportingSinks |> TestHostReportingSinks.stop log - do! dep.WorkerPlugins |> TestHostPlugins.stop log + do! dep.ReportingSinks |> ReportingSinks.stop _log + do! dep.WorkerPlugins |> WorkerPlugins.stop _log } let cleanScenarios (sessionArgs: SessionArgs, cancelToken: CancellationToken, scenarios: Scenario list) = - let baseContext = NBomberContext.createBaseContext(sessionArgs.TestInfo, getCurrentNodeInfo(), cancelToken, log) + let baseContext = NBomberContext.createBaseContext(sessionArgs.TestInfo, getCurrentNodeInfo(), cancelToken, _log) let defaultScnContext = Scenario.ScenarioContext.create baseContext let enabledScenarios = scenarios |> List.filter(fun x -> x.IsEnabled) TestHostScenario.cleanScenarios dep baseContext defaultScnContext enabledScenarios - let getHints (finalStats: NodeStats) (targetScenarios: Scenario list) = - if _sessionArgs.UseHintsAnalyzer then - dep.WorkerPlugins - |> TestHostPlugins.getHints - |> List.append(HintsAnalyzer.analyzeNodeStats finalStats) - |> List.toArray - else - Array.empty - member _.SessionArgs = _sessionArgs member _.CurrentOperation = _currentOperation member _.CurrentNodeInfo = getCurrentNodeInfo() @@ -168,20 +157,20 @@ type internal TestHost(dep: IGlobalDependency, _currentOperation <- OperationType.Init TestHostConsole.printContextInfo(dep) - log.Information "Starting init..." + _log.Information "Starting init..." _cancelToken.Dispose() _cancelToken <- new CancellationTokenSource() match! initScenarios(sessionArgs, _cancelToken.Token, targetScenarios) with | Ok initializedScenarios -> - log.Information "Init finished" + _log.Information "Init finished" _targetScenarios <- initializedScenarios _sessionArgs <- sessionArgs _currentOperation <- OperationType.None return Ok() | Error appError -> - log.Error "Init failed" + _log.Error "Init failed" _currentOperation <- OperationType.Stop return AppError.createResult appError } @@ -190,35 +179,23 @@ type internal TestHost(dep: IGlobalDependency, _stopped <- false _currentOperation <- OperationType.WarmUp - log.Information "Starting warm up..." - - let schedulers = this.CreateScenarioSchedulers(Scenario.defaultClusterCount, ScenarioStatsActor.create) - _currentSchedulers <- schedulers + _log.Information "Starting warm up..." + let schedulers = this.CreateScenarioSchedulers() do! startWarmUp schedulers stopSchedulers(_cancelToken, schedulers) _currentOperation <- OperationType.None } - member _.StartBombing(schedulers: ScenarioScheduler list, - flushStatsTimer: Timers.Timer option, - reportingTimer: Timers.Timer, - currentOperationTimer: Stopwatch, - ?cleanUp: unit -> Task) = backgroundTask { + member _.StartBombing(schedulers: ScenarioScheduler list, reportingManager: IReportingManager) = backgroundTask { _stopped <- false _currentOperation <- OperationType.Bombing - _currentSchedulers <- schedulers - - log.Information "Starting bombing..." - do! startBombing(schedulers, flushStatsTimer, reportingTimer, currentOperationTimer) - if cleanUp.IsSome then - log.Debug "NBomber is executing internal cleanup..." - do! cleanUp.Value() + _log.Information "Starting bombing..." + do! startBombing schedulers reportingManager do! this.StopScenarios() - _currentOperation <- OperationType.Complete } @@ -227,9 +204,9 @@ type internal TestHost(dep: IGlobalDependency, _currentOperation <- OperationType.Stop if not(String.IsNullOrEmpty reason) then - log.Warning("Stopping test early: {StopReason}", reason) + _log.Warning("Stopping test early: {StopReason}", reason) else - log.Information "Stopping scenarios..." + _log.Information "Stopping scenarios..." stopSchedulers(_cancelToken, _currentSchedulers) do! cleanScenarios(_sessionArgs, _cancelToken.Token, _targetScenarios) @@ -238,52 +215,49 @@ type internal TestHost(dep: IGlobalDependency, _currentOperation <- OperationType.None } - member _.GetHints(finalStats) = _targetScenarios |> getHints finalStats + member _.CreateScenarioSchedulers() = + createScenarioSchedulers _targetScenarios Scenario.defaultClusterCount ScenarioStatsActor.create getStepOrder execSteps member _.CreateScenarioSchedulers(getScenarioClusterCount: ScenarioName -> int, createStatsActor: ILogger -> Scenario -> TimeSpan -> IScenarioStatsActor) = createScenarioSchedulers _targetScenarios getScenarioClusterCount createStatsActor getStepOrder execSteps + member _.GetRawStats(timestamp) = backgroundTask { + let! stats = _currentSchedulers |> List.map(fun x -> x.GetRawStats timestamp) |> Task.WhenAll + return List.ofArray stats + } + member _.RunSession(sessionArgs: SessionArgs) = taskResult { let targetScenarios = regScenarios |> TestHostScenario.getTargetScenarios sessionArgs do! this.StartInit(sessionArgs, targetScenarios) do! this.StartWarmUp() - let schedulers = this.CreateScenarioSchedulers(Scenario.defaultClusterCount, ScenarioStatsActor.create) - - // create timers - let currentOperationTimer = Stopwatch() - let reportingActor = TestHostReportingActor(dep, schedulers, sessionArgs.TestInfo) - use reportingTimer = new Timers.Timer(sessionArgs.ReportingInterval.TotalMilliseconds) - reportingTimer.Elapsed.Add(fun _ -> - reportingActor.Publish(SaveRealtimeStats currentOperationTimer.Elapsed) - ) + let schedulers = this.CreateScenarioSchedulers() + use reportingManager = new ReportingManager(dep, schedulers, sessionArgs) :> IReportingManager // start bombing - do! this.StartBombing(schedulers, None, reportingTimer, currentOperationTimer) + do! this.StartBombing(schedulers, reportingManager) // gets final stats - log.Information "Calculating final statistics..." - let! finalStats = reportingActor.GetFinalStats(getCurrentNodeInfo()) - let! timeLineHistory = reportingActor.GetTimeLineHistory() - let hints = _targetScenarios |> getHints finalStats - let result = { FinalStats = finalStats; TimeLineHistory = Array.ofList timeLineHistory; Hints = hints } - return result + _log.Information "Calculating final statistics..." + return! reportingManager.GetSessionResult(getCurrentNodeInfo()) } - interface IDisposable with - member _.Dispose() = - if not _disposed then - _disposed <- true - this.StopScenarios().Wait() + member _.Dispose() = + if not _disposed then + _disposed <- true + this.StopScenarios().Wait() + + for sink in dep.ReportingSinks do + use _ = sink + () - for sink in dep.ReportingSinks do - use x = sink - () + for plugin in dep.WorkerPlugins do + use _ = plugin + () - for plugin in dep.WorkerPlugins do - use x = plugin - () + _log.Verbose $"{nameof TestHost} disposed" - log.Verbose $"{nameof TestHost} disposed" + interface IDisposable with + member _.Dispose() = this.Dispose() diff --git a/src/NBomber/DomainServices/TestHost/TestHostReportingActor.fs b/src/NBomber/DomainServices/TestHost/TestHostReportingActor.fs deleted file mode 100644 index 1e60bf6c..00000000 --- a/src/NBomber/DomainServices/TestHost/TestHostReportingActor.fs +++ /dev/null @@ -1,123 +0,0 @@ -module internal NBomber.DomainServices.TestHost.TestHostReportingActor - -open System -open System.Threading.Tasks -open System.Threading.Tasks.Dataflow - -open FsToolkit.ErrorHandling -open FsToolkit.ErrorHandling.Operator.Task - -open NBomber -open NBomber.Contracts.Stats -open NBomber.Extensions.Internal -open NBomber.Domain.LoadTimeLine -open NBomber.Domain.Stats.Statistics -open NBomber.Domain.Concurrency.Scheduler.ScenarioScheduler -open NBomber.Infra.Dependency - -let saveRealtimeScenarioStats (dep: IGlobalDependency) (stats: ScenarioStats[]) = backgroundTask { - for sink in dep.ReportingSinks do - try - do! sink.SaveRealtimeStats(stats) - with - | ex -> dep.Logger.Warning(ex, "Reporting sink: {SinkName} failed to save scenario stats", sink.SinkName) -} - -let getRealtimeScenarioStats (schedulers: ScenarioScheduler list) (executionTime: TimeSpan) = - schedulers - |> List.filter(fun x -> x.Working = true) - |> List.map(fun x -> x.GetRealtimeStats executionTime) - |> Task.WhenAll - -let getFinalScenarioStats (schedulers: ScenarioScheduler list) = - schedulers - |> List.map(fun x -> x.GetFinalStats()) - |> Task.WhenAll - -let getPluginStats (dep: IGlobalDependency) (stats: NodeStats) = backgroundTask { - try - let pluginStatusesTask = - dep.WorkerPlugins - |> List.map(fun plugin -> plugin.GetStats stats) - |> Task.WhenAll - - let! finishedTask = Task.WhenAny(pluginStatusesTask, Task.Delay Constants.GetPluginStatsTimeout) - if finishedTask.Id = pluginStatusesTask.Id then return pluginStatusesTask.Result - else - dep.Logger.Error("Getting plugin stats failed with the timeout error") - return Array.empty - with - | ex -> - dep.Logger.Error(ex, "Getting plugin stats failed with the following error") - return Array.empty -} - -let getFinalStats (dep: IGlobalDependency) - (schedulers: ScenarioScheduler list) - (testInfo: TestInfo) - (nodeInfo: NodeInfo) = backgroundTask { - - let! scenarioStats = getFinalScenarioStats schedulers - - if Array.isEmpty scenarioStats then return None - else - let nodeStats = NodeStats.create testInfo nodeInfo scenarioStats - let! pluginStats = getPluginStats dep nodeStats - return Some { nodeStats with PluginStats = pluginStats } -} - -type ActorMessage = - | SaveRealtimeStats of executionTime:TimeSpan - | GetTimeLineHistory of TaskCompletionSource - | GetFinalStats of TaskCompletionSource * NodeInfo - -type TestHostReportingActor(dep: IGlobalDependency, schedulers: ScenarioScheduler list, testInfo: TestInfo) = - - let saveRealtimeStats = saveRealtimeScenarioStats dep - let getRealtimeStats = getRealtimeScenarioStats schedulers - let getFinalStats = getFinalStats dep schedulers testInfo - - let mutable _currentHistory = List.empty - - let getAndSaveRealtimeStats (executionTime, history) = backgroundTask { - let! scnStats = getRealtimeStats executionTime - if Array.isEmpty scnStats then return history - else - do! scnStats |> Array.map(ScenarioStats.round) |> saveRealtimeStats - let historyRecord = TimeLineHistoryRecord.create scnStats - return historyRecord :: history - } - - let _actor = ActionBlock(fun msg -> - backgroundTask { - try - match msg with - | SaveRealtimeStats executionTime -> - let! newHistory = getAndSaveRealtimeStats(executionTime, _currentHistory) - _currentHistory <- newHistory - - | GetTimeLineHistory reply -> - _currentHistory |> TimeLineHistory.filterRealtime |> reply.TrySetResult |> ignore - - | GetFinalStats (reply, nodeInfo) -> - nodeInfo - |> getFinalStats - |> TaskOption.map(NodeStats.round >> reply.TrySetResult) - |> Task.WaitAll - with - | ex -> dep.Logger.Error("TestHostReporting actor failed: {0}", ex.ToString()) - } - :> Task - ) - - member _.Publish(msg) = _actor.Post(msg) |> ignore - - member _.GetTimeLineHistory() = - let tcs = TaskCompletionSource() - GetTimeLineHistory(tcs) |> _actor.Post |> ignore - tcs.Task - - member _.GetFinalStats(nodeInfo) = - let tcs = TaskCompletionSource() - GetFinalStats(tcs, nodeInfo) |> _actor.Post |> ignore - tcs.Task diff --git a/src/NBomber/DomainServices/TestHost/TestHostPlugins.fs b/src/NBomber/DomainServices/TestHost/WorkerPlugins.fs similarity index 97% rename from src/NBomber/DomainServices/TestHost/TestHostPlugins.fs rename to src/NBomber/DomainServices/TestHost/WorkerPlugins.fs index ae4523df..66dfa807 100644 --- a/src/NBomber/DomainServices/TestHost/TestHostPlugins.fs +++ b/src/NBomber/DomainServices/TestHost/WorkerPlugins.fs @@ -1,4 +1,4 @@ -module internal NBomber.DomainServices.TestHost.TestHostPlugins +module internal NBomber.DomainServices.TestHost.WorkerPlugins open System.Threading.Tasks diff --git a/src/NBomber/Extensions/Internal.fs b/src/NBomber/Extensions/Internal.fs index bdd2fcb2..15fd449b 100644 --- a/src/NBomber/Extensions/Internal.fs +++ b/src/NBomber/Extensions/Internal.fs @@ -4,6 +4,7 @@ open System open System.Collections.Generic open System.IO open System.Text +open System.Threading.Tasks open Json.Net.DataSetConverters open FsToolkit.ErrorHandling @@ -11,8 +12,21 @@ open Newtonsoft.Json module internal Internal = - let inline isNotNull (value) = - not(isNull value) + module Operation = + + let waitOnComplete (retryCount: int) (isComplete: unit -> bool) = backgroundTask { + let mutable counter = 0 + let mutable completed = false + completed <- isComplete() + while not completed || counter < retryCount do + counter <- counter + 1 + do! Task.Delay 1_000 + completed <- isComplete() + + return + if completed then Ok() + else Error() + } module Json = @@ -56,7 +70,7 @@ module internal Internal = let ofRecord (value: 'T) = let boxed = box(value) - if isNotNull(boxed) then Some value + if not(isNull boxed) then Some value else None module String = diff --git a/src/NBomber/NBomber.fsproj b/src/NBomber/NBomber.fsproj index 89a2d08a..16a68ae6 100644 --- a/src/NBomber/NBomber.fsproj +++ b/src/NBomber/NBomber.fsproj @@ -63,9 +63,9 @@ - - - + + + @@ -90,7 +90,7 @@ - + @@ -109,7 +109,7 @@ - - - + + + diff --git a/tests/NBomber.IntegrationTests/Concurrency/ConstantActorSchedulerTests.fs b/tests/NBomber.IntegrationTests/Concurrency/ConstantActorSchedulerTests.fs index a55b009d..1a3e65b5 100644 --- a/tests/NBomber.IntegrationTests/Concurrency/ConstantActorSchedulerTests.fs +++ b/tests/NBomber.IntegrationTests/Concurrency/ConstantActorSchedulerTests.fs @@ -36,7 +36,7 @@ let internal baseDep = { CancellationToken = CancellationToken.None ScenarioGlobalTimer = Stopwatch() Scenario = baseScenario - ScenarioStatsActor = ScenarioStatsActor(logger, baseScenario, Constants.DefaultReportingInterval) + ScenarioStatsActor = ScenarioStatsActor(logger, baseScenario, Constants.DefaultReportingInterval, keepRawStats = false) ExecStopCommand = fun _ -> () GetStepOrder = Scenario.getStepOrder ExecSteps = RunningStep.execSteps diff --git a/tests/NBomber.IntegrationTests/Concurrency/OneTimeActorSchedulerTests.fs b/tests/NBomber.IntegrationTests/Concurrency/OneTimeActorSchedulerTests.fs index 58272e76..945c6190 100644 --- a/tests/NBomber.IntegrationTests/Concurrency/OneTimeActorSchedulerTests.fs +++ b/tests/NBomber.IntegrationTests/Concurrency/OneTimeActorSchedulerTests.fs @@ -35,7 +35,7 @@ let internal baseDep = { CancellationToken = CancellationToken.None ScenarioGlobalTimer = Stopwatch() Scenario = baseScenario - ScenarioStatsActor = ScenarioStatsActor(logger, baseScenario, Constants.DefaultReportingInterval) + ScenarioStatsActor = ScenarioStatsActor(logger, baseScenario, Constants.DefaultReportingInterval, keepRawStats = false) ExecStopCommand = fun _ -> () GetStepOrder = Scenario.getStepOrder ExecSteps = RunningStep.execSteps diff --git a/tests/NBomber.IntegrationTests/Plugins/PluginTests.fs b/tests/NBomber.IntegrationTests/Plugins/PluginTests.fs index 0b9c4121..37e8c572 100644 --- a/tests/NBomber.IntegrationTests/Plugins/PluginTests.fs +++ b/tests/NBomber.IntegrationTests/Plugins/PluginTests.fs @@ -258,4 +258,4 @@ let ``PluginStats should return empty data set in case of internal exception`` ( |> Result.getOk |> fun nodeStats -> test <@ Array.isEmpty nodeStats.PluginStats @> - inMemorySink.Should().HaveMessage("Getting plugin stats failed with the following error", "because exception was thrown") |> ignore + inMemorySink.Should().HaveMessage("Getting plugin stats failed: {0}", "because exception was thrown") |> ignore