Skip to content

Commit

Permalink
[CELEBORN-1081][FOLLOWUP] Remove UNKNOWN_DISK and allocate all slots …
Browse files Browse the repository at this point in the history
…to disk

Signed-off-by: mingji <[email protected]>
  • Loading branch information
FMX committed Nov 14, 2023
1 parent efa22a4 commit 9397bf4
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public int getValue() {
}
}

@Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK";
public static Map<Integer, Type> typesMap = new HashMap<>();
public static Set<String> typeNames = new HashSet<>();

Expand All @@ -57,7 +56,7 @@ public int getValue() {

// Default storage Type is MEMORY.
private Type type = Type.MEMORY;
private String mountPoint = UNKNOWN_DISK;
private String mountPoint;
// if a file is committed, field "finalResult" will be true
private boolean finalResult = false;
private String filePath;
Expand All @@ -77,6 +76,11 @@ public StorageInfo(String mountPoint, int availableStorageTypes) {
this.availableStorageTypes = availableStorageTypes;
}

public StorageInfo(Type type, int availableStorageTypes) {
this.type = type;
this.availableStorageTypes = availableStorageTypes;
}

public StorageInfo(Type type, String mountPoint) {
this.type = type;
this.mountPoint = mountPoint;
Expand Down Expand Up @@ -147,21 +151,33 @@ public String toString() {
+ '}';
}

public boolean localDiskAvailable() {
public static boolean localDiskAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & LOCAL_DISK_MASK) > 0;
}

public boolean HDFSAvailable() {
public boolean localDiskAvailable() {
return StorageInfo.localDiskAvailable(availableStorageTypes);
}

public static boolean HDFSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & HDFS_MASK) > 0;
}

public boolean OSSAvailable() {
public boolean HDFSAvailable() {
return StorageInfo.HDFSAvailable(availableStorageTypes);
}

public static boolean OSSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & OSS_MASK) > 0;
}

public boolean OSSAvailable() {
return StorageInfo.OSSAvailable(availableStorageTypes);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ static class UsableDiskInfo {
return new HashMap<>();
}
Map<WorkerInfo, List<UsableDiskInfo>> restrictions = new HashMap<>();
for (WorkerInfo worker : workers) {
List<UsableDiskInfo> usableDisks =
restrictions.computeIfAbsent(worker, v -> new ArrayList<>());
for (Map.Entry<String, DiskInfo> diskInfoEntry : worker.diskInfos().entrySet()) {
if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots()));
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
for (WorkerInfo worker : workers) {
List<UsableDiskInfo> usableDisks =
restrictions.computeIfAbsent(worker, v -> new ArrayList<>());
for (Map.Entry<String, DiskInfo> diskInfoEntry : worker.diskInfos().entrySet()) {
if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots()));
}
}
}
}
Expand Down Expand Up @@ -160,16 +162,28 @@ private static StorageInfo getStorageInfo(
Map<WorkerInfo, Integer> workerDiskIndex,
int availableStorageTypes) {
WorkerInfo selectedWorker = workers.get(workerIndex);
List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
StorageInfo storageInfo;
int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0);
while (usableDiskInfos.get(diskIndex).usableSlots <= 0) {
diskIndex = (diskIndex + 1) % usableDiskInfos.size();
if (restrictions != null) {
List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
while (usableDiskInfos.get(diskIndex).usableSlots <= 0) {
diskIndex = (diskIndex + 1) % usableDiskInfos.size();
}
usableDiskInfos.get(diskIndex).usableSlots--;
storageInfo =
new StorageInfo(
usableDiskInfos.get(diskIndex).diskInfo.mountPoint(), availableStorageTypes);
workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size());
} else {
DiskInfo[] diskInfos = selectedWorker.diskInfos().values().toArray(new DiskInfo[0]);
if (diskInfos.length == 0) {
// this means that this worker have no disks, use HDFS
storageInfo = new StorageInfo(StorageInfo.Type.HDFS, availableStorageTypes);
} else {
storageInfo = new StorageInfo(diskInfos[diskIndex].mountPoint(), availableStorageTypes);
diskIndex = (diskIndex + 1) % diskInfos.length;
}
}
usableDiskInfos.get(diskIndex).usableSlots--;
StorageInfo storageInfo =
new StorageInfo(
usableDiskInfos.get(diskIndex).diskInfo.mountPoint(), availableStorageTypes);
workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size());
return storageInfo;
}

Expand All @@ -190,15 +204,18 @@ private static StorageInfo getStorageInfo(
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots =
new HashMap<>();

List<Integer> remain =
roundRobin(
slots,
partitionIds,
new LinkedList<>(restrictions.keySet()),
restrictions,
shouldReplicate,
shouldRackAware,
activeStorageTypes);
List<Integer> remain = null;
if (!restrictions.isEmpty()) {
remain =
roundRobin(
slots,
partitionIds,
new LinkedList<>(restrictions.keySet()),
restrictions,
shouldReplicate,
shouldRackAware,
activeStorageTypes);
}
if (!remain.isEmpty()) {
remain =
roundRobin(
Expand All @@ -218,7 +235,8 @@ private static List<Integer> roundRobin(
boolean shouldReplicate,
boolean shouldRackAware,
int availableStorageTypes) {
// workerInfo -> (diskIndexForPrimary, diskIndexForReplica)
// workerInfo -> (diskIndexForPrimary, diskIndexForRe
// plica)
Map<WorkerInfo, Integer> workerDiskIndexForPrimary = new HashMap<>();
Map<WorkerInfo, Integer> workerDiskIndexForReplica = new HashMap<>();
List<Integer> partitionIdList = new ArrayList<>(partitionIds);
Expand All @@ -231,6 +249,7 @@ private static List<Integer> roundRobin(
int partitionId = iter.next();
StorageInfo storageInfo = new StorageInfo();
if (restrictions != null) {
// this means that we'll select a mount point
while (!haveUsableSlots(restrictions, workers, nextPrimaryInd)) {
nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
if (nextPrimaryInd == primaryIndex) {
Expand All @@ -244,6 +263,10 @@ private static List<Integer> roundRobin(
restrictions,
workerDiskIndexForPrimary,
availableStorageTypes);
} else {
storageInfo =
getStorageInfo(
workers, nextPrimaryInd, null, workerDiskIndexForPrimary, availableStorageTypes);
}
PartitionLocation primaryPartition =
createLocation(partitionId, workers.get(nextPrimaryInd), null, storageInfo, true);
Expand Down Expand Up @@ -272,6 +295,10 @@ private static List<Integer> roundRobin(
break outer;
}
}
} else {
storageInfo =
getStorageInfo(
workers, nextReplicaInd, null, workerDiskIndexForReplica, availableStorageTypes);
}
PartitionLocation replicaPartition =
createLocation(
Expand Down Expand Up @@ -493,8 +520,8 @@ public static Map<WorkerInfo, Map<String, Integer>> slotsToDiskAllocations(
jointLocations.addAll(slots.get(worker)._2);
for (PartitionLocation location : jointLocations) {
String mountPoint = location.getStorageInfo().getMountPoint();
// ignore slots for UNKNOWN_DISK
if (!mountPoint.equals(StorageInfo.UNKNOWN_DISK)) {
// skip non local disks slots
if (mountPoint != null) {
if (slotsPerDisk.containsKey(mountPoint)) {
slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1);
} else {
Expand Down

0 comments on commit 9397bf4

Please sign in to comment.