Skip to content

Commit

Permalink
[CELEBORN-1601] Support revise lost shuffles
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
To support revising lost shuffle IDs in a long-running job such as flink batch jobs.

### Why are the changes needed?
1. To support revise lost shuffles.
2. To add an HTTP endpoint to revise lost shuffles manually.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
Cluster tests.

Closes apache#2746 from FMX/b1600.

Lead-authored-by: mingji <[email protected]>
Co-authored-by: Ethan Feng <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
FMX authored and SteNicholas committed Oct 21, 2024
1 parent bcb4318 commit df01fad
Show file tree
Hide file tree
Showing 32 changed files with 647 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,11 @@ class CommonOptions {
paramLabel = "username",
description = Array("The username of the TENANT_USER level."))
private[cli] var configName: String = _

@Option(
names = Array("--apps"),
paramLabel = "appId",
description = Array("The application Id list seperated by comma."))
private[cli] var apps: String = _

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,14 @@ final class MasterOptions {
names = Array("--remove-workers-unavailable-info"),
description = Array("Remove the workers unavailable info from the master."))
private[master] var removeWorkersUnavailableInfo: Boolean = _

@Option(
names = Array("--revise-lost-shuffles"),
description = Array("Revise lost shuffles or remove shuffles for an application."))
private[master] var reviseLostShuffles: Boolean = _

@Option(
names = Array("--delete-apps"),
description = Array("Delete resource of an application."))
private[master] var deleteApps: Boolean = _
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ trait MasterSubcommand extends CliLogging {
@ArgGroup(exclusive = true, multiplicity = "1")
private[master] var masterOptions: MasterOptions = _

@ArgGroup(exclusive = false)
private[master] var reviseLostShuffleOptions: ReviseLostShuffleOptions = _

@Mixin
private[master] var commonOptions: CommonOptions = _

Expand Down Expand Up @@ -110,4 +113,8 @@ trait MasterSubcommand extends CliLogging {

private[master] def runShowThreadDump: ThreadStackResponse

private[master] def reviseLostShuffles: HandleResponse

private[master] def deleteApps: HandleResponse

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class MasterSubcommandImpl extends Runnable with MasterSubcommand {
if (masterOptions.showContainerInfo) log(runShowContainerInfo)
if (masterOptions.showDynamicConf) log(runShowDynamicConf)
if (masterOptions.showThreadDump) log(runShowThreadDump)
if (masterOptions.reviseLostShuffles) log(reviseLostShuffles)
if (masterOptions.deleteApps) log(deleteApps)
if (masterOptions.addClusterAlias != null && masterOptions.addClusterAlias.nonEmpty)
runAddClusterAlias
if (masterOptions.removeClusterAlias != null && masterOptions.removeClusterAlias.nonEmpty)
Expand Down Expand Up @@ -220,4 +222,20 @@ class MasterSubcommandImpl extends Runnable with MasterSubcommand {
}

private[master] def runShowContainerInfo: ContainerInfo = defaultApi.getContainerInfo

override private[master] def reviseLostShuffles: HandleResponse = {
val app = commonOptions.apps
if (app.contains(",")) {
throw new ParameterException(
spec.commandLine(),
"Only one application id can be provided for this command.")
}
val shuffleIds = reviseLostShuffleOptions.shuffleIds
applicationApi.reviseLostShuffles(app, shuffleIds)
}

override private[master] def deleteApps: HandleResponse = {
val apps = commonOptions.apps
applicationApi.deleteApps(apps)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.cli.master

import picocli.CommandLine.Option

final class ReviseLostShuffleOptions {

@Option(
names = Array("--shuffleIds"),
description = Array("The shuffle ids to manipulate."))
private[master] var shuffleIds: String = _

}
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature {
captureOutputAndValidateResponse(args, "success: true")
}

test("master --delete-apps case1") {
val args = prepareMasterArgs() ++ Array(
"--delete-apps",
"--apps",
"app1")
captureOutputAndValidateResponse(args, "success: true")
}

test("master --delete-apps case2") {
val args = prepareMasterArgs() ++ Array(
"--delete-apps",
"--apps",
"app1,app2")
captureOutputAndValidateResponse(args, "success: true")
}

test("master --revise-lost-shuffles case1") {
val args = prepareMasterArgs() ++ Array(
"--revise-lost-shuffles",
"--apps",
"app1",
"--shuffleIds",
"1,2,3,4,5,6")
captureOutputAndValidateResponse(args, "success: true")
}

private def prepareMasterArgs(): Array[String] = {
Array(
"master",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

package org.apache.celeborn.client

import java.util.concurrent.{ScheduledFuture, TimeUnit}
import java.util
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import java.util.function.Consumer

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, ApplicationLostResponse, HeartbeatFromApplication, HeartbeatFromApplicationResponse, ZERO_UUID}
import org.apache.celeborn.common.protocol.PbReviseLostShufflesResponse
import org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, ApplicationLostResponse, HeartbeatFromApplication, HeartbeatFromApplicationResponse, ReviseLostShuffles, ZERO_UUID}
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.util.{ThreadUtils, Utils}

Expand All @@ -33,9 +38,11 @@ class ApplicationHeartbeater(
conf: CelebornConf,
masterClient: MasterClient,
shuffleMetrics: () => (Long, Long),
workerStatusTracker: WorkerStatusTracker) extends Logging {
workerStatusTracker: WorkerStatusTracker,
registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean]) extends Logging {

private var stopped = false
private val reviseLostShuffles = conf.reviseLostShufflesEnabled

// Use independent app heartbeat threads to avoid being blocked by other operations.
private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs
Expand Down Expand Up @@ -68,6 +75,30 @@ class ApplicationHeartbeater(
if (response.statusCode == StatusCode.SUCCESS) {
logDebug("Successfully send app heartbeat.")
workerStatusTracker.handleHeartbeatResponse(response)
// revise shuffle id if there are lost shuffles
if (reviseLostShuffles) {
val masterRecordedShuffleIds = response.registeredShuffles
val localOnlyShuffles = new util.ArrayList[Integer]()
registeredShuffles.forEach(new Consumer[Int] {
override def accept(key: Int): Unit = {
localOnlyShuffles.add(key)
}
})
localOnlyShuffles.removeAll(masterRecordedShuffleIds)
if (!localOnlyShuffles.isEmpty) {
logWarning(
s"There are lost shuffle found ${StringUtils.join(localOnlyShuffles, ",")}, revise lost shuffles.")
val reviseLostShufflesResponse = masterClient.askSync(
ReviseLostShuffles.apply(appId, localOnlyShuffles, MasterClient.genRequestId()),
classOf[PbReviseLostShufflesResponse])
if (!reviseLostShufflesResponse.getSuccess) {
logWarning(
s"Revise lost shuffles failed. Error message :${reviseLostShufflesResponse.getMessage}")
} else {
logInfo("Revise lost shuffles succeed.")
}
}
}
}
} catch {
case it: InterruptedException =>
Expand Down Expand Up @@ -97,6 +128,7 @@ class ApplicationHeartbeater(
StatusCode.REQUEST_FAILED,
List.empty.asJava,
List.empty.asJava,
List.empty.asJava,
List.empty.asJava)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
conf,
masterClient,
() => commitManager.commitMetrics(),
workerStatusTracker)
workerStatusTracker,
registeredShuffle)
private val changePartitionManager = new ChangePartitionManager(conf, this)
private val releasePartitionManager = new ReleasePartitionManager(conf, this)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
StatusCode.SUCCESS,
excludedWorkers,
unknownWorkers,
shuttingWorkers)
shuttingWorkers,
new util.ArrayList[Integer]())
}

