Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#279 When 'trackDays=0', Pramen never checks source data record count if the table is already loaded for the day. #280

Merged
merged 5 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1758,10 +1758,6 @@ Here is an example configuration for a JDBC source:
# Optionally you can specify an expression for the information date.
info.date.expr = "@runDate"

# Data is 1 day late (T+1). When run at 2022-01-15, say, the expectation is that
# the input table has data up until 2022-01-14 (inclusive).
expected.delay.days = 1

tables = [
{
input.db.table = "table1"
Expand Down Expand Up @@ -2098,8 +2094,6 @@ pramen.operations = [
source = "my_postgre_rds"
sink = "my_data_lake"

expected.delay.days = 1

tables = [
{
# Minimal configuration example
Expand Down Expand Up @@ -2764,7 +2758,7 @@ together their own runner (like the one above).
When started without additional command line arguments Pramen will run a normal daily pipeline checks and will execute
jobs scheduled for the day.

Here is how it works. Suppose you run a pipeline at `2020-07-19` and `expected.delay.days = 1`. This means that
Here is how it works. Suppose you run a pipeline at `2020-07-19` and `info.date.expr = "@runDate - 1"`. This means that
the pipeline should process data for the `information date = 2020-07-18`, as usual for T+1 jobs. During a normal
execution Pramen will do the following:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ trait Source extends ExternalChannel {
*/
def isDataAlwaysAvailable: Boolean = false

/**
* If true, the source + query is configured for ingesting events - tables have information date column, and data is filtered
* by that date when ingested.
*
* If false, the source + query is configured for snapshots - tables are loaded fully each day according to the schedule.
*/
def hasInfoDateColumn(query: Query): Boolean = true

/**
* Validates if the source is okay and the ingestion can proceed.
*/
Expand Down
8 changes: 7 additions & 1 deletion pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,15 @@ pramen {
wait.for.output.table.seconds = 600

# How many days to check back for late data
track.days = 4
# 0 - never check for updates after the data is loaded
# 1 - check only the current info date if you run the job more than once per day
# 2 - check the latest info date and the date before
# etc...
# You can also set this parameter for individual tables in the metastore.
track.days = 5

# Do not expect data to arrive specified number of days from now
# (This is a DEPRECATED parameter, please do not change th default)
expected.delay.days = 0

# 1 - Mon, 7 - Sun
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,20 @@ import scala.util.{Failure, Success, Try}
/**
* This is metatable details available to read from the metastore.
*
* @param name The name of the table.
* @param description The description of the table.
* @param format The format of the table.
* @param infoDateColumn The name of the column that contains the information date (partitioned by).
* @param infoDateFormat The format of the information date.
* @param hiveConfig The effective Hive configuration to use for Hive operations.
* @param hiveTable The name of the Hive table.
* @param hivePath The path of the Hive table (if it differs from the path in the underlying format).
* @param infoDateExpression The expression to use to calculate the information date.
* @param infoDateStart The start date of the information date.
* @param trackDays The number of days to look back for retrospective changes if this table is used as a dependency.
* @param readOptions The read options for the table.
* @param writeOptions The write options for the table.
* @param name The name of the table.
* @param description The description of the table.
* @param format The format of the table.
* @param infoDateColumn The name of the column that contains the information date (partitioned by).
* @param infoDateFormat The format of the information date.
* @param hiveConfig The effective Hive configuration to use for Hive operations.
* @param hiveTable The name of the Hive table.
* @param hivePath The path of the Hive table (if it differs from the path in the underlying format).
* @param infoDateExpression The expression to use to calculate the information date.
* @param infoDateStart The start date of the information date.
* @param trackDays The number of days to look back for retrospective changes if this table is used as a dependency.
* @param trackDaysExplicitlySet if true, trackDays was set explicitly. If false, trackDays is taken from workflow defaults.
* @param readOptions The read options for the table.
* @param writeOptions The write options for the table.
*/
case class MetaTable(
name: String,
Expand All @@ -57,6 +58,7 @@ case class MetaTable(
infoDateExpression: Option[String],
infoDateStart: LocalDate,
trackDays: Int,
trackDaysExplicitlySet: Boolean,
readOptions: Map[String, String],
writeOptions: Map[String, String]
)
Expand Down Expand Up @@ -113,6 +115,7 @@ object MetaTable {
val infoDateExpressionOpt = infoDateOverride.expression
val startDate = infoDateOverride.startDate.getOrElse(defaultStartDate)
val trackDays = ConfigUtils.getOptionInt(conf, TRACK_DAYS_KEY).getOrElse(defaultTrackDays)
val trackDaysExplicitlySet = conf.hasPath(TRACK_DAYS_KEY)

val format = Try {
DataFormatParser.fromConfig(conf, appConf)
Expand Down Expand Up @@ -144,6 +147,7 @@ object MetaTable {
infoDateExpressionOpt,
startDate,
trackDays,
trackDaysExplicitlySet,
readOptions,
writeOptions)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.pipeline
import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{NotificationBuilder, Reason, Source, SourceResult}
import za.co.absa.pramen.api.{NotificationBuilder, Query, Reason, Source, SourceResult}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
Expand All @@ -44,6 +44,21 @@ class IngestionJob(operationDef: OperationDef,

override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing

override def trackDays: Int = {
val hasInfoDate = try {
source.hasInfoDateColumn(sourceTable.query)
} catch {
case _: AbstractMethodError =>
log.warn(s"Sources were built using old version of Pramen that does not support track days handling for snapshot tables. Ignoring...")
true
}

if (hasInfoDate || outputTable.trackDaysExplicitlySet)
outputTable.trackDays
else
0
}

override def preRunCheckJob(infoDate: LocalDate, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
source.connect()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.pipeline
import com.typesafe.config.Config
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import za.co.absa.pramen.api.Reason
import za.co.absa.pramen.api.{Query, Reason}
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategy

Expand All @@ -38,6 +38,8 @@ trait Job {

def notificationTargets: Seq[JobNotificationTarget]

def trackDays: Int

/**
* Checks pre-conditions for the job, such as data availability.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ abstract class JobBase(operationDef: OperationDef,

override def notificationTargets: Seq[JobNotificationTarget] = jobNotificationTargets

override def trackDays: Int = outputTable.trackDays

def preRunCheckJob(infoDate: LocalDate, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult

final override def preRunCheck(infoDate: LocalDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ case class TransferTable(
rangeToExpr: Option[String],
infoDateStart: LocalDate,
trackDays: Int,
trackDaysExplicitlySet: Boolean,
transformations: Seq[TransformExpression],
filters: Seq[String],
columns: Seq[String],
Expand All @@ -55,7 +56,7 @@ case class TransferTable(
}

def getMetaTable: MetaTable = {
MetaTable(jobMetaTableName, "", DataFormat.Null(), "", "", HiveConfig.getNullConfig, None, None, None, infoDateStart, trackDays, readOptions, writeOptions)
MetaTable(jobMetaTableName, "", DataFormat.Null(), "", "", HiveConfig.getNullConfig, None, None, None, infoDateStart, trackDays, trackDaysExplicitlySet, readOptions, writeOptions)
}
}

Expand All @@ -78,6 +79,7 @@ object TransferTable {
val dateFromExpr = ConfigUtils.getOptionString(conf, DATE_FROM_KEY)
val dateToExpr = ConfigUtils.getOptionString(conf, DATE_TO_KEY)
val trackDays = ConfigUtils.getOptionInt(conf, TRACK_DAYS_KEY).getOrElse(defaultTrackDays)
val trackDaysExplicitlySet = conf.hasPath(TRACK_DAYS_KEY)
val columns = ConfigUtils.getOptListStrings(conf, COLUMNS_KEY)
val transformations = TransformExpression.fromConfig(conf, TRANSFORMATIONS_KEY, parentPath)
val filters = ConfigUtils.getOptListStrings(conf, FILTERS_KEY)
Expand All @@ -104,7 +106,7 @@ object TransferTable {
val startDate = infoDateOverride.startDate.getOrElse(defaultStartDate)
val jobMetaTable = getOutputTableName(jobMetaTableOpt, query, sinkName)

TransferTable(query, jobMetaTable, conf, dateFromExpr, dateToExpr, startDate, trackDays, transformations, filters, columns, readOptions, writeOptions, sourceOverrideConf, sinkOverrideConf)
TransferTable(query, jobMetaTable, conf, dateFromExpr, dateToExpr, startDate, trackDays, trackDaysExplicitlySet, transformations, filters, columns, readOptions, writeOptions, sourceOverrideConf, sinkOverrideConf)
}

def fromConfig(conf: Config, appConfig: Config, arrayPath: String, sinkName: String): Seq[TransferTable] = {
Expand All @@ -128,17 +130,17 @@ object TransferTable {
private[core] def getInputTableName(query: Query): Option[String] = {
query match {
case t: Query.Table => Option(t.dbTable)
case _ => None
case _ => None
}
}

private[core] def getOutputTableName(jobMetaTableOpt: Option[String], query: Query, sinkName: String): String = {
jobMetaTableOpt match {
case Some(name) => name
case None =>
case None =>
getInputTableName(query) match {
case Some(name) => s"$name->$sinkName"
case None => throw new IllegalArgumentException(s"Cannot determine metastore table name for '$query -> $sinkName'." +
case None => throw new IllegalArgumentException(s"Cannot determine metastore table name for '$query -> $sinkName'." +
s"Please specify it explicitly via '$JOB_METASTORE_OUTPUT_TABLE_KEY'.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,
}

private[core] def runJob(job: Job): Boolean = {
val scheduleParams = ScheduleParams.fromRuntimeConfig(runtimeConfig, job.outputTable.trackDays, job.operation.expectedDelayDays)
val scheduleParams = ScheduleParams.fromRuntimeConfig(runtimeConfig, job.trackDays, job.operation.expectedDelayDays)

val taskDefs = job.scheduleStrategy.getDaysToRun(
job.outputTable.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,36 @@ class ScheduleStrategySourcing extends ScheduleStrategy {
case ScheduleParams.Normal(runDate, trackDays, delayDays, newOnly, lateOnly) =>
log.info(s"Normal run strategy: runDate=$runDate, trackDays=$trackDays, delayDays=$delayDays, newOnly=$newOnly, lateOnly=$lateOnly")
val trackedDays = if (!lateOnly && !newOnly) {
getInfoDateRange(runDate.minusDays(delayDays + trackDays), runDate.minusDays(delayDays + 1), infoDateExpression, schedule)
getInfoDateRange(runDate.minusDays(delayDays + trackDays - 1), runDate.minusDays(delayDays + 1), infoDateExpression, schedule)
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
} else {
Nil
}

val lastProcessedDate = bookkeeper.getLatestProcessedDate(outputTable)

val lateDays = if (!newOnly) {
getLate(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression, initialSourcingDateExpr, bookkeeper)
getLate(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression, initialSourcingDateExpr, lastProcessedDate)
} else {
Nil
}

val newDays = if (!lateOnly) {
val newDaysOrig = if (!lateOnly) {
getNew(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression).toList
} else {
Nil
}

val newDays = lastProcessedDate match {
case Some(date) if trackDays <= 0 => newDaysOrig.filter(task => task.infoDate.isAfter(date))
case _ => newDaysOrig
}

log.info(s"Tracked days: ${trackedDays.map(_.infoDate).mkString(", ")}")
log.info(s"Late days: ${lateDays.map(_.infoDate).mkString(", ")}")
log.info(s"New days: ${newDaysOrig.map(_.infoDate).mkString(", ")}")
log.info(s"New days not ran already: ${newDays.map(_.infoDate).mkString(", ")}")

(trackedDays ++ lateDays ++ newDays).groupBy(_.infoDate).map(d => d._2.head).toList.sortBy(a => a.infoDate.toEpochDay)
case ScheduleParams.Rerun(runDate) =>
log.info(s"Rerun strategy for a single day: $runDate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,34 @@ class ScheduleStrategyTransformation extends ScheduleStrategy {
val dates = params match {
case ScheduleParams.Normal(runDate, trackDays, delayDays, newOnly, lateOnly) =>
log.info(s"Normal run strategy: runDate=$runDate, trackDays=$trackDays, delayDays=$delayDays, newOnly=$newOnly, lateOnly=$lateOnly")
val retrospective = getInfoDateRange(runDate.minusDays(trackDays + delayDays), runDate.minusDays(delayDays + 1), infoDateExpression, schedule)
val retrospective = getInfoDateRange(runDate.minusDays(trackDays + delayDays), runDate.minusDays(delayDays), infoDateExpression, schedule)
.filter(date => anyDependencyUpdatedRetrospectively(outputTable, date, dependencies, bookkeeper))
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Update))

val lastProcessedDate = bookkeeper.getLatestProcessedDate(outputTable)

val lateDays = if (!newOnly) {
getLate(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression, initialSourcingDateExpr, bookkeeper)
getLate(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression, initialSourcingDateExpr, lastProcessedDate)
} else {
Nil
}

val newDays = if (!lateOnly) {
val newDaysOrig = if (!lateOnly) {
getNew(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression).toList
} else {
Nil
}

val newDays = lastProcessedDate match {
case Some(date) => newDaysOrig.filter(task => task.infoDate.isAfter(date))
case _ => newDaysOrig
}

log.info(s"Retrospective: ${retrospective.map(_.infoDate).mkString(", ")}")
log.info(s"Late days: ${lateDays.map(_.infoDate).mkString(", ")}")
log.info(s"New days: ${newDaysOrig.map(_.infoDate).mkString(", ")}")
log.info(s"New days not ran already: ${newDays.map(_.infoDate).mkString(", ")}")

(retrospective ++ lateDays ++ newDays).groupBy(_.infoDate).map(d => d._2.head).toList.sortBy(a => a.infoDate.toEpochDay)
case ScheduleParams.Rerun(runDate) =>
log.info(s"Rerun strategy for a single day: $runDate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ object ScheduleStrategyUtils {
schedule: Schedule,
infoDateExpression: String,
initialDateExpr: String,
bookkeeper: Bookkeeper
lastProcessedDate: Option[LocalDate]
): List[TaskPreDef] = {
val lastInfoDate = evaluateRunDate(runDate.minusDays(1), infoDateExpression)

bookkeeper.getLatestProcessedDate(outputTable) match {
lastProcessedDate match {
case Some(lastUpdatedInfoDate) =>
val nextExpected = lastUpdatedInfoDate.plusDays(1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class JdbcSource(sourceConfig: Config,

override val config: Config = sourceConfig

override def hasInfoDateColumn(query: Query): Boolean = jdbcReaderConfig.hasInfoDate

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val reader = getReader(query)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class LocalSparkSource(sparkSource: SparkSource,
private var fsUtils: FsUtils = _
private val traverser: FsTraverser = new FsTraverserLocal()

override def hasInfoDateColumn(query: Query): Boolean = false

@throws[Exception]
override def connect(): Unit = {
isConnected = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ class RawFileSource(val sourceConfig: Config,

override val config: Config = sourceConfig

override def hasInfoDateColumn(query: Query): Boolean = {
query match {
case Query.Path(pathPattern) => pathPattern.contains("{{")
case _: Query.Custom => false
case _ => throw new IllegalArgumentException("RawFileSource only supports 'path' or 'file.1,...' as an input, 'sql' and 'table' are not supported.")
}
}

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
getPaths(query, infoDateBegin, infoDateEnd).length
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class SparkSource(val format: String,

override val config: Config = sourceConfig

override def hasInfoDateColumn(query: Query): Boolean = hasInfoDateCol

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val reader = getReader(query)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object MetaTableFactory {
infoDateExpression: Option[String] = None,
infoDateStart: LocalDate = LocalDate.of(2020, 1, 31),
trackDays: Int = 0,
trackDaysExplicitlySet: Boolean = false,
readOptions: Map[String, String] = Map.empty[String, String],
writeOptions: Map[String, String] = Map.empty[String, String]
): MetaTable = {
Expand All @@ -47,6 +48,7 @@ object MetaTableFactory {
infoDateExpression,
infoDateStart,
trackDays,
trackDaysExplicitlySet,
readOptions,
writeOptions)
}
Expand Down
Loading
Loading