Skip to content

Commit

Permalink
added ReportingManager
Browse files Browse the repository at this point in the history
  • Loading branch information
AntyaDev committed May 10, 2022
1 parent 82b0655 commit 612e45d
Show file tree
Hide file tree
Showing 18 changed files with 296 additions and 284 deletions.
4 changes: 2 additions & 2 deletions src/NBomber.Contracts/Contracts.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ open NBomber.Contracts.Stats
type Response = {
StatusCode: Nullable<int>
IsError: bool
Message: string
mutable Message: string
SizeBytes: int
LatencyMs: float
Payload: obj
mutable Payload: obj
}

type ScenarioInfo = {
Expand Down
7 changes: 0 additions & 7 deletions src/NBomber/Api/CSharp.fs
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,6 @@ type NBomberRunner =
static member WithReportingInterval(context: NBomberContext, interval: TimeSpan) =
context |> FSharp.NBomberRunner.withReportingInterval interval

/// Sets reporting sinks.
/// Reporting sink is used to save real-time metrics to correspond database.
[<Extension>]
static member WithReportingSinks(context: NBomberContext, [<ParamArray>]reportingSinks: IReportingSink[]) =
let sinks = reportingSinks |> Seq.toList
context |> FSharp.NBomberRunner.withReportingSinks sinks

/// Sets worker plugins.
/// Worker plugin is a plugin that starts at the test start and works as a background worker.
[<Extension>]
Expand Down
6 changes: 0 additions & 6 deletions src/NBomber/Api/FSharp.fs
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,6 @@ module NBomberRunner =
let report = { context.Reporting with ReportingInterval = interval }
{ context with Reporting = report }

/// Sets reporting sinks.
/// Reporting sink is used to save real-time metrics to correspond database
let withReportingSinks (reportingSinks: IReportingSink list) (context: NBomberContext) =
let report = { context.Reporting with Sinks = reportingSinks }
{ context with Reporting = report }

/// Sets worker plugins.
/// Worker plugin is a plugin that starts at the test start and works as a background worker.
let withWorkerPlugins (plugins: IWorkerPlugin list) (context: NBomberContext) =
Expand Down
7 changes: 7 additions & 0 deletions src/NBomber/Contracts.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type NBomberContext = {

namespace NBomber.Contracts.Internal

open System
open CommandLine
open NBomber.Contracts

Expand All @@ -67,3 +68,9 @@ type StepResponse = {
EndTimeMs: float
LatencyMs: float
}

type ScenarioRawStats = {
ScenarioName: string
Data: StepResponse list
Timestamp: TimeSpan
}
23 changes: 17 additions & 6 deletions src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ open FSharp.Control.Reactive

open NBomber
open NBomber.Contracts
open NBomber.Contracts.Internal
open NBomber.Contracts.Stats
open NBomber.Domain
open NBomber.Domain.DomainTypes
Expand Down Expand Up @@ -80,7 +81,7 @@ let emptyExec (dep: ActorDep) (actorPool: ScenarioActor list) (scheduledActorCou

type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =

let log = dep.Logger.ForContext<ScenarioScheduler>()
let _log = dep.Logger.ForContext<ScenarioScheduler>()
let mutable _warmUp = false
let mutable _scenario = dep.Scenario
let mutable _currentSimulation = dep.Scenario.LoadTimeLine.Head.LoadSimulation
Expand Down Expand Up @@ -110,8 +111,15 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
getOneTimeActorCount()
)

let getRealtimeStats (executionTime) =
let executedDuration = _scenario |> Scenario.getDuration |> correctExecutedDuration executionTime
let addRawStats (rawStats: ScenarioRawStats) =
dep.ScenarioStatsActor.Publish(AddResponses rawStats.Data)

let getRawStats (timestamp) =
let executedDuration = _scenario |> Scenario.getDuration |> correctExecutedDuration timestamp
dep.ScenarioStatsActor.GetRawStats executedDuration

let getRealtimeStats (timestamp) =
let executedDuration = _scenario |> Scenario.getDuration |> correctExecutedDuration timestamp
let simulationStats = getCurrentSimulationStats()
dep.ScenarioStatsActor.GetRealtimeStats(simulationStats, executedDuration)

Expand Down Expand Up @@ -198,13 +206,16 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =

member _.EventStream = _eventStream :> IObservable<_>
member _.Scenario = dep.Scenario
member _.PublishStatsToCoordinator() = dep.ScenarioStatsActor.Publish(ActorMessage.PublishStatsToCoordinator)
member _.GetRealtimeStats(executionTime) = getRealtimeStats executionTime

member _.AddRawStats(rawStats) = addRawStats rawStats
member _.GetRawStats(timestamp) = getRawStats timestamp
member _.GetRealtimeStats(timestamp) = getRealtimeStats timestamp
member _.GetFinalStats() = getFinalStats()
member _.GetStatusMessages() = ()

interface IDisposable with
member _.Dispose() =
stop()
_eventStream.Dispose()
log.Verbose $"{nameof ScenarioScheduler} disposed"
_log.Verbose $"{nameof ScenarioScheduler} disposed"

66 changes: 40 additions & 26 deletions src/NBomber/Domain/Stats/ScenarioStatsActor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,48 @@ open NBomber.Domain.DomainTypes
open NBomber.Domain.Stats.Statistics

type ActorMessage =
| AddResponse of StepResponse
| AddResponses of StepResponse[]
| PublishStatsToCoordinator
| GetRealtimeStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
| GetFinalStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
| AddResponse of StepResponse
| AddResponses of StepResponse list
| GetRawStats of reply:TaskCompletionSource<ScenarioRawStats> * timestamp:TimeSpan
| GetRealtimeStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
| GetFinalStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan

type IScenarioStatsActor =
abstract Publish: ActorMessage -> unit
abstract GetRawStats: timestamp:TimeSpan -> Task<ScenarioRawStats>
abstract GetRealtimeStats: LoadSimulationStats * duration:TimeSpan -> Task<ScenarioStats>
abstract GetFinalStats: LoadSimulationStats * duration:TimeSpan -> Task<ScenarioStats>

type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval: TimeSpan) =
type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval: TimeSpan, keepRawStats: bool) =

