Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Vert.x issue 5371 #5372

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ private Http2ServerStream createStream(Http2Headers headers, boolean streamEnded
}

private void initStream(int streamId, Http2ServerStream vertxStream) {
String contentEncoding = options.isCompressionSupported() ? determineContentEncoding(vertxStream.headers) : null;
Http2ServerRequest request = new Http2ServerRequest(vertxStream, serverOrigin, vertxStream.headers, contentEncoding);
Http2ServerRequest request = new Http2ServerRequest(vertxStream, serverOrigin, vertxStream.headers);
vertxStream.request = request;
vertxStream.isConnect = request.method() == HttpMethod.CONNECT;
Http2Stream stream = handler.connection().stream(streamId);
Expand Down Expand Up @@ -218,10 +217,9 @@ private synchronized void doSendPush(int streamId, HostAndPort authority, HttpMe
if (future.isSuccess()) {
synchronized (Http2ServerConnection.this) {
int promisedStreamId = future.getNow();
String contentEncoding = determineContentEncoding(headers_);
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
Http2ServerStream vertxStream = new Http2ServerStream(this, context, method, path, options.getTracingPolicy(), true);
Push push = new Push(vertxStream, contentEncoding, promise);
Http2ServerStream vertxStream = new Http2ServerStream(this, context, headers_, method, path, options.getTracingPolicy(), true);
Push push = new Push(vertxStream, promise);
vertxStream.request = push;
push.stream.priority(streamPriority);
push.stream.init(promisedStream);
Expand Down Expand Up @@ -252,11 +250,10 @@ private class Push implements Http2ServerStreamHandler {
private final Promise<HttpServerResponse> promise;

public Push(Http2ServerStream stream,
String contentEncoding,
Promise<HttpServerResponse> promise) {
this.context = stream.context;
this.stream = stream;
this.response = new Http2ServerResponse(stream.conn, stream, true, contentEncoding);
this.response = new Http2ServerResponse(stream.conn, stream, true);
this.promise = promise;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http2.Http2Headers;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
Expand Down Expand Up @@ -71,11 +70,10 @@ public class Http2ServerRequest extends HttpServerRequestInternal implements Htt

Http2ServerRequest(Http2ServerStream stream,
String serverOrigin,
Http2Headers headers,
String contentEncoding) {
Http2Headers headers) {
this.context = stream.context;
this.stream = stream;
this.response = new Http2ServerResponse(stream.conn, stream, false, contentEncoding);
this.response = new Http2ServerResponse(stream.conn, stream, false);
this.serverOrigin = serverOrigin;
this.headersMap = new Http2HeadersAdaptor(headers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class Http2ServerResponse implements HttpServerResponse, HttpResponse {
private final ChannelHandlerContext ctx;
private final Http2ServerConnection conn;
private final boolean push;
private final String contentEncoding;
private final Http2Headers headers = new DefaultHttp2Headers();
private Http2HeadersAdaptor headersMap;
private Http2Headers trailers;
Expand All @@ -70,13 +69,11 @@ public class Http2ServerResponse implements HttpServerResponse, HttpResponse {

public Http2ServerResponse(Http2ServerConnection conn,
Http2ServerStream stream,
boolean push,
String contentEncoding) {
boolean push) {
this.stream = stream;
this.ctx = conn.handlerContext;
this.conn = conn;
this.push = push;
this.contentEncoding = contentEncoding;
}

boolean isPush() {
Expand Down Expand Up @@ -454,9 +451,6 @@ private boolean checkSendHeaders(boolean end, boolean checkFlush) {

private void prepareHeaders() {
headers.status(status.codeAsText()); // Could be optimized for usual case ?
if (contentEncoding != null && headers.get(HttpHeaderNames.CONTENT_ENCODING) == null) {
headers.set(HttpHeaderNames.CONTENT_ENCODING, contentEncoding);
}
// Sanitize
if (stream.method == HttpMethod.HEAD || status == HttpResponseStatus.NOT_MODIFIED) {
headers.remove(HttpHeaders.TRANSFER_ENCODING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ class Http2ServerStream extends VertxHttp2Stream<Http2ServerConnection> {

Http2ServerStream(Http2ServerConnection conn,
ContextInternal context,
Http2Headers headers,
HttpMethod method,
String uri,
TracingPolicy tracingPolicy,
boolean halfClosedRemote) {
super(conn, context);

this.headers = null;
this.headers = headers;
this.method = method;
this.uri = uri;
this.scheme = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package io.vertx.core.http.impl;
vietj marked this conversation as resolved.
Show resolved Hide resolved

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.compression.CompressionOptions;
import io.netty.handler.codec.http2.CompressorHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Flags;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2LifecycleManager;
import io.netty.handler.codec.http2.Http2RemoteFlowController;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2SettingsReceivedConsumer;
import java.util.function.Function;

import static io.vertx.core.http.HttpHeaders.CONTENT_ENCODING;
import static io.vertx.core.http.HttpHeaders.IDENTITY;

public class VertxCompressorHttp2ConnectionEncoder implements Http2FrameWriter, Http2ConnectionEncoder, Http2SettingsReceivedConsumer {

private Http2ConnectionEncoder delegate;
private final Http2ConnectionEncoder plainEncoder;

public VertxCompressorHttp2ConnectionEncoder(Http2ConnectionEncoder plainEncoder, CompressionOptions[] compressionOptions) {
this.delegate = new CompressorHttp2ConnectionEncoder(plainEncoder, compressionOptions);
this.plainEncoder = plainEncoder;
}

private void beforeWritingHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) {
String contentEncodingToApply = determineContentEncodingToApply(ctx, streamId, responseHeaders);
if (contentEncodingToApply == null || contentEncodingToApply.equalsIgnoreCase(IDENTITY.toString())) {
if (responseHeaders.contains(CONTENT_ENCODING, IDENTITY)) {
responseHeaders.remove(CONTENT_ENCODING);
}
delegate = plainEncoder;
} else {
responseHeaders.set(CONTENT_ENCODING, contentEncodingToApply);
}
}

private String determineContentEncodingToApply(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) {
if (responseHeaders.contains(CONTENT_ENCODING)) {
return null;
}
return ifType(ctx.handler(), VertxHttp2ConnectionHandler.class, connectionHandler ->
ifType(connectionHandler.connectFuture().getNow(), Http2ServerConnection.class, connection ->
ifType(connection.stream(streamId), Http2ServerStream.class, stream ->
stream.headers == null ? null : connection.determineContentEncoding(stream.headers))));
}

private <T, R> R ifType(Object obj, Class<T> type, Function<T, R> then) {
if (type.isAssignableFrom(obj.getClass())) {
return then.apply(type.cast(obj));
}
return null;
}

@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) {
beforeWritingHeaders(ctx, streamId, headers);
return delegate.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
}

@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise) {
beforeWritingHeaders(ctx, streamId, headers);
return delegate.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream, promise);
}

@Override
public void lifecycleManager(Http2LifecycleManager http2LifecycleManager) {
delegate.lifecycleManager(http2LifecycleManager);
}

@Override
public Http2Connection connection() {
return delegate.connection();
}

@Override
public Http2RemoteFlowController flowController() {
return delegate.flowController();
}

@Override
public Http2FrameWriter frameWriter() {
return delegate.frameWriter();
}

@Override
public Http2Settings pollSentSettings() {
return delegate.pollSentSettings();
}

@Override
public void remoteSettings(Http2Settings http2Settings) throws Http2Exception {
delegate.remoteSettings(http2Settings);
}

@Override
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive, ChannelPromise promise) {
return delegate.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
}

@Override
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) {
return delegate.writeRstStream(ctx, streamId, errorCode, promise);
}

@Override
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) {
return delegate.writeSettings(ctx, settings, promise);
}

@Override
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
return delegate.writeSettingsAck(ctx, promise);
}

@Override
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
return delegate.writePing(ctx, ack, data, promise);
}

@Override
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) {
return delegate.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
}

@Override
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, ChannelPromise promise) {
return delegate.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
}

@Override
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
return delegate.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}

@Override
public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload, ChannelPromise promise) {
return delegate.writeFrame(ctx, frameType, streamId, flags, payload, promise);
}

@Override
public Configuration configuration() {
return delegate.configuration();
}

@Override
public void close() {
delegate.close();
}

@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise) {
return delegate.writeData(ctx, streamId, data, padding, endStream, promise);
}

@Override
public void consumeReceivedSettings(Http2Settings settings) {
if (delegate instanceof Http2SettingsReceivedConsumer) {
((Http2SettingsReceivedConsumer) delegate).consumeReceivedSettings(settings);
} else {
throw new IllegalStateException("delegate " + delegate + " is not an instance of " +
Http2SettingsReceivedConsumer.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void configureStreamByteDistributor() {
protected VertxHttp2ConnectionHandler<C> build(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) throws Exception {
if (server) {
if (compressionOptions != null) {
encoder = new CompressorHttp2ConnectionEncoder(encoder, compressionOptions);
encoder = new VertxCompressorHttp2ConnectionEncoder(encoder, compressionOptions);
}
VertxHttp2ConnectionHandler<C> handler = new VertxHttp2ConnectionHandler<>(connectionFactory, useDecompression, decoder, encoder, initialSettings);
decoder.frameListener(handler);
Expand Down
Loading