diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java index d3e1bf95b5f..a46d4cb7b35 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java @@ -38,7 +38,6 @@ public int getValue() { } } - @Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK"; public static Map typesMap = new HashMap<>(); public static Set typeNames = new HashSet<>(); @@ -57,7 +56,7 @@ public int getValue() { // Default storage Type is MEMORY. private Type type = Type.MEMORY; - private String mountPoint = UNKNOWN_DISK; + private String mountPoint = ""; // if a file is committed, field "finalResult" will be true private boolean finalResult = false; private String filePath; @@ -77,6 +76,11 @@ public StorageInfo(String mountPoint, int availableStorageTypes) { this.availableStorageTypes = availableStorageTypes; } + public StorageInfo(Type type, int availableStorageTypes) { + this.type = type; + this.availableStorageTypes = availableStorageTypes; + } + public StorageInfo(Type type, String mountPoint) { this.type = type; this.mountPoint = mountPoint; @@ -147,21 +151,41 @@ public String toString() { + '}'; } - public boolean localDiskAvailable() { + public static boolean localDiskAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & LOCAL_DISK_MASK) > 0; } - public boolean HDFSAvailable() { + public boolean localDiskAvailable() { + return StorageInfo.localDiskAvailable(availableStorageTypes); + } + + public static boolean HDFSAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & HDFS_MASK) > 0; } - public boolean OSSAvailable() { + public boolean HDFSAvailable() { + return StorageInfo.HDFSAvailable(availableStorageTypes); + } + + public static boolean HDFSOnly(int availableStorageTypes) { + return availableStorageTypes == HDFS_MASK; + } + + public boolean HDFSOnly() { + return StorageInfo.HDFSOnly(availableStorageTypes); + } + + public static boolean OSSAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & OSS_MASK) > 0; } + public boolean OSSAvailable() { + return StorageInfo.OSSAvailable(availableStorageTypes); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java index 927b3e9c137..29ecd5a5ca1 100644 --- a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java +++ b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java @@ -205,7 +205,7 @@ public void testToStringOutput() { + " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n" + " mode:PRIMARY\n" + " peer:(empty)\n" - + " storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n" + + " storage hint:StorageInfo{type=MEMORY, mountPoint='', finalResult=false, filePath=null}\n" + " mapIdBitMap:{}]"; String exp2 = "PartitionLocation[\n" @@ -213,7 +213,7 @@ public void testToStringOutput() { + " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n" + " mode:PRIMARY\n" + " peer:(host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4)\n" - + " storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n" + + " storage hint:StorageInfo{type=MEMORY, mountPoint='', finalResult=false, filePath=null}\n" + " mapIdBitMap:{}]"; String exp3 = "PartitionLocation[\n" diff --git a/docs/developers/slotsallocation.md b/docs/developers/slotsallocation.md index e69de29bb2d..78644b16cc6 100644 --- a/docs/developers/slotsallocation.md +++ b/docs/developers/slotsallocation.md @@ -0,0 +1,75 @@ +--- +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +# Slots allocation + +This article describes the detailed design of Celeborn workers' slots allocation. +Slots allocation is the core components about how Celeborn distribute workload amount workers. +We have achieved two approaches of slots allocation. + +## Principle +Allocate slots to local disks unless explicit assigned to HDFS. + +## LoadAware +### Related configs +```properties +celeborn.master.slot.assign.policy LOADAWARE +celeborn.master.slot.assign.loadAware.numDiskGroups 5 +celeborn.master.slot.assign.loadAware.diskGroupGradient 0.1 +celeborn.master.slot.assign.loadAware.flushTimeWeight 0 +celeborn.master.slot.assign.loadAware.fetchTimeWeight 0 +[spark.client.]celeborn.storage.availableTypes HDD,SSD +``` +### Detail +Load-aware slots allocation will take following elements into consideration. +- disk's fetch time +- disk's flush time +- disk's usable space +- disk's used slot + +Slots allocator will find out all worker involved in this allocation and sort their disks by +`disk's average flushtime * flush time weight + disk's average fetch time * fetch time weight`. +After getting the sorted disks list, Celeborn will split the disks into +`celeborn.master.slot.assign.loadAware.numDiskGroups` groups. The slots number to be placed into a disk group +is controlled by the `celeborn.master.slot.assign.loadAware.diskGroupGradient` which means that a group's +allocated slots number will be (1+`celeborn.master.slot.assign.loadAware.diskGroupGradient`) +times to the group's slower than it. +For example, there is 5 groups, G1 , G2, G3, G4 and G5. If the G5 is allocated 100 slots. +Other groups will be G4:110, G3:121, G2:133, G1:146. + +After Celeborn has decided the slots number of a disk group, slots will be distributed in disks of a disk group. +Each disk has a usableSlots which is calculated by `(disk's usable space)/(average partition size)-usedSlots`. +The slots number to allocate in a disk is calculated by ` slots of this disk group * ( current disk's usableSlots / the sum of all disks' usableSlots in this group)`. +For example, G5 need to allocate 100 slots and have 3 disks D1 with usable slots 100, D2 with usable slots 50, D3 with usable slots 20. +The distribution will be D1:59, D2: 29, D3: 12. + +If all slots can be place in disk groups, the slots allocation process is done. + +requested slots are more than all usable slots, slots can not be placed into disks. +Worker will need to allocate these slots to workers with local disks one by one. + +## RoundRobin +### Detail +Roundrobin slots allocation will distribute all slots into all registered workers with disks. Celeborn will treat +all workers as an array and place 1 slots in a worker until all slots are allocated. +If a worker has multiple disks, the chosen disk index is `(monotone increasing disk index +1) % disk count`. + +## Celeborn Worker's Behavior +1. When reserve slots Celeborn worker will decide a slot be placed in local disks or HDFS when reserve slots. +2. If a partition is evicted from memory, the partition might be placed in HDFS. +3. If a slot is explicitly assigned to HDFS, worker will put the slot in HDFS. \ No newline at end of file diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index acfc2261031..bc4d6d656ec 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -47,6 +47,8 @@ static class UsableDiskInfo { private static final Random rand = new Random(); private static boolean initialized = false; private static double[] taskAllocationRatio = null; + private static final DiskInfo HDFSDiskInfo = + new DiskInfo("HDFS", Integer.MAX_VALUE, 0, 0, Integer.MAX_VALUE); public static Map, List>> offerSlotsRoundRobin( @@ -61,25 +63,16 @@ static class UsableDiskInfo { if (workers.size() < 2 && shouldReplicate) { return new HashMap<>(); } - Map> restrictions = new HashMap<>(); - for (WorkerInfo worker : workers) { - List usableDisks = - restrictions.computeIfAbsent(worker, v -> new ArrayList<>()); - for (Map.Entry diskInfoEntry : worker.diskInfos().entrySet()) { - if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) { - usableDisks.add( - new UsableDiskInfo( - diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); - } - } - } + Map> restrictions = + usableSlotsAsRestriction(workers, availableStorageTypes); return locateSlots( partitionIds, workers, restrictions, shouldReplicate, shouldRackAware, - availableStorageTypes); + availableStorageTypes, + false); } /** @@ -105,6 +98,10 @@ static class UsableDiskInfo { if (workers.size() < 2 && shouldReplicate) { return new HashMap<>(); } + if (StorageInfo.HDFSOnly(availableStorageTypes)) { + return offerSlotsRoundRobin( + workers, partitionIds, shouldReplicate, shouldRackAware, availableStorageTypes); + } List usableDisks = new ArrayList<>(); Map diskToWorkerMap = new HashMap<>(); @@ -150,7 +147,31 @@ static class UsableDiskInfo { restrictions, shouldReplicate, shouldRackAware, - availableStorageTypes); + availableStorageTypes, + true); + } + + private static Map> usableSlotsAsRestriction( + List workers, int availableStorageTypes) { + Map> restrictions = new HashMap<>(); + for (WorkerInfo worker : workers) { + List usableDisks = + restrictions.computeIfAbsent(worker, v -> new ArrayList<>()); + for (Map.Entry diskInfoEntry : worker.diskInfos().entrySet()) { + if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) { + // if (StorageInfo.HDFSAvailable(availableStorageTypes)) { + usableDisks.add(new UsableDiskInfo(diskInfoEntry.getValue(), Integer.MAX_VALUE)); + // } else { + // usableDisks.add( + // new UsableDiskInfo( + // diskInfoEntry.getValue(), + // diskInfoEntry.getValue().availableSlots())); + // } + } + } + } + + return restrictions; } private static StorageInfo getStorageInfo( @@ -160,16 +181,30 @@ private static StorageInfo getStorageInfo( Map workerDiskIndex, int availableStorageTypes) { WorkerInfo selectedWorker = workers.get(workerIndex); - List usableDiskInfos = restrictions.get(selectedWorker); + StorageInfo storageInfo; int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0); - while (usableDiskInfos.get(diskIndex).usableSlots <= 0) { - diskIndex = (diskIndex + 1) % usableDiskInfos.size(); + if (restrictions != null) { + List usableDiskInfos = restrictions.get(selectedWorker); + while (usableDiskInfos.get(diskIndex).usableSlots <= 0) { + diskIndex = (diskIndex + 1) % usableDiskInfos.size(); + } + usableDiskInfos.get(diskIndex).usableSlots--; + DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo; + if (selectedDiskInfo.equals(HDFSDiskInfo)) { + storageInfo = new StorageInfo(StorageInfo.Type.HDFS, availableStorageTypes); + } else { + storageInfo = new StorageInfo(selectedDiskInfo.mountPoint(), availableStorageTypes); + } + workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size()); + } else { + if (StorageInfo.localDiskAvailable(availableStorageTypes)) { + DiskInfo[] diskInfos = selectedWorker.diskInfos().values().toArray(new DiskInfo[0]); + storageInfo = new StorageInfo(diskInfos[diskIndex].mountPoint(), availableStorageTypes); + diskIndex = (diskIndex + 1) % diskInfos.length; + } else { + storageInfo = new StorageInfo(StorageInfo.Type.HDFS, availableStorageTypes); + } } - usableDiskInfos.get(diskIndex).usableSlots--; - StorageInfo storageInfo = - new StorageInfo( - usableDiskInfos.get(diskIndex).diskInfo.mountPoint(), availableStorageTypes); - workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size()); return storageInfo; } @@ -186,7 +221,8 @@ private static StorageInfo getStorageInfo( Map> restrictions, boolean shouldReplicate, boolean shouldRackAware, - int activeStorageTypes) { + int availableStorageTypes, + boolean loadAware) { Map, List>> slots = new HashMap<>(); @@ -198,14 +234,20 @@ private static StorageInfo getStorageInfo( restrictions, shouldReplicate, shouldRackAware, - activeStorageTypes); + availableStorageTypes); if (!remain.isEmpty()) { remain = roundRobin( - slots, remain, workers, null, shouldReplicate, shouldRackAware, activeStorageTypes); + slots, + remain, + workers, + null, + shouldReplicate, + shouldRackAware, + availableStorageTypes); } if (!remain.isEmpty()) { - roundRobin(slots, remain, workers, null, shouldReplicate, false, activeStorageTypes); + roundRobin(slots, remain, workers, null, shouldReplicate, false, availableStorageTypes); } return slots; } @@ -218,7 +260,8 @@ private static List roundRobin( boolean shouldReplicate, boolean shouldRackAware, int availableStorageTypes) { - // workerInfo -> (diskIndexForPrimary, diskIndexForReplica) + // workerInfo -> (diskIndexForPrimary, diskIndexForRe + // plica) Map workerDiskIndexForPrimary = new HashMap<>(); Map workerDiskIndexForReplica = new HashMap<>(); List partitionIdList = new ArrayList<>(partitionIds); @@ -231,6 +274,7 @@ private static List roundRobin( int partitionId = iter.next(); StorageInfo storageInfo = new StorageInfo(); if (restrictions != null) { + // this means that we'll select a mount point while (!haveUsableSlots(restrictions, workers, nextPrimaryInd)) { nextPrimaryInd = (nextPrimaryInd + 1) % workers.size(); if (nextPrimaryInd == primaryIndex) { @@ -244,6 +288,20 @@ private static List roundRobin( restrictions, workerDiskIndexForPrimary, availableStorageTypes); + } else { + if (StorageInfo.localDiskAvailable(availableStorageTypes)) { + while (!haveDisk(workers, nextPrimaryInd)) { + nextPrimaryInd = (nextPrimaryInd + 1) % workers.size(); + if (nextPrimaryInd == primaryIndex) { + break outer; + } + } + } else { + nextPrimaryInd = (nextPrimaryInd + 1) % workers.size(); + } + storageInfo = + getStorageInfo( + workers, nextPrimaryInd, null, workerDiskIndexForPrimary, availableStorageTypes); } PartitionLocation primaryPartition = createLocation(partitionId, workers.get(nextPrimaryInd), null, storageInfo, true); @@ -272,6 +330,20 @@ private static List roundRobin( break outer; } } + } else { + if (StorageInfo.localDiskAvailable(availableStorageTypes)) { + while (!haveDisk(workers, nextPrimaryInd)) { + nextPrimaryInd = (nextPrimaryInd + 1) % workers.size(); + if (nextPrimaryInd == primaryIndex) { + break outer; + } + } + } else { + nextPrimaryInd = (nextPrimaryInd + 1) % workers.size(); + } + storageInfo = + getStorageInfo( + workers, nextReplicaInd, null, workerDiskIndexForReplica, availableStorageTypes); } PartitionLocation replicaPartition = createLocation( @@ -299,6 +371,10 @@ private static boolean haveUsableSlots( return restrictions.get(workers.get(index)).stream().mapToLong(i -> i.usableSlots).sum() > 0; } + private static boolean haveDisk(List workers, int index) { + return !workers.get(index).diskInfos().isEmpty(); + } + private static boolean satisfyRackAware( boolean shouldRackAware, List workers, int primaryIndex, int nextReplicaInd) { return !shouldRackAware @@ -493,8 +569,8 @@ public static Map> slotsToDiskAllocations( jointLocations.addAll(slots.get(worker)._2); for (PartitionLocation location : jointLocations) { String mountPoint = location.getStorageInfo().getMountPoint(); - // ignore slots for UNKNOWN_DISK - if (!mountPoint.equals(StorageInfo.UNKNOWN_DISK)) { + // skip non local disks slots + if (!mountPoint.isEmpty()) { if (slotsPerDisk.containsKey(mountPoint)) { slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1); } else { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index c1102418a7e..ae5859f5bc1 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -658,7 +658,7 @@ private[celeborn] class Master( val slots = masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") { statusSystem.workers.synchronized { - if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !hasHDFSStorage) { + if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) { SlotsAllocator.offerSlotsLoadAware( selectedWorkers, requestSlots.partitionIdList, diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java index e82742bab0d..bfb3494c898 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java @@ -298,7 +298,7 @@ private void checkSlotsOnHDFS( conf.set("celeborn.active.storage.levels", "HDFS"); Map, List>> slots = SlotsAllocator.offerSlotsRoundRobin( - workers, partitionIds, shouldReplicate, false, StorageInfo.ALL_TYPES_AVAILABLE_MASK); + workers, partitionIds, shouldReplicate, false, StorageInfo.HDFS_MASK); int allocatedPartitionCount = 0; diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index b1239b49798..52a207887a8 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -362,7 +362,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint") } val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) - if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) { + if ((dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) || location.getStorageInfo.HDFSOnly()) { val shuffleDir = new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId") val fileInfo =