Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement custom notification signatures for email notifications. #277

Merged
merged 2 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.pramen.api

import org.apache.spark.sql.DataFrame
import za.co.absa.pramen.api.notification.{NotificationEntry, Style}
import za.co.absa.pramen.api.notification.{NotificationEntry, Style, TextElement}

/**
* Pramen provides an instance of notification builder to custom sources, transformers and sinks so that
Expand Down Expand Up @@ -56,4 +56,7 @@ trait NotificationBuilder {
descriptionStyle: Style = Style.Normal,
maxRecords: Int = 200,
align: Option[Seq[Char]] = None): Unit

/** Sets a custom notification signature at runtime. Can be used from the custom startup/shutdown hook. */
def setSignature(text: TextElement*): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.pramen.core.app.config

import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.utils.ClassLoaderUtils

case class HookConfig(
Expand All @@ -28,16 +29,24 @@ object HookConfig {
val STARTUP_HOOK_CLASS_KEY = "pramen.hook.startup.class"
val SHUTDOWN_HOOK_CLASS_KEY = "pramen.hook.shutdown.class"

private val log = LoggerFactory.getLogger(this.getClass)

def fromConfig(conf: Config): HookConfig = {
val startupHook = if (conf.hasPath(STARTUP_HOOK_CLASS_KEY) && conf.getString(STARTUP_HOOK_CLASS_KEY).nonEmpty) {
Option(ClassLoaderUtils.loadConfigurableClass[Runnable](conf.getString(STARTUP_HOOK_CLASS_KEY), conf))
val hookClass = conf.getString(STARTUP_HOOK_CLASS_KEY)
log.info(s"Loading the startup hook class '$hookClass'")
Option(ClassLoaderUtils.loadConfigurableClass[Runnable](hookClass, conf))
} else {
log.info(s"Startup hook is not defined.")
None
}

val shutdownHook = if (conf.hasPath(SHUTDOWN_HOOK_CLASS_KEY) && conf.getString(SHUTDOWN_HOOK_CLASS_KEY).nonEmpty) {
Option(ClassLoaderUtils.loadConfigurableClass[Runnable](conf.getString(SHUTDOWN_HOOK_CLASS_KEY), conf))
val hookClass = conf.getString(SHUTDOWN_HOOK_CLASS_KEY)
log.info(s"Loading the shutdown hook class '$hookClass'")
Option(ClassLoaderUtils.loadConfigurableClass[Runnable](hookClass, conf))
} else {
log.info(s"Shutdown hook is not defined.")
None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.core.notify.pipeline

import za.co.absa.pramen.api.notification.NotificationEntry
import za.co.absa.pramen.api.notification.{NotificationEntry, TextElement}
import za.co.absa.pramen.core.runner.task.TaskResult

import java.time.Instant
Expand All @@ -28,5 +28,6 @@ case class PipelineNotification(
started: Instant,
finished: Instant,
tasksCompleted: List[TaskResult],
customEntries: List[NotificationEntry]
customEntries: List[NotificationEntry],
customSignature: List[TextElement]
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.core.notify.pipeline

import za.co.absa.pramen.api.notification.NotificationEntry
import za.co.absa.pramen.api.notification.{NotificationEntry, TextElement}
import za.co.absa.pramen.core.runner.task.TaskResult

import java.time.Instant
Expand All @@ -39,4 +39,6 @@ trait PipelineNotificationBuilder {
def addCompletedTask(completedTask: TaskResult): Unit

def addCustomEntries(entries: Seq[NotificationEntry]): Unit

def addSignature(signature: TextElement*): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
var appFinished: Instant = Instant.now()
var isDryRun = false
var isUndercover = false
var customSignature = Seq.empty[TextElement]

val completedTasks = new ListBuffer[TaskResult]
val customEntries = new ListBuffer[NotificationEntry]
Expand Down Expand Up @@ -91,10 +92,14 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
this.goodRps = goodRps
}

def addCompletedTask(completedTask: TaskResult): Unit = {
override def addCompletedTask(completedTask: TaskResult): Unit = {
completedTasks += completedTask
}

override def addCustomEntries(entries: Seq[NotificationEntry]): Unit = customEntries ++= entries

override def addSignature(signature: TextElement*): Unit = customSignature = signature

def renderSubject(): String = {
val timeCreatedStr = ZonedDateTime.now(zoneId).format(timestampFmt)
val (someTasksSucceeded, someTasksFailed) = getSuccessFlags
Expand Down Expand Up @@ -140,17 +145,12 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
renderNotificationTargetErrors(builder, notificationTargetErrors)
}

builder.withRawParagraph(
s"""Regards,<br>
|Pramen<br>
|version ${BuildPropertyUtils.instance.getFullVersion}
|""".stripMargin
)
renderSignature(builder)

builder.renderBody
}

private def renderHeader(builder: MessageBuilder): MessageBuilder = {
private[core] def renderHeader(builder: MessageBuilder): MessageBuilder = {
val introParagraph = ParagraphBuilder()

if (isDryRun)
Expand Down Expand Up @@ -210,21 +210,21 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
builder
}

private def getSuccessFlags: (Boolean, Boolean) = {
private[core] def getSuccessFlags: (Boolean, Boolean) = {
val hasNotificationFailures = completedTasks.exists(t => t.notificationTargetErrors.nonEmpty)
val someTasksSucceeded = completedTasks.exists(_.runStatus.isInstanceOf[Succeeded]) && appException.isEmpty
val someTasksFailed = completedTasks.exists(t => t.runStatus.isFailure) || hasNotificationFailures || appException.nonEmpty
(someTasksSucceeded, someTasksFailed)
}

private def getZoneId: ZoneId = {
private[core] def getZoneId: ZoneId = {
ConfigUtils.getOptionString(conf, TIMEZONE) match {
case Some(tz) => ZoneId.of(tz)
case None => ZoneId.systemDefault()
}
}

private def renderJobException(builder: MessageBuilder, taskResult: TaskResult, ex: Throwable): MessageBuilder = {
private[core] def renderJobException(builder: MessageBuilder, taskResult: TaskResult, ex: Throwable): MessageBuilder = {
val paragraphBuilder = ParagraphBuilder()
.withText("Job ", Style.Exception)
.withText(taskResult.job.name, Style.Error)
Expand All @@ -245,7 +245,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
renderException(builder, ex)
}

private def renderException(builder: MessageBuilder, ex: Throwable): MessageBuilder = {
private[core] def renderException(builder: MessageBuilder, ex: Throwable): MessageBuilder = {
val text = ex match {
case CmdFailedException(msg, logLines) =>
if (logLines.isEmpty) {
Expand Down Expand Up @@ -287,7 +287,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
builder
}

private def renderTaskTable(builder: MessageBuilder, tasks: Seq[TaskResult]): MessageBuilder = {
private[core] def renderTaskTable(builder: MessageBuilder, tasks: Seq[TaskResult]): MessageBuilder = {
val outputRecordsKnown = tasks.exists(t => t.runStatus match {
case _: Succeeded => true
case _ => false
Expand Down Expand Up @@ -363,7 +363,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
builder.withTable(tableBuilder)
}

def renderNotificationTargetErrors(builder: MessageBuilderHtml, notificationTargetErrors: ListBuffer[NotificationFailure]): MessageBuilder = {
private[core] def renderNotificationTargetErrors(builder: MessageBuilderHtml, notificationTargetErrors: ListBuffer[NotificationFailure]): MessageBuilder = {
val tableBuilder = new TableBuilderHtml

val tableHeaders = new ListBuffer[TableHeader]
Expand Down Expand Up @@ -392,7 +392,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
builder.withTable(tableBuilder)
}

private def renderFilesRead(builder: MessageBuilder, task: TaskResult, runStatus: RunStatus.Succeeded): MessageBuilder = {
private[core] def renderFilesRead(builder: MessageBuilder, task: TaskResult, runStatus: RunStatus.Succeeded): MessageBuilder = {
val tableBuilder = new TableBuilderHtml

val tableHeaders = new ListBuffer[TableHeader]
Expand All @@ -414,7 +414,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
builder.withTable(tableBuilder)
}

private def getThroughputRps(task: TaskResult): TextElement = {
private[core] def getThroughputRps(task: TaskResult): TextElement = {
val recordCount = task.runStatus match {
case s: Succeeded => s.recordCount
case _ => 0
Expand All @@ -437,7 +437,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}

private def getRecordCountText(task: TaskResult): String = {
private[core] def getRecordCountText(task: TaskResult): String = {
def renderDifference(numRecords: Long, numRecordsOld: Option[Long]): String = {
numRecordsOld match {
case Some(old) if old > 0 =>
Expand All @@ -460,14 +460,14 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}

private def getElapsedTime(task: TaskResult): String = {
private[core] def getElapsedTime(task: TaskResult): String = {
task.runInfo match {
case Some(runInfo) => TimeUtils.prettyPrintElapsedTimeShort((runInfo.finished.getEpochSecond - runInfo.started.getEpochSecond) * 1000L)
case _ => ""
}
}

private def getOutputSize(task: TaskResult): String = {
private[core] def getOutputSize(task: TaskResult): String = {
task.runStatus match {
case s: Succeeded =>
s.sizeBytes match {
Expand All @@ -478,7 +478,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}

private def getFailureReason(task: TaskResult): String = {
private[core] def getFailureReason(task: TaskResult): String = {
task.runStatus.getReason() match {
case Some(reason) => reason
case None =>
Expand All @@ -491,14 +491,14 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}

private def getFinishTime(task: TaskResult): String = {
private[core] def getFinishTime(task: TaskResult): String = {
task.runInfo match {
case Some(runInfo) => ZonedDateTime.ofInstant(runInfo.finished, zoneId).format(timestampFmt)
case None => ""
}
}

private def getStatus(task: TaskResult): TextElement = {
private[core] def getStatus(task: TaskResult): TextElement = {
val successStyle = if (task.dependencyWarnings.nonEmpty) Style.Warning else Style.Success

task.runStatus match {
Expand All @@ -514,7 +514,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}

private def getSuccessTextElement(status: RunStatus.Succeeded, hasDependencyWarnings: Boolean): TextElement = {
private[core] def getSuccessTextElement(status: RunStatus.Succeeded, hasDependencyWarnings: Boolean): TextElement = {
val successStyle = if (hasDependencyWarnings) Style.Warning else Style.Success

val style = if (status.warnings.nonEmpty)
Expand All @@ -539,7 +539,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}

private def renderSchemaDifference(builder: MessageBuilder, schemaDifferences: Seq[SchemaDifference]): MessageBuilder = {
private[core] def renderSchemaDifference(builder: MessageBuilder, schemaDifferences: Seq[SchemaDifference]): MessageBuilder = {
if (schemaDifferences.isEmpty) {
return builder
}
Expand Down Expand Up @@ -607,5 +607,16 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}

override def addCustomEntries(entries: Seq[NotificationEntry]): Unit = customEntries ++= entries
def renderSignature(builder: MessageBuilder): MessageBuilder = {
if (customSignature.isEmpty) {
builder.withRawParagraph(
s"""Regards,<br>
|Pramen<br>
|version ${BuildPropertyUtils.instance.getFullVersion}
|""".stripMargin
)
} else {
builder.withParagraph(customSignature)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ object PipelineNotificationDirector {
.foreach(notificationBuilder.addCompletedTask)

notificationBuilder.addCustomEntries(notification.customEntries)
notificationBuilder.addSignature(notification.customSignature: _*)

notificationBuilder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ object AppRunner {
private[core] def runStartupHook(state: PipelineState,
appContext: AppContext): Try[Unit] = {
handleFailure(Try {
log.info(s"Running the startup hook: ${appContext.appConfig.hookConfig.startupHook.isDefined} ")
appContext.appConfig.hookConfig.startupHook.foreach(_.run())
}, state, "running the init hook")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class NotificationBuilderImpl extends NotificationBuilder {

private val notificationEntries = new ListBuffer[NotificationEntry]

private var notificationSignature: Seq[TextElement] = Seq.empty

override def addEntries(entries: NotificationEntry*): Unit = synchronized {
entries.foreach(entry => if (isEntryValid(entry)) notificationEntries += entry)
}
Expand Down Expand Up @@ -58,10 +60,18 @@ class NotificationBuilderImpl extends NotificationBuilder {
}
}

def setSignature(text: TextElement*): Unit = {
notificationSignature = text
}

def entries: Seq[NotificationEntry] = synchronized {
notificationEntries.toSeq
}

def signature: Seq[TextElement] = synchronized {
notificationSignature
}

def isEntryValid(entry: NotificationEntry): Boolean = entry match {
case NotificationEntry.Table(headers, cells) =>
if (headers.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,18 @@ class PipelineStateImpl(implicit conf: Config) extends PipelineState {
taskResults.filterNot(_.isTransient)
}
val finishedInstant = Instant.now
val customEntries = Pramen.instance.notificationBuilder.asInstanceOf[NotificationBuilderImpl].entries
val notificationBuilder = Pramen.instance.notificationBuilder.asInstanceOf[NotificationBuilderImpl]
val customEntries = notificationBuilder.entries
val customSignature = notificationBuilder.signature

val notification = PipelineNotification(failureException,
pipelineName,
environmentName,
startedInstant,
finishedInstant,
realTaskResults.toList,
customEntries.toList)
customEntries.toList,
customSignature.toList)
if (realTaskResults.nonEmpty || sendEmailIfNoNewData || failureException.nonEmpty) {
val email = new PipelineNotificationEmail(notification)
email.send()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,7 @@

<p><b>This is a test HTML block</b></p>

<p>Regards,<br>
Pramen<br>
<p>Test signature</p>

</body>
</html>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.core.mocks

import za.co.absa.pramen.api.notification.NotificationEntry
import za.co.absa.pramen.api.notification.{NotificationEntry, TextElement}
import za.co.absa.pramen.core.notify.pipeline
import za.co.absa.pramen.core.notify.pipeline.PipelineNotification
import za.co.absa.pramen.core.runner.task.TaskResult
Expand All @@ -30,7 +30,8 @@ object PipelineNotificationFactory {
started: Instant = Instant.ofEpochSecond(1234567L),
finished: Instant = Instant.ofEpochSecond(1234568L),
tasksCompleted: List[TaskResult] = List(TaskResultFactory.getDummyTaskResult()),
customEntries: List[NotificationEntry] = List.empty[NotificationEntry]
customEntries: List[NotificationEntry] = List.empty[NotificationEntry],
customSignature: List[TextElement] = List.empty[TextElement]
): PipelineNotification = {
pipeline.PipelineNotification(
exception,
Expand All @@ -39,7 +40,8 @@ object PipelineNotificationFactory {
started,
finished,
tasksCompleted,
customEntries
customEntries,
customSignature
)
}

Expand Down
Loading
Loading