private def mockWorkers(workerHosts: Array[String]): util.ArrayList[WorkerInfo] = {
Expand Down
14 changes: 14 additions & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ enum MessageType {
NOTIFY_REQUIRED_SEGMENT = 86;
BATCH_UNREGISTER_SHUFFLES = 87;
BATCH_UNREGISTER_SHUFFLE_RESPONSE= 88;
REVISE_LOST_SHUFFLES = 89;
REVISE_LOST_SHUFFLES_RESPONSE = 90;
}

enum StreamType {
Expand Down Expand Up @@ -447,6 +449,7 @@ message PbHeartbeatFromApplicationResponse {
repeated PbWorkerInfo excludedWorkers = 2;
repeated PbWorkerInfo unknownWorkers = 3;
repeated PbWorkerInfo shuttingWorkers = 4;
repeated int32 registeredShuffles = 5;
}

message PbCheckQuota {
Expand Down Expand Up @@ -856,3 +859,14 @@ message PbReportWorkerDecommission {
repeated PbWorkerInfo workers = 1;
string requestId = 2;
}

message PbReviseLostShuffles{
string appId = 1;
repeated int32 lostShuffles = 2;
string requestId = 3;
}

message PbReviseLostShufflesResponse{
bool success = 1;
string message = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se

def registerShuffleFilterExcludedWorkerEnabled: Boolean =
get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED)
def reviseLostShufflesEnabled: Boolean = get(REVISE_LOST_SHUFFLES_ENABLED)

// //////////////////////////////////////////////////////
// Worker //
Expand Down Expand Up @@ -5636,6 +5637,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)

val REVISE_LOST_SHUFFLES_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.shuffle.reviseLostShuffles.enabled")
.categories("client")
.version("0.6.0")
.doc("Whether to revise lost shuffles.")
.booleanConf
.createWithDefault(false)

val NETWORK_IO_SASL_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.saslTimeout")
.categories("network")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,26 @@ object ControlMessages extends Logging {
.build()
}

object ReviseLostShuffles {
def apply(
appId: String,
lostShuffles: java.util.List[Integer],
requestId: String): PbReviseLostShuffles =
PbReviseLostShuffles.newBuilder()
.setAppId(appId)
.addAllLostShuffles(lostShuffles)
.setRequestId(requestId)
.build()
}

object ReviseLostShufflesResponse {
def apply(success: Boolean, message: String): PbReviseLostShufflesResponse =
PbReviseLostShufflesResponse.newBuilder()
.setSuccess(success)
.setMessage(message)
.build()
}

case class StageEnd(shuffleId: Int) extends MasterMessage

case class StageEndResponse(status: StatusCode)
Expand Down Expand Up @@ -400,7 +420,8 @@ object ControlMessages extends Logging {
statusCode: StatusCode,
excludedWorkers: util.List[WorkerInfo],
unknownWorkers: util.List[WorkerInfo],
shuttingWorkers: util.List[WorkerInfo]) extends Message
shuttingWorkers: util.List[WorkerInfo],
registeredShuffles: util.List[Integer]) extends Message

case class CheckQuota(userIdentifier: UserIdentifier) extends Message

Expand Down Expand Up @@ -565,6 +586,12 @@ object ControlMessages extends Logging {
case pb: PbReportShuffleFetchFailureResponse =>
new TransportMessage(MessageType.REPORT_SHUFFLE_FETCH_FAILURE_RESPONSE, pb.toByteArray)

case pb: PbReviseLostShuffles =>
new TransportMessage(MessageType.REVISE_LOST_SHUFFLES, pb.toByteArray)

case pb: PbReviseLostShufflesResponse =>
new TransportMessage(MessageType.REVISE_LOST_SHUFFLES_RESPONSE, pb.toByteArray)

case pb: PbReportBarrierStageAttemptFailure =>
new TransportMessage(MessageType.REPORT_BARRIER_STAGE_ATTEMPT_FAILURE, pb.toByteArray)

Expand Down Expand Up @@ -799,7 +826,8 @@ object ControlMessages extends Logging {
statusCode,
excludedWorkers,
unknownWorkers,
shuttingWorkers) =>
shuttingWorkers,
registeredShuffles) =>
val payload = PbHeartbeatFromApplicationResponse.newBuilder()
.setStatus(statusCode.getValue)
.addAllExcludedWorkers(
Expand All @@ -808,6 +836,7 @@ object ControlMessages extends Logging {
unknownWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.addAllShuttingWorkers(
shuttingWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.addAllRegisteredShuffles(registeredShuffles)
.build().toByteArray
new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION_RESPONSE, payload)

Expand Down Expand Up @@ -1191,7 +1220,8 @@ object ControlMessages extends Logging {
pbHeartbeatFromApplicationResponse.getUnknownWorkersList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
pbHeartbeatFromApplicationResponse.getShuttingWorkersList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
pbHeartbeatFromApplicationResponse.getRegisteredShufflesList)

case CHECK_QUOTA_VALUE =>
val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ object PbSerDeUtils {

def toPbSnapshotMetaInfo(
estimatedPartitionSize: java.lang.Long,
registeredShuffle: java.util.Set[String],
registeredShuffle: java.util.Map[String, java.util.Set[Integer]],
hostnameSet: java.util.Set[String],
excludedWorkers: java.util.Set[WorkerInfo],
manuallyExcludedWorkers: java.util.Set[WorkerInfo],
Expand All @@ -468,7 +468,9 @@ object PbSerDeUtils {
decommissionWorkers: java.util.Set[WorkerInfo]): PbSnapshotMetaInfo = {
val builder = PbSnapshotMetaInfo.newBuilder()
.setEstimatedPartitionSize(estimatedPartitionSize)
.addAllRegisteredShuffle(registeredShuffle)
.addAllRegisteredShuffle(registeredShuffle.asScala.flatMap { appIdAndShuffleId =>
appIdAndShuffleId._2.asScala.map(i => Utils.makeShuffleKey(appIdAndShuffleId._1, i))
}.asJava)
.addAllHostnameSet(hostnameSet)
.addAllExcludedWorkers(excludedWorkers.asScala.map(toPbWorkerInfo(_, true, false)).asJava)
.addAllManuallyExcludedWorkers(manuallyExcludedWorkers.asScala
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ license: |
| celeborn.client.shuffle.partitionSplit.threshold | 1G | false | Shuffle file size threshold, if file size exceeds this, trigger split. | 0.3.0 | celeborn.shuffle.partitionSplit.threshold |
| celeborn.client.shuffle.rangeReadFilter.enabled | false | false | If a spark application have skewed partition, this value can set to true to improve performance. | 0.2.0 | celeborn.shuffle.rangeReadFilter.enabled |
| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | false | Whether to filter excluded worker when register shuffle. | 0.4.0 | |
| celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether to revise lost shuffles. | 0.6.0 | |
| celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 0.3.1 | |
| celeborn.client.spark.fetch.throwsFetchFailure | false | false | client throws FetchFailedException instead of CelebornIOException | 0.4.0 | |
| celeborn.client.spark.push.dynamicWriteMode.enabled | false | false | Whether to dynamically switch push write mode based on conditions.If true, shuffle mode will be only determined by partition count | 0.5.0 | |
Expand Down
Loading

0 comments on commit df01fad

Please sign in to comment.