Skip to content

Commit

Permalink
[CELEBORN-1081][FOLLOWUP] Remove UNKNOWN_DISK and allocate all slots …
Browse files Browse the repository at this point in the history
…to disk

Signed-off-by: mingji <[email protected]>
  • Loading branch information
FMX committed Nov 15, 2023
1 parent efa22a4 commit d400c46
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public int getValue() {
}
}

@Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK";
public static Map<Integer, Type> typesMap = new HashMap<>();
public static Set<String> typeNames = new HashSet<>();

Expand All @@ -57,7 +56,7 @@ public int getValue() {

// Default storage Type is MEMORY.
private Type type = Type.MEMORY;
private String mountPoint = UNKNOWN_DISK;
private String mountPoint = "";
// if a file is committed, field "finalResult" will be true
private boolean finalResult = false;
private String filePath;
Expand All @@ -77,6 +76,11 @@ public StorageInfo(String mountPoint, int availableStorageTypes) {
this.availableStorageTypes = availableStorageTypes;
}

public StorageInfo(Type type, int availableStorageTypes) {
this.type = type;
this.availableStorageTypes = availableStorageTypes;
}

public StorageInfo(Type type, String mountPoint) {
this.type = type;
this.mountPoint = mountPoint;
Expand Down Expand Up @@ -147,21 +151,41 @@ public String toString() {
+ '}';
}

public boolean localDiskAvailable() {
public static boolean localDiskAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & LOCAL_DISK_MASK) > 0;
}

public boolean HDFSAvailable() {
public boolean localDiskAvailable() {
return StorageInfo.localDiskAvailable(availableStorageTypes);
}

public static boolean HDFSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & HDFS_MASK) > 0;
}

public boolean OSSAvailable() {
public boolean HDFSAvailable() {
return StorageInfo.HDFSAvailable(availableStorageTypes);
}

public static boolean HDFSOnly(int availableStorageTypes) {
return availableStorageTypes == HDFS_MASK;
}

public boolean HDFSOnly() {
return StorageInfo.HDFSOnly(availableStorageTypes);
}

public static boolean OSSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & OSS_MASK) > 0;
}

