Skip to content

Commit

Permalink
Merge pull request #507 from AbsaOSS/feature/merge-support-branch
Browse files Browse the repository at this point in the history
Feature/merge support branch
  • Loading branch information
yruslan authored Oct 29, 2024
2 parents 23171be + e0da69f commit a8e7a64
Show file tree
Hide file tree
Showing 18 changed files with 215 additions and 97 deletions.
36 changes: 18 additions & 18 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,42 @@ jobs:
strategy:
matrix:
python-version: [ "3.10" ]
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
name: Test Pramen-Py
steps:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.branch }}

- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: setup poetry
uses: abatilo/[email protected]
with:
poetry-version: 1.4.2

- name: install dependencies
working-directory: "./pramen-py"
run: make --silent install

- name: test
working-directory: "./pramen-py"
env:
ENV: ci
run: make --silent test

release-python:
needs: [ "test-python" ]
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
name: Release Python artifact
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.branch }}

- name: Prepare the release branch
id: release_branch1
working-directory: "./pramen"
Expand All @@ -74,7 +74,7 @@ jobs:
git config --global user.name "CI/CD bot"
git checkout -b release/$VERSION
git push --set-upstream origin release/$VERSION
- name: Update version number
id: release_branch
working-directory: "./pramen-py"
Expand All @@ -88,35 +88,35 @@ jobs:
git commit -m "Update version number to $VERSION"
git push
fi
- name: install project dependencies
run: |
sudo apt install -y --no-install-recommends \
libssl-dev \
make
- uses: actions/setup-python@v4
with:
python-version: "3.10"

- uses: abatilo/[email protected]
with:
poetry-version: 1.4.2

- name: Install dependencies
working-directory: "./pramen-py"
run: poetry install --no-interaction --no-root

- name: build and publish the wheel to jfrog
working-directory: "./pramen-py"
env:
ENV: pypi
PRAMENPY_PYPI_TOKEN: ${{ secrets.PRAMENPY_PYPI_TOKEN }}
run: make --silent publish

release-sbt:
needs: [ "release-python" ]
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
name: Release Scala artifacts
steps:
- name: Checkout code
Expand Down Expand Up @@ -158,7 +158,7 @@ jobs:

create-pr:
needs: [ "release-sbt" ]
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
name: Create Pull Request

steps:
Expand All @@ -181,6 +181,6 @@ jobs:
- name: Create Pull Request
run: gh pr create -B "$BASE" -H "release/$VERSION" --title "Release Pramen v$VERSION" --body 'Created by Github action'
env:
BASE: ${{ github.event.inputs.branch }}
BASE: ${{ github.head_ref || github.ref_name }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
VERSION: ${{ steps.release_branch3.outputs.VERSION }}
12 changes: 1 addition & 11 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ on:

jobs:
build-sbt:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -46,16 +46,6 @@ jobs:
distribution: temurin
java-version: 8
cache: sbt
- name: Install sbt
run: |
sudo apt-get update
sudo apt-get install apt-transport-https curl gnupg -yqq
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import
sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg
sudo apt-get update
sudo apt-get install sbt
- name: Build and run unit tests
working-directory: ./pramen
run: sbt ++${{matrix.scala}} unit:test -DSPARK_VERSION=${{matrix.spark}}
Expand Down
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2504,6 +2504,27 @@ You can use any source/sink combination in transfer jobs.

We describe here a more complicated use cases.

### Dynamically changing Spark Application description
You can set up a template for Spark Application, and it will be set dynamically each time a new job is executing.

Example configuration:
```hocon
pramen.job.description.template = "Pramen - running @pipeline, job @jobName for @infoDate"
```

These variables are available:

| Variable | Description |
|--------------|-------------------------------------------------------------------------------|
| @pipeline | The name of the pipeline (if defined at `pramen.pipeline.name`). |
| @tenant | The name of the tenant (if defined at `pramen.environment.name`). |
| @environment | The environment (if defined at `pramen.tenant`). |
| @jobName | The name of the job as defined in the operation definition. |
| @infoDate | The information date the job is running for. |
| @outputTable | The output metastore table of the job. |
| @dryRun | Adds `(DRY RUN)` when running in the dry run mode, am empty string otherwise. |


### Startup and shutdown hooks

Startup and shutdown hooks allow running custom code before and after the pipeline runs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ case class RuntimeConfig(
parallelTasks: Int,
stopSparkSession: Boolean,
allowEmptyPipeline: Boolean,
historicalRunMode: RunMode
historicalRunMode: RunMode,
sparkAppDescriptionTemplate: Option[String]
)

object RuntimeConfig {
Expand All @@ -66,6 +67,7 @@ object RuntimeConfig {
val STOP_SPARK_SESSION = "pramen.stop.spark.session"
val VERBOSE = "pramen.verbose"
val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline"
val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template"

def fromConfig(conf: Config): RuntimeConfig = {
val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP)
Expand Down Expand Up @@ -128,6 +130,7 @@ object RuntimeConfig {
}

val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false)
val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE)

RuntimeConfig(
isDryRun = isDryRun,
Expand All @@ -144,7 +147,8 @@ object RuntimeConfig {
parallelTasks = parallelTasks,
stopSparkSession = conf.getBoolean(STOP_SPARK_SESSION),
allowEmptyPipeline,
runMode
runMode,
sparkAppDescriptionTemplate
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
val errorMessage = ex.getMessage

val errorMessageTruncated = maxReasonLength match {
case Some(maxLength) if errorMessage.length > maxLength => errorMessage.substring(0, maxLength) + "..."
case _ => errorMessage
case Some(maxLength) if errorMessage.length > maxLength => StringUtils.escapeHTML(errorMessage.substring(0, maxLength)) + "..."
case _ => StringUtils.escapeHTML(errorMessage)
}

paragraphBuilder
Expand All @@ -333,7 +333,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
val stderrMsg = if (stderr.isEmpty) "" else s"""Last <b>stderr</b> lines:\n${stderr.mkString("", EOL, EOL)}"""
s"$msg\n$stdoutMsg\n$stderrMsg"
case ex: Throwable =>
renderThrowable(ex, maximumLength = maxExceptionLength)
renderThrowable(ex, maximumLength = maxExceptionLength, escapeHTML = true)
}

builder.withUnformattedText(text)
Expand Down Expand Up @@ -648,10 +648,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}

maxReasonLength match {
StringUtils.escapeHTML(maxReasonLength match {
case Some(maxLength) if reason.length > maxLength => reason.substring(0, maxLength) + "..."
case _ => reason
}
})
}