let _allStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
let mutable _intervalStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
let mutable _intervalRawStats = List.empty

let addResponse (allData: StepStatsRawData[]) (intervalData: StepStatsRawData[]) (resp: StepResponse) =
let allStData = allData.[resp.StepIndex]
let intervalStData = intervalData.[resp.StepIndex]
allData.[resp.StepIndex] <- StepStatsRawData.addResponse allStData resp
intervalData.[resp.StepIndex] <- StepStatsRawData.addResponse intervalStData resp
let addResponse (resp: StepResponse) =
let allStData = _allStepsData.[resp.StepIndex]
let intervalStData = _intervalStepsData.[resp.StepIndex]
_allStepsData.[resp.StepIndex] <- StepStatsRawData.addResponse allStData resp
_intervalStepsData.[resp.StepIndex] <- StepStatsRawData.addResponse intervalStData resp

if keepRawStats then
resp.ClientResponse.Payload <- null // to prevent sending in cluster mode
resp.ClientResponse.Message <- null
_intervalRawStats <- resp :: _intervalRawStats

let createScenarioStats (stepsData, simulationStats, operation, duration, interval) =
ScenarioStats.create scenario stepsData simulationStats operation duration interval

let _actor = ActionBlock(fun msg ->
try
match msg with
| AddResponse response ->
addResponse _allStepsData _intervalStepsData response

| AddResponses responses ->
responses |> Array.iter(addResponse _allStepsData _intervalStepsData)
| AddResponse response -> addResponse response
| AddResponses responses -> responses |> List.iter addResponse

| PublishStatsToCoordinator ->
failwith "invalid operation" // it's only needed for cluster
| GetRawStats (reply, timestamp) ->
let stats = { ScenarioName = scenario.ScenarioName; Data = _intervalRawStats; Timestamp = timestamp }
reply.TrySetResult(stats) |> ignore
_intervalRawStats <- List.empty

| GetRealtimeStats (reply, simulationStats, duration) ->
let scnStats = createScenarioStats(_intervalStepsData, simulationStats, OperationType.Bombing, duration, reportingInterval)
Expand All @@ -60,23 +66,31 @@ type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval:
let scnStats = createScenarioStats(_allStepsData, simulationStats, OperationType.Complete, duration, duration)
reply.TrySetResult(scnStats) |> ignore
with
| ex -> logger.Error(ex, "GlobalScenarioStatsActor failed")
| ex -> logger.Error $"{nameof ScenarioStatsActor} failed: {ex.ToString()}"
)

interface IScenarioStatsActor with

[<MethodImpl(MethodImplOptions.AggressiveInlining)>]
member _.Publish(msg) = _actor.Post(msg) |> ignore

member _.GetRawStats(timestamp) =
let reply = TaskCompletionSource<ScenarioRawStats>()
GetRawStats(reply, timestamp) |> _actor.Post |> ignore
reply.Task

