Skip to content

Commit

Permalink
Replaced db exists check with internal flag in FixedDurationSegmenter
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaro committed Nov 22, 2023
1 parent 2b0642c commit 464417f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class FixedDurationSegmenter : SegmenterFactory {
/** Reference to the last [Source] encountered by this [FixedDurationSegmenter]. */
private var lastSource: Source? = null

/** Tracks if the current source retrievable has already been persisted. */
private var sourceWritten = false


/** [KLogger] instance. */
private val logger: KLogger = KotlinLogging.logger {}

Expand All @@ -105,6 +109,7 @@ class FixedDurationSegmenter : SegmenterFactory {
}
this.lastSource = content.source
this.lastStartTime = 0
this.sourceWritten = false
logger.info { "Starting to segment new source ${lastSource?.name} (${lastSource?.sourceId})" }
}
this.cache.add(content)
Expand Down Expand Up @@ -144,12 +149,14 @@ class FixedDurationSegmenter : SegmenterFactory {
override val source: Source = source
}

/* Persist source retrievable and send it downstream, if it doesn't exist. */
if (!this.reader.exists(source.sourceId)) {
/* Persist source retrievable and send it downstream */
if (!this.sourceWritten) {
this.writer.add(sourceRetrievable)
downstream.send(sourceRetrievable)
this.sourceWritten = true
}


/* Drain cache. */
val content = LinkedList<ContentElement<*>>()
this.cache.removeIf {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ class PassThroughSegmenter : SegmenterFactory {
}

/* Persist retrievable. */
if (!this.reader.exists(source.sourceId)) {
this.writer.add(result)
}
this.writer.add(result)


/* Return result. */
return result
Expand Down

0 comments on commit 464417f

Please sign in to comment.