Skip to content

Commit

Permalink
Fixes Vert.x issue 5371
Browse files Browse the repository at this point in the history
  • Loading branch information
bjornhusberg committed Oct 24, 2024
1 parent 78e2176 commit 4c0638d
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ private Http2ServerStream createStream(Http2Headers headers, boolean streamEnded
}

private void initStream(int streamId, Http2ServerStream vertxStream) {
String contentEncoding = options.isCompressionSupported() ? determineContentEncoding(vertxStream.headers) : null;
Http2ServerRequest request = new Http2ServerRequest(vertxStream, serverOrigin, vertxStream.headers, contentEncoding);
Http2ServerRequest request = new Http2ServerRequest(vertxStream, serverOrigin, vertxStream.headers);
vertxStream.request = request;
vertxStream.isConnect = request.method() == HttpMethod.CONNECT;
Http2Stream stream = handler.connection().stream(streamId);
Expand Down Expand Up @@ -218,10 +217,9 @@ private synchronized void doSendPush(int streamId, HostAndPort authority, HttpMe
if (future.isSuccess()) {
synchronized (Http2ServerConnection.this) {
int promisedStreamId = future.getNow();
String contentEncoding = determineContentEncoding(headers_);
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
Http2ServerStream vertxStream = new Http2ServerStream(this, context, method, path, options.getTracingPolicy(), true);
Push push = new Push(vertxStream, contentEncoding, promise);
Http2ServerStream vertxStream = new Http2ServerStream(this, context, headers_, method, path, options.getTracingPolicy(), true);
Push push = new Push(vertxStream, promise);
vertxStream.request = push;
push.stream.priority(streamPriority);
push.stream.init(promisedStream);
Expand Down Expand Up @@ -252,11 +250,10 @@ private class Push implements Http2ServerStreamHandler {
private final Promise<HttpServerResponse> promise;

public Push(Http2ServerStream stream,
String contentEncoding,
Promise<HttpServerResponse> promise) {
this.context = stream.context;
this.stream = stream;
this.response = new Http2ServerResponse(stream.conn, stream, true, contentEncoding);
this.response = new Http2ServerResponse(stream.conn, stream, true);
this.promise = promise;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http2.Http2Headers;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
Expand Down Expand Up @@ -71,11 +70,10 @@ public class Http2ServerRequest extends HttpServerRequestInternal implements Htt

Http2ServerRequest(Http2ServerStream stream,
String serverOrigin,
Http2Headers headers,
String contentEncoding) {
Http2Headers headers) {
this.context = stream.context;
this.stream = stream;
this.response = new Http2ServerResponse(stream.conn, stream, false, contentEncoding);
this.response = new Http2ServerResponse(stream.conn, stream, false);
this.serverOrigin = serverOrigin;
this.headersMap = new Http2HeadersAdaptor(headers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class Http2ServerResponse implements HttpServerResponse, HttpResponse {
private final ChannelHandlerContext ctx;
private final Http2ServerConnection conn;
private final boolean push;
private final String contentEncoding;
private final Http2Headers headers = new DefaultHttp2Headers();
private Http2HeadersAdaptor headersMap;
private Http2Headers trailers;
Expand All @@ -70,13 +69,11 @@ public class Http2ServerResponse implements HttpServerResponse, HttpResponse {

public Http2ServerResponse(Http2ServerConnection conn,
Http2ServerStream stream,
boolean push,
String contentEncoding) {
boolean push) {
this.stream = stream;
this.ctx = conn.handlerContext;
this.conn = conn;
this.push = push;
this.contentEncoding = contentEncoding;
}

boolean isPush() {
Expand Down Expand Up @@ -454,9 +451,6 @@ private boolean checkSendHeaders(boolean end, boolean checkFlush) {

private void prepareHeaders() {
headers.status(status.codeAsText()); // Could be optimized for usual case ?
if (contentEncoding != null && headers.get(HttpHeaderNames.CONTENT_ENCODING) == null) {
headers.set(HttpHeaderNames.CONTENT_ENCODING, contentEncoding);
}
// Sanitize
if (stream.method == HttpMethod.HEAD || status == HttpResponseStatus.NOT_MODIFIED) {
headers.remove(HttpHeaders.TRANSFER_ENCODING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ class Http2ServerStream extends VertxHttp2Stream<Http2ServerConnection> {

Http2ServerStream(Http2ServerConnection conn,
ContextInternal context,
Http2Headers headers,
HttpMethod method,
String uri,
TracingPolicy tracingPolicy,
boolean halfClosedRemote) {
super(conn, context);

this.headers = null;
this.headers = headers;
this.method = method;
this.uri = uri;
this.scheme = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package io.vertx.core.http.impl;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.compression.CompressionOptions;
import io.netty.handler.codec.http2.CompressorHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Flags;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2LifecycleManager;
import io.netty.handler.codec.http2.Http2RemoteFlowController;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2SettingsReceivedConsumer;
import java.util.function.Function;

import static io.vertx.core.http.HttpHeaders.CONTENT_ENCODING;
import static io.vertx.core.http.HttpHeaders.IDENTITY;

public class VertxCompressorHttp2ConnectionEncoder implements Http2FrameWriter, Http2ConnectionEncoder, Http2SettingsReceivedConsumer {

private Http2ConnectionEncoder delegate;
private final Http2ConnectionEncoder plainEncoder;

public VertxCompressorHttp2ConnectionEncoder(Http2ConnectionEncoder plainEncoder, CompressionOptions[] compressionOptions) {
this.delegate = new CompressorHttp2ConnectionEncoder(plainEncoder, compressionOptions);
this.plainEncoder = plainEncoder;
}

private void beforeWritingHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) {
String contentEncodingToApply = determineContentEncodingToApply(ctx, streamId, responseHeaders);
if (contentEncodingToApply == null || contentEncodingToApply.equalsIgnoreCase(IDENTITY.toString())) {
if (responseHeaders.contains(CONTENT_ENCODING, IDENTITY)) {
responseHeaders.remove(CONTENT_ENCODING);
}
delegate = plainEncoder;
} else {
responseHeaders.set(CONTENT_ENCODING, contentEncodingToApply);
}
}

private String determineContentEncodingToApply(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) {
if (responseHeaders.contains(CONTENT_ENCODING)) {
return null;
}
return ifType(ctx.handler(), VertxHttp2ConnectionHandler.class, connectionHandler ->
ifType(connectionHandler.connectFuture().getNow(), Http2ServerConnection.class, connection ->
ifType(connection.stream(streamId), Http2ServerStream.class, stream ->
stream.headers == null ? null : connection.determineContentEncoding(stream.headers))));
}

private <T, R> R ifType(Object obj, Class<T> type, Function<T, R> then) {
if (type.isAssignableFrom(obj.getClass())) {
return then.apply(type.cast(obj));
}
return null;
}

@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) {
beforeWritingHeaders(ctx, streamId, headers);
return delegate.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
}

@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise) {
beforeWritingHeaders(ctx, streamId, headers);
return delegate.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream, promise);
}

@Override
public void lifecycleManager(Http2LifecycleManager http2LifecycleManager) {
delegate.lifecycleManager(http2LifecycleManager);
}

@Override
public Http2Connection connection() {
return delegate.connection();
}

@Override
public Http2RemoteFlowController flowController() {
return delegate.flowController();
}

@Override
public Http2FrameWriter frameWriter() {
return delegate.frameWriter();
}

@Override
public Http2Settings pollSentSettings() {
return delegate.pollSentSettings();
}

@Override
public void remoteSettings(Http2Settings http2Settings) throws Http2Exception {
delegate.remoteSettings(http2Settings);
}

@Override
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive, ChannelPromise promise) {
return delegate.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
}

@Override
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) {
return delegate.writeRstStream(ctx, streamId, errorCode, promise);
}

@Override
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) {
return delegate.writeSettings(ctx, settings, promise);
}

@Override
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
return delegate.writeSettingsAck(ctx, promise);
}

@Override
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
return delegate.writePing(ctx, ack, data, promise);
}

@Override
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) {
return delegate.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
}

@Override
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, ChannelPromise promise) {
return delegate.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
}

@Override
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
return delegate.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}

@Override
public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload, ChannelPromise promise) {
return delegate.writeFrame(ctx, frameType, streamId, flags, payload, promise);
}

@Override
public Configuration configuration() {
return delegate.configuration();
}

@Override
public void close() {
delegate.close();
}

@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise) {
return delegate.writeData(ctx, streamId, data, padding, endStream, promise);
}

@Override
public void consumeReceivedSettings(Http2Settings settings) {
if (delegate instanceof Http2SettingsReceivedConsumer) {
((Http2SettingsReceivedConsumer) delegate).consumeReceivedSettings(settings);
} else {
throw new IllegalStateException("delegate " + delegate + " is not an instance of " +
Http2SettingsReceivedConsumer.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void configureStreamByteDistributor() {
protected VertxHttp2ConnectionHandler<C> build(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) throws Exception {
if (server) {
if (compressionOptions != null) {
encoder = new CompressorHttp2ConnectionEncoder(encoder, compressionOptions);
encoder = new VertxCompressorHttp2ConnectionEncoder(encoder, compressionOptions);
}
VertxHttp2ConnectionHandler<C> handler = new VertxHttp2ConnectionHandler<>(connectionFactory, useDecompression, decoder, encoder, initialSettings);
decoder.frameListener(handler);
Expand Down
Loading

0 comments on commit 4c0638d

Please sign in to comment.