-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ADAPT-1794: Replace cutoff-dates with enum based additional validatio…
…ns on new topics
- Loading branch information
Showing
24 changed files
with
338 additions
and
170 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,20 +3,18 @@ package hydra.ingest.utils | |
import cats.data.NonEmptyList | ||
import cats.effect.IO | ||
import cats.implicits._ | ||
import hydra.avro.registry.SchemaRegistry | ||
import hydra.avro.registry.SchemaRegistry.SchemaId | ||
import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer | ||
import hydra.kafka.algebras.TestMetadataAlgebra | ||
import hydra.kafka.model.ContactMethod.Email | ||
import hydra.kafka.model.TopicMetadataV2Request.Subject | ||
import hydra.kafka.model._ | ||
import org.apache.avro.{Schema, SchemaBuilder} | ||
import org.apache.avro.SchemaBuilder | ||
|
||
import java.time.Instant | ||
|
||
object TopicUtils { | ||
|
||
def updateTopicMetadata(topics: List[String], metadataAlgebra: TestMetadataAlgebra[IO], createdDate: Instant): IO[List[Unit]] = { | ||
def updateTopicMetadata(topics: List[String], metadataAlgebra: TestMetadataAlgebra[IO]): IO[List[Unit]] = { | ||
topics.traverse(topic => { | ||
val keySchema = SchemaBuilder.record(topic + "Key").fields.requiredInt("test").endRecord() | ||
val valueSchema = SchemaBuilder.record(topic + "Value").fields.requiredInt("test").endRecord() | ||
|
@@ -28,13 +26,14 @@ object TopicUtils { | |
deprecatedDate = None, | ||
Public, | ||
NonEmptyList.of(Email.create("[email protected]").get), | ||
createdDate, | ||
Instant.now(), | ||
List.empty, | ||
None, | ||
Some("dvs-teamName"), | ||
None, | ||
List.empty, | ||
Some("notificationUrl") | ||
Some("notificationUrl"), | ||
additionalValidations = None | ||
) | ||
val topicMetadataContainer = TopicMetadataContainer( | ||
topicMetadataKey, | ||
|
46 changes: 46 additions & 0 deletions
46
ingestors/kafka/src/main/scala/hydra/kafka/model/AdditionalValidation.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package hydra.kafka.model | ||
|
||
import enumeratum.{Enum, EnumEntry} | ||
import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer | ||
|
||
import scala.collection.immutable | ||
|
||
sealed trait AdditionalValidation extends EnumEntry | ||
|
||
sealed trait SchemaAdditionalValidation extends AdditionalValidation | ||
|
||
object SchemaAdditionalValidation extends Enum[SchemaAdditionalValidation] { | ||
|
||
case object defaultInRequiredField extends SchemaAdditionalValidation | ||
case object timestampMillis extends SchemaAdditionalValidation | ||
|
||
override val values: immutable.IndexedSeq[SchemaAdditionalValidation] = findValues | ||
} | ||
|
||
object AdditionalValidation { | ||
lazy val allValidations: Option[List[AdditionalValidation]] = | ||
Some(SchemaAdditionalValidation.values.toList) | ||
|
||
/** | ||
* An OLD topic will have its metadata populated. | ||
* Therefore, additionalValidations=None will be picked from the metadata. | ||
* And no new additionalValidations will be applied on older topics. | ||
* | ||
* A NEW topic will not have a metadata object. | ||
* Therefore, all existing additionalValidations will be assigned. | ||
* Thus, additionalValidations on corresponding fields will be applied. | ||
* | ||
* Corner case: After this feature has been on STAGE/PROD for sometime and some new additionalValidations are required. | ||
* We need not worry about old topics as the value of additionalValidations will remain the same since the topic creation. | ||
* New additionalValidations should be applied only on new topics. | ||
* Therefore, assigning all the values under AdditionalValidation enum is reasonable. | ||
* | ||
* @param metadata a metadata object of current topic | ||
* @return value of additionalValidations if the topic is already existing(OLD topic) otherwise all enum values under AdditionalValidation(NEW topic) | ||
*/ | ||
def validations(metadata: Option[TopicMetadataContainer]): Option[List[AdditionalValidation]] = | ||
metadata.map(_.value.additionalValidations).getOrElse(AdditionalValidation.allValidations) | ||
|
||
def isPresent(metadata: Option[TopicMetadataContainer], additionalValidation: AdditionalValidation): Boolean = | ||
validations(metadata).exists(_.contains(additionalValidation)) | ||
} |
Oops, something went wrong.