Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing noisy metric tags and adding stress test for telemetry #249

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ open class Analytics protected constructor(
Telemetry.INVOKE_ERROR_METRIC, t.stackTraceToString()) {
it["error"] = t.toString()
it["message"] = "Exception in Analytics Scope"
it["caller"] = t.stackTrace[0].toString()
}
}
override val analyticsScope = CoroutineScope(SupervisorJob() + exceptionHandler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class HTTPClient(
it["error"] = e.toString()
it["writekey"] = writeKey
it["message"] = "Malformed url"
it["caller"] = e.stackTrace[0].toString()
}
throw error
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ internal fun Analytics.fetchSettings(
it["error"] = ex.toString()
it["writekey"] = writeKey
it["message"] = "Error retrieving settings"
it["caller"] = ex.stackTrace[0].toString()
}
configuration.defaultSettings
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ object Telemetry: Subscriber {

private val queue = ConcurrentLinkedQueue<RemoteMetric>()
private var queueBytes = 0
private var queueSizeExceeded = false
private val seenErrors = mutableMapOf<String, Int>()
private var started = false
private var rateLimitEndTime: Long = 0
private var flushFirstError = true
Expand Down Expand Up @@ -150,7 +148,6 @@ object Telemetry: Subscriber {
fun reset() {
telemetryJob?.cancel()
resetQueue()
seenErrors.clear()
started = false
rateLimitEndTime = 0
}
Expand All @@ -169,7 +166,6 @@ object Telemetry: Subscriber {
if (!metric.startsWith(METRICS_BASE_TAG)) return
if (tags.isEmpty()) return
if (Math.random() > sampleRate) return
if (queue.size >= maxQueueSize) return

addRemoteMetric(metric, tags)
}
Expand All @@ -188,7 +184,6 @@ object Telemetry: Subscriber {
if (!enable || sampleRate == 0.0) return
if (!metric.startsWith(METRICS_BASE_TAG)) return
if (tags.isEmpty()) return
if (queue.size >= maxQueueSize) return
if (Math.random() > sampleRate) return

var filteredTags = if(sendWriteKeyOnError) {
Expand Down Expand Up @@ -235,7 +230,6 @@ object Telemetry: Subscriber {
var queueCount = queue.size
// Reset queue data size counter since all current queue items will be removed
queueBytes = 0
queueSizeExceeded = false
val sendQueue = mutableListOf<RemoteMetric>()
while (queueCount-- > 0 && !queue.isEmpty()) {
val m = queue.poll()
Expand Down Expand Up @@ -303,6 +297,9 @@ object Telemetry: Subscriber {
found.value += value
return
}
if (queue.size >= maxQueueSize) {
return
}

val newMetric = RemoteMetric(
type = METRIC_TYPE,
Expand All @@ -315,8 +312,6 @@ object Telemetry: Subscriber {
if (queueBytes + newMetricSize <= maxQueueBytes) {
queue.add(newMetric)
queueBytes += newMetricSize
} else {
queueSizeExceeded = true
}
}

Expand Down Expand Up @@ -345,6 +340,5 @@ object Telemetry: Subscriber {
private fun resetQueue() {
queue.clear()
queueBytes = 0
queueSizeExceeded = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList<Plugin> = Cop
}
it["writekey"] = plugin.analytics.configuration.writeKey
it["message"] = "Exception executing plugin"
it["caller"] = t.stackTrace[0].toString()
}
}
}
Expand All @@ -88,7 +87,6 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList<Plugin> = Cop
}
it["writekey"] = plugin.analytics.configuration.writeKey
it["message"] = "Exception executing plugin"
it["caller"] = t.stackTrace[0].toString()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import org.junit.jupiter.api.Test
import java.lang.reflect.Field
import java.net.HttpURLConnection
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.random.Random

class TelemetryTest {
fun TelemetryResetFlushFirstError() {
Expand Down Expand Up @@ -182,7 +186,7 @@ class TelemetryTest {
Telemetry.start()
for (i in 1..Telemetry.maxQueueSize + 1) {
Telemetry.increment(Telemetry.INVOKE_METRIC) { it["test"] = "test" + i }
Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, "error") { it["test"] = "test" + i }
Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, "error") { it["error"] = "test" + i }
}
assertEquals(Telemetry.maxQueueSize, TelemetryQueueSize())
}
Expand All @@ -195,6 +199,44 @@ class TelemetryTest {
Telemetry.sendWriteKeyOnError = false
Telemetry.sendErrorLogData = false
Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, longString) { it["writekey"] = longString }
assertTrue(TelemetryQueueSize() < 1000)
assertTrue(TelemetryQueueBytes() < 1000)
}

@Test
fun testConcurrentErrorReportingWithQueuePressure() {
val operationCount = 200
val latch = CountDownLatch(operationCount)
val executor = Executors.newFixedThreadPool(3)

try {
// Launch operations across multiple threads
repeat(operationCount) { i ->
executor.submit {
try {
Telemetry.error(
metric = Telemetry.INVOKE_ERROR_METRIC,
log = "High pressure test $i"
) {
it["error"] = "pressure_test_key"
it["iteration"] = "$i"
}

// Add random delays to increase race condition probability
if (i % 5 == 0) {
Thread.sleep(Random.nextLong(1, 3))
}
} finally {
latch.countDown()
}
}
}

// Wait for all operations to complete
latch.await(15, TimeUnit.SECONDS)

} finally {
executor.shutdown()
}
assertTrue(TelemetryQueueSize() == Telemetry.maxQueueSize)
}
}
Loading