Skip to content

Commit

Permalink
Merge pull request #40 from WeDataSphere/dev-0.2.0-version-compatible
Browse files Browse the repository at this point in the history
Version compatible before launching task to Linkis server
  • Loading branch information
wushengyeyouya authored Jul 4, 2022
2 parents 97b9e59 + e6af42a commit 929f96c
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ object JobLauncherConfiguration {
*/
val FLINK_CHECKPOINT_PATH: CommonVars[String] = CommonVars("wds.streamis.launch.flink.checkpoint.dir", "/flink/flink-checkpoints")

/**
* Linkis release version
*/
val FLINK_LINKIS_RELEASE_VERSION: CommonVars[String] = CommonVars("wds.streamis.launch.flink.linkis.release.version", "")
/**
* Variable: savepoint path
*/
Expand All @@ -44,4 +48,5 @@ object JobLauncherConfiguration {
* Variable: flink app
*/
val VAR_FLINK_APP_NAME: CommonVars[String] = CommonVars("wds.streamis.launch.variable.flink.app.name", "flink.app.name")

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ trait FlinkJobLaunchManager extends LinkisJobLaunchManager with Logging {
* @param jobState job state used to launch
* @return the job id.
*/
override def launch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo] = {
override def innerLaunch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo] = {
// Transform the JobState into the params in LaunchJob
Option(jobState).foreach(state => {
val startUpParams = TaskUtils.getStartupMap(job.getParams)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,97 @@
package com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager

import com.webank.wedatasphere.streamis.jobmanager.launcher.job.manager.JobLaunchManager
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.state.JobState
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.{JobClient, LaunchJob}
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.conf.JobLauncherConfiguration
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.LinkisJobInfo
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager.LinkisJobLaunchManager.LINKIS_JAR_VERSION_PATTERN
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.computation.client.LinkisJob
import org.apache.linkis.protocol.utils.TaskUtils

trait LinkisJobLaunchManager extends JobLaunchManager[LinkisJobInfo] {
import java.util
import scala.collection.JavaConverters._
import scala.util.matching.Regex

trait LinkisJobLaunchManager extends JobLaunchManager[LinkisJobInfo] with Logging{
/**
* This method is used to launch a new job.
*
* @param job a StreamisJob wanted to be launched.
* @param jobState job state used to launch
* @return the job id.
*/
override def launch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo] = {
// Support different version of Linkis
var linkisVersion = JobLauncherConfiguration.FLINK_LINKIS_RELEASE_VERSION.getValue
if (StringUtils.isBlank(linkisVersion)) {
val linkisJarPath = classOf[LinkisJob].getProtectionDomain.getCodeSource.getLocation.getPath;
val lastSplit = linkisJarPath.lastIndexOf(IOUtils.DIR_SEPARATOR);
if (lastSplit >= 0) {
linkisVersion = linkisJarPath.substring(lastSplit + 1)
}
}
if (StringUtils.isNotBlank(linkisVersion)) {
Utils.tryAndWarn {
val LINKIS_JAR_VERSION_PATTERN(version) = linkisVersion
linkisVersion = version
}
}
if (StringUtils.isNotBlank(linkisVersion)){
val versionSplitter: Array[String] = linkisVersion.split("\\.")
val major = Integer.valueOf(versionSplitter(0))
val sub = Integer.valueOf(versionSplitter(1))
val fix = Integer.valueOf(versionSplitter(2))
val versionNum = major * 10000 + sub * 100 + fix
info(s"Recognized the linkis release version: [${linkisVersion}, version number: [${versionNum}]")
if (versionNum <= 10101){
warn("Linkis version number is less than [10101], should compatible the startup params in launcher.")
val startupParams = TaskUtils.getStartupMap(job.getParams)
// Change the unit of memory params for linkis older version
changeUnitOfMemoryToG(startupParams, "flink.taskmanager.memory")
changeUnitOfMemoryToG(startupParams, "flink.jobmanager.memory")
// Avoid the _FLINK_CONFIG_. prefix for linkis older version
val newParams = avoidParamsPrefix(startupParams, "_FLINK_CONFIG_.")
startupParams.clear();
startupParams.putAll(newParams)
}
}
innerLaunch(job, jobState)
}

private def changeUnitOfMemoryToG(params: util.Map[String, Any], name: String): Unit = {
params.get(name) match {
case memory: String =>
var actualMem = Integer.valueOf(memory) / 1024
actualMem = if (actualMem <= 0) 1 else actualMem
info(s"Change the unit of startup param: [${name}], value [${memory}] => [${actualMem}]")
params.put(name, actualMem)
case _ => // Ignores
}
}

/**
* Avoid params prefix
* @param params params
* @param prefix prefix
*/
private def avoidParamsPrefix(params: util.Map[String, Any], prefix: String): util.Map[String, Any] = {
params.asScala.map{
case (key, value) =>
if (key.startsWith(prefix)){
info(s"Avoid the prefix of startup param: [${key}] => [${key.substring(prefix.length)}]")
(key.substring(prefix.length), value)
} else {
(key, value)
}
}.toMap.asJava
}
def innerLaunch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo]
}

object LinkisJobLaunchManager{
val LINKIS_JAR_VERSION_PATTERN: Regex = "^[\\s\\S]*([\\d]+\\.[\\d]+\\.[\\d]+)[\\s\\S]*$".r
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,4 @@ class SimpleFlinkJobLaunchManager extends FlinkJobLaunchManager {
object SimpleFlinkJobLaunchManager{

val INSTANCE_NAME = "simpleFlink";

}
127 changes: 65 additions & 62 deletions streamis-server/src/main/resources/linkis.properties
Original file line number Diff line number Diff line change
@@ -1,62 +1,65 @@
#
# Copyright 2021 WeBank
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#wds.linkis.test.mode=true
wds.linkis.server.mybatis.datasource.url=jdbc:mysql://localhost:3306/streamis?characterEncoding=UTF-8
wds.linkis.server.mybatis.datasource.username=user1

wds.linkis.server.mybatis.datasource.password=pwd1
wds.linkis.gateway.ip=
wds.linkis.gateway.port=
wds.linkis.gateway.url=http://localhost:9001

wds.linkis.mysql.is.encrypt=false
##restful
wds.linkis.log.clear=true
wds.linkis.server.version=v1
#wds.linkis.test.user=user1



##restful
wds.linkis.server.restful.scan.packages=com.webank.wedatasphere.streamis.datasource.server.restful.api,\
com.webank.wedatasphere.streamis.project.server.restful,\
com.webank.wedatasphere.streamis.jobmanager.restful.api,\
com.webank.wedatasphere.streamis.datasource.execute.rest,\
com.webank.wedatasphere.streamis.projectmanager.restful.api
##mybatis
wds.linkis.server.mybatis.mapperLocations=\
classpath*:com/webank/wedatasphere/streamis/datasource/manager/dao/impl/*.xml,\
classpath*:com/webank/wedatasphere/streamis/project/server/dao/impl/*.xml,\
classpath*:com/webank/wedatasphere/streamis/jobmanager/launcher/dao/impl/*.xml,\
classpath*:com/webank/wedatasphere/streamis/jobmanager/manager/dao/impl/*.xml,\
classpath*:com/webank/wedatasphere/streamis/projectmanager/dao/impl/*.xml

wds.linkis.server.mybatis.typeAliasesPackage=com.webank.wedatasphere.streamis.datasource.manager.domain,\
com.webank.wedatasphere.streamis.jobmanager.launcher.entity,\
com.webank.wedatasphere.streamis.jobmanager.manager.entity,\
com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo,\
com.webank.wedatasphere.streamis.jobmanager.launcher.entity.vo,\
com.webank.wedatasphere.streamis.projectmanager.entity


wds.linkis.server.mybatis.BasePackage=com.webank.wedatasphere.streamis.datasource.manager.dao,\
org.apache.linkis.bml.dao,\
com.webank.wedatasphere.streamis.project.server.dao,\
com.webank.wedatasphere.streamis.jobmanager.launcher.dao,\
com.webank.wedatasphere.streamis.jobmanager.manager.dao,\
com.webank.wedatasphere.streamis.projectmanager.dao

# Make sure that can fetch the application info finally
wds.streamis.application.info.fetch.max=20
#
# Copyright 2021 WeBank
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#wds.linkis.test.mode=true
wds.linkis.server.mybatis.datasource.url=jdbc:mysql://localhost:3306/streamis?characterEncoding=UTF-8
wds.linkis.server.mybatis.datasource.username=user1

wds.linkis.server.mybatis.datasource.password=pwd1
wds.linkis.gateway.ip=
wds.linkis.gateway.port=
wds.linkis.gateway.url=http://localhost:9001

wds.linkis.mysql.is.encrypt=false
##restful
wds.linkis.log.clear=true
wds.linkis.server.version=v1
#wds.linkis.test.user=user1



##restful
wds.linkis.server.restful.scan.packages=com.webank.wedatasphere.streamis.datasource.server.restful.api,\
com.webank.wedatasphere.streamis.project.server.restful,\
com.webank.wedatasphere.streamis.jobmanager.restful.api,\
com.webank.wedatasphere.streamis.datasource.execute.rest,\
com.webank.wedatasphere.streamis.projectmanager.restful.api
##mybatis
wds.linkis.server.mybatis.mapperLocations=\
classpath*:com/webank/wedatasphere/streamis/datasource/manager/dao/impl/*.xml,\
classpath*:com/webank/wedatasphere/streamis/project/server/dao/impl/*.xml,\
classpath*:com/webank/wedatasphere/streamis/jobmanager/launcher/dao/impl/*.xml,\
classpath*:com/webank/wedatasphere/streamis/jobmanager/manager/dao/impl/*.xml,\
classpath*:com/webank/wedatasphere/streamis/projectmanager/dao/impl/*.xml

wds.linkis.server.mybatis.typeAliasesPackage=com.webank.wedatasphere.streamis.datasource.manager.domain,\
com.webank.wedatasphere.streamis.jobmanager.launcher.entity,\
com.webank.wedatasphere.streamis.jobmanager.manager.entity,\
com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo,\
com.webank.wedatasphere.streamis.jobmanager.launcher.entity.vo,\
com.webank.wedatasphere.streamis.projectmanager.entity


wds.linkis.server.mybatis.BasePackage=com.webank.wedatasphere.streamis.datasource.manager.dao,\
org.apache.linkis.bml.dao,\
com.webank.wedatasphere.streamis.project.server.dao,\
com.webank.wedatasphere.streamis.jobmanager.launcher.dao,\
com.webank.wedatasphere.streamis.jobmanager.manager.dao,\
com.webank.wedatasphere.streamis.projectmanager.dao

# Make sure that can fetch the application info finally
wds.streamis.application.info.fetch.max=20

# To use the complete features of streamis in linkis 1.1.2
#wds.streamis.launch.flink.linkis.release.version=1.1.2

0 comments on commit 929f96c

Please sign in to comment.