diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java index d3e1bf95b5f..f4cfaf1aef7 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java @@ -38,7 +38,6 @@ public int getValue() { } } - @Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK"; public static Map typesMap = new HashMap<>(); public static Set typeNames = new HashSet<>(); @@ -57,7 +56,7 @@ public int getValue() { // Default storage Type is MEMORY. private Type type = Type.MEMORY; - private String mountPoint = UNKNOWN_DISK; + private String mountPoint; // if a file is committed, field "finalResult" will be true private boolean finalResult = false; private String filePath; @@ -77,6 +76,11 @@ public StorageInfo(String mountPoint, int availableStorageTypes) { this.availableStorageTypes = availableStorageTypes; } + public StorageInfo(Type type, int availableStorageTypes) { + this.type = type; + this.availableStorageTypes = availableStorageTypes; + } + public StorageInfo(Type type, String mountPoint) { this.type = type; this.mountPoint = mountPoint; @@ -147,21 +151,33 @@ public String toString() { + '}'; } - public boolean localDiskAvailable() { + public static boolean localDiskAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & LOCAL_DISK_MASK) > 0; } - public boolean HDFSAvailable() { + public boolean localDiskAvailable() { + return StorageInfo.localDiskAvailable(availableStorageTypes); + } + + public static boolean HDFSAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & HDFS_MASK) > 0; } - public boolean OSSAvailable() { + public boolean HDFSAvailable() { + return StorageInfo.HDFSAvailable(availableStorageTypes); + } + + public static boolean OSSAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & OSS_MASK) > 0; } + public boolean OSSAvailable() { + return StorageInfo.OSSAvailable(availableStorageTypes); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index acfc2261031..e1546d05bf8 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -62,14 +62,16 @@ static class UsableDiskInfo { return new HashMap<>(); } Map> restrictions = new HashMap<>(); - for (WorkerInfo worker : workers) { - List usableDisks = - restrictions.computeIfAbsent(worker, v -> new ArrayList<>()); - for (Map.Entry diskInfoEntry : worker.diskInfos().entrySet()) { - if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) { - usableDisks.add( - new UsableDiskInfo( - diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); + if (StorageInfo.localDiskAvailable(availableStorageTypes)) { + for (WorkerInfo worker : workers) { + List usableDisks = + restrictions.computeIfAbsent(worker, v -> new ArrayList<>()); + for (Map.Entry diskInfoEntry : worker.diskInfos().entrySet()) { + if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) { + usableDisks.add( + new UsableDiskInfo( + diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); + } } } } @@ -160,16 +162,28 @@ private static StorageInfo getStorageInfo( Map workerDiskIndex, int availableStorageTypes) { WorkerInfo selectedWorker = workers.get(workerIndex); - List usableDiskInfos = restrictions.get(selectedWorker); + StorageInfo storageInfo; int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0); - while (usableDiskInfos.get(diskIndex).usableSlots <= 0) { - diskIndex = (diskIndex + 1) % usableDiskInfos.size(); + if (restrictions != null) { + List usableDiskInfos = restrictions.get(selectedWorker); + while (usableDiskInfos.get(diskIndex).usableSlots <= 0) { + diskIndex = (diskIndex + 1) % usableDiskInfos.size(); + } + usableDiskInfos.get(diskIndex).usableSlots--; + storageInfo = + new StorageInfo( + usableDiskInfos.get(diskIndex).diskInfo.mountPoint(), availableStorageTypes); + workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size()); + } else { + DiskInfo[] diskInfos = selectedWorker.diskInfos().values().toArray(new DiskInfo[0]); + if (diskInfos.length == 0) { + // this means that this worker have no disks, use HDFS + storageInfo = new StorageInfo(StorageInfo.Type.HDFS, availableStorageTypes); + } else { + storageInfo = new StorageInfo(diskInfos[diskIndex].mountPoint(), availableStorageTypes); + diskIndex = (diskIndex + 1) % diskInfos.length; + } } - usableDiskInfos.get(diskIndex).usableSlots--; - StorageInfo storageInfo = - new StorageInfo( - usableDiskInfos.get(diskIndex).diskInfo.mountPoint(), availableStorageTypes); - workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size()); return storageInfo; } @@ -190,15 +204,18 @@ private static StorageInfo getStorageInfo( Map, List>> slots = new HashMap<>(); - List remain = - roundRobin( - slots, - partitionIds, - new LinkedList<>(restrictions.keySet()), - restrictions, - shouldReplicate, - shouldRackAware, - activeStorageTypes); + List remain = null; + if (!restrictions.isEmpty()) { + remain = + roundRobin( + slots, + partitionIds, + new LinkedList<>(restrictions.keySet()), + restrictions, + shouldReplicate, + shouldRackAware, + activeStorageTypes); + } if (!remain.isEmpty()) { remain = roundRobin( @@ -218,7 +235,8 @@ private static List roundRobin( boolean shouldReplicate, boolean shouldRackAware, int availableStorageTypes) { - // workerInfo -> (diskIndexForPrimary, diskIndexForReplica) + // workerInfo -> (diskIndexForPrimary, diskIndexForRe + // plica) Map workerDiskIndexForPrimary = new HashMap<>(); Map workerDiskIndexForReplica = new HashMap<>(); List partitionIdList = new ArrayList<>(partitionIds); @@ -231,6 +249,7 @@ private static List roundRobin( int partitionId = iter.next(); StorageInfo storageInfo = new StorageInfo(); if (restrictions != null) { + // this means that we'll select a mount point while (!haveUsableSlots(restrictions, workers, nextPrimaryInd)) { nextPrimaryInd = (nextPrimaryInd + 1) % workers.size(); if (nextPrimaryInd == primaryIndex) { @@ -244,6 +263,10 @@ private static List roundRobin( restrictions, workerDiskIndexForPrimary, availableStorageTypes); + } else { + storageInfo = + getStorageInfo( + workers, nextPrimaryInd, null, workerDiskIndexForPrimary, availableStorageTypes); } PartitionLocation primaryPartition = createLocation(partitionId, workers.get(nextPrimaryInd), null, storageInfo, true); @@ -272,6 +295,10 @@ private static List roundRobin( break outer; } } + } else { + storageInfo = + getStorageInfo( + workers, nextReplicaInd, null, workerDiskIndexForReplica, availableStorageTypes); } PartitionLocation replicaPartition = createLocation( @@ -493,8 +520,8 @@ public static Map> slotsToDiskAllocations( jointLocations.addAll(slots.get(worker)._2); for (PartitionLocation location : jointLocations) { String mountPoint = location.getStorageInfo().getMountPoint(); - // ignore slots for UNKNOWN_DISK - if (!mountPoint.equals(StorageInfo.UNKNOWN_DISK)) { + // skip non local disks slots + if (mountPoint != null) { if (slotsPerDisk.containsKey(mountPoint)) { slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1); } else {