Skip to content

Commit

Permalink
add ut for error condition
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Jan 16, 2025
1 parent 05ec339 commit 9eddcad
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -760,25 +760,23 @@ private[deploy] class Controller(
val (commitStartWaitTime, context) = epochWaitTimeEntry.getValue
try {
val commitInfo = shuffleCommitInfos.get(shuffleKey).get(epoch)
if (commitInfo != null) {
commitInfo.synchronized {
if (commitInfo.status == CommitInfo.COMMIT_FINISHED) {
context.reply(commitInfo.response)
commitInfo.synchronized {
if (commitInfo.status == CommitInfo.COMMIT_FINISHED) {
context.reply(commitInfo.response)
epochIterator.remove()
} else {
if (currentTime - commitStartWaitTime >= shuffleCommitTimeout) {
val replyResponse = CommitFilesResponse(
StatusCode.COMMIT_FILE_EXCEPTION,
List.empty.asJava,
List.empty.asJava,
commitInfo.response.failedPrimaryIds,
commitInfo.response.failedReplicaIds)
shuffleCommitInfos.get(shuffleKey).put(
epoch,
new CommitInfo(replyResponse, CommitInfo.COMMIT_FINISHED))
context.reply(replyResponse)
epochIterator.remove()
} else {
if (currentTime - commitStartWaitTime >= shuffleCommitTimeout) {
val replyResponse = CommitFilesResponse(
StatusCode.COMMIT_FILE_EXCEPTION,
List.empty.asJava,
List.empty.asJava,
commitInfo.response.failedPrimaryIds,
commitInfo.response.failedReplicaIds)
shuffleCommitInfos.get(shuffleKey).put(
epoch,
new CommitInfo(replyResponse, CommitInfo.COMMIT_FINISHED))
context.reply(replyResponse)
epochIterator.remove()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
val epoch0: Long = 0
val epoch1: Long = 1
val epoch2: Long = 2
val epoch3: Long = 3
val startWaitTime = System.currentTimeMillis()

// update an INPROCESS commitInfo
Expand All @@ -207,9 +208,12 @@ class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
shuffleKey,
JavaUtils.newConcurrentHashMap[Long, (Long, RpcCallContext)]())
val epochWaitTimeMap = shuffleCommitTime.get(shuffleKey)
epochWaitTimeMap.putIfAbsent(epoch0, (startWaitTime, context))
epochWaitTimeMap.put(epoch0, (startWaitTime, context))
}

assert(shuffleCommitTime.get(shuffleKey).get(epoch0)._1 == startWaitTime)
assert(epochCommitMap.get(epoch0).status == CommitInfo.COMMIT_INPROCESS)

// update an INPROCESS commitInfo
val response1 = CommitFilesResponse(
null,
Expand All @@ -225,17 +229,20 @@ class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
shuffleKey,
JavaUtils.newConcurrentHashMap[Long, (Long, RpcCallContext)]())
val epochWaitTimeMap = shuffleCommitTime.get(shuffleKey)
epochWaitTimeMap.putIfAbsent(epoch1, (startWaitTime, context))
epochWaitTimeMap.put(epoch1, (startWaitTime, context))
}

assert(shuffleCommitTime.get(shuffleKey).get(epoch1)._1 == startWaitTime)
assert(epochCommitMap.get(epoch1).status == CommitInfo.COMMIT_INPROCESS)

// update an FINISHED commitInfo
val response2 = CommitFilesResponse(
StatusCode.SUCCESS,
primaryIds.asJava,
replicaIds.asJava,
List.empty.asJava,
List.empty.asJava)
epochCommitMap.putIfAbsent(epoch2, new CommitInfo(response2, CommitInfo.COMMIT_FINISHED))
epochCommitMap.put(epoch2, new CommitInfo(response2, CommitInfo.COMMIT_FINISHED))

val commitInfo2 = epochCommitMap.get(epoch2)
commitInfo2.synchronized {
Expand All @@ -244,16 +251,26 @@ class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
JavaUtils.newConcurrentHashMap[Long, (Long, RpcCallContext)]())
val epochWaitTimeMap = shuffleCommitTime.get(shuffleKey)
// epoch2 is already timeout
epochWaitTimeMap.putIfAbsent(epoch2, (startWaitTime, context))
epochWaitTimeMap.put(epoch2, (startWaitTime, context))
}

assert(shuffleCommitTime.get(shuffleKey).get(epoch0)._1 == startWaitTime)
assert(epochCommitMap.get(epoch0).status == CommitInfo.COMMIT_INPROCESS)
assert(shuffleCommitTime.get(shuffleKey).get(epoch1)._1 == startWaitTime)
assert(epochCommitMap.get(epoch1).status == CommitInfo.COMMIT_INPROCESS)
assert(shuffleCommitTime.get(shuffleKey).get(epoch2)._1 == startWaitTime)
assert(epochCommitMap.get(epoch2).status == CommitInfo.COMMIT_FINISHED)

// add a new shuffleKey2 to shuffleCommitTime but not to shuffleCommitInfos
val shuffleKey2 = "2"
shuffleCommitTime.putIfAbsent(
shuffleKey2,
JavaUtils.newConcurrentHashMap[Long, (Long, RpcCallContext)]())
shuffleCommitTime.get(shuffleKey2).put(epoch0, (startWaitTime, context))
assert(shuffleCommitTime.containsKey(shuffleKey2))
assert(!shuffleCommitInfos.containsKey(shuffleKey2))

// add an epoch to shuffleCommitTime but not to shuffleCommitInfos
shuffleCommitTime.get(shuffleKey).put(epoch3, (startWaitTime, context))
assert(shuffleCommitTime.get(shuffleKey).get(epoch3)._1 == startWaitTime)
assert(!shuffleCommitInfos.get(shuffleKey).containsKey(epoch3))

// update status of epoch1 to FINISHED
epochCommitMap.get(epoch1).status = CommitInfo.COMMIT_FINISHED
assert(epochCommitMap.get(epoch1).status == CommitInfo.COMMIT_FINISHED)
Expand All @@ -262,6 +279,13 @@ class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
controller.checkCommitTimeout(shuffleCommitTime)
assert(epochCommitMap.get(epoch0).status == CommitInfo.COMMIT_INPROCESS)

// shuffleCommitTime will be removed when shuffleCommitInfos contains no shuffleKey
assert(!shuffleCommitTime.containsKey(shuffleKey2))
assert(!shuffleCommitInfos.containsKey(shuffleKey2))

// epoch will be removed when shuffleCommitInfos contains no epoch
assert(!shuffleCommitTime.get(shuffleKey).containsKey(epoch3))

// FINISHED status of epoch1 will be removed from shuffleCommitTime
assert(shuffleCommitTime.get(shuffleKey).get(epoch1) == null)

Expand Down

0 comments on commit 9eddcad

Please sign in to comment.