Skip to content

Commit

Permalink
[#96] Close upstream receivers on downstream connection loss.
Browse files Browse the repository at this point in the history
When the connection to the downstream container is lost, all links with
upstream clients for receiving telemetry or events are closed.
This way the clients will trigger the opening of new downstream senders
when they re-establish their links with Hono.
  • Loading branch information
Kai Hudalla committed Feb 21, 2017
1 parent 630cff6 commit a778798
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,17 @@
@Component
public abstract class ForwardingDownstreamAdapter implements DownstreamAdapter {

protected Logger logger = LoggerFactory.getLogger(getClass());
protected HonoConfigProperties honoConfig = new HonoConfigProperties();
private final Map<String, ProtonSender> activeSenders = new HashMap<>();
private final Map<String, List<String>> sendersPerConnection = new HashMap<>();
private boolean running = false;
/**
* A logger to be shared with subclasses.
*/
protected Logger logger = LoggerFactory.getLogger(getClass());
/**
* The Hono configuration.
*/
protected HonoConfigProperties honoConfig = new HonoConfigProperties();
private final Map<UpstreamReceiver, ProtonSender> activeSenders = new HashMap<>();
private final Map<String, List<UpstreamReceiver>> sendersPerConnection = new HashMap<>();
private boolean running = false;
private final Vertx vertx;
private ProtonConnection downstreamConnection;
private SenderFactory senderFactory;
Expand Down Expand Up @@ -149,6 +155,11 @@ public final void stop(final Future<Void> stopFuture) {
stopFuture.complete();
}

/**
* Gets the name of the downstream AMQP 1.0 container this adapter is forwarding messages to.
*
* @return The name or {@code null} if this adapter is currently not connected.
*/
protected final String getDownstreamContainer() {
if (downstreamConnection != null) {
return downstreamConnection.getRemoteContainer();
Expand All @@ -157,7 +168,7 @@ protected final String getDownstreamContainer() {
}
}

protected ProtonClientOptions createClientOptions() {
private ProtonClientOptions createClientOptions() {
return new ProtonClientOptions()
.setConnectTimeout(100)
.setReconnectAttempts(-1).setReconnectInterval(200); // reconnect forever, every 200 millisecs
Expand Down Expand Up @@ -196,7 +207,11 @@ private void onRemoteClose(final AsyncResult<ProtonConnection> remoteClose) {
*/
private void onDisconnectFromDownstreamContainer(final ProtonConnection con) {
// all links to downstream host will now be stale and unusable
logger.warn("lost connection to downstream container [{}]", con.getRemoteContainer());
logger.warn("lost connection to downstream container [{}], closing upstream receivers ...", con.getRemoteContainer());

for (UpstreamReceiver client : activeSenders.keySet()) {
client.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER);
}
activeSenders.clear();
con.disconnectHandler(null);
con.disconnect();
Expand All @@ -221,18 +236,19 @@ public final void onClientAttach(final UpstreamReceiver client, final Handler<As

if (activeSenders.containsKey(client.getLinkId())) {
logger.info("reusing existing downstream sender [con: {}, link: {}]", client.getConnectionId(), client.getLinkId());
resultHandler.handle(Future.succeededFuture());
} else {
createSender(
client.getTargetAddress(),
replenishedSender -> handleFlow(replenishedSender, client),
creationAttempt -> {
if (creationAttempt.succeeded()) {
logger.info("created downstream sender [con: {}, link: {}]", client.getConnectionId(), client.getLinkId());
addSender(client.getConnectionId(), client.getLinkId(), creationAttempt.result());
addSender(client, creationAttempt.result());
resultHandler.handle(Future.succeededFuture());
} else {
resultHandler.handle(Future.failedFuture(creationAttempt.cause()));
logger.warn("can't create downstream sender [con: {}, link: {}]", client.getConnectionId(), client.getLinkId(), creationAttempt.cause());
resultHandler.handle(Future.failedFuture(creationAttempt.cause()));
}
});
}
Expand Down Expand Up @@ -278,16 +294,16 @@ private static String getTenantOnlyTargetAddress(final String address) {
return String.format("%s/%s", targetAddress.getEndpoint(), targetAddress.getTenantId());
}

public final void addSender(final String connectionId, final String linkId, final ProtonSender sender) {
sender.attachments().set(Constants.KEY_CONNECTION_ID, String.class, connectionId);
public final void addSender(final UpstreamReceiver link, final ProtonSender sender) {
sender.attachments().set(Constants.KEY_CONNECTION_ID, String.class, link.getConnectionId());
sender.setAutoDrained(false); // we need to propagate drain requests upstream and wait for the result
activeSenders.put(linkId, sender);
List<String> senders = sendersPerConnection.get(connectionId);
activeSenders.put(link, sender);
List<UpstreamReceiver> senders = sendersPerConnection.get(link.getConnectionId());
if (senders == null) {
senders = new ArrayList<>();
sendersPerConnection.put(connectionId, senders);
sendersPerConnection.put(link.getConnectionId(), senders);
}
senders.add(linkId);
senders.add(link);
}

private static int getAvailableCredit(final ProtonSender sender) {
Expand All @@ -304,24 +320,18 @@ public final void onClientDetach(final UpstreamReceiver client) {

Objects.requireNonNull(client);

String connectionId = closeSender(client.getLinkId());
if (connectionId != null) {
List<String> senders = sendersPerConnection.get(connectionId);
if (senders != null) {
senders.remove(client.getLinkId());
}
closeSender(client);
List<UpstreamReceiver> senders = sendersPerConnection.get(client.getConnectionId());
if (senders != null) {
senders.remove(client);
}
}

private String closeSender(final String linkId) {
ProtonSender sender = activeSenders.remove(linkId);
private void closeSender(final UpstreamReceiver link) {
ProtonSender sender = activeSenders.remove(link);
if (sender != null && sender.isOpen()) {
String connectionId = Constants.getConnectionId(sender);
logger.info("closing downstream sender [con: {}, link: {}]", connectionId, linkId);
logger.info("closing downstream sender [con: {}, link: {}]", link.getConnectionId(), link.getLinkId());
sender.close();
return connectionId;
} else {
return null;
}
}

Expand All @@ -332,11 +342,11 @@ public final void onClientDisconnect(final String connectionId) {
throw new IllegalStateException("adapter must be started first");
}

List<String> senders = sendersPerConnection.remove(Objects.requireNonNull(connectionId));
List<UpstreamReceiver> senders = sendersPerConnection.remove(Objects.requireNonNull(connectionId));
if (senders != null && !senders.isEmpty()) {
logger.info("closing {} downstream senders for connection [id: {}]", senders.size(), connectionId);
for (String linkId : senders) {
closeSender(linkId);
for (UpstreamReceiver link : senders) {
closeSender(link);
}
}
}
Expand All @@ -351,7 +361,7 @@ public final void processMessage(final UpstreamReceiver client, final ProtonDeli
Objects.requireNonNull(client);
Objects.requireNonNull(msg);
Objects.requireNonNull(delivery);
ProtonSender sender = activeSenders.get(client.getLinkId());
ProtonSender sender = activeSenders.get(client);
if (sender == null) {
logger.info("no downstream sender for link [{}] available, discarding message and closing link with client", client.getLinkId());
client.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

/**
* A decorator for a {@code ProtonReceiver} representing a client uploading data to a Hono endpoint.
* <p>
* Subclasses are strongly encouraged to implement {@link Object#hashCode()} and {@link Object#equals(Object)}
* based on the <em>linkId</em> because instances of this interface are used as keys in maps in other classes of Hono.
*
*/
public interface UpstreamReceiver {
Expand Down Expand Up @@ -114,5 +117,4 @@ static UpstreamReceiver atLeastOnceReceiver(final String linkId, final ProtonRec
* @return The address.
*/
String getTargetAddress();

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
/**
* A decorator for a {@code ProtonReceiver} that represents a Hono client sending data downstream.
* <p>
* The main purpose of this class to <em>attach</em> a (surrogate) identifier to the receiver.
* The main purpose of this class is to <em>attach</em> a (surrogate) {@linkplain #getLinkId() identifier}
* to the receiver.
*/
public class UpstreamReceiverImpl implements UpstreamReceiver {

Expand Down Expand Up @@ -75,4 +76,34 @@ public String getConnectionId() {
public String getTargetAddress() {
return link.getRemoteTarget().getAddress();
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
UpstreamReceiverImpl other = (UpstreamReceiverImpl) obj;
if (id == null) {
if (other.id != null) {
return false;
}
} else if (!id.equals(other.id)) {
return false;
}
return true;
}
}
33 changes: 32 additions & 1 deletion server/src/test/java/org/eclipse/hono/TestSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
public final class TestSupport {

public static final String CLIENT_ID = "protocol_adapter";
public static final String CON_ID = "connection-1";
public static final int DEFAULT_CREDITS = 20;

public static ProtonConnection openConnection(final TestContext ctx, final Vertx vertx, final String host, final int port) {
Expand Down Expand Up @@ -145,9 +146,39 @@ public static SenderFactory newMockSenderFactory(final ProtonSender senderToCrea
};
}

/**
* Creates a new mock upstream client for the {@linkplain #CLIENT_ID default link ID}
* and {@linkplain #CON_ID default connection ID}.
*
* @return The new client.
*/
public static UpstreamReceiver newClient() {
return newClient(CLIENT_ID);
}

/**
* Creates a new mock upstream client for a link ID and the {@linkplain #CON_ID default connection ID}.
*
* @param linkId The client's link ID.
* @return The new client.
*/
public static UpstreamReceiver newClient(final String linkId) {

return newClient(linkId, CON_ID);
}

/**
* Creates a new mock upstream client for a link and connection ID.
*
* @param linkId The client's link ID.
* @param connectionId The client's conenction ID.
* @return The new client.
*/
public static UpstreamReceiver newClient(final String linkId, final String connectionId) {

UpstreamReceiver client = mock(UpstreamReceiver.class);
when(client.getLinkId()).thenReturn(CLIENT_ID);
when(client.getLinkId()).thenReturn(linkId);
when(client.getConnectionId()).thenReturn(connectionId);
return client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;

/**
Expand Down Expand Up @@ -67,7 +68,7 @@ public void testProcessMessageForwardsEventMessageToDownstreamSender() throws In
ForwardingEventDownstreamAdapter adapter = new ForwardingEventDownstreamAdapter(vertx, newMockSenderFactory(sender));
adapter.setDownstreamConnectionFactory(newMockConnectionFactory(false));
adapter.start(Future.future());
adapter.addSender("CON_ID", CLIENT_ID, sender);
adapter.addSender(client, sender);

// WHEN processing an event
Message msg = ProtonHelper.message(EVENT_MSG_CONTENT);
Expand Down
Loading

0 comments on commit a778798

Please sign in to comment.