public boolean OSSAvailable() {
return StorageInfo.OSSAvailable(availableStorageTypes);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,15 @@ public void testToStringOutput() {
+ " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
+ " mode:PRIMARY\n"
+ " peer:(empty)\n"
+ " storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n"
+ " storage hint:StorageInfo{type=MEMORY, mountPoint='', finalResult=false, filePath=null}\n"
+ " mapIdBitMap:{}]";
String exp2 =
"PartitionLocation[\n"
+ " id-epoch:0-0\n"
+ " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
+ " mode:PRIMARY\n"
+ " peer:(host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4)\n"
+ " storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n"
+ " storage hint:StorageInfo{type=MEMORY, mountPoint='', finalResult=false, filePath=null}\n"
+ " mapIdBitMap:{}]";
String exp3 =
"PartitionLocation[\n"
Expand Down
75 changes: 75 additions & 0 deletions docs/developers/slotsallocation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
---
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

# Slots allocation

This article describes the detailed design of Celeborn workers' slots allocation.
Slots allocation is the core components about how Celeborn distribute workload amount workers.
We have achieved two approaches of slots allocation.

## Principle
Allocate slots to local disks unless explicit assigned to HDFS.

## LoadAware
### Related configs
```properties
celeborn.master.slot.assign.policy LOADAWARE
celeborn.master.slot.assign.loadAware.numDiskGroups 5
celeborn.master.slot.assign.loadAware.diskGroupGradient 0.1
celeborn.master.slot.assign.loadAware.flushTimeWeight 0
celeborn.master.slot.assign.loadAware.fetchTimeWeight 0
[spark.client.]celeborn.storage.availableTypes HDD,SSD
```
### Detail
Load-aware slots allocation will take following elements into consideration.
- disk's fetch time
- disk's flush time
- disk's usable space
- disk's used slot

Slots allocator will find out all worker involved in this allocation and sort their disks by
`disk's average flushtime * flush time weight + disk's average fetch time * fetch time weight`.
After getting the sorted disks list, Celeborn will split the disks into
`celeborn.master.slot.assign.loadAware.numDiskGroups` groups. The slots number to be placed into a disk group
is controlled by the `celeborn.master.slot.assign.loadAware.diskGroupGradient` which means that a group's
allocated slots number will be (1+`celeborn.master.slot.assign.loadAware.diskGroupGradient`)
times to the group's slower than it.
For example, there is 5 groups, G1 , G2, G3, G4 and G5. If the G5 is allocated 100 slots.
Other groups will be G4:110, G3:121, G2:133, G1:146.

After Celeborn has decided the slots number of a disk group, slots will be distributed in disks of a disk group.
Each disk has a usableSlots which is calculated by `(disk's usable space)/(average partition size)-usedSlots`.
The slots number to allocate in a disk is calculated by ` slots of this disk group * ( current disk's usableSlots / the sum of all disks' usableSlots in this group)`.
For example, G5 need to allocate 100 slots and have 3 disks D1 with usable slots 100, D2 with usable slots 50, D3 with usable slots 20.
The distribution will be D1:59, D2: 29, D3: 12.

If all slots can be place in disk groups, the slots allocation process is done.

requested slots are more than all usable slots, slots can not be placed into disks.
Worker will need to allocate these slots to workers with local disks one by one.

## RoundRobin
### Detail
Roundrobin slots allocation will distribute all slots into all registered workers with disks. Celeborn will treat
all workers as an array and place 1 slots in a worker until all slots are allocated.
If a worker has multiple disks, the chosen disk index is `(monotone increasing disk index +1) % disk count`.

## Celeborn Worker's Behavior
1. When reserve slots Celeborn worker will decide a slot be placed in local disks or HDFS when reserve slots.
2. If a partition is evicted from memory, the partition might be placed in HDFS.
3. If a slot is explicitly assigned to HDFS, worker will put the slot in HDFS.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ static class UsableDiskInfo {
private static final Random rand = new Random();
private static boolean initialized = false;
private static double[] taskAllocationRatio = null;
private static final DiskInfo HDFSDiskInfo =
new DiskInfo("HDFS", Integer.MAX_VALUE, 0, 0, Integer.MAX_VALUE);

public static Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
offerSlotsRoundRobin(
Expand All @@ -67,9 +69,7 @@ static class UsableDiskInfo {
restrictions.computeIfAbsent(worker, v -> new ArrayList<>());
for (Map.Entry<String, DiskInfo> diskInfoEntry : worker.diskInfos().entrySet()) {
if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots()));
usableDisks.add(new UsableDiskInfo(diskInfoEntry.getValue(), Integer.MAX_VALUE));
}
}
}
Expand Down Expand Up @@ -105,6 +105,10 @@ static class UsableDiskInfo {
if (workers.size() < 2 && shouldReplicate) {
return new HashMap<>();
}
if (StorageInfo.HDFSOnly(availableStorageTypes)) {
return offerSlotsRoundRobin(
workers, partitionIds, shouldReplicate, shouldRackAware, availableStorageTypes);
}

List<DiskInfo> usableDisks = new ArrayList<>();
Map<DiskInfo, WorkerInfo> diskToWorkerMap = new HashMap<>();
Expand Down Expand Up @@ -160,16 +164,30 @@ private static StorageInfo getStorageInfo(
Map<WorkerInfo, Integer> workerDiskIndex,
int availableStorageTypes) {
WorkerInfo selectedWorker = workers.get(workerIndex);
List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
StorageInfo storageInfo;
int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0);
while (usableDiskInfos.get(diskIndex).usableSlots <= 0) {
diskIndex = (diskIndex + 1) % usableDiskInfos.size();
if (restrictions != null) {
List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
while (usableDiskInfos.get(diskIndex).usableSlots <= 0) {
diskIndex = (diskIndex + 1) % usableDiskInfos.size();
}
usableDiskInfos.get(diskIndex).usableSlots--;
DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo;
if (selectedDiskInfo.equals(HDFSDiskInfo)) {
storageInfo = new StorageInfo(StorageInfo.Type.HDFS, availableStorageTypes);
} else {
storageInfo = new StorageInfo(selectedDiskInfo.mountPoint(), availableStorageTypes);
}
workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size());
} else {
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
DiskInfo[] diskInfos = selectedWorker.diskInfos().values().toArray(new DiskInfo[0]);
storageInfo = new StorageInfo(diskInfos[diskIndex].mountPoint(), availableStorageTypes);
diskIndex = (diskIndex + 1) % diskInfos.length;
} else {
storageInfo = new StorageInfo(StorageInfo.Type.HDFS, availableStorageTypes);
}
}
usableDiskInfos.get(diskIndex).usableSlots--;
StorageInfo storageInfo =
new StorageInfo(
usableDiskInfos.get(diskIndex).diskInfo.mountPoint(), availableStorageTypes);
workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size());
return storageInfo;
}

