diff --git a/.github/workflows/ci-integration-tests.yml b/.github/workflows/ci-integration-tests.yml index b697ff955..f3bb20bf1 100644 --- a/.github/workflows/ci-integration-tests.yml +++ b/.github/workflows/ci-integration-tests.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - mysql-version: [ 5.5, 5.6, 5.7, 8.0 ] + mysql-version: [ 5.5, 5.6, 5.7, 8.0, 8.1, 8.2 ] name: Integration test with MySQL ${{ matrix.mysql-version }} steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/ci-unit-tests.yml b/.github/workflows/ci-unit-tests.yml index b0571a8ff..dd0f87d0e 100644 --- a/.github/workflows/ci-unit-tests.yml +++ b/.github/workflows/ci-unit-tests.yml @@ -9,14 +9,14 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - java-version: [ 8, 11, 17 ] + java-version: [ 8, 11, 17 , 21] name: linux-java-${{ matrix.java-version }} steps: - uses: actions/checkout@v3 - name: Set up Java ${{ matrix.java-version }} uses: actions/setup-java@v3 with: - distribution: temurin + distribution: zulu java-version: ${{ matrix.java-version }} cache: maven - name: Unit test with Maven diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties index 29a507889..110dad7d8 100644 --- a/.mvn/wrapper/maven-wrapper.properties +++ b/.mvn/wrapper/maven-wrapper.properties @@ -14,5 +14,5 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.1/apache-maven-3.9.1-bin.zip -wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.1/maven-wrapper-3.1.1.jar +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.3/apache-maven-3.9.3-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar \ No newline at end of file diff --git a/README.md b/README.md index 142c1d136..bc13b2b41 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ Refer to the table below to determine the appropriate version of r2dbc-mysql for | spring-boot-starter-data-r2dbc | spring-data-r2dbc | r2dbc-spi | r2dbc-mysql(recommended) | |--------------------------------|-------------------|---------------|------------------------------| -| 3.0.* | 3.0.* | 1.0.0.RELEASE | io.asyncer:r2dbc-mysql:1.0.1 | -| 2.7.* | 1.5.* | 0.9.1.RELEASE | io.asyncer:r2dbc-mysql:0.9.2 | +| 3.0.* and above | 3.0.* and above | 1.0.0.RELEASE | io.asyncer:r2dbc-mysql:1.0.4 | +| 2.7.* | 1.5.* | 0.9.1.RELEASE | io.asyncer:r2dbc-mysql:0.9.5 | | 2.6.* and below | 1.4.* and below | 0.8.6.RELEASE | dev.miku:r2dbc-mysql:0.8.2 | ## Supported Features @@ -41,6 +41,10 @@ This project is currently being maintained by [@jchrys](https://github.com/jchry ![MySQL 5.6 status](https://img.shields.io/badge/MySQL%205.6-pass-blue) ![MySQL 5.7 status](https://img.shields.io/badge/MySQL%205.7-pass-blue) ![MySQL 8.0 status](https://img.shields.io/badge/MySQL%208.0-pass-blue) +![MySQL 8.1 status](https://img.shields.io/badge/MySQL%208.1-pass-blue) +![MySQL 8.2 status](https://img.shields.io/badge/MySQL%208.2-pass-blue) + + In fact, it supports lower versions, in the theory, such as 4.1, 4.0, etc. @@ -53,7 +57,7 @@ However, Docker-certified images do not have these versions lower than 5.5.0, so io.asyncer r2dbc-mysql - 1.0.1 + 1.0.4 ``` @@ -63,7 +67,7 @@ However, Docker-certified images do not have these versions lower than 5.5.0, so ```groovy dependencies { - implementation 'io.asyncer:r2dbc-mysql:1.0.1' + implementation 'io.asyncer:r2dbc-mysql:1.0.4' } ``` @@ -72,7 +76,7 @@ dependencies { ```kotlin dependencies { // Maybe should to use `compile` instead of `implementation` on the lower version of Gradle. - implementation("io.asyncer:r2dbc-mysql:1.0.1") + implementation("io.asyncer:r2dbc-mysql:1.0.4") } ``` @@ -121,6 +125,7 @@ ConnectionFactoryOptions options = ConnectionFactoryOptions.builder() .option(PORT, 3306) // optional, default 3306 .option(PASSWORD, "database-password-in-here") // optional, default null, null means has no password .option(DATABASE, "r2dbc") // optional, default null, null means not specifying the database + .option(Option.valueOf("createDatabaseIfNotExist"), true) // optional, default false, create database if not exist (since 1.0.6 / 0.9.7) .option(CONNECT_TIMEOUT, Duration.ofSeconds(3)) // optional, default null, null means no timeout .option(Option.valueOf("socketTimeout"), Duration.ofSeconds(4)) // deprecated since 1.0.1, because it has no effect and serves no purpose. .option(SSL, true) // optional, default sslMode is "preferred", it will be ignore if sslMode is set @@ -137,6 +142,7 @@ ConnectionFactoryOptions options = ConnectionFactoryOptions.builder() .option(Option.valueOf("tcpKeepAlive"), true) // optional, default false .option(Option.valueOf("tcpNoDelay"), true) // optional, default false .option(Option.valueOf("autodetectExtensions"), false) // optional, default false + .option(Option.valueOf("passwordPublisher"), Mono.just("password")) // optional, default null, null means has no passwordPublisher (since 1.0.5 / 0.9.6) .build(); ConnectionFactory connectionFactory = ConnectionFactories.get(options); @@ -167,9 +173,10 @@ MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builde .port(3306) // optional, default 3306 .password("database-password-in-here") // optional, default null, null means has no password .database("r2dbc") // optional, default null, null means not specifying the database + .createDatabaseIfNotExist(true) // optional, default false, create database if not exist (since 1.0.6 / 0.9.7) .serverZoneId(ZoneId.of("Continent/City")) // optional, default null, null means query server time zone when connection init .connectTimeout(Duration.ofSeconds(3)) // optional, default null, null means no timeout - .socketTimeout(Duration.ofSeconds(4)) // optional, default null, null means no timeout + .socketTimeout(Duration.ofSeconds(4)) // deprecated since 1.0.1, because it has no effect and serves no purpose. .sslMode(SslMode.VERIFY_IDENTITY) // optional, default SslMode.PREFERRED .sslCa("/path/to/mysql/ca.pem") // required when sslMode is VERIFY_CA or VERIFY_IDENTITY, default null, null means has no server CA cert .sslCert("/path/to/mysql/client-cert.pem") // optional, default has no client SSL certificate @@ -184,6 +191,7 @@ MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builde .tcpNoDelay(true) // optional, controls TCP No Delay, default is false .autodetectExtensions(false) // optional, controls extension auto-detect, default is true .extendWith(MyExtension.INSTANCE) // optional, manual extend an extension into extensions, default using auto-detect + .passwordPublisher(Mono.just("password")) // optional, default null, null means has no password publisher (since 1.0.5 / 0.9.6) .build(); ConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration); @@ -215,8 +223,9 @@ Mono connectionMono = Mono.from(connectionFactory.create()); | user | A valid MySQL username and not be empty | Required | Who wants to connect to the MySQL database | | password | Any printable string | Optional, default no password | The password of the MySQL database user | | database | A valid MySQL database name | Optional, default does not initialize database | Database used by the MySQL connection | +| createDatabaseIfNotExist | `true` or `false` | Optional, default `false` | Create database if not exist | | connectTimeout | A `Duration` which must be positive duration | Optional, default has no timeout | TCP connect timeout | -| socketTimeout | A `Duration` which must be positive duration | Optional, default has no timeout | TCP socket timeout | +| socketTimeout | A `Duration` which must be positive duration | Deprecated since 1.0.1 | TCP socket timeout | | serverZoneId | An id of `ZoneId` | Optional, default query time zone when connection init | Server time zone id | | tcpKeepAlive | `true` or `false` | Optional, default disabled | Controls TCP KeepAlive | | tcpNoDelay | `true` or `false` | Optional, default disabled | Controls TCP NoDelay | @@ -231,6 +240,7 @@ Mono connectionMono = Mono.from(connectionFactory.create()); | zeroDateOption | Any value of `ZeroDateOption` | Optional, default `USE_NULL` | The option indicates "zero date" handling, see following notice | | autodetectExtensions | `true` or `false` | Optional, default is `true` | Controls auto-detect `Extension`s | | useServerPrepareStatement | `true`, `false` or `Predicate` | Optional, default is `false` | See following notice | +| passwordPublisher | A `Publisher` | Optional, default is `null` | The password publisher, see following notice | - `SslMode` Considers security level and verification for SSL, make sure the database server supports SSL before you want change SSL mode to `REQUIRED` or higher. **The Unix Domain Socket only offers "DISABLED" available** - `DISABLED` I don't care about security and don't want to pay the overhead for encryption @@ -256,6 +266,7 @@ Mono connectionMono = Mono.from(connectionFactory.create()); - It is **NOT** RECOMMENDED, enable the `autodetectExtensions` is the best way for extensions - The `Extensions` will not remove duplicates, make sure it would be not extended twice or more - The auto-detected `Extension`s will not affect manual extends and will not remove duplicates +- `passwordPublisher` Every time the client attempts to authenticate, it will use the password provided by the `passwordPublisher`.(Since `1.0.5` / `0.9.6`) e.g., You can employ this method for IAM-based authentication when connecting to an AWS Aurora RDS database. Should use `enum` in [Programmatic](#programmatic-configuration) configuration that not like discovery configurations, except `TlsVersions` (All elements of `TlsVersions` will be always `String` which is case-sensitive). diff --git a/mvnw b/mvnw index a16b5431b..66df28542 100755 --- a/mvnw +++ b/mvnw @@ -19,7 +19,7 @@ # ---------------------------------------------------------------------------- # ---------------------------------------------------------------------------- -# Maven Start Up Batch script +# Apache Maven Wrapper startup batch script, version 3.2.0 # # Required ENV vars: # ------------------ @@ -27,7 +27,6 @@ # # Optional ENV vars # ----------------- -# M2_HOME - location of maven2's installed home dir # MAVEN_OPTS - parameters passed to the Java VM when running Maven # e.g. to debug Maven itself, use # set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 @@ -36,6 +35,10 @@ if [ -z "$MAVEN_SKIP_RC" ] ; then + if [ -f /usr/local/etc/mavenrc ] ; then + . /usr/local/etc/mavenrc + fi + if [ -f /etc/mavenrc ] ; then . /etc/mavenrc fi @@ -50,7 +53,7 @@ fi cygwin=false; darwin=false; mingw=false -case "`uname`" in +case "$(uname)" in CYGWIN*) cygwin=true ;; MINGW*) mingw=true;; Darwin*) darwin=true @@ -58,9 +61,9 @@ case "`uname`" in # See https://developer.apple.com/library/mac/qa/qa1170/_index.html if [ -z "$JAVA_HOME" ]; then if [ -x "/usr/libexec/java_home" ]; then - export JAVA_HOME="`/usr/libexec/java_home`" + JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME else - export JAVA_HOME="/Library/Java/Home" + JAVA_HOME="/Library/Java/Home"; export JAVA_HOME fi fi ;; @@ -68,68 +71,38 @@ esac if [ -z "$JAVA_HOME" ] ; then if [ -r /etc/gentoo-release ] ; then - JAVA_HOME=`java-config --jre-home` + JAVA_HOME=$(java-config --jre-home) fi fi -if [ -z "$M2_HOME" ] ; then - ## resolve links - $0 may be a link to maven's home - PRG="$0" - - # need this for relative symlinks - while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG="`dirname "$PRG"`/$link" - fi - done - - saveddir=`pwd` - - M2_HOME=`dirname "$PRG"`/.. - - # make it fully qualified - M2_HOME=`cd "$M2_HOME" && pwd` - - cd "$saveddir" - # echo Using m2 at $M2_HOME -fi - # For Cygwin, ensure paths are in UNIX format before anything is touched if $cygwin ; then - [ -n "$M2_HOME" ] && - M2_HOME=`cygpath --unix "$M2_HOME"` [ -n "$JAVA_HOME" ] && - JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + JAVA_HOME=$(cygpath --unix "$JAVA_HOME") [ -n "$CLASSPATH" ] && - CLASSPATH=`cygpath --path --unix "$CLASSPATH"` + CLASSPATH=$(cygpath --path --unix "$CLASSPATH") fi # For Mingw, ensure paths are in UNIX format before anything is touched if $mingw ; then - [ -n "$M2_HOME" ] && - M2_HOME="`(cd "$M2_HOME"; pwd)`" - [ -n "$JAVA_HOME" ] && - JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] && + JAVA_HOME="$(cd "$JAVA_HOME" || (echo "cannot cd into $JAVA_HOME."; exit 1); pwd)" fi if [ -z "$JAVA_HOME" ]; then - javaExecutable="`which javac`" - if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + javaExecutable="$(which javac)" + if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ ]*\)')" = "no" ]; then # readlink(1) is not available as standard on Solaris 10. - readLink=`which readlink` - if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + readLink=$(which readlink) + if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then if $darwin ; then - javaHome="`dirname \"$javaExecutable\"`" - javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + javaHome="$(dirname "\"$javaExecutable\"")" + javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac" else - javaExecutable="`readlink -f \"$javaExecutable\"`" + javaExecutable="$(readlink -f "\"$javaExecutable\"")" fi - javaHome="`dirname \"$javaExecutable\"`" - javaHome=`expr "$javaHome" : '\(.*\)/bin'` + javaHome="$(dirname "\"$javaExecutable\"")" + javaHome=$(expr "$javaHome" : '\(.*\)/bin') JAVA_HOME="$javaHome" export JAVA_HOME fi @@ -145,7 +118,7 @@ if [ -z "$JAVACMD" ] ; then JAVACMD="$JAVA_HOME/bin/java" fi else - JAVACMD="`which java`" + JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)" fi fi @@ -159,12 +132,9 @@ if [ -z "$JAVA_HOME" ] ; then echo "Warning: JAVA_HOME environment variable is not set." fi -CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher - # traverses directory structure from process work directory to filesystem root # first directory with .mvn subdirectory is considered project base directory find_maven_basedir() { - if [ -z "$1" ] then echo "Path not specified to find_maven_basedir" @@ -180,96 +150,99 @@ find_maven_basedir() { fi # workaround for JBEAP-8937 (on Solaris 10/Sparc) if [ -d "${wdir}" ]; then - wdir=`cd "$wdir/.."; pwd` + wdir=$(cd "$wdir/.." || exit 1; pwd) fi # end of workaround done - echo "${basedir}" + printf '%s' "$(cd "$basedir" || exit 1; pwd)" } # concatenates all lines of a file concat_lines() { if [ -f "$1" ]; then - echo "$(tr -s '\n' ' ' < "$1")" + # Remove \r in case we run on Windows within Git Bash + # and check out the repository with auto CRLF management + # enabled. Otherwise, we may read lines that are delimited with + # \r\n and produce $'-Xarg\r' rather than -Xarg due to word + # splitting rules. + tr -s '\r\n' ' ' < "$1" + fi +} + +log() { + if [ "$MVNW_VERBOSE" = true ]; then + printf '%s\n' "$1" fi } -BASE_DIR=`find_maven_basedir "$(pwd)"` +BASE_DIR=$(find_maven_basedir "$(dirname "$0")") if [ -z "$BASE_DIR" ]; then exit 1; fi +MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}; export MAVEN_PROJECTBASEDIR +log "$MAVEN_PROJECTBASEDIR" + ########################################################################################## # Extension to allow automatically downloading the maven-wrapper.jar from Maven-central # This allows using the maven wrapper in projects that prohibit checking in binary data. ########################################################################################## -if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then - if [ "$MVNW_VERBOSE" = true ]; then - echo "Found .mvn/wrapper/maven-wrapper.jar" - fi +wrapperJarPath="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" +if [ -r "$wrapperJarPath" ]; then + log "Found $wrapperJarPath" else - if [ "$MVNW_VERBOSE" = true ]; then - echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." - fi + log "Couldn't find $wrapperJarPath, downloading it ..." + if [ -n "$MVNW_REPOURL" ]; then - jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + wrapperUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" else - jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + wrapperUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" fi - while IFS="=" read key value; do - case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + while IFS="=" read -r key value; do + # Remove '\r' from value to allow usage on windows as IFS does not consider '\r' as a separator ( considers space, tab, new line ('\n'), and custom '=' ) + safeValue=$(echo "$value" | tr -d '\r') + case "$key" in (wrapperUrl) wrapperUrl="$safeValue"; break ;; esac - done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" - if [ "$MVNW_VERBOSE" = true ]; then - echo "Downloading from: $jarUrl" - fi - wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties" + log "Downloading from: $wrapperUrl" + if $cygwin; then - wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + wrapperJarPath=$(cygpath --path --windows "$wrapperJarPath") fi if command -v wget > /dev/null; then - if [ "$MVNW_VERBOSE" = true ]; then - echo "Found wget ... using wget" - fi + log "Found wget ... using wget" + [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--quiet" if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then - wget "$jarUrl" -O "$wrapperJarPath" + wget $QUIET "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" else - wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" + wget $QUIET --http-user="$MVNW_USERNAME" --http-password="$MVNW_PASSWORD" "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" fi elif command -v curl > /dev/null; then - if [ "$MVNW_VERBOSE" = true ]; then - echo "Found curl ... using curl" - fi + log "Found curl ... using curl" + [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--silent" if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then - curl -o "$wrapperJarPath" "$jarUrl" -f + curl $QUIET -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath" else - curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + curl $QUIET --user "$MVNW_USERNAME:$MVNW_PASSWORD" -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath" fi - else - if [ "$MVNW_VERBOSE" = true ]; then - echo "Falling back to using Java to download" - fi - javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + log "Falling back to using Java to download" + javaSource="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.java" + javaClass="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.class" # For Cygwin, switch paths to Windows format before running javac if $cygwin; then - javaClass=`cygpath --path --windows "$javaClass"` + javaSource=$(cygpath --path --windows "$javaSource") + javaClass=$(cygpath --path --windows "$javaClass") fi - if [ -e "$javaClass" ]; then - if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then - if [ "$MVNW_VERBOSE" = true ]; then - echo " - Compiling MavenWrapperDownloader.java ..." - fi - # Compiling the Java class - ("$JAVA_HOME/bin/javac" "$javaClass") + if [ -e "$javaSource" ]; then + if [ ! -e "$javaClass" ]; then + log " - Compiling MavenWrapperDownloader.java ..." + ("$JAVA_HOME/bin/javac" "$javaSource") fi - if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then - # Running the downloader - if [ "$MVNW_VERBOSE" = true ]; then - echo " - Running MavenWrapperDownloader.java ..." - fi - ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + if [ -e "$javaClass" ]; then + log " - Running MavenWrapperDownloader.java ..." + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$wrapperUrl" "$wrapperJarPath") || rm -f "$wrapperJarPath" fi fi fi @@ -278,33 +251,58 @@ fi # End of extension ########################################################################################## -export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} -if [ "$MVNW_VERBOSE" = true ]; then - echo $MAVEN_PROJECTBASEDIR +# If specified, validate the SHA-256 sum of the Maven wrapper jar file +wrapperSha256Sum="" +while IFS="=" read -r key value; do + case "$key" in (wrapperSha256Sum) wrapperSha256Sum=$value; break ;; + esac +done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties" +if [ -n "$wrapperSha256Sum" ]; then + wrapperSha256Result=false + if command -v sha256sum > /dev/null; then + if echo "$wrapperSha256Sum $wrapperJarPath" | sha256sum -c > /dev/null 2>&1; then + wrapperSha256Result=true + fi + elif command -v shasum > /dev/null; then + if echo "$wrapperSha256Sum $wrapperJarPath" | shasum -a 256 -c > /dev/null 2>&1; then + wrapperSha256Result=true + fi + else + echo "Checksum validation was requested but neither 'sha256sum' or 'shasum' are available." + echo "Please install either command, or disable validation by removing 'wrapperSha256Sum' from your maven-wrapper.properties." + exit 1 + fi + if [ $wrapperSha256Result = false ]; then + echo "Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised." >&2 + echo "Investigate or delete $wrapperJarPath to attempt a clean download." >&2 + echo "If you updated your Maven version, you need to update the specified wrapperSha256Sum property." >&2 + exit 1 + fi fi + MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" # For Cygwin, switch paths to Windows format before running java if $cygwin; then - [ -n "$M2_HOME" ] && - M2_HOME=`cygpath --path --windows "$M2_HOME"` [ -n "$JAVA_HOME" ] && - JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + JAVA_HOME=$(cygpath --path --windows "$JAVA_HOME") [ -n "$CLASSPATH" ] && - CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + CLASSPATH=$(cygpath --path --windows "$CLASSPATH") [ -n "$MAVEN_PROJECTBASEDIR" ] && - MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` + MAVEN_PROJECTBASEDIR=$(cygpath --path --windows "$MAVEN_PROJECTBASEDIR") fi # Provide a "standardized" way to retrieve the CLI args that will # work with both Windows and non-Windows executions. -MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $*" export MAVEN_CMD_LINE_ARGS WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain +# shellcheck disable=SC2086 # safe args exec "$JAVACMD" \ $MAVEN_OPTS \ + $MAVEN_DEBUG_OPTS \ -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ - "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/mvnw.cmd b/mvnw.cmd index c8d43372c..93880f5d9 100644 --- a/mvnw.cmd +++ b/mvnw.cmd @@ -1,182 +1,205 @@ -@REM ---------------------------------------------------------------------------- -@REM Licensed to the Apache Software Foundation (ASF) under one -@REM or more contributor license agreements. See the NOTICE file -@REM distributed with this work for additional information -@REM regarding copyright ownership. The ASF licenses this file -@REM to you under the Apache License, Version 2.0 (the -@REM "License"); you may not use this file except in compliance -@REM with the License. You may obtain a copy of the License at -@REM -@REM https://www.apache.org/licenses/LICENSE-2.0 -@REM -@REM Unless required by applicable law or agreed to in writing, -@REM software distributed under the License is distributed on an -@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -@REM KIND, either express or implied. See the License for the -@REM specific language governing permissions and limitations -@REM under the License. -@REM ---------------------------------------------------------------------------- - -@REM ---------------------------------------------------------------------------- -@REM Maven Start Up Batch script -@REM -@REM Required ENV vars: -@REM JAVA_HOME - location of a JDK home dir -@REM -@REM Optional ENV vars -@REM M2_HOME - location of maven2's installed home dir -@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands -@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending -@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven -@REM e.g. to debug Maven itself, use -@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 -@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files -@REM ---------------------------------------------------------------------------- - -@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' -@echo off -@REM set title of command window -title %0 -@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' -@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% - -@REM set %HOME% to equivalent of $HOME -if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") - -@REM Execute a user defined script before this one -if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre -@REM check for pre script, once with legacy .bat ending and once with .cmd ending -if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" -if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" -:skipRcPre - -@setlocal - -set ERROR_CODE=0 - -@REM To isolate internal variables from possible post scripts, we use another setlocal -@setlocal - -@REM ==== START VALIDATION ==== -if not "%JAVA_HOME%" == "" goto OkJHome - -echo. -echo Error: JAVA_HOME not found in your environment. >&2 -echo Please set the JAVA_HOME variable in your environment to match the >&2 -echo location of your Java installation. >&2 -echo. -goto error - -:OkJHome -if exist "%JAVA_HOME%\bin\java.exe" goto init - -echo. -echo Error: JAVA_HOME is set to an invalid directory. >&2 -echo JAVA_HOME = "%JAVA_HOME%" >&2 -echo Please set the JAVA_HOME variable in your environment to match the >&2 -echo location of your Java installation. >&2 -echo. -goto error - -@REM ==== END VALIDATION ==== - -:init - -@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". -@REM Fallback to current working directory if not found. - -set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% -IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir - -set EXEC_DIR=%CD% -set WDIR=%EXEC_DIR% -:findBaseDir -IF EXIST "%WDIR%"\.mvn goto baseDirFound -cd .. -IF "%WDIR%"=="%CD%" goto baseDirNotFound -set WDIR=%CD% -goto findBaseDir - -:baseDirFound -set MAVEN_PROJECTBASEDIR=%WDIR% -cd "%EXEC_DIR%" -goto endDetectBaseDir - -:baseDirNotFound -set MAVEN_PROJECTBASEDIR=%EXEC_DIR% -cd "%EXEC_DIR%" - -:endDetectBaseDir - -IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig - -@setlocal EnableExtensions EnableDelayedExpansion -for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a -@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% - -:endReadAdditionalConfig - -SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" -set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" -set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain - -set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" - -FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( - IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B -) - -@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central -@REM This allows using the maven wrapper in projects that prohibit checking in binary data. -if exist %WRAPPER_JAR% ( - if "%MVNW_VERBOSE%" == "true" ( - echo Found %WRAPPER_JAR% - ) -) else ( - if not "%MVNW_REPOURL%" == "" ( - SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" - ) - if "%MVNW_VERBOSE%" == "true" ( - echo Couldn't find %WRAPPER_JAR%, downloading it ... - echo Downloading from: %DOWNLOAD_URL% - ) - - powershell -Command "&{"^ - "$webclient = new-object System.Net.WebClient;"^ - "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ - "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ - "}"^ - "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ - "}" - if "%MVNW_VERBOSE%" == "true" ( - echo Finished downloading %WRAPPER_JAR% - ) -) -@REM End of extension - -@REM Provide a "standardized" way to retrieve the CLI args that will -@REM work with both Windows and non-Windows executions. -set MAVEN_CMD_LINE_ARGS=%* - -%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* -if ERRORLEVEL 1 goto error -goto end - -:error -set ERROR_CODE=1 - -:end -@endlocal & set ERROR_CODE=%ERROR_CODE% - -if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost -@REM check for post script, once with legacy .bat ending and once with .cmd ending -if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" -if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" -:skipRcPost - -@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' -if "%MAVEN_BATCH_PAUSE%" == "on" pause - -if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% - -exit /B %ERROR_CODE% +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM https://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Apache Maven Wrapper startup batch script, version 3.2.0 +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %* +if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %* +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set WRAPPER_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" + +FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET WRAPPER_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET WRAPPER_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %WRAPPER_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%WRAPPER_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM If specified, validate the SHA-256 sum of the Maven wrapper jar file +SET WRAPPER_SHA_256_SUM="" +FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperSha256Sum" SET WRAPPER_SHA_256_SUM=%%B +) +IF NOT %WRAPPER_SHA_256_SUM%=="" ( + powershell -Command "&{"^ + "$hash = (Get-FileHash \"%WRAPPER_JAR%\" -Algorithm SHA256).Hash.ToLower();"^ + "If('%WRAPPER_SHA_256_SUM%' -ne $hash){"^ + " Write-Output 'Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised.';"^ + " Write-Output 'Investigate or delete %WRAPPER_JAR% to attempt a clean download.';"^ + " Write-Output 'If you updated your Maven version, you need to update the specified wrapperSha256Sum property.';"^ + " exit 1;"^ + "}"^ + "}" + if ERRORLEVEL 1 goto error +) + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% ^ + %JVM_CONFIG_MAVEN_PROPS% ^ + %MAVEN_OPTS% ^ + %MAVEN_DEBUG_OPTS% ^ + -classpath %WRAPPER_JAR% ^ + "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^ + %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat" +if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%"=="on" pause + +if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE% + +cmd /C exit /B %ERROR_CODE% diff --git a/pom.xml b/pom.xml index d2d532bc4..d8262d16f 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.asyncer r2dbc-mysql - 1.0.2-SNAPSHOT + 1.0.6-SNAPSHOT jar Reactive Relational Database Connectivity - MySQL @@ -65,20 +65,20 @@ false 1.0.0.RELEASE - 2022.0.0 - 3.21.0 - 1.36 - 5.9.2 - 1.3.6 - 4.0.0 - 8.0.32 - 1.17.6 + 2022.0.9 + 3.24.2 + 1.37 + 5.10.1 + 1.4.14 + 4.11.0 + 8.2.0 + 1.19.3 4.0.3 - 5.3.25 - 2.14.2 + 5.3.31 + 2.16.0 0.3.0.RELEASE 3.0.2 - 24.0.1 + 24.1.0 @@ -186,8 +186,8 @@ test - mysql - mysql-connector-java + com.mysql + mysql-connector-j ${mysql.version} test @@ -242,7 +242,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.8.1 + 3.11.0 -Xlint:all @@ -271,12 +271,12 @@ org.apache.maven.plugins maven-deploy-plugin - 3.1.0 + 3.1.1 org.apache.maven.plugins maven-javadoc-plugin - 3.5.0 + 3.6.3 io.asyncer.r2dbc.mysql.authentication,io.asyncer.r2dbc.mysql.client,io.asyncer.r2dbc.mysql.util,io.asyncer.r2dbc.mysql.codec.lob,io.asyncer.r2dbc.mysql.message @@ -300,7 +300,7 @@ org.apache.maven.plugins maven-source-plugin - 3.2.1 + 3.3.0 attach-javadocs @@ -313,7 +313,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.22.2 + 3.2.3 random @@ -329,7 +329,7 @@ org.apache.maven.plugins maven-failsafe-plugin - 2.22.2 + 3.2.3 @@ -389,7 +389,7 @@ org.codehaus.mojo build-helper-maven-plugin - 3.3.0 + 3.5.0 add-source @@ -422,7 +422,7 @@ org.codehaus.mojo exec-maven-plugin - 3.1.0 + 3.1.1 run-benchmarks diff --git a/src/main/java/io/asyncer/r2dbc/mysql/Capability.java b/src/main/java/io/asyncer/r2dbc/mysql/Capability.java index e57b6e170..fe1da0f68 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/Capability.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/Capability.java @@ -26,6 +26,8 @@ public final class Capability { /** * Can use long password. + *

