Skip to content

Commit

Permalink
[#2152] Add integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Kai Hudalla <[email protected]>
  • Loading branch information
sophokles73 committed Oct 27, 2020
1 parent 91bd4e7 commit 388c791
Showing 1 changed file with 58 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.LinkError;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ServerErrorException;
Expand Down Expand Up @@ -172,6 +177,59 @@ public void testAdapterRejectsBadInboundMessage(final VertxTestContext context)

}

/**
* Verifies that the adapter closes the link when a device sends a message containing a payload which
* exceeds the configured max payload size.
*
* @param ctx The Vert.x test context.
*/
@Test
@Timeout(timeUnit = TimeUnit.SECONDS, value = 10)
public void testAdapterClosesLinkOnMessageExceedingMaxPayloadSize(final VertxTestContext ctx) {

final String tenantId = helper.getRandomTenantId();
final String deviceId = helper.getRandomDeviceId(tenantId);

createConsumer(tenantId, msg -> {
ctx.failNow(new AssertionError("should not have received message"));
})
.compose(consumer -> setupProtocolAdapter(tenantId, deviceId, false))
.onComplete(ctx.succeeding(s -> {

s.detachHandler(remoteDetach -> {
ctx.verify(() -> {
final ErrorCondition errorCondition = s.getRemoteCondition();
assertThat(remoteDetach.succeeded()).isFalse();
assertThat(errorCondition).isNotNull();
assertThat((Comparable<Symbol>) errorCondition.getCondition()).isEqualTo(LinkError.MESSAGE_SIZE_EXCEEDED);
});
log.info("AMQP adapter detached link as expected");
s.close();
ctx.completeNow();
});

final UnsignedLong maxMessageSize = s.getRemoteMaxMessageSize();
log.info("AMQP adapter uses max-message-size {}", maxMessageSize);

ctx.verify(() -> {
assertThat(maxMessageSize).as("check adapter's attach frame includes max-message-size").isNotNull();
assertThat(maxMessageSize.longValue()).as("check message size is limited").isGreaterThan(0);
});

final Message msg = ProtonHelper.message();
msg.setContentType("opaque/binary");
msg.setAddress(getEndpointName());
msg.setBody(new Data(new Binary(IntegrationTestSupport.getPayload(maxMessageSize.intValue()))));

context.runOnContext(go -> {
log.debug("sending message");
s.send(msg);
});
}));


}

/**
* Verifies that the AMQP Adapter rejects (closes) AMQP links that contain a target address.
*
Expand Down

0 comments on commit 388c791

Please sign in to comment.