member _.GetRealtimeStats(simulationStats, duration) =
let tcs = TaskCompletionSource<ScenarioStats>()
GetRealtimeStats(tcs, simulationStats, duration) |> _actor.Post |> ignore
tcs.Task
let reply = TaskCompletionSource<ScenarioStats>()
GetRealtimeStats(reply, simulationStats, duration) |> _actor.Post |> ignore
reply.Task

member _.GetFinalStats(simulationStats, duration) =
let tcs = TaskCompletionSource<ScenarioStats>()
GetFinalStats(tcs, simulationStats, duration) |> _actor.Post |> ignore
tcs.Task
let reply = TaskCompletionSource<ScenarioStats>()
GetFinalStats(reply, simulationStats, duration) |> _actor.Post |> ignore
reply.Task

let create (logger: ILogger) (scenario: Scenario) (reportingInterval: TimeSpan) =
ScenarioStatsActor(logger, scenario, reportingInterval) :> IScenarioStatsActor
ScenarioStatsActor(logger, scenario, reportingInterval, keepRawStats = false) :> IScenarioStatsActor

let createWithRawStats (logger: ILogger) (scenario: Scenario) (reportingInterval: TimeSpan) =
ScenarioStatsActor(logger, scenario, reportingInterval, keepRawStats = true) :> IScenarioStatsActor
28 changes: 14 additions & 14 deletions src/NBomber/Domain/Stats/Statistics.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ module internal NBomber.Domain.Stats.Statistics

open System
open System.Collections.Generic
open System.Data

open HdrHistogram

Expand Down Expand Up @@ -206,19 +205,20 @@ module ScenarioStats =
module NodeStats =

let create (testInfo: TestInfo) (nodeInfo: NodeInfo) (scnStats: ScenarioStats[]) =

let maxDuration = scnStats |> Array.maxBy(fun x -> x.Duration) |> fun scn -> scn.Duration

{ RequestCount = scnStats |> Array.sumBy(fun x -> x.RequestCount)
OkCount = scnStats |> Array.sumBy(fun x -> x.OkCount)
FailCount = scnStats |> Array.sumBy(fun x -> x.FailCount)
AllBytes = scnStats |> Array.sumBy(fun x -> x.AllBytes)
ScenarioStats = scnStats
PluginStats = Array.empty
NodeInfo = nodeInfo
TestInfo = testInfo
ReportFiles = Array.empty
Duration = maxDuration }
if Array.isEmpty scnStats then
NodeStats.empty
else
let maxDuration = scnStats |> Array.maxBy(fun x -> x.Duration) |> fun scn -> scn.Duration
{ RequestCount = scnStats |> Array.sumBy(fun x -> x.RequestCount)
OkCount = scnStats |> Array.sumBy(fun x -> x.OkCount)
FailCount = scnStats |> Array.sumBy(fun x -> x.FailCount)
AllBytes = scnStats |> Array.sumBy(fun x -> x.AllBytes)
ScenarioStats = scnStats
PluginStats = Array.empty
NodeInfo = nodeInfo
TestInfo = testInfo
ReportFiles = Array.empty
Duration = maxDuration }

let round (stats: NodeStats) =
{ stats with ScenarioStats = stats.ScenarioStats |> Array.map(ScenarioStats.round)
Expand Down
6 changes: 4 additions & 2 deletions src/NBomber/Domain/Step.fs
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,16 @@ module RunningStep =
let execStep (dep: StepDep) (step: RunningStep) = backgroundTask {

let! response = measureExec step dep.ScenarioGlobalTimer
let payload = response.ClientResponse.Payload
response.ClientResponse.Payload <- null // to prevent holding it for stats actor

if not step.Value.DoNotTrack then
dep.ScenarioStatsActor.Publish(AddResponse response)

if response.ClientResponse.IsError then
dep.Logger.Fatal($"Step '{step.Value.StepName}' from Scenario: '{dep.ScenarioInfo.ScenarioName}' has failed. Error: {response.ClientResponse.Message}")
else
dep.Data[Constants.StepResponseKey] <- response.ClientResponse.Payload
dep.Data[Constants.StepResponseKey] <- payload

return response.ClientResponse

Expand All @@ -194,7 +196,7 @@ module RunningStep =
if response.ClientResponse.IsError then
dep.Logger.Fatal($"Step '{step.Value.StepName}' from Scenario: '{dep.ScenarioInfo.ScenarioName}' has failed. Error: {response.ClientResponse.Message}")
else
dep.Data[Constants.StepResponseKey] <- response.ClientResponse.Payload
dep.Data[Constants.StepResponseKey] <- payload

return response.ClientResponse
}
Expand Down
Loading

0 comments on commit 612e45d

Please sign in to comment.