Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Interval schedule should take start time from the request, should not… #1047

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ data class Rollup(
// TODO: Make startTime public in Job Scheduler so we can just directly check the value
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
if (schedule is IntervalSchedule) {
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0)
schedule = IntervalSchedule(schedule.startTime, schedule.interval, schedule.unit, schedule.delay ?: 0)
}
}
return Rollup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ class RollupRunnerIT : RollupRestTestCase() {
// Tests that a continuous rollup will not be processed until the end of the interval plus delay passes
fun `test delaying continuous execution`() {
val indexName = "test_index_runner_eighth"
val delay: Long = 15000
val delay: Long = 7_500
// Define rollup
var rollup = randomRollup().copy(
id = "$testName-4",
Expand All @@ -663,40 +663,29 @@ class RollupRunnerIT : RollupRestTestCase() {
putDateDocumentInSourceIndex(rollup)

// Create rollup job
rollup = createRollup(rollup = rollup, rollupId = rollup.id)
val jobStartTime = Instant.now()
val rollupNow = rollup.copy(
jobSchedule = IntervalSchedule(jobStartTime, 1, ChronoUnit.MINUTES),
jobEnabledTime = jobStartTime
)
rollup = createRollup(rollup = rollupNow, rollupId = rollupNow.id)

var nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
val expectedExecutionTime = rollup.jobEnabledTime!!.plusMillis(delay).toEpochMilli()
val delayIsCorrect = ((expectedExecutionTime - nextExecutionTime) > -500) && ((expectedExecutionTime - nextExecutionTime) < 500)
assertTrue("Delay was not correctly applied", delayIsCorrect)
val expectedFirstExecutionTime = rollup.jobSchedule.getNextExecutionTime(null).toEpochMilli()
assertTrue("The first job execution time should be equal [job start time] + [delay].", expectedFirstExecutionTime == jobStartTime.toEpochMilli() + delay)

waitFor {
// Wait until half a second before the intended execution time
assertTrue(Instant.now().toEpochMilli() >= nextExecutionTime - 500)
// Still should not have run at this point
assertFalse("Target rollup index was created before the delay should allow", indexExists(rollup.targetIndex))
}
val rollupMetadata = waitFor {
waitFor() {
assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex))
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertNotNull("Rollup metadata not found", rollupMetadata)
rollupMetadata
}
nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
val nextExecutionOffset = (nextExecutionTime - Instant.now().toEpochMilli()) - 60000
val nextExecutionIsCorrect = nextExecutionOffset < 5000 && nextExecutionOffset > -5000
assertTrue("Next execution time not updated correctly", nextExecutionIsCorrect)
val nextWindowStartTime: Instant = rollupMetadata.continuous!!.nextWindowStartTime
val nextWindowEndTime: Instant = rollupMetadata.continuous!!.nextWindowEndTime
// Assert that after the window was updated, it falls approximately around 'now'
assertTrue("Rollup window start time is incorrect", nextWindowStartTime.plusMillis(delay).minusMillis(1000) < Instant.now())
assertTrue("Rollup window end time is incorrect", nextWindowEndTime.plusMillis(delay).plusMillis(1000) > Instant.now())

// window length should be 5 seconds
val expectedWindowEnd = nextWindowStartTime.plusMillis(5000)
assertEquals("Rollup window length applied incorrectly", expectedWindowEnd, nextWindowEndTime)

val now = Instant.now().toEpochMilli()
assertTrue("The first job execution must happen after [job start time] + [delay]", now > jobStartTime.toEpochMilli() + delay)

val secondExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
assertTrue("The second job execution time should be not earlier than a minute after the first execution.", secondExecutionTime - expectedFirstExecutionTime == 60_000L)
}

fun `test non continuous delay does nothing`() {
Expand Down
Loading