diff --git a/pramen/build.sbt b/pramen/build.sbt index 4b7231984..ee6ccd19a 100644 --- a/pramen/build.sbt +++ b/pramen/build.sbt @@ -243,6 +243,7 @@ lazy val assemblySettingsExtras = assemblySettingsCommon ++ Seq(assembly / assem ShadeRule.zap("org.apache.avro.**").inAll, ShadeRule.zap("org.apache.commons.**").inAll, ShadeRule.zap("org.apache.jute.**").inAll, + ShadeRule.zap("org.apache.kafka.**").inAll, ShadeRule.zap("org.apache.spark.annotation.**").inAll, ShadeRule.zap("org.apache.yetus.**").inAll, ShadeRule.zap("org.apache.zookeeper.**").inAll, @@ -282,7 +283,6 @@ lazy val assemblySettingsRunner = assemblySettingsCommon ++ Seq(assembly / assem ShadeRule.zap("com.ibm.icu.**").inAll, ShadeRule.zap("net.jpountz.**").inAll, ShadeRule.zap("org.abego.**").inAll, - ShadeRule.zap("org.apache.kafka.**").inAll, ShadeRule.zap("org.glassfish.**").inAll, ShadeRule.zap("org.lz4.**").inAll, ShadeRule.zap("org.slf4j.**").inAll, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/NotificationTargetManagerSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/NotificationTargetManagerSuite.scala index 7387a6176..d300f7b56 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/NotificationTargetManagerSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/NotificationTargetManagerSuite.scala @@ -19,7 +19,8 @@ package za.co.absa.pramen.core.tests.notify import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.core.base.SparkTestBase -import za.co.absa.pramen.core.notify.{HyperdriveNotificationTarget, NotificationTargetManager} +import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock +import za.co.absa.pramen.core.notify.NotificationTargetManager class NotificationTargetManagerSuite extends AnyWordSpec with SparkTestBase { private val conf: Config = ConfigFactory.parseString( @@ -27,7 +28,7 @@ class NotificationTargetManagerSuite extends AnyWordSpec with SparkTestBase { | pramen.notification.targets = [ | { | name = "hyperdrive1" - | factory.class = "za.co.absa.pramen.core.notify.HyperdriveNotificationTarget" + | factory.class = "za.co.absa.pramen.core.mocks.notify.NotificationTargetMock" | | kafka.topic = "mytopic" | @@ -48,7 +49,7 @@ class NotificationTargetManagerSuite extends AnyWordSpec with SparkTestBase { "return a notification target" in { val nt = NotificationTargetManager.getByName("hyperdrive1", conf, None) - assert(nt.isInstanceOf[HyperdriveNotificationTarget]) + assert(nt.isInstanceOf[NotificationTargetMock]) } "throw an exception if the notification target does not exist" in { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/HyperdriveNotificationTarget.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/HyperdriveNotificationTarget.scala similarity index 95% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/notify/HyperdriveNotificationTarget.scala rename to pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/HyperdriveNotificationTarget.scala index d14bc8482..5eb832241 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/HyperdriveNotificationTarget.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/HyperdriveNotificationTarget.scala @@ -14,16 +14,16 @@ * limitations under the License. */ -package za.co.absa.pramen.core.notify +package za.co.absa.pramen.extras.notification import com.typesafe.config.Config import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory import za.co.absa.pramen.api.status.{RunStatus, TaskResult} import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, PipelineInfo} -import za.co.absa.pramen.core.notify.mq.{SingleMessageProducer, SingleMessageProducerKafka} import za.co.absa.pramen.core.utils.ConfigUtils import za.co.absa.pramen.core.utils.Emoji._ +import za.co.absa.pramen.extras.notification.mq.{SingleMessageProducer, SingleMessageProducerKafka} class HyperdriveNotificationTarget(conf: Config, producer: SingleMessageProducer, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/mq/SingleMessageProducer.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducer.scala similarity index 93% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/notify/mq/SingleMessageProducer.scala rename to pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducer.scala index c456bdaaf..95c92d0af 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/mq/SingleMessageProducer.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducer.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.pramen.core.notify.mq +package za.co.absa.pramen.extras.notification.mq trait SingleMessageProducer { def send(topic: String, message: String, numberOrRetries: Int = 3): Unit diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/mq/SingleMessageProducerKafka.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducerKafka.scala similarity index 98% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/notify/mq/SingleMessageProducerKafka.scala rename to pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducerKafka.scala index c959a6fee..789231484 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/mq/SingleMessageProducerKafka.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducerKafka.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.pramen.core.notify.mq +package za.co.absa.pramen.extras.notification.mq import com.typesafe.config.Config import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/TaskNotificationFactory.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/TaskNotificationFactory.scala new file mode 100644 index 000000000..359bdbc14 --- /dev/null +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/TaskNotificationFactory.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.extras + +import za.co.absa.pramen.api.SchemaDifference +import za.co.absa.pramen.api.status._ + +import java.time.{Instant, LocalDate} + +object TaskNotificationFactory { + def getDummyTaskNotification(taskDef: TaskDef = TaskDefFactory.getDummyTaskNotification(), + runInfo: Option[RunInfo] = Some(RunInfo( + LocalDate.of(2022, 2, 18), + Instant.ofEpochMilli(1613600000000L), + Instant.ofEpochMilli(1672759508000L) + )), + status: RunStatus = RunStatus.Succeeded(None, Some(100), None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty), + applicationId: String = "app_12345", + isTransient: Boolean = false, + isRawFilesJob: Boolean = false, + schemaChanges: Seq[SchemaDifference] = Seq.empty, + dependencyWarnings: Seq[DependencyWarning] = Seq.empty, + notificationTargetErrors: Seq[NotificationFailure] = Seq.empty, + options: Map[String, String] = Map.empty[String, String]): TaskResult = { + TaskResult(taskDef, + status, + runInfo, + applicationId, + isTransient, + isRawFilesJob, + schemaChanges, + dependencyWarnings, + notificationTargetErrors, + options) + } +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/HyperdriveNotificationTargetSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/HyperdriveNotificationTargetSuite.scala similarity index 92% rename from pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/HyperdriveNotificationTargetSuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/HyperdriveNotificationTargetSuite.scala index 1dced187e..ec1b0543f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/HyperdriveNotificationTargetSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/HyperdriveNotificationTargetSuite.scala @@ -14,15 +14,13 @@ * limitations under the License. */ -package za.co.absa.pramen.core.tests.notify +package za.co.absa.pramen.extras.notification import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.pramen.api.status.{RunStatus, TaskStatus} -import za.co.absa.pramen.core.TaskNotificationFactory -import za.co.absa.pramen.core.base.SparkTestBase -import za.co.absa.pramen.core.mocks.notify.SingleMessageProducerSpy -import za.co.absa.pramen.core.notify.HyperdriveNotificationTarget +import za.co.absa.pramen.api.status.RunStatus +import za.co.absa.pramen.extras.TaskNotificationFactory +import za.co.absa.pramen.extras.base.SparkTestBase class HyperdriveNotificationTargetSuite extends AnyWordSpec with SparkTestBase { "apply()" should { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/SingleMessageProducerSpy.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/SingleMessageProducerSpy.scala similarity index 90% rename from pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/SingleMessageProducerSpy.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/SingleMessageProducerSpy.scala index 7c5527e89..d3827347d 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/SingleMessageProducerSpy.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/SingleMessageProducerSpy.scala @@ -14,9 +14,9 @@ * limitations under the License. */ -package za.co.absa.pramen.core.mocks.notify +package za.co.absa.pramen.extras.notification -import za.co.absa.pramen.core.notify.mq.SingleMessageProducer +import za.co.absa.pramen.extras.notification.mq.SingleMessageProducer class SingleMessageProducerSpy extends SingleMessageProducer { var connectInvoked = 0 diff --git a/pramen/project/Dependencies.scala b/pramen/project/Dependencies.scala index 45f557492..fbcb305f0 100644 --- a/pramen/project/Dependencies.scala +++ b/pramen/project/Dependencies.scala @@ -35,7 +35,6 @@ object Dependencies { "org.postgresql" % "postgresql" % postgreSqlDriverVersion, "com.github.scopt" %% "scopt" % scoptVersion, "com.github.yruslan" %% "channel_scala" % channelVersion, - "org.apache.kafka" % "kafka-clients" % kafkaClientVersion, "com.sun.mail" % "javax.mail" % javaXMailVersion, "com.lihaoyi" %% "requests" % requestsVersion, "org.scalatest" %% "scalatest" % scalatestVersion % Test, @@ -51,6 +50,7 @@ object Dependencies { "org.scalatest" %% "scalatest" % scalatestVersion % Test, "org.mockito" % "mockito-core" % mockitoVersion % Test ) ++ Seq( + getKafkaClientsDependency(sparkVersion(scalaVersion)), getAbrisDependency(sparkVersion(scalaVersion)), getDeltaDependency(sparkVersion(scalaVersion), isCompile = false, isTest = true) ) diff --git a/pramen/project/Versions.scala b/pramen/project/Versions.scala index 626c3d9ba..1c0312d96 100644 --- a/pramen/project/Versions.scala +++ b/pramen/project/Versions.scala @@ -93,6 +93,17 @@ object Versions { } } + def getKafkaClientsDependency(sparkVersion: String): ModuleID = { + val kafkaClientsVersion = sparkVersion match { + case version if version.startsWith("2.4.") => "2.5.1" + case _ => "3.9.0" + } + + println(s"Using 'kafla-clients' version $kafkaClientsVersion") + + "org.apache.kafka" % "kafka-clients" % kafkaClientsVersion + } + def getAbrisDependency(sparkVersion: String): ModuleID = { // According to this: https://github.com/AbsaOSS/ABRiS?tab=readme-ov-file#supported-versions val abrisVersion = sparkVersion match {