diff --git a/.github/workflows/ci-integration-tests.yml b/.github/workflows/ci-integration-tests.yml index 534393fe5..9862f8a80 100644 --- a/.github/workflows/ci-integration-tests.yml +++ b/.github/workflows/ci-integration-tests.yml @@ -27,5 +27,7 @@ jobs: mysql version: ${{ matrix.mysql-version }} mysql database: r2dbc mysql root password: r2dbc-password!@ + - name: Enable LOCAL INFILE + run: mysql --protocol=tcp --password='r2dbc-password!@' "SET GLOBAL local_infile=on;" - name: Integration test with MySQL ${{ matrix.mysql-version }} run: ./mvnw -B verify -Dmaven.javadoc.skip=true -Dmaven.surefire.skip=true -Dtest.mysql.password=r2dbc-password!@ -Dtest.mysql.version=${{ matrix.mysql-version }} -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=WARN diff --git a/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java b/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java index 51c0930b8..46d93174b 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java @@ -119,6 +119,10 @@ public ZeroDateOption getZeroDateOption() { return zeroDateOption; } + public int getLocalInfileBufferSize() { + return 64 * 1024; + } + /** * Get the bitmap of server statuses. * diff --git a/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java b/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java index 2cc7f1142..107277d2d 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java @@ -22,12 +22,12 @@ import io.asyncer.r2dbc.mysql.client.FluxExchangeable; import io.asyncer.r2dbc.mysql.constant.ServerStatuses; import io.asyncer.r2dbc.mysql.constant.SslMode; -import io.asyncer.r2dbc.mysql.internal.util.InternalArrays; import io.asyncer.r2dbc.mysql.internal.util.StringUtils; import io.asyncer.r2dbc.mysql.message.client.AuthResponse; import io.asyncer.r2dbc.mysql.message.client.ClientMessage; import io.asyncer.r2dbc.mysql.message.client.HandshakeResponse; -import io.asyncer.r2dbc.mysql.message.client.LoginClientMessage; +import io.asyncer.r2dbc.mysql.message.client.LocalInfileResponse; +import io.asyncer.r2dbc.mysql.message.client.SubsequenceClientMessage; import io.asyncer.r2dbc.mysql.message.client.PingMessage; import io.asyncer.r2dbc.mysql.message.client.PrepareQueryMessage; import io.asyncer.r2dbc.mysql.message.client.PreparedCloseMessage; @@ -44,6 +44,7 @@ import io.asyncer.r2dbc.mysql.message.server.ErrorMessage; import io.asyncer.r2dbc.mysql.message.server.HandshakeHeader; import io.asyncer.r2dbc.mysql.message.server.HandshakeRequest; +import io.asyncer.r2dbc.mysql.message.server.LocalInfileRequest; import io.asyncer.r2dbc.mysql.message.server.OkMessage; import io.asyncer.r2dbc.mysql.message.server.PreparedOkMessage; import io.asyncer.r2dbc.mysql.message.server.ServerMessage; @@ -74,6 +75,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Predicate; @@ -209,36 +211,26 @@ static Mono login(Client client, SslMode sslMode, String database, Strin * terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. The {@link ErrorMessage} * will emit an exception. The exchange will be completed by {@link CompleteMessage} after receive the * last result for the last binding. + *

+ * Note: this method does not support {@code LOCAL INFILE} due to it should be used for excepted queries. * * @param client the {@link Client} to exchange messages with. * @param sql the query to execute, can be contains multi-statements. * @return receives complete signal. */ static Mono executeVoid(Client client, String sql) { - return Mono.defer(() -> execute0(client, sql).doOnNext(EXECUTE_VOID).then()); - } + return Mono.defer(() -> client.exchange(new TextQueryMessage(sql), (message, sink) -> { + if (message instanceof ErrorMessage) { + sink.next(((ErrorMessage) message).offendedBy(sql)); + sink.complete(); + } else { + sink.next(message); - /** - * Execute multiple simple queries with one-by-one and return a {@link Mono} for the complete signal or - * error. Query execution terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. The - * {@link ErrorMessage} will emit an exception and cancel subsequent statements execution. The exchange - * will be completed by {@link CompleteMessage} after receive the last result for the last binding. - * - * @param client the {@link Client} to exchange messages with. - * @param statements the queries to execute, each element can be contains multi-statements. - * @return receives complete signal. - */ - static Mono executeVoid(Client client, String... statements) { - switch (statements.length) { - case 0: - return Mono.empty(); - case 1: - return executeVoid(client, statements[0]); - default: - return client.exchange(new MultiQueryExchangeable(InternalArrays.asIterator(statements))) - .doOnNext(EXECUTE_VOID) - .then(); - } + if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) { + sink.complete(); + } + } + }).doOnSubscribe(ignored -> QueryLogger.log(sql)).doOnNext(EXECUTE_VOID).then()); } /** @@ -303,18 +295,7 @@ static Mono createSavepoint(Client client, ConnectionState state, String n * @return the messages received in response to this exchange. */ private static Flux execute0(Client client, String sql) { - return client.exchange(new TextQueryMessage(sql), (message, sink) -> { - if (message instanceof ErrorMessage) { - sink.next(((ErrorMessage) message).offendedBy(sql)); - sink.complete(); - } else { - sink.next(message); - - if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) { - sink.complete(); - } - } - }).doOnSubscribe(ignored -> QueryLogger.log(sql)); + return client.exchange(new SimpleQueryExchangeable(sql)); } private QueryFlow() { } @@ -339,6 +320,16 @@ public final void accept(ServerMessage message, SynchronousSink s if (message instanceof ErrorMessage) { sink.next(((ErrorMessage) message).offendedBy(offendingSql())); sink.complete(); + } else if (message instanceof LocalInfileRequest) { + LocalInfileRequest request = (LocalInfileRequest) message; + String path = request.getPath(); + + QueryLogger.logLocalInfile(path); + + requests.emitNext( + new LocalInfileResponse(request.getEnvelopeId() + 1, path, sink), + Sinks.EmitFailureHandler.FAIL_FAST + ); } else { sink.next(message); @@ -353,6 +344,59 @@ public final void accept(ServerMessage message, SynchronousSink s abstract protected String offendingSql(); } +final class SimpleQueryExchangeable extends BaseFluxExchangeable { + + private static final int INIT = 0; + + private static final int EXECUTE = 1; + + private static final int DISPOSE = 2; + + private final AtomicInteger state = new AtomicInteger(INIT); + + private final String sql; + + SimpleQueryExchangeable(String sql) { + this.sql = sql; + } + + @Override + public void dispose() { + if (state.getAndSet(DISPOSE) != DISPOSE) { + requests.tryEmitComplete(); + } + } + + @Override + public boolean isDisposed() { + return state.get() == DISPOSE; + } + + @Override + protected void tryNextOrComplete(@Nullable SynchronousSink sink) { + if (state.compareAndSet(INIT, EXECUTE)) { + QueryLogger.log(sql); + + Sinks.EmitResult result = requests.tryEmitNext(new TextQueryMessage(sql)); + + if (result == Sinks.EmitResult.OK) { + return; + } + + QueryFlow.logger.error("Emit request failed due to {}", result); + } + + if (sink != null) { + sink.complete(); + } + } + + @Override + protected String offendingSql() { + return sql; + } +} + /** * An implementation of {@link FluxExchangeable} that considers client-preparing requests. */ @@ -770,8 +814,8 @@ final class LoginExchangeable extends FluxExchangeable { private static final int HANDSHAKE_VERSION = 10; - private final Sinks.Many requests = Sinks.many().unicast() - .onBackpressureBuffer(Queues.one().get()); + private final Sinks.Many requests = Sinks.many().unicast() + .onBackpressureBuffer(Queues.one().get()); private final Client client; @@ -879,7 +923,7 @@ public void dispose() { this.requests.tryEmitComplete(); } - private void emitNext(LoginClientMessage message, SynchronousSink sink) { + private void emitNext(SubsequenceClientMessage message, SynchronousSink sink) { Sinks.EmitResult result = requests.tryEmitNext(message); if (result != Sinks.EmitResult.OK) { @@ -903,8 +947,6 @@ private Capability clientCapability(Capability serverCapability) { builder.disableDatabasePinned(); builder.disableCompression(); - // TODO: support LOAD DATA LOCAL INFILE - builder.disableLoadDataInfile(); builder.disableIgnoreAmbiguitySpace(); builder.disableInteractiveTimeout(); diff --git a/src/main/java/io/asyncer/r2dbc/mysql/QueryLogger.java b/src/main/java/io/asyncer/r2dbc/mysql/QueryLogger.java index cb7d954cd..3894167ae 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/QueryLogger.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/QueryLogger.java @@ -47,5 +47,9 @@ static void log(int statementId, MySqlParameter[] values) { logger.debug("Executing prepared statement {} with {}", statementId, values); } + static void logLocalInfile(String path) { + logger.debug("Loading data from: {}", path); + } + private QueryLogger() { } } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/TextParametrizedStatement.java b/src/main/java/io/asyncer/r2dbc/mysql/TextParametrizedStatement.java index 9d2c2c55b..e0fd475c6 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/TextParametrizedStatement.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/TextParametrizedStatement.java @@ -33,7 +33,8 @@ final class TextParametrizedStatement extends ParametrizedStatementSupport { @Override protected Flux execute(List bindings) { - return Flux.defer(() -> QueryFlow.execute(client, query, returningIdentifiers(), bindings)) + return Flux.defer(() -> QueryFlow.execute(client, query, returningIdentifiers(), + bindings)) .map(messages -> MySqlResult.toResult(false, codecs, context, syntheticKeyName(), messages)); } } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/TextSimpleStatement.java b/src/main/java/io/asyncer/r2dbc/mysql/TextSimpleStatement.java index e23873c07..04fd90001 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/TextSimpleStatement.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/TextSimpleStatement.java @@ -34,7 +34,7 @@ final class TextSimpleStatement extends SimpleStatementSupport { public Flux execute() { return Flux.defer(() -> QueryFlow.execute( client, - StringUtils.extendReturning(sql, returningIdentifiers())) - ).map(messages -> MySqlResult.toResult(false, codecs, context, syntheticKeyName(), messages)); + StringUtils.extendReturning(sql, returningIdentifiers()) + ).map(messages -> MySqlResult.toResult(false, codecs, context, syntheticKeyName(), messages))); } } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java b/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java index a467ae2ec..1641e480c 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java @@ -19,7 +19,7 @@ import io.asyncer.r2dbc.mysql.ConnectionContext; import io.asyncer.r2dbc.mysql.internal.util.OperatorUtils; import io.asyncer.r2dbc.mysql.message.client.ClientMessage; -import io.asyncer.r2dbc.mysql.message.client.LoginClientMessage; +import io.asyncer.r2dbc.mysql.message.client.SubsequenceClientMessage; import io.asyncer.r2dbc.mysql.message.client.PrepareQueryMessage; import io.asyncer.r2dbc.mysql.message.client.PreparedFetchMessage; import io.asyncer.r2dbc.mysql.message.client.SslRequest; @@ -86,22 +86,22 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (msg instanceof ClientMessage) { ByteBufAllocator allocator = ctx.alloc(); - Flux encoded; - int envelopeId; - if (msg instanceof LoginClientMessage) { - LoginClientMessage message = (LoginClientMessage) msg; + if (msg instanceof SubsequenceClientMessage) { + SubsequenceClientMessage message = (SubsequenceClientMessage) msg; encoded = Flux.from(message.encode(allocator, this.context)); - envelopeId = message.getEnvelopeId(); + int envelopeId = message.getEnvelopeId(); + + OperatorUtils.envelope(encoded, allocator, envelopeId, false) + .subscribe(new WriteSubscriber(ctx, promise)); } else { encoded = Flux.from(((ClientMessage) msg).encode(allocator, this.context)); - envelopeId = 0; - } - OperatorUtils.cumulateEnvelope(encoded, allocator, envelopeId) - .subscribe(new WriteSubscriber(ctx, promise)); + OperatorUtils.envelope(encoded, allocator, 0, true) + .subscribe(new WriteSubscriber(ctx, promise)); + } if (msg instanceof PrepareQueryMessage) { setDecodeContext(DecodeContext.prepareQuery()); diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java b/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java index e30191e8e..505ebf919 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java @@ -184,7 +184,8 @@ public Flux exchange(FluxExchangeable exchangeable) { .asFlux() .doOnSubscribe(ignored -> exchangeable.subscribe( this::emitNextRequest, - e -> requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST)) + e -> + requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST)) ) .handle(exchangeable) .doOnTerminate(() -> { diff --git a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelope.java b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelope.java similarity index 73% rename from src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelope.java rename to src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelope.java index 9bc085be8..27e786903 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelope.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelope.java @@ -32,7 +32,7 @@ * An implementation of {@link Flux}{@code <}{@link ByteBuf}{@code >} that considers cumulate buffers as * envelopes of the MySQL socket protocol. */ -final class FluxCumulateEnvelope extends FluxOperator { +final class FluxEnvelope extends FluxOperator { private final ByteBufAllocator alloc; @@ -40,17 +40,132 @@ final class FluxCumulateEnvelope extends FluxOperator { private final int start; - FluxCumulateEnvelope(Flux source, ByteBufAllocator alloc, int size, int start) { + private final boolean cumulate; + + FluxEnvelope(Flux source, ByteBufAllocator alloc, int size, int start, + boolean cumulate) { super(source); this.alloc = alloc; this.size = size; this.start = start; + this.cumulate = cumulate; } @Override public void subscribe(CoreSubscriber actual) { - this.source.subscribe(new CumulateEnvelopeSubscriber(actual, alloc, size, start)); + if (cumulate) { + this.source.subscribe(new CumulateEnvelopeSubscriber(actual, alloc, size, start)); + } else { + this.source.subscribe(new DirectEnvelopeSubscriber(actual, alloc, size, start)); + } + } +} + +final class DirectEnvelopeSubscriber implements CoreSubscriber, Scannable, Subscription { + + private final CoreSubscriber actual; + + private final ByteBufAllocator alloc; + + private final int size; + + private boolean done; + + private Subscription s; + + private int envelopeId; + + DirectEnvelopeSubscriber(CoreSubscriber actual, ByteBufAllocator alloc, int size, + int start) { + this.actual = actual; + this.alloc = alloc; + this.size = size; + this.envelopeId = start; + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + this.actual.onSubscribe(this); + } + } + + @Override + public void onNext(ByteBuf buf) { + if (done) { + // Do not release the buffer, it should be handled by OperatorUtils.discardOnCancel() or Context. + Operators.onNextDropped(buf, actual.currentContext()); + return; + } + + try { + ByteBuf header = this.alloc.buffer(Envelopes.PART_HEADER_SIZE) + .writeMediumLE(buf.readableBytes()) + .writeByte(this.envelopeId++); + + this.actual.onNext(header); + this.actual.onNext(buf); + } catch (Throwable e) { + Throwable t = Operators.onNextError(buf, e, this.actual.currentContext(), this.s); + + if (t == null) { + s.request(1); + } else { + onError(t); + } + } + } + + @Override + public void onError(Throwable t) { + if (this.done) { + Operators.onErrorDropped(t, this.actual.currentContext()); + return; + } + + this.done = true; + this.actual.onError(t); + } + + @Override + public void onComplete() { + if (this.done) { + return; + } + + this.done = true; + this.actual.onComplete(); + } + + @Override + public void request(long n) { + this.s.request(n); + } + + @Override + public void cancel() { + this.s.cancel(); + } + + @Override + public Context currentContext() { + return this.actual.currentContext(); + } + + @Override + @SuppressWarnings("rawtypes") + public Object scanUnsafe(Attr key) { + if (key == Attr.PARENT) { + return this.s; + } else if (key == Attr.ACTUAL) { + return this.actual; + } else if (key == Attr.TERMINATED) { + return this.done; + } else { + return null; + } } } @@ -95,7 +210,7 @@ public void onNext(ByteBuf buf) { } if (!buf.isReadable()) { - // Ignore empty buffer, useless for MySQL protocol. + // Ignore empty buffer, useless for cumulated buffers. buf.release(); return; } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java index bd97b440c..9c719988d 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java @@ -18,12 +18,19 @@ import io.asyncer.r2dbc.mysql.message.FieldValue; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import reactor.core.publisher.Flux; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.List; +import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.require; +import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull; /** * An internal utility considers the use of safe release buffers (array or {@link List}). It uses standard @@ -31,6 +38,38 @@ */ public final class NettyBufferUtils { + /** + * Reads all bytes from a file asynchronously. + * + * @param path The path of the file want to be read. + * @param allocator The {@link ByteBufAllocator} used to allocate {@link ByteBuf}s. + * @param bufferSize The size of the buffer used to read the file. + * @return A {@link Flux} emits {@link ByteBuf}s read from the file. + */ + public static Flux readFile(Path path, ByteBufAllocator allocator, int bufferSize) { + requireNonNull(path, "path must not be null"); + requireNonNull(allocator, "allocator must not be null"); + require(bufferSize > 0, "bufferSize must be positive"); + + return Flux.create(sink -> { + ReadCompletionHandler handler; + + try { + // AsynchronousFileChannel can only be opened in blocking mode :( + @SuppressWarnings("BlockingMethodInNonBlockingContext") + AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ); + + handler = new ReadCompletionHandler(channel, allocator, bufferSize, sink); + } catch (Throwable e) { + sink.error(e); + return; + } + + sink.onCancel(handler::cancel); + sink.onRequest(handler::request); + }).doOnDiscard(ByteBuf.class, ReferenceCountUtil::safeRelease); + } + /** * Combine {@link ByteBuf}s through composite buffer. *

diff --git a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/OperatorUtils.java b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/OperatorUtils.java index 4306ff20f..21375c419 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/OperatorUtils.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/OperatorUtils.java @@ -55,13 +55,13 @@ public static Flux discardOnCancel(Flux source) { return new FluxDiscardOnCancel<>(source); } - public static Flux cumulateEnvelope(Flux source, ByteBufAllocator allocator, - int envelopeIdStart) { + public static Flux envelope(Flux source, ByteBufAllocator allocator, + int envelopeIdStart, boolean cumulate) { requireNonNull(source, "source must not be null"); requireNonNull(allocator, "allocator must not be null"); - return new FluxCumulateEnvelope(source, allocator, Envelopes.MAX_ENVELOPE_SIZE, - envelopeIdStart & 0xFF); + return new FluxEnvelope(source, allocator, Envelopes.MAX_ENVELOPE_SIZE, + envelopeIdStart & 0xFF, cumulate); } private OperatorUtils() { } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/ReadCompletionHandler.java b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/ReadCompletionHandler.java new file mode 100644 index 000000000..a75ca94a9 --- /dev/null +++ b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/ReadCompletionHandler.java @@ -0,0 +1,141 @@ +/* + * Copyright 2024 asyncer.io projects + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.asyncer.r2dbc.mysql.internal.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import reactor.core.publisher.FluxSink; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * TODO: add javadoc here + */ +final class ReadCompletionHandler implements CompletionHandler { + + private final AsynchronousFileChannel channel; + + private final ByteBufAllocator allocator; + + private final int bufferSize; + + private final FluxSink sink; + + private final AtomicLong position; + + private final AtomicReference state = new AtomicReference<>(State.IDLE); + + ReadCompletionHandler( + AsynchronousFileChannel channel, + ByteBufAllocator allocator, + int bufferSize, + FluxSink sink + ) { + this.channel = channel; + this.allocator = allocator; + this.bufferSize = bufferSize; + this.sink = sink; + this.position = new AtomicLong(0); + } + + public void request(long ignored) { + tryRead(); + } + + public void cancel() { + this.state.getAndSet(State.DISPOSED); + + // According java.nio.channels.AsynchronousChannel "if an I/O operation is outstanding + // on the channel and the channel's close method is invoked, then the I/O operation + // fails with the exception AsynchronousCloseException". That should invoke the failed + // callback below and the current ByteBuf should be released. + + tryCloseChannel(); + } + + private void tryRead() { + if (this.sink.requestedFromDownstream() > 0 && this.state.compareAndSet(State.IDLE, State.READING)) { + read(); + } + } + + private void read() { + ByteBuf buf = this.allocator.buffer(this.bufferSize); + ByteBuffer byteBuffer = buf.nioBuffer(buf.writerIndex(), buf.writableBytes()); + + this.channel.read(byteBuffer, this.position.get(), buf, this); + } + + @Override + public void completed(Integer read, ByteBuf buf) { + if (State.DISPOSED.equals(this.state.get())) { + buf.release(); + tryCloseChannel(); + return; + } + + if (read == -1) { + buf.release(); + tryCloseChannel(); + this.state.set(State.DISPOSED); + this.sink.complete(); + return; + } + + this.position.addAndGet(read); + buf.writerIndex(read); + this.sink.next(buf); + + // Stay in READING mode if there is demand + if (this.sink.requestedFromDownstream() > 0) { + read(); + return; + } + + // Release READING mode and then try again in case of concurrent "request" + if (this.state.compareAndSet(State.READING, State.IDLE)) { + tryRead(); + } + } + + @Override + public void failed(Throwable exc, ByteBuf buf) { + buf.release(); + + tryCloseChannel(); + this.state.set(State.DISPOSED); + this.sink.error(exc); + } + + private enum State { + IDLE, READING, DISPOSED + } + + void tryCloseChannel() { + if (channel.isOpen()) { + try { + channel.close(); + } catch (IOException ignored) { + } + } + } +} diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/AuthResponse.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/AuthResponse.java index 0837febc3..4c6d2b1de 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/client/AuthResponse.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/AuthResponse.java @@ -26,7 +26,7 @@ * A message that contains only an authentication, used by full authentication or change authentication * response. */ -public final class AuthResponse extends SizedClientMessage implements LoginClientMessage { +public final class AuthResponse extends SizedClientMessage implements SubsequenceClientMessage { private final int envelopeId; diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/HandshakeResponse.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/HandshakeResponse.java index a1b0de9bd..ca05f9831 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/client/HandshakeResponse.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/HandshakeResponse.java @@ -25,9 +25,9 @@ import static io.asyncer.r2dbc.mysql.constant.Envelopes.TERMINAL; /** - * An abstraction of {@link LoginClientMessage} considers handshake response. + * An abstraction of {@link SubsequenceClientMessage} considers handshake response. */ -public interface HandshakeResponse extends LoginClientMessage { +public interface HandshakeResponse extends SubsequenceClientMessage { /** * Construct an instance of {@link HandshakeResponse}, it is implemented by the protocol version that is diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/LocalInfileResponse.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/LocalInfileResponse.java new file mode 100644 index 000000000..048e8ebd6 --- /dev/null +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/LocalInfileResponse.java @@ -0,0 +1,103 @@ +/* + * Copyright 2024 asyncer.io projects + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.asyncer.r2dbc.mysql.message.client; + +import io.asyncer.r2dbc.mysql.ConnectionContext; +import io.asyncer.r2dbc.mysql.internal.util.NettyBufferUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import reactor.core.publisher.Flux; +import reactor.core.publisher.SynchronousSink; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.atomic.AtomicReference; + +import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull; + +/** + * A message considers as a chunk of a local in-file data. + */ +public final class LocalInfileResponse implements SubsequenceClientMessage { + + private final int envelopeId; + + private final String path; + + private final SynchronousSink errorSink; + + public LocalInfileResponse(int envelopeId, String path, SynchronousSink errorSink) { + requireNonNull(path, "path must not be null"); + + this.envelopeId = envelopeId; + this.path = path; + this.errorSink = errorSink; + } + + @Override + public Flux encode(ByteBufAllocator allocator, ConnectionContext context) { + return Flux.defer(() -> { + AtomicReference error = new AtomicReference<>(); + Path path = Paths.get(this.path); + + return NettyBufferUtils.readFile(path, allocator, context.getLocalInfileBufferSize()) + .onErrorComplete(e -> { + error.set(e); + return true; + }) + .concatWith(Flux.just(allocator.buffer(0, 0))) + .doAfterTerminate(() -> { + Throwable e = error.getAndSet(null); + + if (e != null) { + errorSink.error(e); + } + }); + }); + } + + @Override + public int getEnvelopeId() { + return envelopeId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LocalInfileResponse)) { + return false; + } + + LocalInfileResponse that = (LocalInfileResponse) o; + + return envelopeId == that.envelopeId && path.equals(that.path); + } + + @Override + public int hashCode() { + return 31 * envelopeId + path.hashCode(); + } + + @Override + public String toString() { + return "LocalInfileResponse{envelopeId=" + envelopeId + + ", path='" + path + "'}"; + } +} diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/SslRequest.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/SslRequest.java index c1e1356cf..16420a412 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/client/SslRequest.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/SslRequest.java @@ -23,7 +23,7 @@ /** * An abstraction of {@link ClientMessage} that considers SSL request for handshake. */ -public interface SslRequest extends LoginClientMessage { +public interface SslRequest extends SubsequenceClientMessage { /** * Get current {@link Capability} of the connection. diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/LoginClientMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/SubsequenceClientMessage.java similarity index 71% rename from src/main/java/io/asyncer/r2dbc/mysql/message/client/LoginClientMessage.java rename to src/main/java/io/asyncer/r2dbc/mysql/message/client/SubsequenceClientMessage.java index 3bbca7e0b..e9f78a5d0 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/client/LoginClientMessage.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/SubsequenceClientMessage.java @@ -17,12 +17,14 @@ package io.asyncer.r2dbc.mysql.message.client; /** - * An abstraction of {@link ClientMessage} considers login phase messages. + * An abstraction of {@link ClientMessage} considers as a subsequence of a request message. + *

+ * All encoded buffers will not be cumulated. */ -public interface LoginClientMessage extends ClientMessage { +public interface SubsequenceClientMessage extends ClientMessage { /** - * Get the current envelope ID used to serialize subsequent request messages. + * Gets the current envelope ID used to serialize subsequent request messages. * * @return the current envelope ID. */ diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/LocalInfileRequest.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/LocalInfileRequest.java new file mode 100644 index 000000000..e493e2bf8 --- /dev/null +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/LocalInfileRequest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2024 asyncer.io projects + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.asyncer.r2dbc.mysql.message.server; + +import io.asyncer.r2dbc.mysql.ConnectionContext; +import io.netty.buffer.ByteBuf; + +/** + * A message sent by the server to indicate that the client should send a file to the server using the + * {@code LOAD DATA LOCAL INFILE} command. + */ +public final class LocalInfileRequest implements ServerMessage { + + private final int envelopeId; + + private final String path; + + private LocalInfileRequest(int envelopeId, String path) { + this.envelopeId = envelopeId; + this.path = path; + } + + public int getEnvelopeId() { + return envelopeId; + } + + public String getPath() { + return path; + } + + static LocalInfileRequest decode(int envelopeId, ByteBuf buf, ConnectionContext context) { + buf.skipBytes(1); // Constant 0xFB + return new LocalInfileRequest(envelopeId, buf.toString(context.getClientCollation().getCharset())); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LocalInfileRequest)) { + return false; + } + + LocalInfileRequest that = (LocalInfileRequest) o; + + return envelopeId == that.envelopeId && path.equals(that.path); + } + + @Override + public int hashCode() { + return 31 * envelopeId + path.hashCode(); + } + + @Override + public String toString() { + return "LocalInfileRequest{envelopeId=" + envelopeId + + ", path='" + path + "'}"; + } +} diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java index 1f7408e7b..f81a06200 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java @@ -48,6 +48,11 @@ public final class ServerMessageDecoder { private static final short ERROR = 0xFF; + /** + * Note: it can be a column count message, so the packet size should be checked. + */ + private static final short LOCAL_INFILE = 0xFB; + private final List parts = new ArrayList<>(); /** @@ -99,7 +104,7 @@ private static ServerMessage decodeMessage(List buffers, int envelopeId try { if (decodeContext instanceof CommandDecodeContext) { - return decodeCommandMessage(combined, context); + return decodeCommandMessage(envelopeId, combined, context); } else if (decodeContext instanceof PreparedMetadataDecodeContext) { return decodePreparedMetadata(combined, context, (PreparedMetadataDecodeContext) decodeContext); @@ -189,7 +194,8 @@ private static ServerMessage decodePrepareQuery(ByteBuf buf) { " on prepare query phase"); } - private static ServerMessage decodeCommandMessage(ByteBuf buf, ConnectionContext context) { + private static ServerMessage decodeCommandMessage(int envelopeId, ByteBuf buf, + ConnectionContext context) { short header = buf.getUnsignedByte(buf.readerIndex()); switch (header) { case ERROR: @@ -213,6 +219,10 @@ private static ServerMessage decodeCommandMessage(ByteBuf buf, ConnectionContext } else if (EofMessage.isValidSize(byteSize)) { return EofMessage.decode(buf); } + case LOCAL_INFILE: + if (buf.readableBytes() > 1) { + return LocalInfileRequest.decode(envelopeId, buf, context); + } } if (VarIntUtils.checkNextVarInt(buf) == 0) { diff --git a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java index fcd5b03a7..6894d3346 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java @@ -16,15 +16,29 @@ package io.asyncer.r2dbc.mysql; +import io.r2dbc.spi.ColumnMetadata; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.testcontainers.shaded.com.fasterxml.jackson.core.JsonProcessingException; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.node.ArrayNode; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.node.ObjectNode; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; import java.time.Duration; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAccessor; import java.util.Arrays; import java.util.Collections; +import java.util.Objects; import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED; import static io.r2dbc.spi.IsolationLevel.READ_UNCOMMITTED; @@ -401,7 +415,70 @@ void beginTransactionShouldRespectQueuedMessages() { .flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class))) .doOnNext(count -> assertThat(count).isEqualTo(1L))) ); + } + @ParameterizedTest + @ValueSource(strings = { "stations", "users" }) + void loadDataLocalInfile(String name) throws URISyntaxException, IOException { + URL tdlUrl = Objects.requireNonNull(getClass().getResource(String.format("/local/%s.sql", name))); + URL csvUrl = Objects.requireNonNull(getClass().getResource(String.format("/local/%s.csv", name))); + URL jsonUrl = Objects.requireNonNull(getClass().getResource(String.format("/local/%s.json", name))); + String tdl = new String(Files.readAllBytes(Paths.get(tdlUrl.toURI())), StandardCharsets.UTF_8); + String path = Paths.get(csvUrl.toURI()).toString(); + String loadData = String.format("LOAD DATA LOCAL INFILE '%s' INTO TABLE `%s` " + + "FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"'", path, name); + String select = String.format("SELECT * FROM `%s` ORDER BY `id`", name); + ObjectMapper mapper = new ObjectMapper(); + ArrayNode arrayNode = (ArrayNode) mapper.readTree(jsonUrl); + String json = mapper.writeValueAsString(arrayNode); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + complete(conn -> conn.createStatement(tdl) + .execute() + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .thenMany(conn.createStatement(loadData).execute()) + .flatMap(IntegrationTestSupport::extractRowsUpdated) + .doOnNext(it -> assertThat(it).isEqualTo(arrayNode.size())) + .thenMany(conn.createStatement(select).execute()) + .flatMap(result -> result.map((row, metadata) -> { + ObjectNode node = mapper.createObjectNode(); + + for (ColumnMetadata column : metadata.getColumnMetadatas()) { + String columnName = column.getName(); + Object value = row.get(columnName); + + if (value instanceof TemporalAccessor) { + node.set(columnName, node.textNode(formatter.format((TemporalAccessor) value))); + } else if (value instanceof Long) { + node.set(columnName, node.numberNode(((Long) value))); + } else if (value instanceof Integer) { + node.set(columnName, node.numberNode(((Integer) value))); + } else if (value instanceof String) { + node.set(columnName, node.textNode(((String) value))); + } else if (value == null) { + node.set(columnName, node.nullNode()); + } else { + throw new IllegalArgumentException("Unsupported type: " + value.getClass()); + } + } + + return node; + })) + .collectList() + .handle((list, sink) -> { + ArrayNode array = mapper.createArrayNode(); + + for (ObjectNode node : list) { + array.add(node); + } + + try { + sink.next(mapper.writeValueAsString(array)); + } catch (JsonProcessingException e) { + sink.error(e); + } + }) + .doOnNext(it -> assertThat(it).isEqualTo(json))); } @Test diff --git a/src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelopeTest.java b/src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelopeTest.java similarity index 98% rename from src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelopeTest.java rename to src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelopeTest.java index a9fa64817..02e076a03 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxCumulateEnvelopeTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/internal/util/FluxEnvelopeTest.java @@ -36,9 +36,9 @@ import static org.assertj.core.api.Assertions.assertThat; /** - * Unit tests for {@link FluxCumulateEnvelope}. + * Unit tests for {@link FluxEnvelope}. */ -class FluxCumulateEnvelopeTest { +class FluxEnvelopeTest { private static final byte[] RD_PATTERN = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz" .getBytes(StandardCharsets.US_ASCII); @@ -262,7 +262,7 @@ void mergeIntegralWithLargeCrossIntegral() { } private Flux envelopes(Flux source, int envelopeSize) { - return new FluxCumulateEnvelope(source, allocator, envelopeSize, 0); + return new FluxEnvelope(source, allocator, envelopeSize, 0, true); } private Consumer> assertBuffers(String origin, int envelopeSize, int lastSize, diff --git a/src/test/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtilsTest.java b/src/test/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtilsTest.java new file mode 100644 index 000000000..a5f06f35c --- /dev/null +++ b/src/test/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtilsTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2024 asyncer.io projects + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.asyncer.r2dbc.mysql.internal.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; + +/** + * Unit tests for {@link NettyBufferUtils}. + */ +class NettyBufferUtilsTest { + + @ParameterizedTest + @ValueSource(strings = { "stations.csv", "users.csv" }) + void readFile(String name) throws IOException, URISyntaxException { + URL url = Objects.requireNonNull(getClass().getResource("/local/" + name)); + Path path = Paths.get(Paths.get(url.toURI()).toString()); + String content = new String(Files.readAllBytes(path), StandardCharsets.UTF_8); + + NettyBufferUtils.readFile(path, PooledByteBufAllocator.DEFAULT, 8192) + .map(NettyBufferUtilsTest::toStringAndRelease) + .as(it -> StepVerifier.create(it, Long.MAX_VALUE)) + .expectNext(content) + .verifyComplete(); + } + + private static String toStringAndRelease(ByteBuf buf) { + String s = buf.toString(StandardCharsets.UTF_8); + buf.release(); + return s; + } +} diff --git a/src/test/resources/local/stations.csv b/src/test/resources/local/stations.csv new file mode 100644 index 000000000..750030b95 --- /dev/null +++ b/src/test/resources/local/stations.csv @@ -0,0 +1,6 @@ +10000,Atlantic,2024-01-01 00:00:00 +10001,Baychester,2024-01-02 00:00:00 +10002,Cortelyou,2024-01-05 00:00:00 +10003,Dyckman,2024-01-08 00:00:00 +10004,Elmhurst,2024-01-06 00:00:00 +10005,Fordham,2024-01-03 00:00:00 diff --git a/src/test/resources/local/stations.json b/src/test/resources/local/stations.json new file mode 100644 index 000000000..92d19f2d6 --- /dev/null +++ b/src/test/resources/local/stations.json @@ -0,0 +1,32 @@ +[ + { + "id": 10000, + "name": "Atlantic", + "created_at": "2024-01-01 00:00:00" + }, + { + "id": 10001, + "name": "Baychester", + "created_at": "2024-01-02 00:00:00" + }, + { + "id": 10002, + "name": "Cortelyou", + "created_at": "2024-01-05 00:00:00" + }, + { + "id": 10003, + "name": "Dyckman", + "created_at": "2024-01-08 00:00:00" + }, + { + "id": 10004, + "name": "Elmhurst", + "created_at": "2024-01-06 00:00:00" + }, + { + "id": 10005, + "name": "Fordham", + "created_at": "2024-01-03 00:00:00" + } +] diff --git a/src/test/resources/local/stations.sql b/src/test/resources/local/stations.sql new file mode 100644 index 000000000..7ef113a93 --- /dev/null +++ b/src/test/resources/local/stations.sql @@ -0,0 +1,6 @@ +CREATE TEMPORARY TABLE `stations` +( + `id` INT NOT NULL PRIMARY KEY, + `name` VARCHAR(120) NOT NULL, + `created_at` DATETIME NOT NULL +) diff --git a/src/test/resources/local/users.csv b/src/test/resources/local/users.csv new file mode 100644 index 000000000..c63c4a5c3 --- /dev/null +++ b/src/test/resources/local/users.csv @@ -0,0 +1,5 @@ +mirromutth,Mirro Mutth,3000, +superman,Superman,\N,I'm a superhero +earth,earth,4543000000,"I'm a planet. +I have ""Humans"" on me" +goku,Goku,40,"I'm a saiyan, I'm hungry" diff --git a/src/test/resources/local/users.json b/src/test/resources/local/users.json new file mode 100644 index 000000000..33facc0f6 --- /dev/null +++ b/src/test/resources/local/users.json @@ -0,0 +1,26 @@ +[ + { + "id": "earth", + "name": "earth", + "age": 4543000000, + "bio": "I'm a planet.\nI have \"Humans\" on me" + }, + { + "id": "goku", + "name": "Goku", + "age": 40, + "bio": "I'm a saiyan, I'm hungry" + }, + { + "id": "mirromutth", + "name": "Mirro Mutth", + "age": 3000, + "bio": "" + }, + { + "id": "superman", + "name": "Superman", + "age": null, + "bio": "I'm a superhero" + } +] diff --git a/src/test/resources/local/users.sql b/src/test/resources/local/users.sql new file mode 100644 index 000000000..4afcfd30b --- /dev/null +++ b/src/test/resources/local/users.sql @@ -0,0 +1,7 @@ +CREATE TEMPORARY TABLE `users` +( + `id` VARCHAR(120) NOT NULL PRIMARY KEY, + `name` VARCHAR(120) NOT NULL, + `age` BIGINT NULL, + `bio` TEXT NULL +)