private[core] def getFinishTime(task: TaskResult): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.pramen.core.runner.task

import com.typesafe.config.Config
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.lit
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
Expand All @@ -31,12 +31,13 @@ import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.pipeline.JobPreRunStatus._
import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY}
import za.co.absa.pramen.core.pipeline._
import za.co.absa.pramen.core.state.PipelineState
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.SparkUtils._
import za.co.absa.pramen.core.utils.hive.HiveHelper
import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils}
import za.co.absa.pramen.core.utils.{ConfigUtils, ThreadUtils, TimeUtils}

import java.sql.Date
import java.time.{Instant, LocalDate}
Expand All @@ -53,6 +54,8 @@ abstract class TaskRunnerBase(conf: Config,
runtimeConfig: RuntimeConfig,
pipelineState: PipelineState,
applicationId: String) extends TaskRunner {
import TaskRunnerBase._

implicit private val ecDefault: ExecutionContext = ExecutionContext.global
implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay)

Expand Down Expand Up @@ -118,6 +121,13 @@ abstract class TaskRunnerBase(conf: Config,
/** Runs a task in the single thread. Performs all task logging and notification sending activities. */
protected def runTask(task: Task): RunStatus = {
val started = Instant.now()

runtimeConfig.sparkAppDescriptionTemplate.foreach { template =>
val description = applyAppDescriptionTemplate(template, task, runtimeConfig, conf)
val spark = SparkSession.builder().getOrCreate()
spark.sparkContext.setJobDescription(description)
}

task.job.operation.killMaxExecutionTimeSeconds match {
case Some(timeout) if timeout > 0 =>
@volatile var runStatus: RunStatus = null
Expand Down Expand Up @@ -606,3 +616,23 @@ abstract class TaskRunnerBase(conf: Config,
}
}
}

object TaskRunnerBase {
def applyAppDescriptionTemplate(template: String, task: Task, runtimeConfig: RuntimeConfig, conf: Config): String = {
val job = task.job
val pipelineName = conf.getString(PIPELINE_NAME_KEY)
val environmentName = ConfigUtils.getOptionString(conf, ENVIRONMENT_NAME).getOrElse("UNKNOWN")
val tenant = ConfigUtils.getOptionString(conf, TENANT_KEY).getOrElse("UNKNOWN")
val dryRun = if (runtimeConfig.isDryRun) "(DRY RUN)" else ""

template.replaceAll("@jobName", job.name)
.replaceAll("@infoDate", task.infoDate.toString)
.replaceAll("@metastoreTable", job.outputTable.name)
.replaceAll("@outputTable", job.outputTable.name)
.replaceAll("@table", job.outputTable.name)
.replaceAll("@pipeline", pipelineName)
.replaceAll("@tenant", tenant)
.replaceAll("@environment", environmentName)
.replaceAll("@dryRun", dryRun)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.FatalErrorWrapper
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.pipeline.Task
import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY}
import za.co.absa.pramen.core.pipeline.{Job, Task}
import za.co.absa.pramen.core.state.PipelineState
import za.co.absa.pramen.core.utils.Emoji
import za.co.absa.pramen.core.utils.{ConfigUtils, Emoji}

import java.util.concurrent.Executors.newFixedThreadPool
import java.util.concurrent.{ExecutorService, Semaphore}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ object StringUtils {
}

/** Renders an exception as a string */
def renderThrowable(ex: Throwable, level: Int = 1, maximumLength: Option[Int] = None): String = {
def renderThrowable(ex: Throwable, level: Int = 1, maximumLength: Option[Int] = None, escapeHTML: Boolean = false): String = {
val prefix = " " * (level * 2)
val base = s"""${ex.toString}\n${ex.getStackTrace.map(s => s"$prefix$s").mkString("", EOL, EOL)}"""
val errMsg = if (escapeHTML) StringUtils.escapeHTML(ex.toString) else ex.toString
val base = s"""$errMsg\n${ex.getStackTrace.map(s => s"$prefix$s").mkString("", EOL, EOL)}"""
val cause = Option(ex.getCause) match {
case Some(c) if level < 6 => s"\n${prefix}Caused by " + renderThrowable(c, level + 1)
case _ => ""
Expand Down Expand Up @@ -346,4 +347,26 @@ object StringUtils {
output.toString()
}

/**
* Escapes HTML tags and symbols from a string.
* Based on https://stackoverflow.com/a/25228492/1038282
*
* @param s A string to escape HTML from.
* @return An escaped string.
*/
def escapeHTML(s: String): String = {
val out = new StringBuilder(Math.max(64, s.length))
var i = 0
while (i < s.length) {
val c = s.charAt(i)
if (c == '<' || c == '>' || c == '&') {
out.append("&#")
out.append(c.toInt)
out.append(';')
}
else out.append(c)
i += 1
}
out.toString
}
}
Loading

0 comments on commit a8e7a64

Please sign in to comment.