Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1081][FOLLOWUP] Remove UNKNOWN_DISK and allocate all slots to disk #2098

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public int getValue() {
}
}

@Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK";
public static Map<Integer, Type> typesMap = new HashMap<>();
public static Set<String> typeNames = new HashSet<>();

Expand All @@ -57,7 +56,7 @@ public int getValue() {

// Default storage Type is MEMORY.
private Type type = Type.MEMORY;
private String mountPoint = UNKNOWN_DISK;
private String mountPoint = "";
// if a file is committed, field "finalResult" will be true
private boolean finalResult = false;
private String filePath;
Expand All @@ -72,27 +71,10 @@ public StorageInfo(Type type, boolean isFinal, String filePath) {
this.filePath = filePath;
}

public StorageInfo(String mountPoint, int availableStorageTypes) {
public StorageInfo(String mountPoint, StorageInfo.Type type, int availableStorageTypes) {
this.mountPoint = mountPoint;
this.availableStorageTypes = availableStorageTypes;
}

public StorageInfo(Type type, String mountPoint) {
this.type = type;
this.mountPoint = mountPoint;
}

public StorageInfo(Type type, String mountPoint, boolean finalResult) {
this.type = type;
this.mountPoint = mountPoint;
this.finalResult = finalResult;
}

public StorageInfo(Type type, String mountPoint, boolean finalResult, String filePath) {
this.type = type;
this.mountPoint = mountPoint;
this.finalResult = finalResult;
this.filePath = filePath;
this.availableStorageTypes = availableStorageTypes;
}

public StorageInfo(
Expand Down Expand Up @@ -147,21 +129,41 @@ public String toString() {
+ '}';
}

public boolean localDiskAvailable() {
public static boolean localDiskAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & LOCAL_DISK_MASK) > 0;
}

public boolean HDFSAvailable() {
public boolean localDiskAvailable() {
return StorageInfo.localDiskAvailable(availableStorageTypes);
}

public static boolean HDFSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & HDFS_MASK) > 0;
}

public boolean OSSAvailable() {
public boolean HDFSAvailable() {
return StorageInfo.HDFSAvailable(availableStorageTypes);
}

public static boolean HDFSOnly(int availableStorageTypes) {
return availableStorageTypes == HDFS_MASK;
}

public boolean HDFSOnly() {
return StorageInfo.HDFSOnly(availableStorageTypes);
}

public static boolean OSSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & OSS_MASK) > 0;
}

