From 4c0638df458621819c51a739895de3e7c3ce7f5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Husberg?= Date: Thu, 24 Oct 2024 14:48:32 +0200 Subject: [PATCH] Fixes Vert.x issue 5371 https://github.com/eclipse-vertx/vert.x/issues/5371 --- .../core/http/impl/Http2ServerConnection.java | 11 +- .../core/http/impl/Http2ServerRequest.java | 6 +- .../core/http/impl/Http2ServerResponse.java | 8 +- .../core/http/impl/Http2ServerStream.java | 3 +- ...VertxCompressorHttp2ConnectionEncoder.java | 174 ++++++++++++++++++ .../VertxHttp2ConnectionHandlerBuilder.java | 2 +- .../io/vertx/tests/http/Http2ServerTest.java | 107 ++++++++++- 7 files changed, 290 insertions(+), 21 deletions(-) create mode 100644 vertx-core/src/main/java/io/vertx/core/http/impl/VertxCompressorHttp2ConnectionEncoder.java diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java index 1d42df5b445..335c1db7540 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java @@ -151,8 +151,7 @@ private Http2ServerStream createStream(Http2Headers headers, boolean streamEnded } private void initStream(int streamId, Http2ServerStream vertxStream) { - String contentEncoding = options.isCompressionSupported() ? determineContentEncoding(vertxStream.headers) : null; - Http2ServerRequest request = new Http2ServerRequest(vertxStream, serverOrigin, vertxStream.headers, contentEncoding); + Http2ServerRequest request = new Http2ServerRequest(vertxStream, serverOrigin, vertxStream.headers); vertxStream.request = request; vertxStream.isConnect = request.method() == HttpMethod.CONNECT; Http2Stream stream = handler.connection().stream(streamId); @@ -218,10 +217,9 @@ private synchronized void doSendPush(int streamId, HostAndPort authority, HttpMe if (future.isSuccess()) { synchronized (Http2ServerConnection.this) { int promisedStreamId = future.getNow(); - String contentEncoding = determineContentEncoding(headers_); Http2Stream promisedStream = handler.connection().stream(promisedStreamId); - Http2ServerStream vertxStream = new Http2ServerStream(this, context, method, path, options.getTracingPolicy(), true); - Push push = new Push(vertxStream, contentEncoding, promise); + Http2ServerStream vertxStream = new Http2ServerStream(this, context, headers_, method, path, options.getTracingPolicy(), true); + Push push = new Push(vertxStream, promise); vertxStream.request = push; push.stream.priority(streamPriority); push.stream.init(promisedStream); @@ -252,11 +250,10 @@ private class Push implements Http2ServerStreamHandler { private final Promise promise; public Push(Http2ServerStream stream, - String contentEncoding, Promise promise) { this.context = stream.context; this.stream = stream; - this.response = new Http2ServerResponse(stream.conn, stream, true, contentEncoding); + this.response = new Http2ServerResponse(stream.conn, stream, true); this.promise = promise; } diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java index 3ced0088da0..075c0b90994 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java @@ -18,7 +18,6 @@ import io.netty.handler.codec.http.multipart.InterfaceHttpData; import io.netty.handler.codec.http2.Http2Headers; import io.vertx.codegen.annotations.Nullable; -import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; @@ -71,11 +70,10 @@ public class Http2ServerRequest extends HttpServerRequestInternal implements Htt Http2ServerRequest(Http2ServerStream stream, String serverOrigin, - Http2Headers headers, - String contentEncoding) { + Http2Headers headers) { this.context = stream.context; this.stream = stream; - this.response = new Http2ServerResponse(stream.conn, stream, false, contentEncoding); + this.response = new Http2ServerResponse(stream.conn, stream, false); this.serverOrigin = serverOrigin; this.headersMap = new Http2HeadersAdaptor(headers); } diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java index e1f90d91306..f2cc228470e 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java @@ -48,7 +48,6 @@ public class Http2ServerResponse implements HttpServerResponse, HttpResponse { private final ChannelHandlerContext ctx; private final Http2ServerConnection conn; private final boolean push; - private final String contentEncoding; private final Http2Headers headers = new DefaultHttp2Headers(); private Http2HeadersAdaptor headersMap; private Http2Headers trailers; @@ -70,13 +69,11 @@ public class Http2ServerResponse implements HttpServerResponse, HttpResponse { public Http2ServerResponse(Http2ServerConnection conn, Http2ServerStream stream, - boolean push, - String contentEncoding) { + boolean push) { this.stream = stream; this.ctx = conn.handlerContext; this.conn = conn; this.push = push; - this.contentEncoding = contentEncoding; } boolean isPush() { @@ -454,9 +451,6 @@ private boolean checkSendHeaders(boolean end, boolean checkFlush) { private void prepareHeaders() { headers.status(status.codeAsText()); // Could be optimized for usual case ? - if (contentEncoding != null && headers.get(HttpHeaderNames.CONTENT_ENCODING) == null) { - headers.set(HttpHeaderNames.CONTENT_ENCODING, contentEncoding); - } // Sanitize if (stream.method == HttpMethod.HEAD || status == HttpResponseStatus.NOT_MODIFIED) { headers.remove(HttpHeaders.TRANSFER_ENCODING); diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java index 288f928afc0..f303862f369 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java @@ -50,13 +50,14 @@ class Http2ServerStream extends VertxHttp2Stream { Http2ServerStream(Http2ServerConnection conn, ContextInternal context, + Http2Headers headers, HttpMethod method, String uri, TracingPolicy tracingPolicy, boolean halfClosedRemote) { super(conn, context); - this.headers = null; + this.headers = headers; this.method = method; this.uri = uri; this.scheme = null; diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/VertxCompressorHttp2ConnectionEncoder.java b/vertx-core/src/main/java/io/vertx/core/http/impl/VertxCompressorHttp2ConnectionEncoder.java new file mode 100644 index 00000000000..f33242b7eb2 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/VertxCompressorHttp2ConnectionEncoder.java @@ -0,0 +1,174 @@ +package io.vertx.core.http.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.compression.CompressionOptions; +import io.netty.handler.codec.http2.CompressorHttp2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2Flags; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2LifecycleManager; +import io.netty.handler.codec.http2.Http2RemoteFlowController; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.codec.http2.Http2SettingsReceivedConsumer; +import java.util.function.Function; + +import static io.vertx.core.http.HttpHeaders.CONTENT_ENCODING; +import static io.vertx.core.http.HttpHeaders.IDENTITY; + +public class VertxCompressorHttp2ConnectionEncoder implements Http2FrameWriter, Http2ConnectionEncoder, Http2SettingsReceivedConsumer { + + private Http2ConnectionEncoder delegate; + private final Http2ConnectionEncoder plainEncoder; + + public VertxCompressorHttp2ConnectionEncoder(Http2ConnectionEncoder plainEncoder, CompressionOptions[] compressionOptions) { + this.delegate = new CompressorHttp2ConnectionEncoder(plainEncoder, compressionOptions); + this.plainEncoder = plainEncoder; + } + + private void beforeWritingHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) { + String contentEncodingToApply = determineContentEncodingToApply(ctx, streamId, responseHeaders); + if (contentEncodingToApply == null || contentEncodingToApply.equalsIgnoreCase(IDENTITY.toString())) { + if (responseHeaders.contains(CONTENT_ENCODING, IDENTITY)) { + responseHeaders.remove(CONTENT_ENCODING); + } + delegate = plainEncoder; + } else { + responseHeaders.set(CONTENT_ENCODING, contentEncodingToApply); + } + } + + private String determineContentEncodingToApply(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) { + if (responseHeaders.contains(CONTENT_ENCODING)) { + return null; + } + return ifType(ctx.handler(), VertxHttp2ConnectionHandler.class, connectionHandler -> + ifType(connectionHandler.connectFuture().getNow(), Http2ServerConnection.class, connection -> + ifType(connection.stream(streamId), Http2ServerStream.class, stream -> + stream.headers == null ? null : connection.determineContentEncoding(stream.headers)))); + } + + private R ifType(Object obj, Class type, Function then) { + if (type.isAssignableFrom(obj.getClass())) { + return then.apply(type.cast(obj)); + } + return null; + } + + @Override + public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) { + beforeWritingHeaders(ctx, streamId, headers); + return delegate.writeHeaders(ctx, streamId, headers, padding, endStream, promise); + } + + @Override + public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise) { + beforeWritingHeaders(ctx, streamId, headers); + return delegate.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream, promise); + } + + @Override + public void lifecycleManager(Http2LifecycleManager http2LifecycleManager) { + delegate.lifecycleManager(http2LifecycleManager); + } + + @Override + public Http2Connection connection() { + return delegate.connection(); + } + + @Override + public Http2RemoteFlowController flowController() { + return delegate.flowController(); + } + + @Override + public Http2FrameWriter frameWriter() { + return delegate.frameWriter(); + } + + @Override + public Http2Settings pollSentSettings() { + return delegate.pollSentSettings(); + } + + @Override + public void remoteSettings(Http2Settings http2Settings) throws Http2Exception { + delegate.remoteSettings(http2Settings); + } + + @Override + public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive, ChannelPromise promise) { + return delegate.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise); + } + + @Override + public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) { + return delegate.writeRstStream(ctx, streamId, errorCode, promise); + } + + @Override + public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) { + return delegate.writeSettings(ctx, settings, promise); + } + + @Override + public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) { + return delegate.writeSettingsAck(ctx, promise); + } + + @Override + public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) { + return delegate.writePing(ctx, ack, data, promise); + } + + @Override + public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) { + return delegate.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise); + } + + @Override + public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, ChannelPromise promise) { + return delegate.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise); + } + + @Override + public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { + return delegate.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); + } + + @Override + public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload, ChannelPromise promise) { + return delegate.writeFrame(ctx, frameType, streamId, flags, payload, promise); + } + + @Override + public Configuration configuration() { + return delegate.configuration(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise) { + return delegate.writeData(ctx, streamId, data, padding, endStream, promise); + } + + @Override + public void consumeReceivedSettings(Http2Settings settings) { + if (delegate instanceof Http2SettingsReceivedConsumer) { + ((Http2SettingsReceivedConsumer) delegate).consumeReceivedSettings(settings); + } else { + throw new IllegalStateException("delegate " + delegate + " is not an instance of " + + Http2SettingsReceivedConsumer.class); + } + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandlerBuilder.java b/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandlerBuilder.java index f36a6d3dcfe..bd40865acfd 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandlerBuilder.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandlerBuilder.java @@ -147,7 +147,7 @@ private void configureStreamByteDistributor() { protected VertxHttp2ConnectionHandler build(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) throws Exception { if (server) { if (compressionOptions != null) { - encoder = new CompressorHttp2ConnectionEncoder(encoder, compressionOptions); + encoder = new VertxCompressorHttp2ConnectionEncoder(encoder, compressionOptions); } VertxHttp2ConnectionHandler handler = new VertxHttp2ConnectionHandler<>(connectionFactory, useDecompression, decoder, encoder, initialSettings); decoder.frameListener(handler); diff --git a/vertx-core/src/test/java/io/vertx/tests/http/Http2ServerTest.java b/vertx-core/src/test/java/io/vertx/tests/http/Http2ServerTest.java index 74645b21df6..b9a863dea91 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/Http2ServerTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/Http2ServerTest.java @@ -2123,7 +2123,7 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int if (i == -1) { break; } - baos.write(i);; + baos.write(i); } decoded = baos.toString(); } catch (IOException e) { @@ -2144,6 +2144,111 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int await(); } + @Test + public void testResponseCompressionEnabledButResponseAlreadyCompressed() throws Exception { + waitFor(2); + String expected = TestUtils.randomAlphaString(1000); + server.close(); + server = vertx.createHttpServer(serverOptions.setCompressionSupported(true)); + server.requestHandler(req -> { + req.response().headers().set(HttpHeaderNames.CONTENT_ENCODING, "gzip"); + try { + req.response().end(Buffer.buffer(TestUtils.compressGzip(expected))); + } catch (Exception e) { + fail(e); + } + }); + startServer(); + TestClient client = new TestClient(); + ChannelFuture fut = client.connect(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, request -> { + request.decoder.frameListener(new Http2EventAdapter() { + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception { + vertx.runOnContext(v -> { + assertEquals("gzip", headers.get(HttpHeaderNames.CONTENT_ENCODING).toString()); + complete(); + }); + } + @Override + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { + byte[] bytes = new byte[data.readableBytes()]; + data.readBytes(bytes); + vertx.runOnContext(v -> { + String decoded; + try { + GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(bytes)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + while (true) { + int i = in.read(); + if (i == -1) { + break; + } + baos.write(i); + } + decoded = baos.toString(); + } catch (IOException e) { + fail(e); + return; + } + assertEquals(expected, decoded); + complete(); + }); + return super.onDataRead(ctx, streamId, data, padding, endOfStream); + } + }); + int id = request.nextStreamId(); + request.encoder.writeHeaders(request.context, id, GET("/").add("accept-encoding", "gzip"), 0, true, request.context.newPromise()); + request.context.flush(); + }); + fut.sync(); + await(); + } + + @Test + public void testResponseCompressionEnabledButExplicitlyDisabled() throws Exception { + waitFor(2); + String expected = TestUtils.randomAlphaString(1000); + server.close(); + server = vertx.createHttpServer(serverOptions.setCompressionSupported(true)); + server.requestHandler(req -> { + req.response().headers().set(HttpHeaderNames.CONTENT_ENCODING, "identity"); + try { + req.response().end(expected); + } catch (Exception e) { + fail(e); + } + }); + startServer(); + TestClient client = new TestClient(); + ChannelFuture fut = client.connect(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, request -> { + request.decoder.frameListener(new Http2EventAdapter() { + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception { + vertx.runOnContext(v -> { + assertFalse(headers.contains(HttpHeaderNames.CONTENT_ENCODING)); + complete(); + }); + } + @Override + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { + byte[] bytes = new byte[data.readableBytes()]; + data.readBytes(bytes); + vertx.runOnContext(v -> { + String decoded = new String(bytes, StandardCharsets.UTF_8); + assertEquals(expected, decoded); + complete(); + }); + return super.onDataRead(ctx, streamId, data, padding, endOfStream); + } + }); + int id = request.nextStreamId(); + request.encoder.writeHeaders(request.context, id, GET("/").add("accept-encoding", "gzip"), 0, true, request.context.newPromise()); + request.context.flush(); + }); + fut.sync(); + await(); + } + @Test public void testRequestCompressionEnabled() throws Exception { String expected = TestUtils.randomAlphaString(1000);