Skip to content

Commit

Permalink
#537 Update 'kafka-clients' version and move it to 'extras', includin…
Browse files Browse the repository at this point in the history
…g HyperdriveNotificationTarget.
  • Loading branch information
yruslan committed Jan 13, 2025
1 parent d4a2e3d commit 2c2eeca
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pramen/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ lazy val assemblySettingsExtras = assemblySettingsCommon ++ Seq(assembly / assem
ShadeRule.zap("org.apache.avro.**").inAll,
ShadeRule.zap("org.apache.commons.**").inAll,
ShadeRule.zap("org.apache.jute.**").inAll,
ShadeRule.zap("org.apache.kafka.**").inAll,
ShadeRule.zap("org.apache.spark.annotation.**").inAll,
ShadeRule.zap("org.apache.yetus.**").inAll,
ShadeRule.zap("org.apache.zookeeper.**").inAll,
Expand Down Expand Up @@ -282,7 +283,6 @@ lazy val assemblySettingsRunner = assemblySettingsCommon ++ Seq(assembly / assem
ShadeRule.zap("com.ibm.icu.**").inAll,
ShadeRule.zap("net.jpountz.**").inAll,
ShadeRule.zap("org.abego.**").inAll,
ShadeRule.zap("org.apache.kafka.**").inAll,
ShadeRule.zap("org.glassfish.**").inAll,
ShadeRule.zap("org.lz4.**").inAll,
ShadeRule.zap("org.slf4j.**").inAll,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package za.co.absa.pramen.core.tests.notify
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.notify.{HyperdriveNotificationTarget, NotificationTargetManager}
import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock
import za.co.absa.pramen.core.notify.NotificationTargetManager

class NotificationTargetManagerSuite extends AnyWordSpec with SparkTestBase {
private val conf: Config = ConfigFactory.parseString(
s"""
| pramen.notification.targets = [
| {
| name = "hyperdrive1"
| factory.class = "za.co.absa.pramen.core.notify.HyperdriveNotificationTarget"
| factory.class = "za.co.absa.pramen.core.mocks.notify.NotificationTargetMock"
|
| kafka.topic = "mytopic"
|
Expand All @@ -48,7 +49,7 @@ class NotificationTargetManagerSuite extends AnyWordSpec with SparkTestBase {
"return a notification target" in {
val nt = NotificationTargetManager.getByName("hyperdrive1", conf, None)

assert(nt.isInstanceOf[HyperdriveNotificationTarget])
assert(nt.isInstanceOf[NotificationTargetMock])
}

"throw an exception if the notification target does not exist" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
* limitations under the License.
*/

package za.co.absa.pramen.core.notify
package za.co.absa.pramen.extras.notification

import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.status.{RunStatus, TaskResult}
import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, PipelineInfo}
import za.co.absa.pramen.core.notify.mq.{SingleMessageProducer, SingleMessageProducerKafka}
import za.co.absa.pramen.core.utils.ConfigUtils
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.extras.notification.mq.{SingleMessageProducer, SingleMessageProducerKafka}

class HyperdriveNotificationTarget(conf: Config,
producer: SingleMessageProducer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.pramen.core.notify.mq
package za.co.absa.pramen.extras.notification.mq

trait SingleMessageProducer {
def send(topic: String, message: String, numberOrRetries: Int = 3): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.pramen.core.notify.mq
package za.co.absa.pramen.extras.notification.mq

import com.typesafe.config.Config
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras

import za.co.absa.pramen.api.SchemaDifference
import za.co.absa.pramen.api.status._

import java.time.{Instant, LocalDate}

object TaskNotificationFactory {
def getDummyTaskNotification(taskDef: TaskDef = TaskDefFactory.getDummyTaskNotification(),
runInfo: Option[RunInfo] = Some(RunInfo(
LocalDate.of(2022, 2, 18),
Instant.ofEpochMilli(1613600000000L),
Instant.ofEpochMilli(1672759508000L)
)),
status: RunStatus = RunStatus.Succeeded(None, Some(100), None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty),
applicationId: String = "app_12345",
isTransient: Boolean = false,
isRawFilesJob: Boolean = false,
schemaChanges: Seq[SchemaDifference] = Seq.empty,
dependencyWarnings: Seq[DependencyWarning] = Seq.empty,
notificationTargetErrors: Seq[NotificationFailure] = Seq.empty,
options: Map[String, String] = Map.empty[String, String]): TaskResult = {
TaskResult(taskDef,
status,
runInfo,
applicationId,
isTransient,
isRawFilesJob,
schemaChanges,
dependencyWarnings,
notificationTargetErrors,
options)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
* limitations under the License.
*/

package za.co.absa.pramen.core.tests.notify
package za.co.absa.pramen.extras.notification

import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.api.status.{RunStatus, TaskStatus}
import za.co.absa.pramen.core.TaskNotificationFactory
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.mocks.notify.SingleMessageProducerSpy
import za.co.absa.pramen.core.notify.HyperdriveNotificationTarget
import za.co.absa.pramen.api.status.RunStatus
import za.co.absa.pramen.extras.TaskNotificationFactory
import za.co.absa.pramen.extras.base.SparkTestBase

class HyperdriveNotificationTargetSuite extends AnyWordSpec with SparkTestBase {
"apply()" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package za.co.absa.pramen.core.mocks.notify
package za.co.absa.pramen.extras.notification

import za.co.absa.pramen.core.notify.mq.SingleMessageProducer
import za.co.absa.pramen.extras.notification.mq.SingleMessageProducer

class SingleMessageProducerSpy extends SingleMessageProducer {
var connectInvoked = 0
Expand Down
2 changes: 1 addition & 1 deletion pramen/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ object Dependencies {
"org.postgresql" % "postgresql" % postgreSqlDriverVersion,
"com.github.scopt" %% "scopt" % scoptVersion,
"com.github.yruslan" %% "channel_scala" % channelVersion,
"org.apache.kafka" % "kafka-clients" % kafkaClientVersion,
"com.sun.mail" % "javax.mail" % javaXMailVersion,
"com.lihaoyi" %% "requests" % requestsVersion,
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
Expand All @@ -51,6 +50,7 @@ object Dependencies {
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.mockito" % "mockito-core" % mockitoVersion % Test
) ++ Seq(
getKafkaClientsDependency(sparkVersion(scalaVersion)),
getAbrisDependency(sparkVersion(scalaVersion)),
getDeltaDependency(sparkVersion(scalaVersion), isCompile = false, isTest = true)
)
Expand Down
11 changes: 11 additions & 0 deletions pramen/project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ object Versions {
}
}

def getKafkaClientsDependency(sparkVersion: String): ModuleID = {
val kafkaClientsVersion = sparkVersion match {
case version if version.startsWith("2.4.") => "2.5.1"
case _ => "3.9.0"
}

println(s"Using 'kafla-clients' version $kafkaClientsVersion")

"org.apache.kafka" % "kafka-clients" % kafkaClientsVersion
}

def getAbrisDependency(sparkVersion: String): ModuleID = {
// According to this: https://github.com/AbsaOSS/ABRiS?tab=readme-ov-file#supported-versions
val abrisVersion = sparkVersion match {
Expand Down

0 comments on commit 2c2eeca

Please sign in to comment.