From 4e6247181645747323c9f4108d9e7a5bd495412c Mon Sep 17 00:00:00 2001 From: Florian Kaltner Date: Thu, 25 Jun 2020 08:09:44 +0200 Subject: [PATCH] [#1856] Fix negative authenticated AMQP connection count Connection count was erroneously not incremented when a connection was rejected due to the connection limit of an adapter, but was decremented in that case as well which lead to negative connection counts. Fixes #1856 Signed-off-by: Florian Kaltner --- .../amqp/impl/VertxBasedAmqpProtocolAdapter.java | 12 +++++++----- .../amqp/impl/VertxBasedAmqpProtocolAdapterTest.java | 12 +++++++++++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter.java b/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter.java index d1969cdd3e..ed74e95d28 100644 --- a/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter.java +++ b/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapter.java @@ -332,6 +332,13 @@ protected void onConnectRequest(final ProtonConnection con) { handleRemoteSenderOpenForCommands(con, sender); }); con.openHandler(remoteOpen -> { + final Device authenticatedDevice = getAuthenticatedDevice(con); + if (authenticatedDevice == null) { + metrics.incrementUnauthenticatedConnections(); + } else { + metrics.incrementConnections(authenticatedDevice.getTenantId()); + } + if (remoteOpen.failed()) { log.debug("ignoring device's open frame containing error", remoteOpen.cause()); } else { @@ -417,11 +424,6 @@ private void processRemoteOpen(final ProtonConnection con) { con.open(); log.debug("connection with device [container: {}] established", con.getRemoteContainer()); span.log("connection established"); - if (authenticatedDevice == null) { - metrics.incrementUnauthenticatedConnections(); - } else { - metrics.incrementConnections(authenticatedDevice.getTenantId()); - } return null; }).otherwise(t -> { con.setCondition(getErrorCondition(t)); diff --git a/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapterTest.java b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapterTest.java index 55f2feaeda..89bfecd186 100644 --- a/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapterTest.java +++ b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/impl/VertxBasedAmqpProtocolAdapterTest.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -88,6 +89,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import io.opentracing.Span; import io.opentracing.SpanContext; @@ -1189,10 +1191,18 @@ public void testConnectionFailsIfAdapterLevelConnectionLimitIsExceeded() { .forClass(Handler.class); verify(deviceConnection).openHandler(openHandler.capture()); openHandler.getValue().handle(Future.succeededFuture(deviceConnection)); - // THEN the adapter does not accept the incoming connection request. + // THEN the connection count should be incremented when the connection is opened + final InOrder metricsInOrderVerifier = inOrder(metrics); + metricsInOrderVerifier.verify(metrics).incrementConnections(TEST_TENANT_ID); + // AND the adapter should close the connection right after it opened it + final ArgumentCaptor>> closeHandler = ArgumentCaptor.forClass(Handler.class); + verify(deviceConnection).closeHandler(closeHandler.capture()); + closeHandler.getValue().handle(Future.succeededFuture()); final ArgumentCaptor errorConditionCaptor = ArgumentCaptor.forClass(ErrorCondition.class); verify(deviceConnection).setCondition(errorConditionCaptor.capture()); assertEquals(AmqpError.UNAUTHORIZED_ACCESS, errorConditionCaptor.getValue().getCondition()); + // AND the connection count should be decremented accordingly when the connection is closed + metricsInOrderVerifier.verify(metrics).decrementConnections(TEST_TENANT_ID); verify(metrics).reportConnectionAttempt(ConnectionAttemptOutcome.ADAPTER_CONNECTION_LIMIT_EXCEEDED); }