Expand All @@ -186,7 +204,7 @@ private static StorageInfo getStorageInfo(
Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
boolean shouldReplicate,
boolean shouldRackAware,
int activeStorageTypes) {
int availableStorageTypes) {
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots =
new HashMap<>();

Expand All @@ -198,14 +216,20 @@ private static StorageInfo getStorageInfo(
restrictions,
shouldReplicate,
shouldRackAware,
activeStorageTypes);
availableStorageTypes);
if (!remain.isEmpty()) {
remain =
roundRobin(
slots, remain, workers, null, shouldReplicate, shouldRackAware, activeStorageTypes);
slots,
remain,
workers,
null,
shouldReplicate,
shouldRackAware,
availableStorageTypes);
}
if (!remain.isEmpty()) {
roundRobin(slots, remain, workers, null, shouldReplicate, false, activeStorageTypes);
roundRobin(slots, remain, workers, null, shouldReplicate, false, availableStorageTypes);
}
return slots;
}
Expand All @@ -218,7 +242,8 @@ private static List<Integer> roundRobin(
boolean shouldReplicate,
boolean shouldRackAware,
int availableStorageTypes) {
// workerInfo -> (diskIndexForPrimary, diskIndexForReplica)
// workerInfo -> (diskIndexForPrimary, diskIndexForRe
// plica)
Map<WorkerInfo, Integer> workerDiskIndexForPrimary = new HashMap<>();
Map<WorkerInfo, Integer> workerDiskIndexForReplica = new HashMap<>();
List<Integer> partitionIdList = new ArrayList<>(partitionIds);
Expand All @@ -231,6 +256,7 @@ private static List<Integer> roundRobin(
int partitionId = iter.next();
StorageInfo storageInfo = new StorageInfo();
if (restrictions != null) {
// this means that we'll select a mount point
while (!haveUsableSlots(restrictions, workers, nextPrimaryInd)) {
nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
if (nextPrimaryInd == primaryIndex) {
Expand All @@ -244,6 +270,20 @@ private static List<Integer> roundRobin(
restrictions,
workerDiskIndexForPrimary,
availableStorageTypes);
} else {
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
while (!haveDisk(workers, nextPrimaryInd)) {
nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
if (nextPrimaryInd == primaryIndex) {
break outer;
}
}
} else {
nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
}
storageInfo =
getStorageInfo(
workers, nextPrimaryInd, null, workerDiskIndexForPrimary, availableStorageTypes);
}
PartitionLocation primaryPartition =
createLocation(partitionId, workers.get(nextPrimaryInd), null, storageInfo, true);
Expand Down Expand Up @@ -272,6 +312,20 @@ private static List<Integer> roundRobin(
break outer;
}
}
} else {
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
while (!haveDisk(workers, nextPrimaryInd)) {
nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
if (nextPrimaryInd == primaryIndex) {
break outer;
}
}
} else {
nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
}
storageInfo =
getStorageInfo(
workers, nextReplicaInd, null, workerDiskIndexForReplica, availableStorageTypes);
}
PartitionLocation replicaPartition =
createLocation(
Expand Down Expand Up @@ -299,6 +353,10 @@ private static boolean haveUsableSlots(
return restrictions.get(workers.get(index)).stream().mapToLong(i -> i.usableSlots).sum() > 0;
}

private static boolean haveDisk(List<WorkerInfo> workers, int index) {
return !workers.get(index).diskInfos().isEmpty();
}

private static boolean satisfyRackAware(
boolean shouldRackAware, List<WorkerInfo> workers, int primaryIndex, int nextReplicaInd) {
return !shouldRackAware
Expand Down Expand Up @@ -493,8 +551,8 @@ public static Map<WorkerInfo, Map<String, Integer>> slotsToDiskAllocations(
jointLocations.addAll(slots.get(worker)._2);
for (PartitionLocation location : jointLocations) {
String mountPoint = location.getStorageInfo().getMountPoint();
// ignore slots for UNKNOWN_DISK
if (!mountPoint.equals(StorageInfo.UNKNOWN_DISK)) {
// skip non local disks slots
if (!mountPoint.isEmpty()) {
if (slotsPerDisk.containsKey(mountPoint)) {
slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ private[celeborn] class Master(
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") {
statusSystem.workers.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !hasHDFSStorage) {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) {
SlotsAllocator.offerSlotsLoadAware(
selectedWorkers,
requestSlots.partitionIdList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private void checkSlotsOnHDFS(
conf.set("celeborn.active.storage.levels", "HDFS");
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots =
SlotsAllocator.offerSlotsRoundRobin(
workers, partitionIds, shouldReplicate, false, StorageInfo.ALL_TYPES_AVAILABLE_MASK);
workers, partitionIds, shouldReplicate, false, StorageInfo.HDFS_MASK);

int allocatedPartitionCount = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint")
}
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
if ((dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) || location.getStorageInfo.HDFSOnly()) {
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")
val fileInfo =
Expand Down

0 comments on commit d400c46

Please sign in to comment.