From b25c4aa934af09634e7b7ab65782e801e8987b04 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Sun, 24 Nov 2024 21:06:25 -0500 Subject: [PATCH 1/2] Removing noisy metric tags and adding stress test for telemetry --- .../analytics/kotlin/core/Analytics.kt | 1 - .../analytics/kotlin/core/HTTPClient.kt | 1 - .../segment/analytics/kotlin/core/Settings.kt | 1 - .../kotlin/core/platform/Mediator.kt | 2 - .../analytics/kotlin/core/TelemetryTest.kt | 46 ++++++++++++++++++- 5 files changed, 44 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt index a1ffb124..d9812078 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt @@ -98,7 +98,6 @@ open class Analytics protected constructor( Telemetry.INVOKE_ERROR_METRIC, t.stackTraceToString()) { it["error"] = t.toString() it["message"] = "Exception in Analytics Scope" - it["caller"] = t.stackTrace[0].toString() } } override val analyticsScope = CoroutineScope(SupervisorJob() + exceptionHandler) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/HTTPClient.kt b/core/src/main/java/com/segment/analytics/kotlin/core/HTTPClient.kt index 84b99b92..753a5672 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/HTTPClient.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/HTTPClient.kt @@ -39,7 +39,6 @@ class HTTPClient( it["error"] = e.toString() it["writekey"] = writeKey it["message"] = "Malformed url" - it["caller"] = e.stackTrace[0].toString() } throw error } diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt index 2cd2ddfc..197dafa4 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt @@ -124,7 +124,6 @@ internal fun Analytics.fetchSettings( it["error"] = ex.toString() it["writekey"] = writeKey it["message"] = "Error retrieving settings" - it["caller"] = ex.stackTrace[0].toString() } configuration.defaultSettings } \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt index eaf4d6ec..5542e36b 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt @@ -63,7 +63,6 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList = Cop } it["writekey"] = plugin.analytics.configuration.writeKey it["message"] = "Exception executing plugin" - it["caller"] = t.stackTrace[0].toString() } } } @@ -88,7 +87,6 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList = Cop } it["writekey"] = plugin.analytics.configuration.writeKey it["message"] = "Exception executing plugin" - it["caller"] = t.stackTrace[0].toString() } } } diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt index e92ea6d9..df1ff354 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt @@ -7,6 +7,10 @@ import org.junit.jupiter.api.Test import java.lang.reflect.Field import java.net.HttpURLConnection import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import kotlin.random.Random class TelemetryTest { fun TelemetryResetFlushFirstError() { @@ -182,7 +186,7 @@ class TelemetryTest { Telemetry.start() for (i in 1..Telemetry.maxQueueSize + 1) { Telemetry.increment(Telemetry.INVOKE_METRIC) { it["test"] = "test" + i } - Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, "error") { it["test"] = "test" + i } + Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, "error") { it["error"] = "test" + i } } assertEquals(Telemetry.maxQueueSize, TelemetryQueueSize()) } @@ -195,6 +199,44 @@ class TelemetryTest { Telemetry.sendWriteKeyOnError = false Telemetry.sendErrorLogData = false Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, longString) { it["writekey"] = longString } - assertTrue(TelemetryQueueSize() < 1000) + assertTrue(TelemetryQueueBytes() < 1000) + } + + @Test + fun testConcurrentErrorReportingWithQueuePressure() { + val operationCount = 200 + val latch = CountDownLatch(operationCount) + val executor = Executors.newFixedThreadPool(3) + + try { + // Launch operations across multiple threads + repeat(operationCount) { i -> + executor.submit { + try { + Telemetry.error( + metric = Telemetry.INVOKE_ERROR_METRIC, + log = "High pressure test $i" + ) { + it["error"] = "pressure_test_key" + it["iteration"] = "$i" + } + + // Add random delays to increase race condition probability + if (i % 5 == 0) { + Thread.sleep(Random.nextLong(1, 3)) + } + } finally { + latch.countDown() + } + } + } + + // Wait for all operations to complete + latch.await(15, TimeUnit.SECONDS) + + } finally { + executor.shutdown() + } + assertTrue(TelemetryQueueSize() == Telemetry.maxQueueSize) } } From 939a09e4304a4aa7cd955b18f2b05ed0864465af Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Mon, 25 Nov 2024 08:55:34 -0500 Subject: [PATCH 2/2] Refactoring unhelpful queue size checks --- .../com/segment/analytics/kotlin/core/Telemetry.kt | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt index 3c70842b..7b7210c4 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt @@ -93,8 +93,6 @@ object Telemetry: Subscriber { private val queue = ConcurrentLinkedQueue() private var queueBytes = 0 - private var queueSizeExceeded = false - private val seenErrors = mutableMapOf() private var started = false private var rateLimitEndTime: Long = 0 private var flushFirstError = true @@ -150,7 +148,6 @@ object Telemetry: Subscriber { fun reset() { telemetryJob?.cancel() resetQueue() - seenErrors.clear() started = false rateLimitEndTime = 0 } @@ -169,7 +166,6 @@ object Telemetry: Subscriber { if (!metric.startsWith(METRICS_BASE_TAG)) return if (tags.isEmpty()) return if (Math.random() > sampleRate) return - if (queue.size >= maxQueueSize) return addRemoteMetric(metric, tags) } @@ -188,7 +184,6 @@ object Telemetry: Subscriber { if (!enable || sampleRate == 0.0) return if (!metric.startsWith(METRICS_BASE_TAG)) return if (tags.isEmpty()) return - if (queue.size >= maxQueueSize) return if (Math.random() > sampleRate) return var filteredTags = if(sendWriteKeyOnError) { @@ -235,7 +230,6 @@ object Telemetry: Subscriber { var queueCount = queue.size // Reset queue data size counter since all current queue items will be removed queueBytes = 0 - queueSizeExceeded = false val sendQueue = mutableListOf() while (queueCount-- > 0 && !queue.isEmpty()) { val m = queue.poll() @@ -303,6 +297,9 @@ object Telemetry: Subscriber { found.value += value return } + if (queue.size >= maxQueueSize) { + return + } val newMetric = RemoteMetric( type = METRIC_TYPE, @@ -315,8 +312,6 @@ object Telemetry: Subscriber { if (queueBytes + newMetricSize <= maxQueueBytes) { queue.add(newMetric) queueBytes += newMetricSize - } else { - queueSizeExceeded = true } } @@ -345,6 +340,5 @@ object Telemetry: Subscriber { private fun resetQueue() { queue.clear() queueBytes = 0 - queueSizeExceeded = false } } \ No newline at end of file