diff --git a/server/src/main/java/org/eclipse/hono/server/ForwardingDownstreamAdapter.java b/server/src/main/java/org/eclipse/hono/server/ForwardingDownstreamAdapter.java index f75dae82cc..2f0036f3e0 100644 --- a/server/src/main/java/org/eclipse/hono/server/ForwardingDownstreamAdapter.java +++ b/server/src/main/java/org/eclipse/hono/server/ForwardingDownstreamAdapter.java @@ -44,11 +44,17 @@ @Component public abstract class ForwardingDownstreamAdapter implements DownstreamAdapter { - protected Logger logger = LoggerFactory.getLogger(getClass()); - protected HonoConfigProperties honoConfig = new HonoConfigProperties(); - private final Map activeSenders = new HashMap<>(); - private final Map> sendersPerConnection = new HashMap<>(); - private boolean running = false; + /** + * A logger to be shared with subclasses. + */ + protected Logger logger = LoggerFactory.getLogger(getClass()); + /** + * The Hono configuration. + */ + protected HonoConfigProperties honoConfig = new HonoConfigProperties(); + private final Map activeSenders = new HashMap<>(); + private final Map> sendersPerConnection = new HashMap<>(); + private boolean running = false; private final Vertx vertx; private ProtonConnection downstreamConnection; private SenderFactory senderFactory; @@ -149,6 +155,11 @@ public final void stop(final Future stopFuture) { stopFuture.complete(); } + /** + * Gets the name of the downstream AMQP 1.0 container this adapter is forwarding messages to. + * + * @return The name or {@code null} if this adapter is currently not connected. + */ protected final String getDownstreamContainer() { if (downstreamConnection != null) { return downstreamConnection.getRemoteContainer(); @@ -157,7 +168,7 @@ protected final String getDownstreamContainer() { } } - protected ProtonClientOptions createClientOptions() { + private ProtonClientOptions createClientOptions() { return new ProtonClientOptions() .setConnectTimeout(100) .setReconnectAttempts(-1).setReconnectInterval(200); // reconnect forever, every 200 millisecs @@ -196,7 +207,11 @@ private void onRemoteClose(final AsyncResult remoteClose) { */ private void onDisconnectFromDownstreamContainer(final ProtonConnection con) { // all links to downstream host will now be stale and unusable - logger.warn("lost connection to downstream container [{}]", con.getRemoteContainer()); + logger.warn("lost connection to downstream container [{}], closing upstream receivers ...", con.getRemoteContainer()); + + for (UpstreamReceiver client : activeSenders.keySet()) { + client.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER); + } activeSenders.clear(); con.disconnectHandler(null); con.disconnect(); @@ -221,6 +236,7 @@ public final void onClientAttach(final UpstreamReceiver client, final Handler { if (creationAttempt.succeeded()) { logger.info("created downstream sender [con: {}, link: {}]", client.getConnectionId(), client.getLinkId()); - addSender(client.getConnectionId(), client.getLinkId(), creationAttempt.result()); + addSender(client, creationAttempt.result()); resultHandler.handle(Future.succeededFuture()); } else { - resultHandler.handle(Future.failedFuture(creationAttempt.cause())); logger.warn("can't create downstream sender [con: {}, link: {}]", client.getConnectionId(), client.getLinkId(), creationAttempt.cause()); + resultHandler.handle(Future.failedFuture(creationAttempt.cause())); } }); } @@ -278,16 +294,16 @@ private static String getTenantOnlyTargetAddress(final String address) { return String.format("%s/%s", targetAddress.getEndpoint(), targetAddress.getTenantId()); } - public final void addSender(final String connectionId, final String linkId, final ProtonSender sender) { - sender.attachments().set(Constants.KEY_CONNECTION_ID, String.class, connectionId); + public final void addSender(final UpstreamReceiver link, final ProtonSender sender) { + sender.attachments().set(Constants.KEY_CONNECTION_ID, String.class, link.getConnectionId()); sender.setAutoDrained(false); // we need to propagate drain requests upstream and wait for the result - activeSenders.put(linkId, sender); - List senders = sendersPerConnection.get(connectionId); + activeSenders.put(link, sender); + List senders = sendersPerConnection.get(link.getConnectionId()); if (senders == null) { senders = new ArrayList<>(); - sendersPerConnection.put(connectionId, senders); + sendersPerConnection.put(link.getConnectionId(), senders); } - senders.add(linkId); + senders.add(link); } private static int getAvailableCredit(final ProtonSender sender) { @@ -304,24 +320,18 @@ public final void onClientDetach(final UpstreamReceiver client) { Objects.requireNonNull(client); - String connectionId = closeSender(client.getLinkId()); - if (connectionId != null) { - List senders = sendersPerConnection.get(connectionId); - if (senders != null) { - senders.remove(client.getLinkId()); - } + closeSender(client); + List senders = sendersPerConnection.get(client.getConnectionId()); + if (senders != null) { + senders.remove(client); } } - private String closeSender(final String linkId) { - ProtonSender sender = activeSenders.remove(linkId); + private void closeSender(final UpstreamReceiver link) { + ProtonSender sender = activeSenders.remove(link); if (sender != null && sender.isOpen()) { - String connectionId = Constants.getConnectionId(sender); - logger.info("closing downstream sender [con: {}, link: {}]", connectionId, linkId); + logger.info("closing downstream sender [con: {}, link: {}]", link.getConnectionId(), link.getLinkId()); sender.close(); - return connectionId; - } else { - return null; } } @@ -332,11 +342,11 @@ public final void onClientDisconnect(final String connectionId) { throw new IllegalStateException("adapter must be started first"); } - List senders = sendersPerConnection.remove(Objects.requireNonNull(connectionId)); + List senders = sendersPerConnection.remove(Objects.requireNonNull(connectionId)); if (senders != null && !senders.isEmpty()) { logger.info("closing {} downstream senders for connection [id: {}]", senders.size(), connectionId); - for (String linkId : senders) { - closeSender(linkId); + for (UpstreamReceiver link : senders) { + closeSender(link); } } } @@ -351,7 +361,7 @@ public final void processMessage(final UpstreamReceiver client, final ProtonDeli Objects.requireNonNull(client); Objects.requireNonNull(msg); Objects.requireNonNull(delivery); - ProtonSender sender = activeSenders.get(client.getLinkId()); + ProtonSender sender = activeSenders.get(client); if (sender == null) { logger.info("no downstream sender for link [{}] available, discarding message and closing link with client", client.getLinkId()); client.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER); diff --git a/server/src/main/java/org/eclipse/hono/server/UpstreamReceiver.java b/server/src/main/java/org/eclipse/hono/server/UpstreamReceiver.java index 9e4b76d3f3..280a547238 100644 --- a/server/src/main/java/org/eclipse/hono/server/UpstreamReceiver.java +++ b/server/src/main/java/org/eclipse/hono/server/UpstreamReceiver.java @@ -21,6 +21,9 @@ /** * A decorator for a {@code ProtonReceiver} representing a client uploading data to a Hono endpoint. + *

+ * Subclasses are strongly encouraged to implement {@link Object#hashCode()} and {@link Object#equals(Object)} + * based on the linkId because instances of this interface are used as keys in maps in other classes of Hono. * */ public interface UpstreamReceiver { @@ -114,5 +117,4 @@ static UpstreamReceiver atLeastOnceReceiver(final String linkId, final ProtonRec * @return The address. */ String getTargetAddress(); - } \ No newline at end of file diff --git a/server/src/main/java/org/eclipse/hono/server/UpstreamReceiverImpl.java b/server/src/main/java/org/eclipse/hono/server/UpstreamReceiverImpl.java index d11c68ff5a..f7f366ee21 100644 --- a/server/src/main/java/org/eclipse/hono/server/UpstreamReceiverImpl.java +++ b/server/src/main/java/org/eclipse/hono/server/UpstreamReceiverImpl.java @@ -27,7 +27,8 @@ /** * A decorator for a {@code ProtonReceiver} that represents a Hono client sending data downstream. *

- * The main purpose of this class to attach a (surrogate) identifier to the receiver. + * The main purpose of this class is to attach a (surrogate) {@linkplain #getLinkId() identifier} + * to the receiver. */ public class UpstreamReceiverImpl implements UpstreamReceiver { @@ -75,4 +76,34 @@ public String getConnectionId() { public String getTargetAddress() { return link.getRemoteTarget().getAddress(); } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((id == null) ? 0 : id.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UpstreamReceiverImpl other = (UpstreamReceiverImpl) obj; + if (id == null) { + if (other.id != null) { + return false; + } + } else if (!id.equals(other.id)) { + return false; + } + return true; + } } \ No newline at end of file diff --git a/server/src/test/java/org/eclipse/hono/TestSupport.java b/server/src/test/java/org/eclipse/hono/TestSupport.java index 7938b9a139..9afe656d27 100644 --- a/server/src/test/java/org/eclipse/hono/TestSupport.java +++ b/server/src/test/java/org/eclipse/hono/TestSupport.java @@ -51,6 +51,7 @@ public final class TestSupport { public static final String CLIENT_ID = "protocol_adapter"; + public static final String CON_ID = "connection-1"; public static final int DEFAULT_CREDITS = 20; public static ProtonConnection openConnection(final TestContext ctx, final Vertx vertx, final String host, final int port) { @@ -145,9 +146,39 @@ public static SenderFactory newMockSenderFactory(final ProtonSender senderToCrea }; } + /** + * Creates a new mock upstream client for the {@linkplain #CLIENT_ID default link ID} + * and {@linkplain #CON_ID default connection ID}. + * + * @return The new client. + */ public static UpstreamReceiver newClient() { + return newClient(CLIENT_ID); + } + + /** + * Creates a new mock upstream client for a link ID and the {@linkplain #CON_ID default connection ID}. + * + * @param linkId The client's link ID. + * @return The new client. + */ + public static UpstreamReceiver newClient(final String linkId) { + + return newClient(linkId, CON_ID); + } + + /** + * Creates a new mock upstream client for a link and connection ID. + * + * @param linkId The client's link ID. + * @param connectionId The client's conenction ID. + * @return The new client. + */ + public static UpstreamReceiver newClient(final String linkId, final String connectionId) { + UpstreamReceiver client = mock(UpstreamReceiver.class); - when(client.getLinkId()).thenReturn(CLIENT_ID); + when(client.getLinkId()).thenReturn(linkId); + when(client.getConnectionId()).thenReturn(connectionId); return client; } diff --git a/server/src/test/java/org/eclipse/hono/event/impl/ForwardingEventDownstreamAdapterTest.java b/server/src/test/java/org/eclipse/hono/event/impl/ForwardingEventDownstreamAdapterTest.java index 3453fd85c5..f0069d1a76 100644 --- a/server/src/test/java/org/eclipse/hono/event/impl/ForwardingEventDownstreamAdapterTest.java +++ b/server/src/test/java/org/eclipse/hono/event/impl/ForwardingEventDownstreamAdapterTest.java @@ -35,6 +35,7 @@ import io.vertx.core.Vertx; import io.vertx.proton.ProtonDelivery; import io.vertx.proton.ProtonHelper; +import io.vertx.proton.ProtonReceiver; import io.vertx.proton.ProtonSender; /** @@ -67,7 +68,7 @@ public void testProcessMessageForwardsEventMessageToDownstreamSender() throws In ForwardingEventDownstreamAdapter adapter = new ForwardingEventDownstreamAdapter(vertx, newMockSenderFactory(sender)); adapter.setDownstreamConnectionFactory(newMockConnectionFactory(false)); adapter.start(Future.future()); - adapter.addSender("CON_ID", CLIENT_ID, sender); + adapter.addSender(client, sender); // WHEN processing an event Message msg = ProtonHelper.message(EVENT_MSG_CONTENT); diff --git a/server/src/test/java/org/eclipse/hono/server/ForwardingDownstreamAdapterTest.java b/server/src/test/java/org/eclipse/hono/server/ForwardingDownstreamAdapterTest.java index a46fa332fe..b2d13972e6 100644 --- a/server/src/test/java/org/eclipse/hono/server/ForwardingDownstreamAdapterTest.java +++ b/server/src/test/java/org/eclipse/hono/server/ForwardingDownstreamAdapterTest.java @@ -14,14 +14,13 @@ import static org.eclipse.hono.TestSupport.*; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.message.Message; import org.eclipse.hono.connection.ConnectionFactory; import org.eclipse.hono.telemetry.TelemetryConstants; @@ -49,6 +48,10 @@ public class ForwardingDownstreamAdapterTest { private ConnectionFactory connectionFactory; private Vertx vertx; + /** + * Initializes mocks etc. + */ + @SuppressWarnings("unchecked") @Before public void setup() { vertx = mock(Vertx.class); @@ -61,8 +64,12 @@ public void setup() { connectionFactory = newMockConnectionFactory(false); } + /** + * Verifies that an upstream client is replenished with credits from the downstream container + * when a link is successfully established. + */ @Test - public void testClientAttachedReplenishesClientOnSuccess() throws InterruptedException { + public void testClientAttachedReplenishesClientOnSuccess() { final ResourceIdentifier targetAddress = ResourceIdentifier.from(TelemetryConstants.NODE_ADDRESS_TELEMETRY_PREFIX, "myTenant", null); final UpstreamReceiver client = newClient(); @@ -81,6 +88,11 @@ public void testClientAttachedReplenishesClientOnSuccess() throws InterruptedExc verify(client).replenish(DEFAULT_CREDITS); } + /** + * Verifies that drain requests received from the downstream container are forwarded + * to upstream clients. + * @throws InterruptedException + */ @SuppressWarnings("unchecked") @Test public void testHandleFlowForwardsDrainRequestUpstream() throws InterruptedException { @@ -88,23 +100,25 @@ public void testHandleFlowForwardsDrainRequestUpstream() throws InterruptedExcep final ResourceIdentifier targetAddress = ResourceIdentifier.from(TelemetryConstants.NODE_ADDRESS_TELEMETRY_PREFIX, "myTenant", null); final UpstreamReceiver client = newClient(); when(client.getTargetAddress()).thenReturn(targetAddress.toString()); + final ProtonSender drainingSender = newMockSender(true); // GIVEN an adapter with a connection to the downstream container and a client attached givenADownstreamAdapter(); adapter.setDownstreamConnectionFactory(connectionFactory); adapter.start(Future.future()); - adapter.onClientAttach(client, s -> { - assertTrue(s.succeeded()); - }); + adapter.addSender(client, drainingSender); // WHEN the downstream sender drains the adapter - ProtonSender drainingSender = newMockSender(true); adapter.handleFlow(drainingSender, client); // THEN assert that the upstream client has been drained verify(client).drain(anyInt(), any(Handler.class)); } + /** + * Verifies that the adapter refuses to accept a link from an upstream client + * when there is no connection to the downstream container. + */ @Test public void testGetDownstreamSenderClosesLinkIfDownstreamConnectionIsBroken() { @@ -126,17 +140,22 @@ public void testGetDownstreamSenderClosesLinkIfDownstreamConnectionIsBroken() { }); } + /** + * Verifies that corresponding sender links to the downstream container are closed when + * a connection to an upstream client is lost/closed. + */ @Test public void testOnClientDisconnectClosesDownstreamSenders() { final String upstreamConnection = "upstream-connection-id"; final String linkId = "link-id"; + final UpstreamReceiver client = newClient(linkId, upstreamConnection); final ProtonSender downstreamSender = newMockSender(false); givenADownstreamAdapter(downstreamSender); adapter.setDownstreamConnectionFactory(connectionFactory); adapter.start(Future.future()); - adapter.addSender(upstreamConnection, linkId, downstreamSender); + adapter.addSender(client, downstreamSender); // WHEN the upstream client disconnects adapter.onClientDisconnect(upstreamConnection); @@ -145,61 +164,59 @@ public void testOnClientDisconnectClosesDownstreamSenders() { verify(downstreamSender).close(); } + /** + * Verifies that the adapter tries to re-establish a lost connection to a downstream container. + */ @Test - public void testDownstreamDisconnectTriggersReconnect() throws InterruptedException { + public void testDownstreamDisconnectTriggersReconnect() { final ProtonConnection connectionToCreate = mock(ProtonConnection.class); when(connectionToCreate.getRemoteContainer()).thenReturn("downstream"); // expect the connection factory to be invoked twice // first on initial connection // second on re-connect attempt - CountDownLatch latch = new CountDownLatch(2); - final AtomicReference disconnectHandlerRef = new AtomicReference<>(); - ConnectionFactory factory = new ConnectionFactory() { + DisconnectHandlerProvidingConnectionFactory factory = new DisconnectHandlerProvidingConnectionFactory(connectionToCreate, 2); - @Override - public void connect( - final ProtonClientOptions options, - final Handler> closeHandler, - final Handler disconnectHandler, - final Handler> connectionResultHandler) { - - latch.countDown(); - disconnectHandlerRef.set(disconnectHandler); - connectionResultHandler.handle(Future.succeededFuture(connectionToCreate)); - } + // GIVEN an adapter connected to a downstream container + givenADownstreamAdapter(); + adapter.setDownstreamConnectionFactory(factory); + adapter.start(Future.future()); - @Override - public String getName() { - return "client"; - } + // WHEN the downstream connection fails + factory.getDisconnectHandler().handle(connectionToCreate); - @Override - public String getHost() { - return "server"; - } + // THEN the adapter tries to reconnect to the downstream container + factory.await(1, TimeUnit.SECONDS); + } - @Override - public int getPort() { - return 5672; - } + /** + * Verifies that all links to upstream clients are closed when the connection to the + * downstream container is lost. + */ + @Test + public void testDownstreamDisconnectClosesUpstreamReceivers() { - @Override - public String getPathSeparator() { - return Constants.DEFAULT_PATH_SEPARATOR; - } - }; + final ProtonConnection connectionToCreate = mock(ProtonConnection.class); + when(connectionToCreate.getRemoteContainer()).thenReturn("downstream"); + final UpstreamReceiver client = newClient(); + final ProtonSender downstreamSender = newMockSender(false); + // expect the connection factory to be invoked twice + // first on initial connection + // second on re-connect attempt + DisconnectHandlerProvidingConnectionFactory factory = new DisconnectHandlerProvidingConnectionFactory(connectionToCreate, 2); // GIVEN an adapter connected to a downstream container - givenADownstreamAdapter(); + givenADownstreamAdapter(downstreamSender); adapter.setDownstreamConnectionFactory(factory); adapter.start(Future.future()); + adapter.addSender(client, downstreamSender); // WHEN the downstream connection fails - disconnectHandlerRef.get().handle(connectionToCreate); + factory.getDisconnectHandler().handle(connectionToCreate); - // THEN the adapter tries to reconnect to the downstream container - assertTrue(latch.await(1, TimeUnit.SECONDS)); + // THEN the adapter tries to reconnect to the downstream container and has closed all upstream receivers + factory.await(1, TimeUnit.SECONDS); + verify(client).close(any(ErrorCondition.class)); } private void givenADownstreamAdapter() { @@ -222,4 +239,61 @@ protected void forwardMessage(ProtonSender sender, Message msg, ProtonDelivery d } }; } + + private class DisconnectHandlerProvidingConnectionFactory implements ConnectionFactory { + + private Handler disconnectHandler; + private CountDownLatch expectedConnectionAttemps; + private ProtonConnection connectionToCreate; + + public DisconnectHandlerProvidingConnectionFactory(final ProtonConnection conToCreate, final int expectedConnectionAttempts) { + this.connectionToCreate = conToCreate; + this.expectedConnectionAttemps = new CountDownLatch(expectedConnectionAttempts); + } + + @Override + public void connect( + final ProtonClientOptions options, + final Handler> closeHandler, + final Handler disconnectHandler, + final Handler> connectionResultHandler) { + + expectedConnectionAttemps.countDown(); + this.disconnectHandler = disconnectHandler; + connectionResultHandler.handle(Future.succeededFuture(connectionToCreate)); + } + + @Override + public String getName() { + return "client"; + } + + @Override + public String getHost() { + return "server"; + } + + @Override + public int getPort() { + return 5672; + } + + @Override + public String getPathSeparator() { + return Constants.DEFAULT_PATH_SEPARATOR; + } + + public Handler getDisconnectHandler() { + return disconnectHandler; + } + + public boolean await(long timeout, TimeUnit unit) { + try { + return expectedConnectionAttemps.await(timeout, unit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + } } diff --git a/server/src/test/java/org/eclipse/hono/telemetry/impl/ForwardingTelemetryDownstreamAdapterTest.java b/server/src/test/java/org/eclipse/hono/telemetry/impl/ForwardingTelemetryDownstreamAdapterTest.java index 15b8ccb3fa..6bb59f0a40 100644 --- a/server/src/test/java/org/eclipse/hono/telemetry/impl/ForwardingTelemetryDownstreamAdapterTest.java +++ b/server/src/test/java/org/eclipse/hono/telemetry/impl/ForwardingTelemetryDownstreamAdapterTest.java @@ -43,11 +43,20 @@ public class ForwardingTelemetryDownstreamAdapterTest { private static final String DEVICE_ID = "myDevice"; private ConnectionFactory connectionFactory; + /** + * Initializes mocks etc. + */ @Before public void setup() { connectionFactory = newMockConnectionFactory(false); } + /** + * Verifies that telemetry data uploaded by an upstream client is forwarded to the + * downstream container. + * + * @throws InterruptedException if test execution gets interrupted. + */ @Test public void testProcessTelemetryDataForwardsMessageToDownstreamSender() throws InterruptedException { @@ -65,7 +74,7 @@ public void testProcessTelemetryDataForwardsMessageToDownstreamSender() throws I ForwardingTelemetryDownstreamAdapter adapter = new ForwardingTelemetryDownstreamAdapter(vertx, newMockSenderFactory(sender)); adapter.setDownstreamConnectionFactory(connectionFactory); adapter.start(Future.future()); - adapter.addSender("CON_ID", CLIENT_ID, sender); + adapter.addSender(client, sender); // WHEN processing a telemetry message Message msg = ProtonHelper.message(TELEMETRY_MSG_CONTENT);