From a77879897802eff7b944add9a223aa902bcb249b Mon Sep 17 00:00:00 2001 From: Kai Hudalla Date: Tue, 21 Feb 2017 15:26:44 +0100 Subject: [PATCH] [#96] Close upstream receivers on downstream connection loss. When the connection to the downstream container is lost, all links with upstream clients for receiving telemetry or events are closed. This way the clients will trigger the opening of new downstream senders when they re-establish their links with Hono. --- .../server/ForwardingDownstreamAdapter.java | 74 ++++---- .../eclipse/hono/server/UpstreamReceiver.java | 4 +- .../hono/server/UpstreamReceiverImpl.java | 33 +++- .../java/org/eclipse/hono/TestSupport.java | 33 +++- .../ForwardingEventDownstreamAdapterTest.java | 3 +- .../ForwardingDownstreamAdapterTest.java | 162 +++++++++++++----- ...wardingTelemetryDownstreamAdapterTest.java | 11 +- 7 files changed, 239 insertions(+), 81 deletions(-) diff --git a/server/src/main/java/org/eclipse/hono/server/ForwardingDownstreamAdapter.java b/server/src/main/java/org/eclipse/hono/server/ForwardingDownstreamAdapter.java index f75dae82cc..2f0036f3e0 100644 --- a/server/src/main/java/org/eclipse/hono/server/ForwardingDownstreamAdapter.java +++ b/server/src/main/java/org/eclipse/hono/server/ForwardingDownstreamAdapter.java @@ -44,11 +44,17 @@ @Component public abstract class ForwardingDownstreamAdapter implements DownstreamAdapter { - protected Logger logger = LoggerFactory.getLogger(getClass()); - protected HonoConfigProperties honoConfig = new HonoConfigProperties(); - private final Map activeSenders = new HashMap<>(); - private final Map> sendersPerConnection = new HashMap<>(); - private boolean running = false; + /** + * A logger to be shared with subclasses. + */ + protected Logger logger = LoggerFactory.getLogger(getClass()); + /** + * The Hono configuration. + */ + protected HonoConfigProperties honoConfig = new HonoConfigProperties(); + private final Map activeSenders = new HashMap<>(); + private final Map> sendersPerConnection = new HashMap<>(); + private boolean running = false; private final Vertx vertx; private ProtonConnection downstreamConnection; private SenderFactory senderFactory; @@ -149,6 +155,11 @@ public final void stop(final Future stopFuture) { stopFuture.complete(); } + /** + * Gets the name of the downstream AMQP 1.0 container this adapter is forwarding messages to. + * + * @return The name or {@code null} if this adapter is currently not connected. + */ protected final String getDownstreamContainer() { if (downstreamConnection != null) { return downstreamConnection.getRemoteContainer(); @@ -157,7 +168,7 @@ protected final String getDownstreamContainer() { } } - protected ProtonClientOptions createClientOptions() { + private ProtonClientOptions createClientOptions() { return new ProtonClientOptions() .setConnectTimeout(100) .setReconnectAttempts(-1).setReconnectInterval(200); // reconnect forever, every 200 millisecs @@ -196,7 +207,11 @@ private void onRemoteClose(final AsyncResult remoteClose) { */ private void onDisconnectFromDownstreamContainer(final ProtonConnection con) { // all links to downstream host will now be stale and unusable - logger.warn("lost connection to downstream container [{}]", con.getRemoteContainer()); + logger.warn("lost connection to downstream container [{}], closing upstream receivers ...", con.getRemoteContainer()); + + for (UpstreamReceiver client : activeSenders.keySet()) { + client.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER); + } activeSenders.clear(); con.disconnectHandler(null); con.disconnect(); @@ -221,6 +236,7 @@ public final void onClientAttach(final UpstreamReceiver client, final Handler { if (creationAttempt.succeeded()) { logger.info("created downstream sender [con: {}, link: {}]", client.getConnectionId(), client.getLinkId()); - addSender(client.getConnectionId(), client.getLinkId(), creationAttempt.result()); + addSender(client, creationAttempt.result()); resultHandler.handle(Future.succeededFuture()); } else { - resultHandler.handle(Future.failedFuture(creationAttempt.cause())); logger.warn("can't create downstream sender [con: {}, link: {}]", client.getConnectionId(), client.getLinkId(), creationAttempt.cause()); + resultHandler.handle(Future.failedFuture(creationAttempt.cause())); } }); } @@ -278,16 +294,16 @@ private static String getTenantOnlyTargetAddress(final String address) { return String.format("%s/%s", targetAddress.getEndpoint(), targetAddress.getTenantId()); } - public final void addSender(final String connectionId, final String linkId, final ProtonSender sender) { - sender.attachments().set(Constants.KEY_CONNECTION_ID, String.class, connectionId); + public final void addSender(final UpstreamReceiver link, final ProtonSender sender) { + sender.attachments().set(Constants.KEY_CONNECTION_ID, String.class, link.getConnectionId()); sender.setAutoDrained(false); // we need to propagate drain requests upstream and wait for the result - activeSenders.put(linkId, sender); - List senders = sendersPerConnection.get(connectionId); + activeSenders.put(link, sender); + List senders = sendersPerConnection.get(link.getConnectionId()); if (senders == null) { senders = new ArrayList<>(); - sendersPerConnection.put(connectionId, senders); + sendersPerConnection.put(link.getConnectionId(), senders); } - senders.add(linkId); + senders.add(link); } private static int getAvailableCredit(final ProtonSender sender) { @@ -304,24 +320,18 @@ public final void onClientDetach(final UpstreamReceiver client) { Objects.requireNonNull(client); - String connectionId = closeSender(client.getLinkId()); - if (connectionId != null) { - List senders = sendersPerConnection.get(connectionId); - if (senders != null) { - senders.remove(client.getLinkId()); - } + closeSender(client); + List senders = sendersPerConnection.get(client.getConnectionId()); + if (senders != null) { + senders.remove(client); } } - private String closeSender(final String linkId) { - ProtonSender sender = activeSenders.remove(linkId); + private void closeSender(final UpstreamReceiver link) { + ProtonSender sender = activeSenders.remove(link); if (sender != null && sender.isOpen()) { - String connectionId = Constants.getConnectionId(sender); - logger.info("closing downstream sender [con: {}, link: {}]", connectionId, linkId); + logger.info("closing downstream sender [con: {}, link: {}]", link.getConnectionId(), link.getLinkId()); sender.close(); - return connectionId; - } else { - return null; } } @@ -332,11 +342,11 @@ public final void onClientDisconnect(final String connectionId) { throw new IllegalStateException("adapter must be started first"); } - List senders = sendersPerConnection.remove(Objects.requireNonNull(connectionId)); + List senders = sendersPerConnection.remove(Objects.requireNonNull(connectionId)); if (senders != null && !senders.isEmpty()) { logger.info("closing {} downstream senders for connection [id: {}]", senders.size(), connectionId); - for (String linkId : senders) { - closeSender(linkId); + for (UpstreamReceiver link : senders) { + closeSender(link); } } } @@ -351,7 +361,7 @@ public final void processMessage(final UpstreamReceiver client, final ProtonDeli Objects.requireNonNull(client); Objects.requireNonNull(msg); Objects.requireNonNull(delivery); - ProtonSender sender = activeSenders.get(client.getLinkId()); + ProtonSender sender = activeSenders.get(client); if (sender == null) { logger.info("no downstream sender for link [{}] available, discarding message and closing link with client", client.getLinkId()); client.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER); diff --git a/server/src/main/java/org/eclipse/hono/server/UpstreamReceiver.java b/server/src/main/java/org/eclipse/hono/server/UpstreamReceiver.java index 9e4b76d3f3..280a547238 100644 --- a/server/src/main/java/org/eclipse/hono/server/UpstreamReceiver.java +++ b/server/src/main/java/org/eclipse/hono/server/UpstreamReceiver.java @@ -21,6 +21,9 @@ /** * A decorator for a {@code ProtonReceiver} representing a client uploading data to a Hono endpoint. + *

+ * Subclasses are strongly encouraged to implement {@link Object#hashCode()} and {@link Object#equals(Object)} + * based on the linkId because instances of this interface are used as keys in maps in other classes of Hono. * */ public interface UpstreamReceiver { @@ -114,5 +117,4 @@ static UpstreamReceiver atLeastOnceReceiver(final String linkId, final ProtonRec * @return The address. */ String getTargetAddress(); - } \ No newline at end of file diff --git a/server/src/main/java/org/eclipse/hono/server/UpstreamReceiverImpl.java b/server/src/main/java/org/eclipse/hono/server/UpstreamReceiverImpl.java index d11c68ff5a..f7f366ee21 100644 --- a/server/src/main/java/org/eclipse/hono/server/UpstreamReceiverImpl.java +++ b/server/src/main/java/org/eclipse/hono/server/UpstreamReceiverImpl.java @@ -27,7 +27,8 @@ /** * A decorator for a {@code ProtonReceiver} that represents a Hono client sending data downstream. *

- * The main purpose of this class to attach a (surrogate) identifier to the receiver. + * The main purpose of this class is to attach a (surrogate) {@linkplain #getLinkId() identifier} + * to the receiver. */ public class UpstreamReceiverImpl implements UpstreamReceiver { @@ -75,4 +76,34 @@ public String getConnectionId() { public String getTargetAddress() { return link.getRemoteTarget().getAddress(); } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((id == null) ? 0 : id.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UpstreamReceiverImpl other = (UpstreamReceiverImpl) obj; + if (id == null) { + if (other.id != null) { + return false; + } + } else if (!id.equals(other.id)) { + return false; + } + return true; + } } \ No newline at end of file diff --git a/server/src/test/java/org/eclipse/hono/TestSupport.java b/server/src/test/java/org/eclipse/hono/TestSupport.java index 7938b9a139..9afe656d27 100644 --- a/server/src/test/java/org/eclipse/hono/TestSupport.java +++ b/server/src/test/java/org/eclipse/hono/TestSupport.java @@ -51,6 +51,7 @@ public final class TestSupport { public static final String CLIENT_ID = "protocol_adapter"; + public static final String CON_ID = "connection-1"; public static final int DEFAULT_CREDITS = 20; public static ProtonConnection openConnection(final TestContext ctx, final Vertx vertx, final String host, final int port) { @@ -145,9 +146,39 @@ public static SenderFactory newMockSenderFactory(final ProtonSender senderToCrea }; } + /** + * Creates a new mock upstream client for the {@linkplain #CLIENT_ID default link ID} + * and {@linkplain #CON_ID default connection ID}. + * + * @return The new client. + */ public static UpstreamReceiver newClient() { + return newClient(CLIENT_ID); + } + + /** + * Creates a new mock upstream client for a link ID and the {@linkplain #CON_ID default connection ID}. + * + * @param linkId The client's link ID. + * @return The new client. + */ + public static UpstreamReceiver newClient(final String linkId) { + + return newClient(linkId, CON_ID); + } + + /** + * Creates a new mock upstream client for a link and connection ID. + * + * @param linkId The client's link ID. + * @param connectionId The client's conenction ID. + * @return The new client. + */ + public static UpstreamReceiver newClient(final String linkId, final String connectionId) { + UpstreamReceiver client = mock(UpstreamReceiver.class); - when(client.getLinkId()).thenReturn(CLIENT_ID); + when(client.getLinkId()).thenReturn(linkId); + when(client.getConnectionId()).thenReturn(connectionId); return client; } diff --git a/server/src/test/java/org/eclipse/hono/event/impl/ForwardingEventDownstreamAdapterTest.java b/server/src/test/java/org/eclipse/hono/event/impl/ForwardingEventDownstreamAdapterTest.java index 3453fd85c5..f0069d1a76 100644 --- a/server/src/test/java/org/eclipse/hono/event/impl/ForwardingEventDownstreamAdapterTest.java +++ b/server/src/test/java/org/eclipse/hono/event/impl/ForwardingEventDownstreamAdapterTest.java @@ -35,6 +35,7 @@ import io.vertx.core.Vertx; import io.vertx.proton.ProtonDelivery; import io.vertx.proton.ProtonHelper; +import io.vertx.proton.ProtonReceiver; import io.vertx.proton.ProtonSender; /** @@ -67,7 +68,7 @@ public void testProcessMessageForwardsEventMessageToDownstreamSender() throws In ForwardingEventDownstreamAdapter adapter = new ForwardingEventDownstreamAdapter(vertx, newMockSenderFactory(sender)); adapter.setDownstreamConnectionFactory(newMockConnectionFactory(false)); adapter.start(Future.future()); - adapter.addSender("CON_ID", CLIENT_ID, sender); + adapter.addSender(client, sender); // WHEN processing an event Message msg = ProtonHelper.message(EVENT_MSG_CONTENT); diff --git a/server/src/test/java/org/eclipse/hono/server/ForwardingDownstreamAdapterTest.java b/server/src/test/java/org/eclipse/hono/server/ForwardingDownstreamAdapterTest.java index a46fa332fe..b2d13972e6 100644 --- a/server/src/test/java/org/eclipse/hono/server/ForwardingDownstreamAdapterTest.java +++ b/server/src/test/java/org/eclipse/hono/server/ForwardingDownstreamAdapterTest.java @@ -14,14 +14,13 @@ import static org.eclipse.hono.TestSupport.*; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.message.Message; import org.eclipse.hono.connection.ConnectionFactory; import org.eclipse.hono.telemetry.TelemetryConstants; @@ -49,6 +48,10 @@ public class ForwardingDownstreamAdapterTest { private ConnectionFactory connectionFactory; private Vertx vertx; + /** + * Initializes mocks etc. + */ + @SuppressWarnings("unchecked") @Before public void setup() { vertx = mock(Vertx.class); @@ -61,8 +64,12 @@ public void setup() { connectionFactory = newMockConnectionFactory(false); } + /** + * Verifies that an upstream client is replenished with credits from the downstream container + * when a link is successfully established. + */ @Test - public void testClientAttachedReplenishesClientOnSuccess() throws InterruptedException { + public void testClientAttachedReplenishesClientOnSuccess() { final ResourceIdentifier targetAddress = ResourceIdentifier.from(TelemetryConstants.NODE_ADDRESS_TELEMETRY_PREFIX, "myTenant", null); final UpstreamReceiver client = newClient(); @@ -81,6 +88,11 @@ public void testClientAttachedReplenishesClientOnSuccess() throws InterruptedExc verify(client).replenish(DEFAULT_CREDITS); } + /** + * Verifies that drain requests received from the downstream container are forwarded + * to upstream clients. + * @throws InterruptedException + */ @SuppressWarnings("unchecked") @Test public void testHandleFlowForwardsDrainRequestUpstream() throws InterruptedException { @@ -88,23 +100,25 @@ public void testHandleFlowForwardsDrainRequestUpstream() throws InterruptedExcep final ResourceIdentifier targetAddress = ResourceIdentifier.from(TelemetryConstants.NODE_ADDRESS_TELEMETRY_PREFIX, "myTenant", null); final UpstreamReceiver client = newClient(); when(client.getTargetAddress()).thenReturn(targetAddress.toString()); + final ProtonSender drainingSender = newMockSender(true); // GIVEN an adapter with a connection to the downstream container and a client attached givenADownstreamAdapter(); adapter.setDownstreamConnectionFactory(connectionFactory); adapter.start(Future.future()); - adapter.onClientAttach(client, s -> { - assertTrue(s.succeeded()); - }); + adapter.addSender(client, drainingSender); // WHEN the downstream sender drains the adapter - ProtonSender drainingSender = newMockSender(true); adapter.handleFlow(drainingSender, client); // THEN assert that the upstream client has been drained verify(client).drain(anyInt(), any(Handler.class)); } + /** + * Verifies that the adapter refuses to accept a link from an upstream client + * when there is no connection to the downstream container. + */ @Test public void testGetDownstreamSenderClosesLinkIfDownstreamConnectionIsBroken() { @@ -126,17 +140,22 @@ public void testGetDownstreamSenderClosesLinkIfDownstreamConnectionIsBroken() { }); } + /** + * Verifies that corresponding sender links to the downstream container are closed when + * a connection to an upstream client is lost/closed. + */ @Test public void testOnClientDisconnectClosesDownstreamSenders() { final String upstreamConnection = "upstream-connection-id"; final String linkId = "link-id"; + final UpstreamReceiver client = newClient(linkId, upstreamConnection); final ProtonSender downstreamSender = newMockSender(false); givenADownstreamAdapter(downstreamSender); adapter.setDownstreamConnectionFactory(connectionFactory); adapter.start(Future.future()); - adapter.addSender(upstreamConnection, linkId, downstreamSender); + adapter.addSender(client, downstreamSender); // WHEN the upstream client disconnects adapter.onClientDisconnect(upstreamConnection); @@ -145,61 +164,59 @@ public void testOnClientDisconnectClosesDownstreamSenders() { verify(downstreamSender).close(); } + /** + * Verifies that the adapter tries to re-establish a lost connection to a downstream container. + */ @Test - public void testDownstreamDisconnectTriggersReconnect() throws InterruptedException { + public void testDownstreamDisconnectTriggersReconnect() { final ProtonConnection connectionToCreate = mock(ProtonConnection.class); when(connectionToCreate.getRemoteContainer()).thenReturn("downstream"); // expect the connection factory to be invoked twice // first on initial connection // second on re-connect attempt - CountDownLatch latch = new CountDownLatch(2); - final AtomicReference disconnectHandlerRef = new AtomicReference<>(); - ConnectionFactory factory = new ConnectionFactory() { + DisconnectHandlerProvidingConnectionFactory factory = new DisconnectHandlerProvidingConnectionFactory(connectionToCreate, 2); - @Override - public void connect( - final ProtonClientOptions options, - final Handler> closeHandler, - final Handler disconnectHandler, - final Handler> connectionResultHandler) { - - latch.countDown(); - disconnectHandlerRef.set(disconnectHandler); - connectionResultHandler.handle(Future.succeededFuture(connectionToCreate)); - } + // GIVEN an adapter connected to a downstream container + givenADownstreamAdapter(); + adapter.setDownstreamConnectionFactory(factory); + adapter.start(Future.future()); - @Override - public String getName() { - return "client"; - } + // WHEN the downstream connection fails + factory.getDisconnectHandler().handle(connectionToCreate); - @Override - public String getHost() { - return "server"; - } + // THEN the adapter tries to reconnect to the downstream container + factory.await(1, TimeUnit.SECONDS); + } - @Override - public int getPort() { - return 5672; - } + /** + * Verifies that all links to upstream clients are closed when the connection to the + * downstream container is lost. + */ + @Test + public void testDownstreamDisconnectClosesUpstreamReceivers() { - @Override - public String getPathSeparator() { - return Constants.DEFAULT_PATH_SEPARATOR; - } - }; + final ProtonConnection connectionToCreate = mock(ProtonConnection.class); + when(connectionToCreate.getRemoteContainer()).thenReturn("downstream"); + final UpstreamReceiver client = newClient(); + final ProtonSender downstreamSender = newMockSender(false); + // expect the connection factory to be invoked twice + // first on initial connection + // second on re-connect attempt + DisconnectHandlerProvidingConnectionFactory factory = new DisconnectHandlerProvidingConnectionFactory(connectionToCreate, 2); // GIVEN an adapter connected to a downstream container - givenADownstreamAdapter(); + givenADownstreamAdapter(downstreamSender); adapter.setDownstreamConnectionFactory(factory); adapter.start(Future.future()); + adapter.addSender(client, downstreamSender); // WHEN the downstream connection fails - disconnectHandlerRef.get().handle(connectionToCreate); + factory.getDisconnectHandler().handle(connectionToCreate); - // THEN the adapter tries to reconnect to the downstream container - assertTrue(latch.await(1, TimeUnit.SECONDS)); + // THEN the adapter tries to reconnect to the downstream container and has closed all upstream receivers + factory.await(1, TimeUnit.SECONDS); + verify(client).close(any(ErrorCondition.class)); } private void givenADownstreamAdapter() { @@ -222,4 +239,61 @@ protected void forwardMessage(ProtonSender sender, Message msg, ProtonDelivery d } }; } + + private class DisconnectHandlerProvidingConnectionFactory implements ConnectionFactory { + + private Handler disconnectHandler; + private CountDownLatch expectedConnectionAttemps; + private ProtonConnection connectionToCreate; + + public DisconnectHandlerProvidingConnectionFactory(final ProtonConnection conToCreate, final int expectedConnectionAttempts) { + this.connectionToCreate = conToCreate; + this.expectedConnectionAttemps = new CountDownLatch(expectedConnectionAttempts); + } + + @Override + public void connect( + final ProtonClientOptions options, + final Handler> closeHandler, + final Handler disconnectHandler, + final Handler> connectionResultHandler) { + + expectedConnectionAttemps.countDown(); + this.disconnectHandler = disconnectHandler; + connectionResultHandler.handle(Future.succeededFuture(connectionToCreate)); + } + + @Override + public String getName() { + return "client"; + } + + @Override + public String getHost() { + return "server"; + } + + @Override + public int getPort() { + return 5672; + } + + @Override + public String getPathSeparator() { + return Constants.DEFAULT_PATH_SEPARATOR; + } + + public Handler getDisconnectHandler() { + return disconnectHandler; + } + + public boolean await(long timeout, TimeUnit unit) { + try { + return expectedConnectionAttemps.await(timeout, unit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + } } diff --git a/server/src/test/java/org/eclipse/hono/telemetry/impl/ForwardingTelemetryDownstreamAdapterTest.java b/server/src/test/java/org/eclipse/hono/telemetry/impl/ForwardingTelemetryDownstreamAdapterTest.java index 15b8ccb3fa..6bb59f0a40 100644 --- a/server/src/test/java/org/eclipse/hono/telemetry/impl/ForwardingTelemetryDownstreamAdapterTest.java +++ b/server/src/test/java/org/eclipse/hono/telemetry/impl/ForwardingTelemetryDownstreamAdapterTest.java @@ -43,11 +43,20 @@ public class ForwardingTelemetryDownstreamAdapterTest { private static final String DEVICE_ID = "myDevice"; private ConnectionFactory connectionFactory; + /** + * Initializes mocks etc. + */ @Before public void setup() { connectionFactory = newMockConnectionFactory(false); } + /** + * Verifies that telemetry data uploaded by an upstream client is forwarded to the + * downstream container. + * + * @throws InterruptedException if test execution gets interrupted. + */ @Test public void testProcessTelemetryDataForwardsMessageToDownstreamSender() throws InterruptedException { @@ -65,7 +74,7 @@ public void testProcessTelemetryDataForwardsMessageToDownstreamSender() throws I ForwardingTelemetryDownstreamAdapter adapter = new ForwardingTelemetryDownstreamAdapter(vertx, newMockSenderFactory(sender)); adapter.setDownstreamConnectionFactory(connectionFactory); adapter.start(Future.future()); - adapter.addSender("CON_ID", CLIENT_ID, sender); + adapter.addSender(client, sender); // WHEN processing a telemetry message Message msg = ProtonHelper.message(TELEMETRY_MSG_CONTENT);