diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java index d3e1bf95b5f..5b59a10ed4b 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java @@ -38,7 +38,6 @@ public int getValue() { } } - @Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK"; public static Map typesMap = new HashMap<>(); public static Set typeNames = new HashSet<>(); @@ -57,7 +56,7 @@ public int getValue() { // Default storage Type is MEMORY. private Type type = Type.MEMORY; - private String mountPoint = UNKNOWN_DISK; + private String mountPoint = ""; // if a file is committed, field "finalResult" will be true private boolean finalResult = false; private String filePath; @@ -72,27 +71,10 @@ public StorageInfo(Type type, boolean isFinal, String filePath) { this.filePath = filePath; } - public StorageInfo(String mountPoint, int availableStorageTypes) { + public StorageInfo(String mountPoint, StorageInfo.Type type, int availableStorageTypes) { this.mountPoint = mountPoint; - this.availableStorageTypes = availableStorageTypes; - } - - public StorageInfo(Type type, String mountPoint) { - this.type = type; - this.mountPoint = mountPoint; - } - - public StorageInfo(Type type, String mountPoint, boolean finalResult) { - this.type = type; - this.mountPoint = mountPoint; - this.finalResult = finalResult; - } - - public StorageInfo(Type type, String mountPoint, boolean finalResult, String filePath) { this.type = type; - this.mountPoint = mountPoint; - this.finalResult = finalResult; - this.filePath = filePath; + this.availableStorageTypes = availableStorageTypes; } public StorageInfo( @@ -147,21 +129,41 @@ public String toString() { + '}'; } - public boolean localDiskAvailable() { + public static boolean localDiskAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & LOCAL_DISK_MASK) > 0; } - public boolean HDFSAvailable() { + public boolean localDiskAvailable() { + return StorageInfo.localDiskAvailable(availableStorageTypes); + } + + public static boolean HDFSAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & HDFS_MASK) > 0; } - public boolean OSSAvailable() { + public boolean HDFSAvailable() { + return StorageInfo.HDFSAvailable(availableStorageTypes); + } + + public static boolean HDFSOnly(int availableStorageTypes) { + return availableStorageTypes == HDFS_MASK; + } + + public boolean HDFSOnly() { + return StorageInfo.HDFSOnly(availableStorageTypes); + } + + public static boolean OSSAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & OSS_MASK) > 0; } + public boolean OSSAvailable() { + return StorageInfo.OSSAvailable(availableStorageTypes); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 56f0e7561c9..3454782f4d3 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -133,6 +133,7 @@ message PbDiskInfo { int64 usedSlots = 4; int32 status = 5; int64 avgFetchTime = 6; + int32 storageType = 7; } message PbWorkerInfo { diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index 56933a2cf04..990a2b12734 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -49,6 +49,17 @@ class DiskInfo( this(mountPoint, usableSpace, avgFlushTime, avgFetchTime, activeSlots, List.empty, null) } + def this( + mountPoint: String, + usableSpace: Long, + avgFlushTime: Long, + avgFetchTime: Long, + activeSlots: Long, + storageType: StorageInfo.Type) = { + this(mountPoint, usableSpace, avgFlushTime, avgFetchTime, activeSlots, List.empty, null) + this.storageType = storageType + } + def this( mountPoint: String, dirs: List[File], @@ -70,10 +81,14 @@ class DiskInfo( var status: DiskStatus = DiskStatus.HEALTHY var threadCount = 1 var configuredUsableSpace = 0L - var storageType: StorageInfo.Type = _ + var storageType: StorageInfo.Type = StorageInfo.Type.SSD var maxSlots: Long = 0 lazy val shuffleAllocations = new util.HashMap[String, Integer]() + def setStorageType(storageType: StorageInfo.Type) = { + this.storageType = storageType + } + def setStatus(status: DiskStatus): this.type = this.synchronized { this.status = status this @@ -145,9 +160,11 @@ class DiskInfo( s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," + s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," + s" avgFetchTime: ${Utils.nanoDurationToString(avgFetchTime)}," + - s" activeSlots: $activeSlots)" + + s" activeSlots: $activeSlots," + + s" storageType: ${storageType})" + s" status: $status" + s" dirs ${dirs.mkString("\t")}" + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala index 65dc22918a2..05ea35d63ec 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.protocol.StorageInfo import org.apache.celeborn.common.quota.ResourceConsumption import org.apache.celeborn.common.rpc.RpcEndpointRef import org.apache.celeborn.common.rpc.netty.NettyRpcEndpointRef @@ -156,12 +157,12 @@ class WorkerInfo( curDisk.activeSlots = newDisk.activeSlots curDisk.avgFlushTime = newDisk.avgFlushTime curDisk.avgFetchTime = newDisk.avgFetchTime - if (estimatedPartitionSize.nonEmpty) { + if (estimatedPartitionSize.nonEmpty && curDisk.storageType != StorageInfo.Type.HDFS) { curDisk.maxSlots = curDisk.actualUsableSpace / estimatedPartitionSize.get } curDisk.setStatus(newDisk.status) } else { - if (estimatedPartitionSize.nonEmpty) { + if (estimatedPartitionSize.nonEmpty && newDisk.storageType != StorageInfo.Type.HDFS) { newDisk.maxSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get } diskInfos.put(mountPoint, newDisk) @@ -239,6 +240,11 @@ class WorkerInfo( result = 31 * result + replicatePort.hashCode() result } + + def haveDisk(): Boolean = { + diskInfos.values().asScala.exists(p => + p.storageType == StorageInfo.Type.SSD || p.storageType == StorageInfo.Type.HDD) + } } object WorkerInfo { diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala index 2748d6cf38d..d681152f7aa 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala @@ -63,14 +63,17 @@ object PbSerDeUtils { .setMinor(minor) .build.toByteArray - def fromPbDiskInfo(pbDiskInfo: PbDiskInfo): DiskInfo = - new DiskInfo( + def fromPbDiskInfo(pbDiskInfo: PbDiskInfo): DiskInfo = { + val diskInfo = new DiskInfo( pbDiskInfo.getMountPoint, pbDiskInfo.getUsableSpace, pbDiskInfo.getAvgFlushTime, pbDiskInfo.getAvgFetchTime, pbDiskInfo.getUsedSlots) .setStatus(Utils.toDiskStatus(pbDiskInfo.getStatus)) + diskInfo.setStorageType(StorageInfo.typesMap.get(pbDiskInfo.getStorageType)) + diskInfo + } def toPbDiskInfo(diskInfo: DiskInfo): PbDiskInfo = PbDiskInfo.newBuilder @@ -80,6 +83,7 @@ object PbSerDeUtils { .setAvgFetchTime(diskInfo.avgFetchTime) .setUsedSlots(diskInfo.activeSlots) .setStatus(diskInfo.status.getValue) + .setStorageType(diskInfo.storageType.getValue) .build def fromPbFileInfo(pbFileInfo: PbFileInfo): FileInfo = diff --git a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java index 927b3e9c137..e4200c9134a 100644 --- a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java +++ b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java @@ -178,7 +178,9 @@ public void testToStringOutput() { PartitionLocation location2 = new PartitionLocation( partitionId, epoch, host, rpcPort, pushPort, fetchPort, replicatePort, mode, peer); - StorageInfo storageInfo = new StorageInfo(StorageInfo.Type.MEMORY, "/mnt/disk/0"); + StorageInfo storageInfo = + new StorageInfo( + "/mnt/disk/0", StorageInfo.Type.MEMORY, StorageInfo.ALL_TYPES_AVAILABLE_MASK); RoaringBitmap bitmap = new RoaringBitmap(); bitmap.add(1); bitmap.add(2); @@ -205,7 +207,7 @@ public void testToStringOutput() { + " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n" + " mode:PRIMARY\n" + " peer:(empty)\n" - + " storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n" + + " storage hint:StorageInfo{type=MEMORY, mountPoint='', finalResult=false, filePath=null}\n" + " mapIdBitMap:{}]"; String exp2 = "PartitionLocation[\n" @@ -213,7 +215,7 @@ public void testToStringOutput() { + " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n" + " mode:PRIMARY\n" + " peer:(host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4)\n" - + " storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n" + + " storage hint:StorageInfo{type=MEMORY, mountPoint='', finalResult=false, filePath=null}\n" + " mapIdBitMap:{}]"; String exp3 = "PartitionLocation[\n" diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala index b4286292488..ebd83682ee2 100644 --- a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala @@ -284,9 +284,9 @@ class WorkerInfoSuite extends CelebornFunSuite { |SlotsUsed: 60 |LastHeartbeat: 0 |Disks: $placeholder - | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30) status: HEALTHY dirs $placeholder - | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10) status: HEALTHY dirs $placeholder - | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20) status: HEALTHY dirs $placeholder + | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD) status: HEALTHY dirs $placeholder + | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD) status: HEALTHY dirs $placeholder + | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD) status: HEALTHY dirs $placeholder |UserResourceConsumption: $placeholder | UserIdentifier: `tenant1`.`name1`, ResourceConsumption: ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1) |WorkerRef: null diff --git a/docs/developers/slotsallocation.md b/docs/developers/slotsallocation.md index e69de29bb2d..78644b16cc6 100644 --- a/docs/developers/slotsallocation.md +++ b/docs/developers/slotsallocation.md @@ -0,0 +1,75 @@ +--- +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +# Slots allocation + +This article describes the detailed design of Celeborn workers' slots allocation. +Slots allocation is the core components about how Celeborn distribute workload amount workers. +We have achieved two approaches of slots allocation. + +## Principle +Allocate slots to local disks unless explicit assigned to HDFS. + +## LoadAware +### Related configs +```properties +celeborn.master.slot.assign.policy LOADAWARE +celeborn.master.slot.assign.loadAware.numDiskGroups 5 +celeborn.master.slot.assign.loadAware.diskGroupGradient 0.1 +celeborn.master.slot.assign.loadAware.flushTimeWeight 0 +celeborn.master.slot.assign.loadAware.fetchTimeWeight 0 +[spark.client.]celeborn.storage.availableTypes HDD,SSD +``` +### Detail +Load-aware slots allocation will take following elements into consideration. +- disk's fetch time +- disk's flush time +- disk's usable space +- disk's used slot + +Slots allocator will find out all worker involved in this allocation and sort their disks by +`disk's average flushtime * flush time weight + disk's average fetch time * fetch time weight`. +After getting the sorted disks list, Celeborn will split the disks into +`celeborn.master.slot.assign.loadAware.numDiskGroups` groups. The slots number to be placed into a disk group +is controlled by the `celeborn.master.slot.assign.loadAware.diskGroupGradient` which means that a group's +allocated slots number will be (1+`celeborn.master.slot.assign.loadAware.diskGroupGradient`) +times to the group's slower than it. +For example, there is 5 groups, G1 , G2, G3, G4 and G5. If the G5 is allocated 100 slots. +Other groups will be G4:110, G3:121, G2:133, G1:146. + +After Celeborn has decided the slots number of a disk group, slots will be distributed in disks of a disk group. +Each disk has a usableSlots which is calculated by `(disk's usable space)/(average partition size)-usedSlots`. +The slots number to allocate in a disk is calculated by ` slots of this disk group * ( current disk's usableSlots / the sum of all disks' usableSlots in this group)`. +For example, G5 need to allocate 100 slots and have 3 disks D1 with usable slots 100, D2 with usable slots 50, D3 with usable slots 20. +The distribution will be D1:59, D2: 29, D3: 12. + +If all slots can be place in disk groups, the slots allocation process is done. + +requested slots are more than all usable slots, slots can not be placed into disks. +Worker will need to allocate these slots to workers with local disks one by one. + +## RoundRobin +### Detail +Roundrobin slots allocation will distribute all slots into all registered workers with disks. Celeborn will treat +all workers as an array and place 1 slots in a worker until all slots are allocated. +If a worker has multiple disks, the chosen disk index is `(monotone increasing disk index +1) % disk count`. + +## Celeborn Worker's Behavior +1. When reserve slots Celeborn worker will decide a slot be placed in local disks or HDFS when reserve slots. +2. If a partition is evicted from memory, the partition might be placed in HDFS. +3. If a slot is explicitly assigned to HDFS, worker will put the slot in HDFS. \ No newline at end of file diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index e964e57e556..e54ba8763e6 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -18,6 +18,7 @@ package org.apache.celeborn.service.deploy.master; import java.util.*; +import java.util.stream.Collectors; import scala.Double; import scala.Option; @@ -64,22 +65,30 @@ static class UsableDiskInfo { if (workers.size() < 2 && shouldReplicate) { return new HashMap<>(); } - Map> restrictions = new HashMap<>(); + Map> slotsRestrictions = new HashMap<>(); for (WorkerInfo worker : workers) { List usableDisks = - restrictions.computeIfAbsent(worker, v -> new ArrayList<>()); + slotsRestrictions.computeIfAbsent(worker, v -> new ArrayList<>()); for (Map.Entry diskInfoEntry : worker.diskInfos().entrySet()) { if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) { - usableDisks.add( - new UsableDiskInfo( - diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); + if (StorageInfo.localDiskAvailable(availableStorageTypes) + && diskInfoEntry.getValue().storageType() != StorageInfo.Type.HDFS) { + usableDisks.add( + new UsableDiskInfo( + diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); + } else if (StorageInfo.HDFSAvailable(availableStorageTypes) + && diskInfoEntry.getValue().storageType() == StorageInfo.Type.HDFS) { + usableDisks.add( + new UsableDiskInfo( + diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); + } } } } return locateSlots( partitionIds, workers, - restrictions, + slotsRestrictions, shouldReplicate, shouldRackAware, availableStorageTypes); @@ -109,6 +118,10 @@ static class UsableDiskInfo { if (workers.size() < 2 && shouldReplicate) { return new HashMap<>(); } + if (StorageInfo.HDFSOnly(availableStorageTypes)) { + return offerSlotsRoundRobin( + workers, partitionIds, shouldReplicate, shouldRackAware, availableStorageTypes); + } List usableDisks = new ArrayList<>(); Map diskToWorkerMap = new HashMap<>(); @@ -126,7 +139,8 @@ static class UsableDiskInfo { diskReserveRatio.isEmpty() ? Option.empty() : Option.apply(diskReserveRatio.get())) - && diskInfo.status().equals(DiskStatus.HEALTHY)) { + && diskInfo.status().equals(DiskStatus.HEALTHY) + && diskInfo.storageType() != StorageInfo.Type.HDFS) { usableDisks.add(diskInfo); } })); @@ -151,15 +165,15 @@ static class UsableDiskInfo { initLoadAwareAlgorithm(diskGroupCount, diskGroupGradient); } - Map> restrictions = - getRestriction( + Map> slotsRestrictions = + getSlotsRestrictionsByLoadAwareAlgorithm( placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight, fetchTimeWeight), diskToWorkerMap, shouldReplicate ? partitionIds.size() * 2 : partitionIds.size()); return locateSlots( partitionIds, workers, - restrictions, + slotsRestrictions, shouldReplicate, shouldRackAware, availableStorageTypes); @@ -172,16 +186,43 @@ private static StorageInfo getStorageInfo( Map workerDiskIndex, int availableStorageTypes) { WorkerInfo selectedWorker = workers.get(workerIndex); - List usableDiskInfos = restrictions.get(selectedWorker); + StorageInfo storageInfo; int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0); - while (usableDiskInfos.get(diskIndex).usableSlots <= 0) { - diskIndex = (diskIndex + 1) % usableDiskInfos.size(); + if (restrictions != null) { + List usableDiskInfos = restrictions.get(selectedWorker); + while (usableDiskInfos.get(diskIndex).usableSlots <= 0) { + diskIndex = (diskIndex + 1) % usableDiskInfos.size(); + } + usableDiskInfos.get(diskIndex).usableSlots--; + DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo; + if (selectedDiskInfo.storageType() == StorageInfo.Type.HDFS) { + storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, availableStorageTypes); + } else { + storageInfo = + new StorageInfo( + selectedDiskInfo.mountPoint(), + selectedDiskInfo.storageType(), + availableStorageTypes); + workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size()); + } + } else { + if (StorageInfo.localDiskAvailable(availableStorageTypes)) { + DiskInfo[] diskInfos = + selectedWorker.diskInfos().values().stream() + .filter(p -> p.storageType() != StorageInfo.Type.HDFS) + .collect(Collectors.toList()) + .toArray(new DiskInfo[0]); + storageInfo = + new StorageInfo( + diskInfos[diskIndex].mountPoint(), + diskInfos[diskIndex].storageType(), + availableStorageTypes); + diskIndex = (diskIndex + 1) % diskInfos.length; + workerDiskIndex.put(selectedWorker, (diskIndex + 1) % diskInfos.length); + } else { + storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, availableStorageTypes); + } } - usableDiskInfos.get(diskIndex).usableSlots--; - StorageInfo storageInfo = - new StorageInfo( - usableDiskInfos.get(diskIndex).diskInfo.mountPoint(), availableStorageTypes); - workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size()); return storageInfo; } @@ -195,10 +236,10 @@ private static StorageInfo getStorageInfo( locateSlots( List partitionIds, List workers, - Map> restrictions, + Map> slotRestrictions, boolean shouldReplicate, boolean shouldRackAware, - int activeStorageTypes) { + int availableStorageTypes) { Map, List>> slots = new HashMap<>(); @@ -206,18 +247,24 @@ private static StorageInfo getStorageInfo( roundRobin( slots, partitionIds, - new LinkedList<>(restrictions.keySet()), - restrictions, + new LinkedList<>(slotRestrictions.keySet()), + slotRestrictions, shouldReplicate, shouldRackAware, - activeStorageTypes); + availableStorageTypes); if (!remain.isEmpty()) { remain = roundRobin( - slots, remain, workers, null, shouldReplicate, shouldRackAware, activeStorageTypes); + slots, + remain, + workers, + null, + shouldReplicate, + shouldRackAware, + availableStorageTypes); } if (!remain.isEmpty()) { - roundRobin(slots, remain, workers, null, shouldReplicate, false, activeStorageTypes); + roundRobin(slots, remain, workers, null, shouldReplicate, false, availableStorageTypes); } return slots; } @@ -226,7 +273,7 @@ private static List roundRobin( Map, List>> slots, List partitionIds, List workers, - Map> restrictions, + Map> slotsRestrictions, boolean shouldReplicate, boolean shouldRackAware, int availableStorageTypes) { @@ -241,9 +288,10 @@ private static List roundRobin( int nextPrimaryInd = primaryIndex; int partitionId = iter.next(); - StorageInfo storageInfo = new StorageInfo(); - if (restrictions != null) { - while (!haveUsableSlots(restrictions, workers, nextPrimaryInd)) { + StorageInfo storageInfo; + if (slotsRestrictions != null && !slotsRestrictions.isEmpty()) { + // this means that we'll select a mount point + while (!haveUsableSlots(slotsRestrictions, workers, nextPrimaryInd)) { nextPrimaryInd = (nextPrimaryInd + 1) % workers.size(); if (nextPrimaryInd == primaryIndex) { break outer; @@ -253,17 +301,29 @@ private static List roundRobin( getStorageInfo( workers, nextPrimaryInd, - restrictions, + slotsRestrictions, workerDiskIndexForPrimary, availableStorageTypes); + } else { + if (StorageInfo.localDiskAvailable(availableStorageTypes)) { + while (!workers.get(nextPrimaryInd).haveDisk()) { + nextPrimaryInd = (nextPrimaryInd + 1) % workers.size(); + if (nextPrimaryInd == primaryIndex) { + break outer; + } + } + } + storageInfo = + getStorageInfo( + workers, nextPrimaryInd, null, workerDiskIndexForPrimary, availableStorageTypes); } PartitionLocation primaryPartition = createLocation(partitionId, workers.get(nextPrimaryInd), null, storageInfo, true); if (shouldReplicate) { int nextReplicaInd = (nextPrimaryInd + 1) % workers.size(); - if (restrictions != null) { - while (!haveUsableSlots(restrictions, workers, nextReplicaInd) + if (slotsRestrictions != null) { + while (!haveUsableSlots(slotsRestrictions, workers, nextReplicaInd) || !satisfyRackAware(shouldRackAware, workers, nextPrimaryInd, nextReplicaInd)) { nextReplicaInd = (nextReplicaInd + 1) % workers.size(); if (nextReplicaInd == nextPrimaryInd) { @@ -274,7 +334,7 @@ private static List roundRobin( getStorageInfo( workers, nextReplicaInd, - restrictions, + slotsRestrictions, workerDiskIndexForReplica, availableStorageTypes); } else if (shouldRackAware) { @@ -284,6 +344,18 @@ private static List roundRobin( break outer; } } + } else { + if (StorageInfo.localDiskAvailable(availableStorageTypes)) { + while (!workers.get(nextPrimaryInd).haveDisk()) { + nextPrimaryInd = (nextPrimaryInd + 1) % workers.size(); + if (nextPrimaryInd == primaryIndex) { + break outer; + } + } + } + storageInfo = + getStorageInfo( + workers, nextReplicaInd, null, workerDiskIndexForReplica, availableStorageTypes); } PartitionLocation replicaPartition = createLocation( @@ -369,7 +441,11 @@ private static List> placeDisksToGroups( return diskGroups; } - private static Map> getRestriction( + /** + * This method implement the load aware slots allocation algorithm. See details at + * /docs/developers/slotsallocation.md + */ + private static Map> getSlotsRestrictionsByLoadAwareAlgorithm( List> groups, Map diskWorkerMap, int partitionCnt) { int groupSize = groups.size(); long[] groupAllocations = new long[groupSize]; @@ -505,8 +581,8 @@ public static Map> slotsToDiskAllocations( jointLocations.addAll(slots.get(worker)._2); for (PartitionLocation location : jointLocations) { String mountPoint = location.getStorageInfo().getMountPoint(); - // ignore slots for UNKNOWN_DISK - if (!mountPoint.equals(StorageInfo.UNKNOWN_DISK)) { + // skip non local disks slots + if (!mountPoint.isEmpty()) { if (slotsPerDisk.containsKey(mountPoint)) { slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1); } else { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java index ec3094fc276..03e1d21e361 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java @@ -24,6 +24,7 @@ import org.apache.celeborn.common.identity.UserIdentifier$; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; +import org.apache.celeborn.common.protocol.StorageInfo; import org.apache.celeborn.common.quota.ResourceConsumption; import org.apache.celeborn.common.util.Utils; @@ -61,7 +62,8 @@ public static Map fromPbDiskInfos( v.getUsableSpace(), v.getAvgFlushTime(), v.getAvgFetchTime(), - v.getUsedSlots()); + v.getUsedSlots(), + StorageInfo.typesMap.get(v.getStorageType())); diskInfo.setStatus(Utils.toDiskStatus(v.getStatus())); map.put(k, diskInfo); }); @@ -81,6 +83,7 @@ public static Map toPbDiskInfos( .setAvgFlushTime(v.avgFlushTime()) .setAvgFetchTime(v.avgFetchTime()) .setUsedSlots(v.activeSlots()) + .setStorageType(v.storageType().getValue()) .setStatus(v.status().getValue()) .build())); return map; diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index c7cde94dce4..78dc477bd22 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -66,6 +66,7 @@ message DiskInfo { required int64 usedSlots = 4; required int32 status = 5; required int64 avgFetchTime = 6; + required int32 storageType =7; } message RequestSlotsRequest { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 1c03903a748..e17bb46bfc9 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -687,7 +687,7 @@ private[celeborn] class Master( val slots = masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") { statusSystem.workers.synchronized { - if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !hasHDFSStorage) { + if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) { SlotsAllocator.offerSlotsLoadAware( selectedWorkers, requestSlots.partitionIdList, diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java index 92f2ecfbd14..fd724cd7032 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java @@ -17,15 +17,7 @@ package org.apache.celeborn.service.deploy.master; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; +import java.util.*; import scala.Option; import scala.Tuple2; @@ -294,16 +286,34 @@ private void checkSlotsOnHDFS( List workers, List partitionIds, boolean shouldReplicate, - boolean expectSuccess) { + boolean expectSuccess, + boolean roundrobin) { String shuffleKey = "appId-1"; CelebornConf conf = new CelebornConf(); conf.set("celeborn.active.storage.levels", "HDFS"); - Map, List>> slots = - SlotsAllocator.offerSlotsRoundRobin( - workers, partitionIds, shouldReplicate, false, StorageInfo.ALL_TYPES_AVAILABLE_MASK); - + Map, List>> slots; + if (roundrobin) { + slots = + SlotsAllocator.offerSlotsRoundRobin( + workers, partitionIds, shouldReplicate, false, StorageInfo.HDFS_MASK); + } else { + slots = + SlotsAllocator.offerSlotsLoadAware( + workers, + partitionIds, + shouldReplicate, + false, + 1000_000_000, + Option.empty(), + 3, + 0.1, + 0, + 1, + StorageInfo.LOCAL_DISK_MASK | StorageInfo.HDFS_MASK); + } int allocatedPartitionCount = 0; - + Map> slotsDistribution = + SlotsAllocator.slotsToDiskAllocations(slots); for (Map.Entry, List>> workerToPartitions : slots.entrySet()) { WorkerInfo workerInfo = workerToPartitions.getKey(); @@ -332,7 +342,86 @@ public void testHDFSOnly() { partitionIds.add(i); } final boolean shouldReplicate = true; - checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true); + checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, true); + } + + @Test + public void testLocalDisksAndHDFSOnRoundRobin() { + final List workers = prepareWorkers(true); + DiskInfo hdfs1 = + new DiskInfo( + "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, StorageInfo.Type.HDFS); + DiskInfo hdfs2 = + new DiskInfo( + "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, StorageInfo.Type.HDFS); + DiskInfo hdfs3 = + new DiskInfo( + "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, StorageInfo.Type.HDFS); + hdfs1.maxSlots_$eq(Long.MAX_VALUE); + hdfs2.maxSlots_$eq(Long.MAX_VALUE); + hdfs3.maxSlots_$eq(Long.MAX_VALUE); + workers.get(0).diskInfos().put("HDFS", hdfs1); + workers.get(1).diskInfos().put("HDFS", hdfs2); + workers.get(2).diskInfos().put("HDFS", hdfs3); + final List partitionIds = new ArrayList<>(); + for (int i = 0; i < 3000; i++) { + partitionIds.add(i); + } + final boolean shouldReplicate = true; + checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, true); + } + + @Test + public void testLocalDisksAndHDFSOnLoadAware() { + final List workers = prepareWorkers(true); + DiskInfo hdfs1 = + new DiskInfo( + "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, StorageInfo.Type.HDFS); + DiskInfo hdfs2 = + new DiskInfo( + "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, StorageInfo.Type.HDFS); + // DiskInfo hdfs3 = new DiskInfo("HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, + // StorageInfo.Type.HDFS); + hdfs1.maxSlots_$eq(Long.MAX_VALUE); + hdfs2.maxSlots_$eq(Long.MAX_VALUE); + // hdfs3.maxSlots_$eq(Long.MAX_VALUE); + workers.get(0).diskInfos().put("HDFS", hdfs1); + workers.get(1).diskInfos().put("HDFS", hdfs2); + // workers.get(2).diskInfos().put("HDFS", hdfs3); + final List partitionIds = new ArrayList<>(); + for (int i = 0; i < 3000; i++) { + partitionIds.add(i); + } + final boolean shouldReplicate = true; + checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, false); + } + + @Test + public void testLocalDisksAndHDFSOnLoadAwareWithInsufficientSlots() { + final List workers = prepareWorkers(true); + DiskInfo hdfs1 = + new DiskInfo( + "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, StorageInfo.Type.HDFS); + DiskInfo hdfs2 = + new DiskInfo( + "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, StorageInfo.Type.HDFS); + // DiskInfo hdfs3 = new DiskInfo("HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, + // StorageInfo.Type.HDFS); + hdfs1.maxSlots_$eq(Long.MAX_VALUE); + hdfs2.maxSlots_$eq(Long.MAX_VALUE); + // hdfs3.maxSlots_$eq(Long.MAX_VALUE); + workers.get(0).diskInfos().put("HDFS", hdfs1); + workers.get(1).diskInfos().put("HDFS", hdfs2); + for (Map.Entry diskEntry : workers.get(2).diskInfos().entrySet()) { + diskEntry.getValue().maxSlots_$eq(100); + } + // workers.get(2).diskInfos().put("HDFS", hdfs3); + final List partitionIds = new ArrayList<>(); + for (int i = 0; i < 3000; i++) { + partitionIds.add(i); + } + final boolean shouldReplicate = true; + checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, false); } @Test diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java index 8ec6497b4dc..ca38a49c46c 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java @@ -234,7 +234,8 @@ public RoaringBitmap getMapIdBitMap() { public StorageInfo getStorageInfo() { if (flusher instanceof LocalFlusher) { LocalFlusher localFlusher = (LocalFlusher) flusher; - return new StorageInfo(localFlusher.diskType(), localFlusher.mountPoint(), true); + // do not write file path to reduce rpc size + return new StorageInfo(localFlusher.diskType(), true, ""); } else { if (deleted) { return null; diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 80e2073098c..d8c7049e586 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -343,7 +343,7 @@ private[celeborn] class Worker( val diskInfos = workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { disk => disk.mountPoint -> disk - }.toMap.asJava).values().asScala.toSeq + }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo val response = masterClient.askSync[HeartbeatFromWorkerResponse]( HeartbeatFromWorker( host, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index b1239b49798..f3c4e99e00c 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -40,7 +40,7 @@ import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus, FileInfo, TimeWindow} import org.apache.celeborn.common.metrics.source.AbstractSource import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf} -import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType} +import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo} import org.apache.celeborn.common.quota.ResourceConsumption import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils} import org.apache.celeborn.service.deploy.worker._ @@ -72,6 +72,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs DeviceInfo.getDeviceAndDiskInfos(workingDirInfos, conf) } val mountPoints = new util.HashSet[String](diskInfos.keySet()) + val hdfsDiskInfo = + if (conf.hasHDFSStorage) + Option(new DiskInfo("HDFS", Long.MaxValue, 999999, 999999, 0, StorageInfo.Type.HDFS)) + else None def disksSnapshot(): List[DiskInfo] = { diskInfos.synchronized { @@ -362,7 +366,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint") } val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) - if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) { + if ((dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) || location.getStorageInfo.HDFSOnly()) { val shuffleDir = new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId") val fileInfo =