+ * TODO: Reinterpret it as {@code CLIENT_MYSQL} to support MariaDB 10.2 and above. */ private static final int LONG_PASSWORD = 1; diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index 5f132c0b4..a17517cc1 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -21,6 +21,7 @@ import io.asyncer.r2dbc.mysql.client.Client; import io.asyncer.r2dbc.mysql.codec.Codecs; import io.asyncer.r2dbc.mysql.constant.ServerStatuses; +import io.asyncer.r2dbc.mysql.message.client.InitDbMessage; import io.asyncer.r2dbc.mysql.message.client.PingMessage; import io.asyncer.r2dbc.mysql.message.server.CompleteMessage; import io.asyncer.r2dbc.mysql.message.server.ErrorMessage; @@ -76,40 +77,25 @@ public final class MySqlConnection implements Connection, ConnectionState { private static final ServerVersion TX_LEVEL_8X = ServerVersion.create(8, 0, 0); - /** - * Convert initialize result to {@link InitData}. - */ - private static final Function> INIT_HANDLER = r -> - r.map((row, meta) -> new InitData(convertIsolationLevel(row.get(0, String.class)), - convertLockWaitTimeout(row.get(1, Long.class)), - row.get(2, String.class), null)); - - private static final Function> FULL_INIT = r -> r.map((row, meta) -> { - IsolationLevel level = convertIsolationLevel(row.get(0, String.class)); - long lockWaitTimeout = convertLockWaitTimeout(row.get(1, Long.class)); - String product = row.get(2, String.class); - String systemTimeZone = row.get(3, String.class); - String timeZone = row.get(4, String.class); - ZoneId zoneId; - - if (timeZone == null || timeZone.isEmpty() || "SYSTEM".equalsIgnoreCase(timeZone)) { - if (systemTimeZone == null || systemTimeZone.isEmpty()) { - logger.warn("MySQL does not return any timezone, trying to use system default timezone"); - zoneId = ZoneId.systemDefault(); - } else { - zoneId = convertZoneId(systemTimeZone); - } + private static final BiConsumer> PING = (message, sink) -> { + if (message instanceof ErrorMessage) { + ErrorMessage msg = (ErrorMessage) message; + logger.debug("Remote validate failed: [{}] [{}] {}", msg.getCode(), msg.getSqlState(), + msg.getMessage()); + sink.next(false); + sink.complete(); + } else if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) { + sink.next(true); + sink.complete(); } else { - zoneId = convertZoneId(timeZone); + ReferenceCountUtil.safeRelease(message); } + }; - return new InitData(level, lockWaitTimeout, product, zoneId); - }); - - private static final BiConsumer> PING = (message, sink) -> { + private static final BiConsumer> INIT_DB = (message, sink) -> { if (message instanceof ErrorMessage) { ErrorMessage msg = (ErrorMessage) message; - logger.debug("Remote validate failed: [{}] [{}] {}", msg.getCode(), msg.getSqlState(), + logger.debug("Use database failed: [{}] [{}] {}", msg.getCode(), msg.getSqlState(), msg.getMessage()); sink.next(false); sink.complete(); @@ -121,6 +107,16 @@ public final class MySqlConnection implements Connection, ConnectionState { } }; + private static final BiConsumer> INIT_DB_AFTER = (message, sink) -> { + if (message instanceof ErrorMessage) { + sink.error(((ErrorMessage) message).toException()); + } else if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) { + sink.complete(); + } else { + ReferenceCountUtil.safeRelease(message); + } + }; + private final Client client; private final Codecs codecs; @@ -433,13 +429,17 @@ boolean isSessionAutoCommit() { * @param client must be logged-in. * @param codecs the {@link Codecs}. * @param context must be initialized. + * @param database the database that should be lazy init. * @param queryCache the cache of {@link Query}. * @param prepareCache the cache of server-preparing result. * @param prepare judging for prefer use prepare statement to execute simple query. * @return a {@link Mono} will emit an initialized {@link MySqlConnection}. */ - static Mono init(Client client, Codecs codecs, ConnectionContext context, - QueryCache queryCache, PrepareCache prepareCache, @Nullable Predicate prepare) { + static Mono init( + Client client, Codecs codecs, ConnectionContext context, String database, + QueryCache queryCache, PrepareCache prepareCache, + @Nullable Predicate prepare + ) { ServerVersion version = context.getServerVersion(); StringBuilder query = new StringBuilder(128); @@ -456,12 +456,12 @@ static Mono init(Client client, Codecs codecs, ConnectionContex if (context.shouldSetServerZoneId()) { query.append(",@@system_time_zone AS s,@@time_zone AS t"); - handler = FULL_INIT; + handler = MySqlConnection::fullInit; } else { - handler = INIT_HANDLER; + handler = MySqlConnection::init; } - return new TextSimpleStatement(client, codecs, context, query.toString()) + Mono connection = new TextSimpleStatement(client, codecs, context, query.toString()) .execute() .flatMap(handler) .last() @@ -475,6 +475,55 @@ static Mono init(Client client, Codecs codecs, ConnectionContex return new MySqlConnection(client, context, codecs, data.level, data.lockWaitTimeout, queryCache, prepareCache, data.product, prepare); }); + + if (database.isEmpty()) { + return connection; + } + + requireValidName(database, "database must not be empty and not contain backticks"); + + return connection.flatMap(conn -> client.exchange(new InitDbMessage(database), INIT_DB) + .last() + .flatMap(success -> { + if (success) { + return Mono.just(conn); + } + + String sql = String.format("CREATE DATABASE IF NOT EXISTS `%s`", database); + + return QueryFlow.executeVoid(client, sql) + .then(client.exchange(new InitDbMessage(database), INIT_DB_AFTER).then(Mono.just(conn))); + })); + } + + private static Publisher init(MySqlResult r) { + return r.map((row, meta) -> new InitData(convertIsolationLevel(row.get(0, String.class)), + convertLockWaitTimeout(row.get(1, Long.class)), + row.get(2, String.class), null)); + } + + private static Publisher fullInit(MySqlResult r) { + return r.map((row, meta) -> { + IsolationLevel level = convertIsolationLevel(row.get(0, String.class)); + long lockWaitTimeout = convertLockWaitTimeout(row.get(1, Long.class)); + String product = row.get(2, String.class); + String systemTimeZone = row.get(3, String.class); + String timeZone = row.get(4, String.class); + ZoneId zoneId; + + if (timeZone == null || timeZone.isEmpty() || "SYSTEM".equalsIgnoreCase(timeZone)) { + if (systemTimeZone == null || systemTimeZone.isEmpty()) { + logger.warn("MySQL does not return any timezone, trying to use system default timezone"); + zoneId = ZoneId.systemDefault(); + } else { + zoneId = convertZoneId(systemTimeZone); + } + } else { + zoneId = convertZoneId(timeZone); + } + + return new InitData(level, lockWaitTimeout, product, zoneId); + }); } /** diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java index e19bd1483..8b8807b54 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java @@ -21,6 +21,7 @@ import io.asyncer.r2dbc.mysql.extension.Extension; import io.netty.handler.ssl.SslContextBuilder; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; import javax.net.ssl.HostnameVerifier; import java.net.Socket; @@ -47,8 +48,6 @@ public final class MySqlConnectionConfiguration { */ private static final int DEFAULT_PORT = 3306; - private static final Predicate DEFAULT_SERVER_PREPARE = sql -> false; - /** * {@code true} if {@link #domain} is hostname, otherwise {@link #domain} is unix domain socket path. */ @@ -85,6 +84,8 @@ public final class MySqlConnectionConfiguration { private final String database; + private final boolean createDatabaseIfNotExist; + @Nullable private final Predicate preferPrepareStatement; @@ -94,12 +95,18 @@ public final class MySqlConnectionConfiguration { private final Extensions extensions; - private MySqlConnectionConfiguration(boolean isHost, String domain, int port, MySqlSslConfiguration ssl, + @Nullable + private final Publisher passwordPublisher; + + private MySqlConnectionConfiguration( + boolean isHost, String domain, int port, MySqlSslConfiguration ssl, boolean tcpKeepAlive, boolean tcpNoDelay, @Nullable Duration connectTimeout, @Nullable Duration socketTimeout, ZeroDateOption zeroDateOption, @Nullable ZoneId serverZoneId, String user, @Nullable CharSequence password, @Nullable String database, - @Nullable Predicate preferPrepareStatement, int queryCacheSize, int prepareCacheSize, - Extensions extensions) { + boolean createDatabaseIfNotExist, @Nullable Predicate preferPrepareStatement, + int queryCacheSize, int prepareCacheSize, Extensions extensions, + @Nullable Publisher passwordPublisher + ) { this.isHost = isHost; this.domain = domain; this.port = port; @@ -113,10 +120,12 @@ private MySqlConnectionConfiguration(boolean isHost, String domain, int port, My this.user = requireNonNull(user, "user must not be null"); this.password = password; this.database = database == null || database.isEmpty() ? "" : database; + this.createDatabaseIfNotExist = createDatabaseIfNotExist; this.preferPrepareStatement = preferPrepareStatement; this.queryCacheSize = queryCacheSize; this.prepareCacheSize = prepareCacheSize; this.extensions = extensions; + this.passwordPublisher = passwordPublisher; } /** @@ -189,6 +198,10 @@ String getDatabase() { return database; } + boolean isCreateDatabaseIfNotExist() { + return createDatabaseIfNotExist; + } + @Nullable Predicate getPreferPrepareStatement() { return preferPrepareStatement; @@ -206,6 +219,11 @@ Extensions getExtensions() { return extensions; } + @Nullable + Publisher getPasswordPublisher() { + return passwordPublisher; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -228,37 +246,41 @@ public boolean equals(Object o) { user.equals(that.user) && Objects.equals(password, that.password) && database.equals(that.database) && + createDatabaseIfNotExist == that.createDatabaseIfNotExist && Objects.equals(preferPrepareStatement, that.preferPrepareStatement) && queryCacheSize == that.queryCacheSize && prepareCacheSize == that.prepareCacheSize && - extensions.equals(that.extensions); + extensions.equals(that.extensions) && + Objects.equals(passwordPublisher, that.passwordPublisher); } @Override public int hashCode() { - return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, - connectTimeout, socketTimeout, serverZoneId, zeroDateOption, user, password, database, - preferPrepareStatement, queryCacheSize, prepareCacheSize, extensions); + return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout, + socketTimeout, serverZoneId, zeroDateOption, user, password, database, createDatabaseIfNotExist, + preferPrepareStatement, queryCacheSize, prepareCacheSize, extensions, passwordPublisher); } @Override public String toString() { if (isHost) { - return "MySqlConnectionConfiguration{, host='" + domain + "', port=" + port + ", ssl=" + ssl + + return "MySqlConnectionConfiguration{host='" + domain + "', port=" + port + ", ssl=" + ssl + ", tcpNoDelay=" + tcpNoDelay + ", tcpKeepAlive=" + tcpKeepAlive + ", connectTimeout=" + connectTimeout + ", socketTimeout=" + socketTimeout + ", serverZoneId=" + serverZoneId + - ", zeroDateOption=" + zeroDateOption + ", user='" + user + '\'' + ", password=" + password + - ", database='" + database + "', preferPrepareStatement=" + preferPrepareStatement + - ", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize + - ", extensions=" + extensions + '}'; + ", zeroDateOption=" + zeroDateOption + ", user='" + user + "', password=" + password + + ", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist + + ", preferPrepareStatement=" + preferPrepareStatement + ", queryCacheSize=" + queryCacheSize + + ", prepareCacheSize=" + prepareCacheSize + ", extensions=" + extensions + + ", passwordPublisher=" + passwordPublisher + '}'; } - return "MySqlConnectionConfiguration{, unixSocket='" + domain + "', connectTimeout=" + + return "MySqlConnectionConfiguration{unixSocket='" + domain + "', connectTimeout=" + connectTimeout + ", socketTimeout=" + socketTimeout + ", serverZoneId=" + serverZoneId + ", zeroDateOption=" + zeroDateOption + ", user='" + user + "', password=" + password + - ", database='" + database + "', preferPrepareStatement=" + preferPrepareStatement + - ", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize + - ", extensions=" + extensions + '}'; + ", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist + + ", preferPrepareStatement=" + preferPrepareStatement + ", queryCacheSize=" + queryCacheSize + + ", prepareCacheSize=" + prepareCacheSize + ", extensions=" + extensions + + ", passwordPublisher=" + passwordPublisher + '}'; } /** @@ -269,6 +291,8 @@ public static final class Builder { @Nullable private String database; + private boolean createDatabaseIfNotExist; + private boolean isHost = true; private String domain; @@ -329,6 +353,9 @@ public static final class Builder { private final List extensions = new ArrayList<>(); + @Nullable + private Publisher passwordPublisher; + /** * Builds an immutable {@link MySqlConnectionConfiguration} with current options. * @@ -352,8 +379,8 @@ public MySqlConnectionConfiguration build() { sslCa, sslKey, sslKeyPassword, sslCert, sslContextBuilderCustomizer); return new MySqlConnectionConfiguration(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout, socketTimeout, zeroDateOption, serverZoneId, user, password, database, - preferPrepareStatement, queryCacheSize, prepareCacheSize, - Extensions.from(extensions, autodetectExtensions)); + createDatabaseIfNotExist, preferPrepareStatement, queryCacheSize, prepareCacheSize, + Extensions.from(extensions, autodetectExtensions), passwordPublisher); } /** @@ -368,6 +395,19 @@ public Builder database(@Nullable String database) { return this; } + /** + * Configure to create the database given in the configuration if it does not yet exist. Default to + * {@code false}. + * + * @param enabled to discover and register extensions. + * @return this {@link Builder}. + * @since 1.0.6 + */ + public Builder createDatabaseIfNotExist(boolean enabled) { + this.createDatabaseIfNotExist = enabled; + return this; + } + /** * Configure the Unix Domain Socket to connect to. * @@ -445,6 +485,8 @@ public Builder connectTimeout(@Nullable Duration connectTimeout) { * @param socketTimeout the socket timeout, or {@code null} if has no timeout. * @return this {@link Builder}. * @since 0.8.6 + * @deprecated This option has been deprecated as of version 1.0.1, because it has no effect and + * serves no purpose. */ public Builder socketTimeout(@Nullable Duration socketTimeout) { this.socketTimeout = socketTimeout; @@ -685,7 +727,7 @@ public Builder useClientPrepareStatement() { * @since 0.8.1 */ public Builder useServerPrepareStatement() { - return useServerPrepareStatement(DEFAULT_SERVER_PREPARE); + return useServerPrepareStatement((sql) -> false); } /** @@ -779,6 +821,16 @@ public Builder extendWith(Extension extension) { return this; } + /** + * Registers a password publisher function. + * @param passwordPublisher function to retrieve password before making connection. + * @return this {@link Builder}. + */ + public Builder passwordPublisher(Publisher passwordPublisher) { + this.passwordPublisher = passwordPublisher; + return this; + } + private SslMode requireSslMode() { SslMode sslMode = this.sslMode; diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java index b58b074f9..c47fd0c69 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java @@ -29,10 +29,12 @@ import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.ConnectionFactoryMetadata; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Objects; import java.util.function.Predicate; import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull; @@ -83,6 +85,7 @@ public static MySqlConnectionFactory from(MySqlConnectionConfiguration configura } String database = configuration.getDatabase(); + boolean createDbIfNotExist = configuration.isCreateDatabaseIfNotExist(); String user = configuration.getUser(); CharSequence password = configuration.getPassword(); SslMode sslMode = ssl.getSslMode(); @@ -91,24 +94,65 @@ public static MySqlConnectionFactory from(MySqlConnectionConfiguration configura Extensions extensions = configuration.getExtensions(); Predicate prepare = configuration.getPreferPrepareStatement(); int prepareCacheSize = configuration.getPrepareCacheSize(); + Publisher passwordPublisher = configuration.getPasswordPublisher(); + + if (Objects.nonNull(passwordPublisher)) { + return Mono.from(passwordPublisher).flatMap(token -> getMySqlConnection( + configuration, queryCache, + ssl, address, + database, createDbIfNotExist, + user, sslMode, context, + extensions, prepare, + prepareCacheSize, token + )); + } - return Client.connect(ssl, address, configuration.isTcpKeepAlive(), configuration.isTcpNoDelay(), - context, configuration.getConnectTimeout(), configuration.getSocketTimeout()) - .flatMap(client -> QueryFlow.login(client, sslMode, database, user, password, context)) - .flatMap(client -> { - ByteBufAllocator allocator = client.getByteBufAllocator(); - CodecsBuilder builder = Codecs.builder(allocator); - PrepareCache prepareCache = Caches.createPrepareCache(prepareCacheSize); - - extensions.forEach(CodecRegistrar.class, registrar -> - registrar.register(allocator, builder)); - - return MySqlConnection.init(client, builder.build(), context, queryCache.get(), - prepareCache, prepare); - }); + return getMySqlConnection( + configuration, queryCache, + ssl, address, + database, createDbIfNotExist, + user, sslMode, context, + extensions, prepare, + prepareCacheSize, password + ); })); } + private static Mono getMySqlConnection( + final MySqlConnectionConfiguration configuration, + final LazyQueryCache queryCache, + final MySqlSslConfiguration ssl, + final SocketAddress address, + final String database, + final boolean createDbIfNotExist, + final String user, + final SslMode sslMode, + final ConnectionContext context, + final Extensions extensions, + @Nullable final Predicate prepare, + final int prepareCacheSize, + @Nullable final CharSequence password) { + return Client.connect(ssl, address, configuration.isTcpKeepAlive(), configuration.isTcpNoDelay(), + context, configuration.getConnectTimeout(), configuration.getSocketTimeout()) + .flatMap(client -> { + // Lazy init database after handshake/login + String db = createDbIfNotExist ? "" : database; + return QueryFlow.login(client, sslMode, db, user, password, context); + }) + .flatMap(client -> { + ByteBufAllocator allocator = client.getByteBufAllocator(); + CodecsBuilder builder = Codecs.builder(allocator); + PrepareCache prepareCache = Caches.createPrepareCache(prepareCacheSize); + String db = createDbIfNotExist ? database : ""; + + extensions.forEach(CodecRegistrar.class, registrar -> + registrar.register(allocator, builder)); + + return MySqlConnection.init(client, builder.build(), context, db, queryCache.get(), + prepareCache, prepare); + }); + } + private static final class LazyQueryCache { private final int capacity; diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java index 7b31bfb9c..c56fc74b0 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java @@ -23,6 +23,7 @@ import io.r2dbc.spi.ConnectionFactoryOptions; import io.r2dbc.spi.ConnectionFactoryProvider; import io.r2dbc.spi.Option; +import org.reactivestreams.Publisher; import javax.net.ssl.HostnameVerifier; import java.time.Duration; @@ -161,6 +162,14 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr */ public static final Option TCP_NO_DELAY = Option.valueOf("tcpNoDelay"); + /** + * Enable/Disable database creation if not exist. + * + * @since 1.0.6 + */ + public static final Option CREATE_DATABASE_IF_NOT_EXIST = + Option.valueOf("createDatabaseIfNotExist"); + /** * Enable server preparing for parametrized statements and prefer server preparing simple statements. *

@@ -203,6 +212,14 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr */ public static final Option AUTODETECT_EXTENSIONS = Option.valueOf("autodetectExtensions"); + /** + * Password Publisher function can be used to retrieve password before creating a connection. + * This can be used with Amazon RDS Aurora IAM authentication, wherein it requires token to be generated. + * The token is valid for 15 minutes, and this token will be used as password. + * + */ + public static final Option> PASSWORD_PUBLISHER = Option.valueOf("passwordPublisher"); + @Override public ConnectionFactory create(ConnectionFactoryOptions options) { requireNonNull(options, "connectionFactoryOptions must not be null"); @@ -261,6 +278,10 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) { .to(builder::socketTimeout); mapper.optional(DATABASE).asString() .to(builder::database); + mapper.optional(CREATE_DATABASE_IF_NOT_EXIST).asBoolean() + .to(builder::createDatabaseIfNotExist); + mapper.optional(PASSWORD_PUBLISHER).as(Publisher.class) + .to(builder::passwordPublisher); return builder.build(); } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlResult.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlResult.java index 228585f56..2b09277f5 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlResult.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlResult.java @@ -42,7 +42,6 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -57,20 +56,6 @@ */ public final class MySqlResult implements Result { - private static final Consumer RELEASE = ReferenceCounted::release; - - private static final BiConsumer> ROWS_UPDATED = (segment, sink) -> { - if (segment instanceof UpdateCount) { - sink.next(((UpdateCount) segment).value()); - } else if (segment instanceof Message) { - sink.error(((Message) segment).exception()); - } else if (segment instanceof ReferenceCounted) { - ReferenceCountUtil.safeRelease(segment); - } - }; - - private static final BiFunction SUM = Long::sum; - private final Flux segments; private MySqlResult(Flux segments) { @@ -79,7 +64,15 @@ private MySqlResult(Flux segments) { @Override public Mono getRowsUpdated() { - return segments.handle(ROWS_UPDATED).reduce(SUM); + return segments.handle((segment, sink) -> { + if (segment instanceof UpdateCount) { + sink.next(((UpdateCount) segment).value()); + } else if (segment instanceof Message) { + sink.error(((Message) segment).exception()); + } else if (segment instanceof ReferenceCounted) { + ReferenceCountUtil.safeRelease(segment); + } + }).reduce(Long::sum); } @Override @@ -168,7 +161,7 @@ static MySqlResult toResult(boolean binary, Codecs codecs, ConnectionContext con requireNonNull(messages, "messages must not be null"); return new MySqlResult(OperatorUtils.discardOnCancel(messages) - .doOnDiscard(ReferenceCounted.class, RELEASE) + .doOnDiscard(ReferenceCounted.class, ReferenceCounted::release) .handle(new MySqlSegments(binary, codecs, context, generatedKeyName))); } @@ -220,10 +213,6 @@ public Row row() { @Override public ReferenceCounted touch(Object hint) { - if (this.fields.length == 0) { - return this; - } - for (FieldValue field : this.fields) { field.touch(hint); } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java b/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java index 9b8c5c814..f25365cdb 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java @@ -888,6 +888,7 @@ private Capability clientCapability(Capability serverCapability) { builder.disableDatabasePinned(); builder.disableCompression(); + // TODO: support LOAD DATA LOCAL INFILE builder.disableLoadDataInfile(); builder.disableIgnoreAmbiguitySpace(); builder.disableInteractiveTimeout(); diff --git a/src/main/java/io/asyncer/r2dbc/mysql/authentication/CachingSha2FastAuthProvider.java b/src/main/java/io/asyncer/r2dbc/mysql/authentication/CachingSha2FastAuthProvider.java index ef52bd811..bf6919701 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/authentication/CachingSha2FastAuthProvider.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/authentication/CachingSha2FastAuthProvider.java @@ -61,7 +61,7 @@ public byte[] authentication(@Nullable CharSequence password, byte[] salt, CharC @Override public MySqlAuthProvider next() { - return CachingSha2FullAuthProvider.INSTANCE; + return CachingSha2FullAuthProvider.getInstance(); } @Override diff --git a/src/main/java/io/asyncer/r2dbc/mysql/authentication/CachingSha2FullAuthProvider.java b/src/main/java/io/asyncer/r2dbc/mysql/authentication/CachingSha2FullAuthProvider.java index 5bc5debf8..e00cf8292 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/authentication/CachingSha2FullAuthProvider.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/authentication/CachingSha2FullAuthProvider.java @@ -30,7 +30,9 @@ */ final class CachingSha2FullAuthProvider implements MySqlAuthProvider { - static final CachingSha2FullAuthProvider INSTANCE = new CachingSha2FullAuthProvider(); + static CachingSha2FullAuthProvider getInstance() { + return LazyHolder.INSTANCE; + } @Override public boolean isSslNecessary() { @@ -60,4 +62,8 @@ public String getType() { } private CachingSha2FullAuthProvider() { } + + private static class LazyHolder { + private static final CachingSha2FullAuthProvider INSTANCE = new CachingSha2FullAuthProvider(); + } } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java b/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java index 145112896..a467ae2ec 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java @@ -42,8 +42,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import reactor.core.publisher.Flux; -import java.util.concurrent.atomic.AtomicBoolean; - import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull; /** @@ -59,16 +57,10 @@ final class MessageDuplexCodec extends ChannelDuplexHandler { private final ConnectionContext context; - private final AtomicBoolean closing; - - private final RequestQueue requestQueue; - private final ServerMessageDecoder decoder = new ServerMessageDecoder(); - MessageDuplexCodec(ConnectionContext context, AtomicBoolean closing, RequestQueue requestQueue) { + MessageDuplexCodec(ConnectionContext context) { this.context = requireNonNull(context, "context must not be null"); - this.closing = requireNonNull(closing, "closing must not be null"); - this.requestQueue = requireNonNull(requestQueue, "requestQueue must not be null"); } @Override @@ -129,14 +121,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) @Override public void channelInactive(ChannelHandlerContext ctx) { decoder.dispose(); - requestQueue.dispose(); - - // Server has closed the connection without us wanting to close it - // Typically happens if we send data asynchronously (i.e. previous command didn't complete). - if (closing.compareAndSet(false, true)) { - logger.warn("Connection has been closed by peer"); - } - ctx.fireChannelInactive(); } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java b/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java index ab3ed981d..e30191e8e 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java @@ -44,9 +44,8 @@ import reactor.util.context.Context; import reactor.util.context.ContextView; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Function; import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.require; @@ -63,7 +62,16 @@ final class ReactorNettyClient implements Client { private static final boolean INFO_ENABLED = logger.isInfoEnabled(); - private static final Consumer RELEASE = ReferenceCounted::release; + private static final int ST_CONNECTED = 0; + + private static final int ST_CLOSING = 1; + + private static final int ST_CLOSED = 2; + + private static final AtomicIntegerFieldUpdater STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ReactorNettyClient.class, "state"); + + private volatile int state = ST_CONNECTED; private final Connection connection; @@ -76,8 +84,6 @@ final class ReactorNettyClient implements Client { private final RequestQueue requestQueue = new RequestQueue(); - private final AtomicBoolean closing = new AtomicBoolean(); - ReactorNettyClient(Connection connection, MySqlSslConfiguration ssl, ConnectionContext context) { requireNonNull(connection, "connection must not be null"); requireNonNull(context, "context must not be null"); @@ -91,7 +97,7 @@ final class ReactorNettyClient implements Client { // Note: encoder/decoder should before reactor bridge. connection.addHandlerLast(EnvelopeSlicer.NAME, new EnvelopeSlicer()) .addHandlerLast(MessageDuplexCodec.NAME, - new MessageDuplexCodec(context, this.closing, this.requestQueue)); + new MessageDuplexCodec(context)); if (ssl.getSslMode().startSsl()) { connection.addHandlerFirst(SslBridgeHandler.NAME, new SslBridgeHandler(context, ssl)); @@ -153,10 +159,10 @@ public Flux exchange(ClientMessage request, .doOnSubscribe(ignored -> emitNextRequest(request)) .handle(handler) .doOnTerminate(requestQueue)) - .doOnDiscard(ReferenceCounted.class, RELEASE); + .doOnDiscard(ReferenceCounted.class, ReferenceCounted::release); requestQueue.submit(RequestTask.wrap(request, sink, responses)); - }).flatMapMany(identity()); + }).flatMapMany(Function.identity()); } @Override @@ -187,31 +193,43 @@ public Flux exchange(FluxExchangeable exchangeable) { }); requestQueue.submit(RequestTask.wrap(exchangeable, sink, OperatorUtils.discardOnCancel(responses) - .doOnDiscard(ReferenceCounted.class, RELEASE) + .doOnDiscard(ReferenceCounted.class, ReferenceCounted::release) .doOnCancel(exchangeable::dispose))); - }).flatMapMany(identity()); + }).flatMapMany(Function.identity()); } @Override public Mono close() { - return Mono.>create(sink -> { - if (!closing.compareAndSet(false, true)) { - // client is closing or closed - sink.success(); - return; - } - - requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> { - Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE); + return Mono + .>create(sink -> { + if (state == ST_CLOSED) { + logger.debug("Close request ignored (connection already closed)"); + sink.success(); + return; + } - if (result != Sinks.EmitResult.OK) { - logger.error("Exit message sending failed due to {}, force closing", result); - } - }))); - }).flatMap(identity()).onErrorResume(e -> { - logger.error("Exit message sending failed, force closing", e); - return Mono.empty(); - }).then(forceClose()); + logger.debug("Close request accepted"); + + requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> { + Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE); + + if (result != Sinks.EmitResult.OK) { + logger.error("Exit message sending failed due to {}, force closing", result); + } else { + if (STATE_UPDATER.compareAndSet(this, ST_CONNECTED, ST_CLOSING)) { + logger.debug("Exit message sent"); + } else { + logger.debug("Exit message sent (duplicated / connection already closed)"); + } + } + }))); + }) + .flatMap(Function.identity()) + .onErrorResume(e -> { + logger.error("Exit message sending failed, force closing", e); + return Mono.empty(); + }) + .then(forceClose()); } @Override @@ -226,7 +244,7 @@ public ByteBufAllocator getByteBufAllocator() { @Override public boolean isConnected() { - return !closing.get() && connection.channel().isOpen(); + return state < ST_CLOSED && connection.channel().isOpen(); } @Override @@ -242,7 +260,7 @@ public void loginSuccess() { @Override public String toString() { return String.format("ReactorNettyClient(%s){connectionId=%d}", - this.closing.get() ? "closing or closed" : "activating", context.getConnectionId()); + isConnected() ? "activating" : "clsoing or closed", context.getConnectionId()); } private void emitNextRequest(ClientMessage request) { @@ -278,19 +296,23 @@ private void drainError(R2dbcException e) { } private void handleClose() { - if (this.closing.compareAndSet(false, true)) { - logger.warn("Connection has been closed by peer"); + final int oldState = state; + if (oldState == ST_CLOSED) { + logger.debug("Connection already closed"); + return; + } + + STATE_UPDATER.set(this, ST_CLOSED); + + if (oldState != ST_CLOSING) { + logger.warn("Connection unexpectedly closed"); drainError(ClientExceptions.unexpectedClosed()); } else { + logger.debug("Connection closed"); drainError(ClientExceptions.expectedClosed()); } } - @SuppressWarnings("unchecked") - private static Function identity() { - return (Function) Identity.INSTANCE; - } - private final class ResponseSubscriber implements CoreSubscriber { private final ResponseSink sink; @@ -361,14 +383,4 @@ public void next(ServerMessage message) { responseProcessor.emitNext(message, EmitFailureHandler.FAIL_FAST); } } - - private static final class Identity implements Function { - - private static final Identity INSTANCE = new Identity(); - - @Override - public Object apply(Object o) { - return o; - } - } } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/RequestQueue.java b/src/main/java/io/asyncer/r2dbc/mysql/client/RequestQueue.java index c60df947e..ff27d44a3 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/RequestQueue.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/RequestQueue.java @@ -65,22 +65,29 @@ final class RequestQueue extends ActiveStatus implements Runnable { */ @Override public void run() { - RequestTask task = queue.poll(); - - if (task == null) { - // Queue was empty, set it to idle if it is not disposed. - STATUS_UPDATER.compareAndSet(this, ACTIVE, IDLE); - } else { - int status = this.status; - - if (status == DISPOSE) { + for (;;) { + RequestTask task = queue.poll(); + final int status = this.status; + + if (task == null) { + // Queue was empty, set it to idle if it is not disposed. + if (status != ACTIVE || STATUS_UPDATER.compareAndSet(this, ACTIVE, IDLE) && queue.isEmpty()) { + return; + } + } else if (status == DISPOSE) { // Cancel and no need clear queue because it should be cleared by other one. task.cancel(requireDisposed()); + return; } else { task.run(); + // The execution of a canceled task would result in a stall of the request queue. + // refer: https://github.com/asyncer-io/r2dbc-mysql/issues/114 + if (!task.isCancelled()) { + return; } } } +} /** * Submit an exchange task. If the queue is inactive, it will execute directly instead of queuing. @@ -90,13 +97,6 @@ public void run() { * @param the type argument of {@link RequestTask}. */ void submit(RequestTask task) { - if (STATUS_UPDATER.compareAndSet(this, IDLE, ACTIVE)) { - // Fast path for general way. - task.run(); - return; - } - - // Check dispose after fast path failed. int status = this.status; if (status == DISPOSE) { diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/RequestTask.java b/src/main/java/io/asyncer/r2dbc/mysql/client/RequestTask.java index 09235e05f..e92e808c3 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/RequestTask.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/RequestTask.java @@ -38,6 +38,8 @@ final class RequestTask { private final T supplier; + private volatile boolean isCancelled; + private RequestTask(@Nullable Disposable disposable, MonoSink sink, T supplier) { this.disposable = disposable; this.sink = sink; @@ -54,26 +56,43 @@ void run() { * @param e cancelled by which error */ void cancel(Throwable e) { + cancel0(); + sink.error(e); + } + + boolean isCancelled() { + return isCancelled; + } + + private void cancel0() { if (disposable != null) { disposable.dispose(); } - sink.error(e); + isCancelled = true; } static RequestTask wrap(ClientMessage message, MonoSink sink, T supplier) { + final RequestTask task; if (message instanceof Disposable) { - return new RequestTask<>((Disposable) message, sink, supplier); - } + task = new RequestTask<>((Disposable) message, sink, supplier); + } else { + task = new RequestTask<>(null, sink, supplier); - return new RequestTask<>(null, sink, supplier); + } + sink.onCancel(() -> task.cancel0()); + return task; } static RequestTask wrap(Flux messages, MonoSink sink, T supplier) { - return new RequestTask<>(new DisposableFlux(messages), sink, supplier); + final RequestTask task = new RequestTask<>(new DisposableFlux(messages), sink, supplier); + sink.onCancel(() -> task.cancel0()); + return task; } static RequestTask wrap(MonoSink sink, T supplier) { - return new RequestTask<>(null, sink, supplier); + final RequestTask task = new RequestTask<>(null, sink, supplier); + sink.onCancel(() -> task.cancel0()); + return task; } private static final class DisposableFlux implements Disposable { diff --git a/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java b/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java index 7f1dd26a9..f1b4dabce 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java @@ -26,6 +26,7 @@ import io.r2dbc.spi.Parameter; import org.jetbrains.annotations.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.math.BigInteger; @@ -314,10 +315,13 @@ private static Codec[] defaultCodecs(ByteBufAllocator allocator) { }; } - static final class Builder extends ArrayList> implements CodecsBuilder { + static final class Builder implements CodecsBuilder { private final ByteBufAllocator allocator; + @GuardedBy("this") + private final ArrayList> codecs = new ArrayList<>(); + Builder(ByteBufAllocator allocator) { this.allocator = allocator; } @@ -325,15 +329,15 @@ static final class Builder extends ArrayList> implements CodecsBuilder @Override public CodecsBuilder addFirst(Codec codec) { synchronized (this) { - if (isEmpty()) { - Codec[] codecs = defaultCodecs(allocator); + if (codecs.isEmpty()) { + Codec[] defaultCodecs = defaultCodecs(allocator); - ensureCapacity(codecs.length + 1); + codecs.ensureCapacity(defaultCodecs.length + 1); // Add first. - add(codec); - addAll(InternalArrays.asImmutableList(codecs)); + codecs.add(codec); + codecs.addAll(InternalArrays.asImmutableList(defaultCodecs)); } else { - add(0, codec); + codecs.add(0, codec); } } return this; @@ -342,10 +346,10 @@ public CodecsBuilder addFirst(Codec codec) { @Override public CodecsBuilder addLast(Codec codec) { synchronized (this) { - if (isEmpty()) { - addAll(InternalArrays.asImmutableList(defaultCodecs(allocator))); + if (codecs.isEmpty()) { + codecs.addAll(InternalArrays.asImmutableList(defaultCodecs(allocator))); } - add(codec); + codecs.add(codec); } return this; } @@ -354,13 +358,13 @@ public CodecsBuilder addLast(Codec codec) { public Codecs build() { synchronized (this) { try { - if (isEmpty()) { + if (codecs.isEmpty()) { return new DefaultCodecs(defaultCodecs(allocator)); } - return new DefaultCodecs(toArray(new Codec[0])); + return new DefaultCodecs(codecs.toArray(new Codec[0])); } finally { - clear(); - trimToSize(); + codecs.clear(); + codecs.trimToSize(); } } } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java index 5d95256a6..bd97b440c 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/NettyBufferUtils.java @@ -17,17 +17,62 @@ package io.asyncer.r2dbc.mysql.internal.util; import io.asyncer.r2dbc.mysql.message.FieldValue; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import java.util.List; + /** * An internal utility considers the use of safe release buffers (array or {@link List}). It uses standard * netty {@link ReferenceCountUtil#safeRelease} to suppress release errors. */ public final class NettyBufferUtils { + /** + * Combine {@link ByteBuf}s through composite buffer. + *

+ * This method would release all {@link ByteBuf}s when any exception throws. + * + * @param parts The {@link ByteBuf}s want to be wrap, it can not be empty, and it will be cleared. + * @return A {@link ByteBuf} holds the all bytes of given {@code parts}, it may be a read-only buffer. + */ + public static ByteBuf composite(final List parts) { + final int size = parts.size(); + + switch (size) { + case 0: + throw new IllegalStateException("No buffer available"); + case 1: + try { + return parts.get(0); + } finally { + parts.clear(); + } + default: + CompositeByteBuf composite = null; + + try { + composite = parts.get(0).alloc().compositeBuffer(size); + // Auto-releasing failed parts + return composite.addComponents(true, parts); + } catch (Throwable e) { + if (composite == null) { + // Alloc failed, release parts. + releaseAll(parts); + } else { + // Also release success parts. + composite.release(); + } + throw e; + } finally { + parts.clear(); + } + } + } + public static void releaseAll(ReferenceCounted[] parts) { for (ReferenceCounted counted : parts) { if (counted != null) { diff --git a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/VarIntUtils.java b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/VarIntUtils.java index c5a4d530a..dba99fd72 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/internal/util/VarIntUtils.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/internal/util/VarIntUtils.java @@ -47,6 +47,17 @@ public final class VarIntUtils { private static final int MEDIUM_SIZE = MEDIUM_BYTES * Byte.SIZE; + /** + * Reads a length encoded integer from the given buffers. Notice that a length encoded integer can be + * greater than {@link Long#MAX_VALUE}. In this case it should be used as an unsigned long. If we need + * assume the result as a smaller integer, add code comment to explain it. + *

+ * Note: it will change {@code firstPart} and {@code secondPart} readerIndex if necessary. + * + * @param firstPart the first part of a readable buffer include a part of the var integer. + * @param secondPart the second part of a readable buffer include subsequent part of the var integer. + * @return A var integer read from buffer. + */ public static long crossReadVarInt(ByteBuf firstPart, ByteBuf secondPart) { requireNonNull(firstPart, "firstPart must not be null"); requireNonNull(secondPart, "secondPart must not be null"); @@ -87,6 +98,10 @@ public static long crossReadVarInt(ByteBuf firstPart, ByteBuf secondPart) { } /** + * Reads a length encoded integer from the given buffer. Notice that a length encoded integer can be + * greater than {@link Long#MAX_VALUE}. In this case it should be used as an unsigned long. If we need + * assume the result as a smaller integer, add code comment to explain it. + *

* Note: it will change {@code buf} readerIndex. * * @param buf a readable buffer include a var integer. diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/InitDbMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/InitDbMessage.java new file mode 100644 index 000000000..eb926fe41 --- /dev/null +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/InitDbMessage.java @@ -0,0 +1,19 @@ +package io.asyncer.r2dbc.mysql.message.client; + +import io.asyncer.r2dbc.mysql.ConnectionContext; +import io.netty.buffer.ByteBuf; + +public final class InitDbMessage extends ScalarClientMessage { + + private static final byte FLAG = 0x02; + + private final String database; + + public InitDbMessage(String database) { this.database = database; } + + @Override + protected void writeTo(ByteBuf buf, ConnectionContext context) { + // RestOfPacketString, no need terminal or length + buf.writeByte(FLAG).writeCharSequence(database, context.getClientCollation().getCharset()); + } +} diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/client/PreparedExecuteMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/client/PreparedExecuteMessage.java index c139a2904..33c312b2a 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/client/PreparedExecuteMessage.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/client/PreparedExecuteMessage.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.function.Consumer; import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull; @@ -59,8 +58,6 @@ public final class PreparedExecuteMessage implements ClientMessage, Disposable { private static final byte EXECUTE_FLAG = 0x17; - private static final Consumer DISPOSE = MySqlParameter::dispose; - private final int statementId; /** @@ -131,7 +128,7 @@ public Flux encode(ByteBufAllocator allocator, ConnectionContext contex writeTypes(buf, size); Flux parameters = OperatorUtils.discardOnCancel(Flux.fromArray(values)) - .doOnDiscard(MySqlParameter.class, DISPOSE) + .doOnDiscard(MySqlParameter.class, MySqlParameter::dispose) .concatMap(MySqlParameter::publishBinary); return Flux.just(buf).concatWith(parameters); diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ByteBufCombiner.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ByteBufCombiner.java deleted file mode 100644 index 4dfe5ed6c..000000000 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ByteBufCombiner.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2023 asyncer.io projects - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.asyncer.r2dbc.mysql.message.server; - -import io.asyncer.r2dbc.mysql.internal.util.NettyBufferUtils; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; - -import java.util.List; - -/** - * An utility considers combining {@link ByteBuf}s to a single {@link ByteBuf}. - */ -final class ByteBufCombiner { - - /** - * Combine {@link ByteBuf}s through composite buffer. - *

- * This method would release all {@link ByteBuf}s when any exception throws. - * - * @param parts The {@link ByteBuf}s want to be wrap, it can not be empty, and it will be cleared. - * @return A {@link ByteBuf} holds the all bytes of given {@code parts}, it may be a read-only buffer. - */ - static ByteBuf composite(List parts) { - int size = parts.size(); - - switch (size) { - case 0: - throw new IllegalStateException("No buffer available"); - case 1: - try { - return parts.get(0); - } finally { - parts.clear(); - } - default: - CompositeByteBuf composite = null; - - try { - composite = parts.get(0).alloc().compositeBuffer(size); - // Auto-releasing failed parts - return composite.addComponents(true, parts); - } catch (Throwable e) { - if (composite == null) { - // Alloc failed, release parts. - NettyBufferUtils.releaseAll(parts); - } else { - // Also release success parts. - composite.release(); - } - throw e; - } finally { - parts.clear(); - } - } - } - - private ByteBufCombiner() { } -} diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ColumnCountMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ColumnCountMessage.java index a036e7a25..a23019a1f 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ColumnCountMessage.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ColumnCountMessage.java @@ -40,6 +40,7 @@ public int getTotalColumns() { } static ColumnCountMessage decode(ByteBuf buf) { + // JVM does NOT support arrays longer than Integer.MAX_VALUE return new ColumnCountMessage(Math.toIntExact(VarIntUtils.readVarInt(buf))); } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/DefinitionMetadataMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/DefinitionMetadataMessage.java index 3fb4950ab..a2aac7a8b 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/DefinitionMetadataMessage.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/DefinitionMetadataMessage.java @@ -173,7 +173,8 @@ private static DefinitionMetadataMessage decode41(ByteBuf buf, ConnectionContext String column = readVarIntSizedString(buf, charset); String originColumn = readVarIntSizedString(buf, charset); - VarIntUtils.readVarInt(buf); // skip constant 0x0c encoded by var integer + // Skip constant 0x0c encoded by var integer + VarIntUtils.readVarInt(buf); int collationId = buf.readUnsignedShortLE(); long size = buf.readUnsignedIntLE(); @@ -185,7 +186,7 @@ private static DefinitionMetadataMessage decode41(ByteBuf buf, ConnectionContext } private static String readVarIntSizedString(ByteBuf buf, Charset charset) { - // JVM can NOT support string which length upper than maximum of int32 + // JVM does NOT support strings longer than Integer.MAX_VALUE int bytes = (int) VarIntUtils.readVarInt(buf); if (bytes == 0) { diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/FieldReader.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/FieldReader.java index 7c6f2199c..c73d4027d 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/FieldReader.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/FieldReader.java @@ -77,7 +77,7 @@ static FieldReader of(List buffers) { if (totalSize <= Integer.MAX_VALUE) { // The buffers will be cleared by ByteBufCombiner.composite(). - ByteBuf combined = ByteBufCombiner.composite(buffers); + ByteBuf combined = NettyBufferUtils.composite(buffers); try { return new NormalFieldReader(combined); } catch (Throwable e) { diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/LargeFieldReader.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/LargeFieldReader.java index 3eb133aac..9842687b1 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/LargeFieldReader.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/LargeFieldReader.java @@ -99,12 +99,12 @@ public FieldValue readVarIntSizedField() { fieldSize = VarIntUtils.readVarInt(currentBuf); } - // Refresh non empty buffer because current buffer has been read. + // Refresh non-empty buffer because current buffer has been read. currentBuf = nonEmptyBuffer(); List results = readSlice(currentBuf, fieldSize); - if (fieldSize > Integer.MAX_VALUE) { + if (Long.compareUnsigned(fieldSize, Integer.MAX_VALUE) > 0) { return retainedLargeField(results); } @@ -130,26 +130,30 @@ protected void deallocate() { * list instead of a single buffer. * * @param current the current {@link ByteBuf} in {@link #buffers}. - * @param length the length of read. + * @param length the length of read, it can be an unsigned long. * @return result buffer list, should NEVER retain any buffer. */ private List readSlice(ByteBuf current, long length) { ByteBuf buf = current; List results = new ArrayList<>(Math.max( - (int) Math.min((length / Envelopes.MAX_ENVELOPE_SIZE) + 2, Byte.MAX_VALUE), 10)); + (int) Math.min(Long.divideUnsigned(length, Envelopes.MAX_ENVELOPE_SIZE) + 2, Byte.MAX_VALUE), + 10 + )); long totalSize = 0; int bufReadable; // totalSize + bufReadable <= length - while (totalSize <= length - (bufReadable = buf.readableBytes())) { + while (Long.compareUnsigned(totalSize, length - (bufReadable = buf.readableBytes())) <= 0) { totalSize += bufReadable; // No need readSlice because currentBufIndex will be increment after List pushed. results.add(buf); buf = this.buffers[++this.currentBufIndex]; } - if (length > totalSize) { - // need bytes = length - `results` real length = length - (totalSize - `buf` length) + // totalSize < length + if (Long.compareUnsigned(length, totalSize) > 0) { + // need bytes = length - `results` length = length - totalSize + // length - totalSize should be an int due to while loop above. results.add(buf.readSlice((int) (length - totalSize))); } // else results has filled by prev buffer, and currentBufIndex is unread for now. diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/NormalFieldReader.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/NormalFieldReader.java index b65d30024..9379a0265 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/NormalFieldReader.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/NormalFieldReader.java @@ -105,6 +105,7 @@ public boolean release(int decrement) { } private static ByteBuf readVarIntSizedRetained(ByteBuf buf) { + // Normal field will NEVER be greater than Integer.MAX_VALUE. int size = (int) VarIntUtils.readVarInt(buf); if (size == 0) { // Use EmptyByteBuf, new buffer no need to be retained. diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/OkMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/OkMessage.java index f5c91a60e..52e3bce2f 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/OkMessage.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/OkMessage.java @@ -160,10 +160,11 @@ static OkMessage decode(ByteBuf buf, ConnectionContext context) { if (size > sizeAfterVarInt) { information = buf.toString(readerIndex, buf.writerIndex() - readerIndex, charset); } else { + // JVM does NOT support strings longer than Integer.MAX_VALUE information = buf.toString(buf.readerIndex(), (int) size, charset); } - // Ignore session track, it is not human readable and useless for R2DBC client. + // Ignore session track, it is not human-readable and useless for R2DBC client. return new OkMessage(affectedRows, lastInsertId, serverStatuses, warnings, information); } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java index f7c07f0bc..917d3485f 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ServerMessageDecoder.java @@ -90,11 +90,12 @@ private static ServerMessage decodeMessage(List buffers, int envelopeId ConnectionContext context, DecodeContext decodeContext) { if (decodeContext instanceof ResultDecodeContext) { return decodeResult(buffers, context, (ResultDecodeContext) decodeContext); - } else if (decodeContext instanceof FetchDecodeContext) { + } + if (decodeContext instanceof FetchDecodeContext) { return decodeFetch(buffers, context); } - ByteBuf combined = ByteBufCombiner.composite(buffers); + ByteBuf combined = NettyBufferUtils.composite(buffers); try { if (decodeContext instanceof CommandDecodeContext) { @@ -159,7 +160,7 @@ private static ServerMessage decodeResult(List buffers, ConnectionConte } if (decodeContext.isInMetadata()) { - ByteBuf combined = ByteBufCombiner.composite(buffers); + ByteBuf combined = NettyBufferUtils.composite(buffers); try { return decodeInMetadata(combined, header, context, decodeContext); } finally { @@ -309,7 +310,7 @@ private static ErrorMessage decodeCheckError(List buffers, short header // 0xFF is not header of var integer, // not header of text result null (0xFB) and // not header of column metadata (0x03 + "def") - ByteBuf combined = ByteBufCombiner.composite(buffers); + ByteBuf combined = NettyBufferUtils.composite(buffers); try { return ErrorMessage.decode(combined); } finally { @@ -329,7 +330,7 @@ private static ServerMessage decodeRow(List buffers, ByteBuf firstBuf, int byteSize = firstBuf.readableBytes(); if (OkMessage.isValidSize(byteSize)) { - ByteBuf combined = ByteBufCombiner.composite(buffers); + ByteBuf combined = NettyBufferUtils.composite(buffers); try { return OkMessage.decode(combined, context); @@ -337,7 +338,7 @@ private static ServerMessage decodeRow(List buffers, ByteBuf firstBuf, combined.release(); } } else if (EofMessage.isValidSize(byteSize)) { - ByteBuf combined = ByteBufCombiner.composite(buffers); + ByteBuf combined = NettyBufferUtils.composite(buffers); try { return EofMessage.decode(combined); diff --git a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java index ee6f93116..18eb12ff8 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java @@ -35,7 +35,7 @@ class ConnectionIntegrationTest extends IntegrationTestSupport { ConnectionIntegrationTest() { - super(configuration(false, null, null)); + super(configuration("r2dbc", false, false, null, null)); } @Test @@ -233,6 +233,16 @@ void setTransactionIsolationLevel() { .doOnNext(a -> a.isEqualTo(connection.getTransactionIsolationLevel())))); } + @Test + void errorPropagteRequestQueue() { + illegalArgument(connection -> Flux.merge( + connection.createStatement("SELECT 'Result 1', SLEEP(1)").execute(), + connection.createStatement("SELECT 'Result 2'").execute(), + connection.createStatement("SELECT 'Result 3'").execute() + ).flatMap(result -> result.map((row, meta) -> row.get(0, Integer.class))) + ); + } + @Test void batchCrud() { // TODO: spilt it to multiple test cases and move it to BatchIntegrationTest diff --git a/src/test/java/io/asyncer/r2dbc/mysql/InitDbIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/InitDbIntegrationTest.java new file mode 100644 index 000000000..afece8130 --- /dev/null +++ b/src/test/java/io/asyncer/r2dbc/mysql/InitDbIntegrationTest.java @@ -0,0 +1,35 @@ +package io.asyncer.r2dbc.mysql; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for {@code createDatabaseIfNotExist}. + */ +class InitDbIntegrationTest extends IntegrationTestSupport { + + private static final String DATABASE = "test-" + ThreadLocalRandom.current().nextInt(10000); + + InitDbIntegrationTest() { + super(configuration( + DATABASE, true, false, + null, null + )); + } + + @Test + void shouldCreateDatabase() { + complete(conn -> conn.createStatement("SHOW DATABASES") + .execute() + .flatMap(it -> it.map((row, rowMetadata) -> row.get(0, String.class))) + .collect(Collectors.toSet()) + .doOnNext(it -> assertThat(it).contains(DATABASE)) + .thenMany(conn.createStatement("DROP DATABASE `" + DATABASE + "`") + .execute() + .flatMap(MySqlResult::getRowsUpdated))); + } +} diff --git a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java index 52462bb33..26d4e7728 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java @@ -65,6 +65,10 @@ void badGrammar(Function> runner) { process(runner).verifyError(R2dbcBadGrammarException.class); } + void illegalArgument(Function> runner) { + process(runner).expectError(IllegalArgumentException.class).verify(Duration.ofSeconds(3)); + } + Mono create() { return connectionFactory.create(); } @@ -82,8 +86,10 @@ static Mono extractRowsUpdated(Result result) { return Mono.from(result.getRowsUpdated()); } - static MySqlConnectionConfiguration configuration(boolean autodetectExtensions, - @Nullable ZoneId serverZoneId, @Nullable Predicate preferPrepared) { + static MySqlConnectionConfiguration configuration( + String database, boolean createDatabaseIfNotExist, boolean autodetectExtensions, + @Nullable ZoneId serverZoneId, @Nullable Predicate preferPrepared + ) { String password = System.getProperty("test.mysql.password"); assertThat(password).withFailMessage("Property test.mysql.password must exists and not be empty") @@ -95,8 +101,8 @@ static MySqlConnectionConfiguration configuration(boolean autodetectExtensions, .connectTimeout(Duration.ofSeconds(3)) .user("root") .password(password) - .database("r2dbc") - .port(container.getMappedPort(MySQLContainer.MYSQL_PORT)) + .database(database) + .createDatabaseIfNotExist(createDatabaseIfNotExist) .autodetectExtensions(autodetectExtensions); if (serverZoneId != null) { diff --git a/src/test/java/io/asyncer/r2dbc/mysql/JacksonPrepareIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/JacksonPrepareIntegrationTest.java index 1d65fe50a..7205a82bf 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/JacksonPrepareIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/JacksonPrepareIntegrationTest.java @@ -22,7 +22,7 @@ class JacksonPrepareIntegrationTest extends JacksonIntegrationTestSupport { JacksonPrepareIntegrationTest() { - super(configuration(true, null, sql -> false)); + super(configuration("r2dbc", false, true, null, sql -> false)); } } diff --git a/src/test/java/io/asyncer/r2dbc/mysql/JacksonTextIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/JacksonTextIntegrationTest.java index 7aca5eba3..6d666e520 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/JacksonTextIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/JacksonTextIntegrationTest.java @@ -22,6 +22,6 @@ class JacksonTextIntegrationTest extends JacksonIntegrationTestSupport { JacksonTextIntegrationTest() { - super(configuration(true, null, null)); + super(configuration("r2dbc", false, true, null, null)); } } diff --git a/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java index e856e41f1..497f4bacf 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java @@ -25,6 +25,8 @@ import org.assertj.core.api.ThrowableTypeAssert; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import java.time.Duration; import java.time.ZoneId; @@ -189,6 +191,21 @@ void nonAutodetectExtensions() { assertThat(list).isEmpty(); } + @Test + void validPasswordSupplier() { + final Mono passwordSupplier = Mono.just("123456"); + Mono.from(MySqlConnectionConfiguration.builder() + .host(HOST) + .user(USER) + .passwordPublisher(passwordSupplier) + .autodetectExtensions(false) + .build() + .getPasswordPublisher()) + .as(StepVerifier::create) + .expectNext("123456") + .verifyComplete(); + } + private static MySqlConnectionConfiguration unixSocketSslMode(SslMode sslMode) { return MySqlConnectionConfiguration.builder() .unixSocket(UNIX_SOCKET) @@ -213,6 +230,7 @@ private static MySqlConnectionConfiguration filledUp() { .port(3306) .password("database-password-in-here") .database("r2dbc") + .createDatabaseIfNotExist(true) .tcpKeepAlive(true) .tcpNoDelay(true) .connectTimeout(Duration.ofSeconds(3)) diff --git a/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java b/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java index 704699ec3..8343782f3 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java @@ -24,6 +24,8 @@ import io.r2dbc.spi.Option; import org.assertj.core.api.Assert; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLSession; @@ -35,6 +37,7 @@ import java.util.function.Function; import java.util.function.Predicate; +import static io.asyncer.r2dbc.mysql.MySqlConnectionFactoryProvider.PASSWORD_PUBLISHER; import static io.asyncer.r2dbc.mysql.MySqlConnectionFactoryProvider.USE_SERVER_PREPARE_STATEMENT; import static io.r2dbc.spi.ConnectionFactoryOptions.CONNECT_TIMEOUT; import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE; @@ -274,6 +277,7 @@ void validProgrammaticUnixSocket() { .option(SSL, true) .option(Option.valueOf(CONNECT_TIMEOUT.name()), Duration.ofSeconds(3).toString()) .option(DATABASE, "r2dbc") + .option(Option.valueOf("createDatabaseIfNotExist"), true) .option(Option.valueOf("serverZoneId"), "Asia/Tokyo") .option(Option.valueOf("useServerPrepareStatement"), AllTruePredicate.class.getName()) .option(Option.valueOf("zeroDate"), "use_round") @@ -296,6 +300,7 @@ void validProgrammaticUnixSocket() { assertThat(configuration.getPassword()).isEqualTo("123456"); assertThat(configuration.getConnectTimeout()).isEqualTo(Duration.ofSeconds(3)); assertThat(configuration.getDatabase()).isEqualTo("r2dbc"); + assertThat(configuration.isCreateDatabaseIfNotExist()).isTrue(); assertThat(configuration.getZeroDateOption()).isEqualTo(ZeroDateOption.USE_ROUND); assertThat(configuration.isTcpKeepAlive()).isTrue(); assertThat(configuration.isTcpNoDelay()).isTrue(); @@ -390,6 +395,20 @@ void invalidServerPreparing() { .option(USE_SERVER_PREPARE_STATEMENT, NotPredicate.class.getPackage() + "NonePredicate") .build())); } + + @Test + void validPasswordSupplier() { + final Publisher passwordSupplier = Mono.just("123456"); + ConnectionFactoryOptions options = ConnectionFactoryOptions.builder() + .option(DRIVER, "mysql") + .option(HOST, "127.0.0.1") + .option(USER, "root") + .option(PASSWORD_PUBLISHER, passwordSupplier) + .build(); + + assertThat(ConnectionFactories.get(options)).isExactlyInstanceOf(MySqlConnectionFactory.class); + } + } final class MockException extends RuntimeException { diff --git a/src/test/java/io/asyncer/r2dbc/mysql/MySqlPrepareTestKit.java b/src/test/java/io/asyncer/r2dbc/mysql/MySqlPrepareTestKit.java index b9cc1f716..4a8878b92 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/MySqlPrepareTestKit.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/MySqlPrepareTestKit.java @@ -22,7 +22,7 @@ class MySqlPrepareTestKit extends MySqlTestKitSupport { MySqlPrepareTestKit() { - super(IntegrationTestSupport.configuration(false, null, sql -> true)); + super(IntegrationTestSupport.configuration("r2dbc", false, false, null, sql -> true)); } @Override diff --git a/src/test/java/io/asyncer/r2dbc/mysql/MySqlTextTestKit.java b/src/test/java/io/asyncer/r2dbc/mysql/MySqlTextTestKit.java index efd685557..04c32c719 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/MySqlTextTestKit.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/MySqlTextTestKit.java @@ -22,6 +22,6 @@ class MySqlTextTestKit extends MySqlTestKitSupport { MySqlTextTestKit() { - super(IntegrationTestSupport.configuration(false, null, null)); + super(IntegrationTestSupport.configuration("r2dbc", false, false, null, null)); } } diff --git a/src/test/java/io/asyncer/r2dbc/mysql/PrepareQueryIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/PrepareQueryIntegrationTest.java index 33f4c2ee2..45d5a94d7 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/PrepareQueryIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/PrepareQueryIntegrationTest.java @@ -29,7 +29,7 @@ class PrepareQueryIntegrationTest extends QueryIntegrationTestSupport { PrepareQueryIntegrationTest() { - super(configuration(false, null, sql -> true)); + super(configuration("r2dbc", false, false, null, sql -> true)); } @Test diff --git a/src/test/java/io/asyncer/r2dbc/mysql/TextQueryIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/TextQueryIntegrationTest.java index c4032fab3..a4e7152b7 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/TextQueryIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/TextQueryIntegrationTest.java @@ -22,6 +22,6 @@ class TextQueryIntegrationTest extends QueryIntegrationTestSupport { TextQueryIntegrationTest() { - super(configuration(false, null, null)); + super(configuration("r2dbc", false, false, null, null)); } } diff --git a/src/test/java/io/asyncer/r2dbc/mysql/TimeZoneIntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/TimeZoneIntegrationTestSupport.java index 42e7dce1d..5bfb769f1 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/TimeZoneIntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/TimeZoneIntegrationTestSupport.java @@ -66,7 +66,7 @@ abstract class TimeZoneIntegrationTestSupport extends IntegrationTestSupport { } TimeZoneIntegrationTestSupport(@Nullable Predicate preferPrepared) { - super(configuration(false, SERVER_ZONE, preferPrepared)); + super(configuration("r2dbc", false, false, SERVER_ZONE, preferPrepared)); } @Test