public boolean OSSAvailable() {
return StorageInfo.OSSAvailable(availableStorageTypes);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
1 change: 1 addition & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ message PbDiskInfo {
int64 usedSlots = 4;
int32 status = 5;
int64 avgFetchTime = 6;
int32 storageType = 7;
}

message PbWorkerInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ class DiskInfo(
this(mountPoint, usableSpace, avgFlushTime, avgFetchTime, activeSlots, List.empty, null)
}

def this(
mountPoint: String,
usableSpace: Long,
avgFlushTime: Long,
avgFetchTime: Long,
activeSlots: Long,
storageType: StorageInfo.Type) = {
this(mountPoint, usableSpace, avgFlushTime, avgFetchTime, activeSlots, List.empty, null)
this.storageType = storageType
}

def this(
mountPoint: String,
dirs: List[File],
Expand All @@ -70,10 +81,14 @@ class DiskInfo(
var status: DiskStatus = DiskStatus.HEALTHY
var threadCount = 1
var configuredUsableSpace = 0L
var storageType: StorageInfo.Type = _
var storageType: StorageInfo.Type = StorageInfo.Type.SSD
var maxSlots: Long = 0
lazy val shuffleAllocations = new util.HashMap[String, Integer]()

def setStorageType(storageType: StorageInfo.Type) = {
this.storageType = storageType
}

def setStatus(status: DiskStatus): this.type = this.synchronized {
this.status = status
this
Expand Down Expand Up @@ -145,9 +160,11 @@ class DiskInfo(
s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," +
s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," +
s" avgFetchTime: ${Utils.nanoDurationToString(avgFetchTime)}," +
s" activeSlots: $activeSlots)" +
s" activeSlots: $activeSlots," +
s" storageType: ${storageType})" +
s" status: $status" +
s" dirs ${dirs.mkString("\t")}"

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._

import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.rpc.RpcEndpointRef
import org.apache.celeborn.common.rpc.netty.NettyRpcEndpointRef
Expand Down Expand Up @@ -156,12 +157,12 @@ class WorkerInfo(
curDisk.activeSlots = newDisk.activeSlots
curDisk.avgFlushTime = newDisk.avgFlushTime
curDisk.avgFetchTime = newDisk.avgFetchTime
if (estimatedPartitionSize.nonEmpty) {
if (estimatedPartitionSize.nonEmpty && curDisk.storageType != StorageInfo.Type.HDFS) {
curDisk.maxSlots = curDisk.actualUsableSpace / estimatedPartitionSize.get
}
curDisk.setStatus(newDisk.status)
} else {
if (estimatedPartitionSize.nonEmpty) {
if (estimatedPartitionSize.nonEmpty && newDisk.storageType != StorageInfo.Type.HDFS) {
newDisk.maxSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get
}
diskInfos.put(mountPoint, newDisk)
Expand Down Expand Up @@ -239,6 +240,11 @@ class WorkerInfo(
result = 31 * result + replicatePort.hashCode()
result
}

def haveDisk(): Boolean = {
diskInfos.values().asScala.exists(p =>
p.storageType == StorageInfo.Type.SSD || p.storageType == StorageInfo.Type.HDD)
}
}

object WorkerInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,17 @@ object PbSerDeUtils {
.setMinor(minor)
.build.toByteArray

def fromPbDiskInfo(pbDiskInfo: PbDiskInfo): DiskInfo =
new DiskInfo(
def fromPbDiskInfo(pbDiskInfo: PbDiskInfo): DiskInfo = {
val diskInfo = new DiskInfo(
pbDiskInfo.getMountPoint,
pbDiskInfo.getUsableSpace,
pbDiskInfo.getAvgFlushTime,
pbDiskInfo.getAvgFetchTime,
pbDiskInfo.getUsedSlots)
.setStatus(Utils.toDiskStatus(pbDiskInfo.getStatus))
diskInfo.setStorageType(StorageInfo.typesMap.get(pbDiskInfo.getStorageType))
diskInfo
}

def toPbDiskInfo(diskInfo: DiskInfo): PbDiskInfo =
PbDiskInfo.newBuilder
Expand All @@ -80,6 +83,7 @@ object PbSerDeUtils {
.setAvgFetchTime(diskInfo.avgFetchTime)
.setUsedSlots(diskInfo.activeSlots)
.setStatus(diskInfo.status.getValue)
.setStorageType(diskInfo.storageType.getValue)
.build

def fromPbFileInfo(pbFileInfo: PbFileInfo): FileInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ public void testToStringOutput() {
PartitionLocation location2 =
new PartitionLocation(
partitionId, epoch, host, rpcPort, pushPort, fetchPort, replicatePort, mode, peer);
StorageInfo storageInfo = new StorageInfo(StorageInfo.Type.MEMORY, "/mnt/disk/0");
StorageInfo storageInfo =
new StorageInfo(
"/mnt/disk/0", StorageInfo.Type.MEMORY, StorageInfo.ALL_TYPES_AVAILABLE_MASK);
RoaringBitmap bitmap = new RoaringBitmap();
bitmap.add(1);
bitmap.add(2);
Expand All @@ -205,15 +207,15 @@ public void testToStringOutput() {
+ " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
+ " mode:PRIMARY\n"
+ " peer:(empty)\n"
+ " storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n"
+ " storage hint:StorageInfo{type=MEMORY, mountPoint='', finalResult=false, filePath=null}\n"
+ " mapIdBitMap:{}]";
String exp2 =
"PartitionLocation[\n"
+ " id-epoch:0-0\n"
+ " host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
+ " mode:PRIMARY\n"
+ " peer:(host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4)\n"
+ " storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n"
+ " storage hint:StorageInfo{type=MEMORY, mountPoint='', finalResult=false, filePath=null}\n"
+ " mapIdBitMap:{}]";
String exp3 =
"PartitionLocation[\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,9 @@ class WorkerInfoSuite extends CelebornFunSuite {
|SlotsUsed: 60
|LastHeartbeat: 0
|Disks: $placeholder
| DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30) status: HEALTHY dirs $placeholder
| DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10) status: HEALTHY dirs $placeholder
| DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20) status: HEALTHY dirs $placeholder
| DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD) status: HEALTHY dirs $placeholder
| DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD) status: HEALTHY dirs $placeholder
| DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD) status: HEALTHY dirs $placeholder
|UserResourceConsumption: $placeholder
| UserIdentifier: `tenant1`.`name1`, ResourceConsumption: ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
|WorkerRef: null
Expand Down
75 changes: 75 additions & 0 deletions docs/developers/slotsallocation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
---
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

# Slots allocation

This article describes the detailed design of Celeborn workers' slots allocation.
Slots allocation is the core components about how Celeborn distribute workload amount workers.
We have achieved two approaches of slots allocation.

## Principle
Allocate slots to local disks unless explicit assigned to HDFS.

## LoadAware
### Related configs
```properties
celeborn.master.slot.assign.policy LOADAWARE
celeborn.master.slot.assign.loadAware.numDiskGroups 5
celeborn.master.slot.assign.loadAware.diskGroupGradient 0.1
celeborn.master.slot.assign.loadAware.flushTimeWeight 0
celeborn.master.slot.assign.loadAware.fetchTimeWeight 0
[spark.client.]celeborn.storage.availableTypes HDD,SSD
```
### Detail
Load-aware slots allocation will take following elements into consideration.
- disk's fetch time
- disk's flush time
- disk's usable space
- disk's used slot

Slots allocator will find out all worker involved in this allocation and sort their disks by
`disk's average flushtime * flush time weight + disk's average fetch time * fetch time weight`.
After getting the sorted disks list, Celeborn will split the disks into
`celeborn.master.slot.assign.loadAware.numDiskGroups` groups. The slots number to be placed into a disk group
is controlled by the `celeborn.master.slot.assign.loadAware.diskGroupGradient` which means that a group's
allocated slots number will be (1+`celeborn.master.slot.assign.loadAware.diskGroupGradient`)
times to the group's slower than it.
For example, there is 5 groups, G1 , G2, G3, G4 and G5. If the G5 is allocated 100 slots.
Other groups will be G4:110, G3:121, G2:133, G1:146.

After Celeborn has decided the slots number of a disk group, slots will be distributed in disks of a disk group.
Each disk has a usableSlots which is calculated by `(disk's usable space)/(average partition size)-usedSlots`.
The slots number to allocate in a disk is calculated by ` slots of this disk group * ( current disk's usableSlots / the sum of all disks' usableSlots in this group)`.
For example, G5 need to allocate 100 slots and have 3 disks D1 with usable slots 100, D2 with usable slots 50, D3 with usable slots 20.
The distribution will be D1:59, D2: 29, D3: 12.

If all slots can be place in disk groups, the slots allocation process is done.

requested slots are more than all usable slots, slots can not be placed into disks.
Worker will need to allocate these slots to workers with local disks one by one.

## RoundRobin
### Detail
Roundrobin slots allocation will distribute all slots into all registered workers with disks. Celeborn will treat
all workers as an array and place 1 slots in a worker until all slots are allocated.
If a worker has multiple disks, the chosen disk index is `(monotone increasing disk index +1) % disk count`.

## Celeborn Worker's Behavior
1. When reserve slots Celeborn worker will decide a slot be placed in local disks or HDFS when reserve slots.
2. If a partition is evicted from memory, the partition might be placed in HDFS.
3. If a slot is explicitly assigned to HDFS, worker will put the slot in HDFS.
Loading
Loading