Skip to content

Commit

Permalink
[CELEBORN-1081] Client support celeborn.storage.activeTypes config
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1.To support `celeborn.storage.activeTypes` in Client.
2.Master will ignore slots for "UNKNOWN_DISK".

### Why are the changes needed?
Enable client application to select storage types to use.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
GA and cluster.

Closes apache#2045 from FMX/B1081.

Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
FMX authored and RexXiong committed Nov 3, 2023
1 parent 0e5008d commit 5e77b85
Show file tree
Hide file tree
Showing 17 changed files with 245 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
val latestPartitionLocation =
JavaUtils.newConcurrentHashMap[Int, ConcurrentHashMap[Int, PartitionLocation]]()
private val userIdentifier: UserIdentifier = IdentityProvider.instantiate(conf).provide()
private val availableStorageTypes = conf.availableStorageTypes

@VisibleForTesting
def workerSnapshots(shuffleId: Int): util.Map[WorkerInfo, ShufflePartitionLocationInfo] =
Expand Down Expand Up @@ -1025,7 +1026,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
pushReplicateEnabled,
pushRackAwareEnabled,
userIdentifier,
slotsAssignMaxWorkers)
slotsAssignMaxWorkers,
availableStorageTypes)
val res = requestMasterRequestSlots(req)
if (res.status != StatusCode.SUCCESS) {
requestMasterRequestSlots(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public static String getFileName(String uniqueId, Mode mode) {
private StorageInfo storageInfo;
private RoaringBitmap mapIdBitMap;
private transient String _hostPushPort;

private transient String _hostFetchPort;

public PartitionLocation(PartitionLocation loc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public int getValue() {
}
}

public static String UNKNOWN_DISK = "UNKNOWN_DISK";
@Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK";
public static Map<Integer, Type> typesMap = new HashMap<>();
public static Set<String> typeNames = new HashSet<>();

Expand All @@ -49,13 +49,21 @@ public int getValue() {
}
}

public static final int MEMORY_MASK = 0b1;
public static final int LOCAL_DISK_MASK = 0b10;
public static final int HDFS_MASK = 0b100;
public static final int OSS_MASK = 0b1000;
public static final int ALL_TYPES_AVAILABLE_MASK = 0;

// Default storage Type is MEMORY.
private Type type = Type.MEMORY;
private String mountPoint = UNKNOWN_DISK;
// if a file is committed, field "finalResult" will be true
private boolean finalResult = false;
private String filePath;

public int availableStorageTypes = 0;

public StorageInfo() {}

public StorageInfo(Type type, boolean isFinal, String filePath) {
Expand All @@ -64,8 +72,9 @@ public StorageInfo(Type type, boolean isFinal, String filePath) {
this.filePath = filePath;
}

public StorageInfo(String mountPoint) {
public StorageInfo(String mountPoint, int availableStorageTypes) {
this.mountPoint = mountPoint;
this.availableStorageTypes = availableStorageTypes;
}

public StorageInfo(Type type, String mountPoint) {
Expand All @@ -86,6 +95,19 @@ public StorageInfo(Type type, String mountPoint, boolean finalResult, String fil
this.filePath = filePath;
}

public StorageInfo(
Type type,
String mountPoint,
boolean finalResult,
String filePath,
int availableStorageTypes) {
this.type = type;
this.mountPoint = mountPoint;
this.finalResult = finalResult;
this.filePath = filePath;
this.availableStorageTypes = availableStorageTypes;
}

public boolean isFinalResult() {
return finalResult;
}
Expand Down Expand Up @@ -125,13 +147,50 @@ public String toString() {
+ '}';
}

public boolean localDiskAvailable() {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & LOCAL_DISK_MASK) > 0;
}

public boolean HDFSAvailable() {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & HDFS_MASK) > 0;
}

public boolean OSSAvailable() {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & OSS_MASK) > 0;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StorageInfo that = (StorageInfo) o;
return finalResult == that.finalResult
&& availableStorageTypes == that.availableStorageTypes
&& type == that.type
&& Objects.equals(mountPoint, that.mountPoint)
&& Objects.equals(filePath, that.filePath);
}

@Override
public int hashCode() {
return Objects.hash(type, mountPoint, finalResult, filePath, availableStorageTypes);
}

public static final boolean validate(String typeStr) {
return typeNames.contains(typeStr);
}

public static PbStorageInfo toPb(StorageInfo storageInfo) {
String filePath = storageInfo.getFilePath();
PbStorageInfo.Builder builder = PbStorageInfo.newBuilder();
builder
.setType(storageInfo.type.value)
.setFinalResult(storageInfo.finalResult)
.setMountPoint(storageInfo.mountPoint);
.setMountPoint(storageInfo.mountPoint)
.setAvailableStorageTypes(storageInfo.availableStorageTypes);
if (filePath != null) {
builder.setFilePath(filePath);
}
Expand All @@ -143,10 +202,29 @@ public static StorageInfo fromPb(PbStorageInfo pbStorageInfo) {
typesMap.get(pbStorageInfo.getType()),
pbStorageInfo.getMountPoint(),
pbStorageInfo.getFinalResult(),
pbStorageInfo.getFilePath());
}

public static boolean validateStorageType(String str) {
return typeNames.contains(str);
pbStorageInfo.getFilePath(),
pbStorageInfo.getAvailableStorageTypes());
}

public static int getAvailableTypes(List<Type> types) {
int ava = 0;
for (Type type : types) {
switch (type) {
case MEMORY:
ava = ava | MEMORY_MASK;
break;
case HDD:
case SSD:
ava = ava | LOCAL_DISK_MASK;
break;
case HDFS:
ava = ava | HDFS_MASK;
break;
case OSS:
ava = ava | OSS_MASK;
break;
}
}
return ava;
}
}
5 changes: 4 additions & 1 deletion common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ message PbStorageInfo {
string mountPoint = 2;
bool finalResult = 3;
string filePath = 4;
int32 availableStorageTypes = 5;
}

message PbPartitionLocation {
Expand Down Expand Up @@ -206,6 +207,7 @@ message PbRequestSlots {
PbUserIdentifier userIdentifier = 8;
bool shouldRackAware = 9;
int32 maxWorkers = 10;
int32 availableStorageTypes = 11;
}

message PbSlotInfo {
Expand Down Expand Up @@ -365,6 +367,7 @@ message PbReserveSlots {
PbUserIdentifier userIdentifier = 9;
int64 pushDataTimeout = 10;
bool partitionSplitEnabled = 11;
int32 availableStorageTypes = 12;
}

message PbReserveSlotsResponse {
Expand Down Expand Up @@ -494,7 +497,7 @@ message PbSnapshotMetaInfo {
int64 partitionTotalWritten = 8;
int64 partitionTotalFileCount = 9;
repeated PbAppDiskUsageSnapshot appDiskUsageMetricSnapshots = 10;
optional PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11;
PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11;
map<string, int64> lostWorkers = 12;
repeated PbWorkerInfo shutdownWorkers = 13;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,10 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// //////////////////////////////////////////////////////
def masterSlotAssignPolicy: SlotsAssignPolicy =
SlotsAssignPolicy.valueOf(get(MASTER_SLOT_ASSIGN_POLICY))

def availableStorageTypes: Int = {
val types = get(ACTIVE_STORAGE_TYPES).split(",").map(StorageInfo.Type.valueOf(_)).toList
StorageInfo.getAvailableTypes(types.asJava)
}
def hasHDFSStorage: Boolean =
get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) && get(HDFS_DIR).isDefined
def masterSlotAssignLoadAwareDiskGroupNum: Int = get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM)
Expand Down Expand Up @@ -3955,13 +3958,16 @@ object CelebornConf extends Logging {
.createWithDefaultString("32m")

val ACTIVE_STORAGE_TYPES: ConfigEntry[String] =
buildConf("celeborn.storage.activeTypes")
.categories("master", "worker")
buildConf("celeborn.storage.availableTypes")
.withAlternative("celeborn.storage.activeTypes")
.categories("master", "worker", "client")
.version("0.3.0")
.doc("Enabled storage levels. Available options: HDD,SSD,HDFS. ")
.doc(
"Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.createWithDefault("HDD,SSD")
.checkValue(p => p.split(",").map(StorageInfo.validate(_)).reduce(_ && _), "")
.createWithDefault("HDD")

val READ_LOCAL_SHUFFLE_FILE: ConfigEntry[Boolean] =
buildConf("celeborn.client.readLocalShuffleFile.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ object ControlMessages extends Logging {
shouldRackAware: Boolean,
userIdentifier: UserIdentifier,
maxWorkers: Int,
availableStorageTypes: Int,
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage

Expand Down Expand Up @@ -519,6 +520,7 @@ object ControlMessages extends Logging {
shouldRackAware,
userIdentifier,
maxWorkers,
availableStorageTypes,
requestId) =>
val payload = PbRequestSlots.newBuilder()
.setApplicationId(applicationId)
Expand All @@ -529,6 +531,7 @@ object ControlMessages extends Logging {
.setShouldRackAware(shouldRackAware)
.setMaxWorkers(maxWorkers)
.setRequestId(requestId)
.setAvailableStorageTypes(availableStorageTypes)
.setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)
Expand Down Expand Up @@ -896,6 +899,7 @@ object ControlMessages extends Logging {
pbRequestSlots.getShouldRackAware,
userIdentifier,
pbRequestSlots.getMaxWorkers,
pbRequestSlots.getAvailableStorageTypes,
pbRequestSlots.getRequestId)

case REQUEST_SLOTS_RESPONSE_VALUE =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.celeborn.common

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf._
import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.util.Utils

class CelebornConfSuite extends CelebornFunSuite {
Expand Down Expand Up @@ -209,11 +210,11 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerBaseDirs.isEmpty)

conf.set("celeborn.storage.activeTypes", "SDD,HDD,HDFS")
conf.set("celeborn.storage.activeTypes", "SSD,HDD,HDFS")
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerBaseDirs.isEmpty)

conf.set("celeborn.storage.activeTypes", "SDD,HDD")
conf.set("celeborn.storage.activeTypes", "SSD,HDD")
assert(!conf.workerBaseDirs.isEmpty)
}

Expand All @@ -223,10 +224,25 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerCommitThreads === 128)

conf.set("celeborn.storage.activeTypes", "SDD,HDD")
conf.set("celeborn.storage.activeTypes", "SSD,HDD")
assert(conf.workerCommitThreads === 32)
}

test("Test available storage types") {
val conf = new CelebornConf()

assert(conf.availableStorageTypes == StorageInfo.LOCAL_DISK_MASK)

conf.set("celeborn.storage.availableTypes", "HDD,MEMORY")
assert(conf.availableStorageTypes == Integer.parseInt("11", 2))

conf.set("celeborn.storage.availableTypes", "HDD,HDFS")
assert(conf.availableStorageTypes == (StorageInfo.HDFS_MASK | StorageInfo.LOCAL_DISK_MASK))

conf.set("celeborn.storage.availableTypes", "HDFS")
assert(conf.availableStorageTypes == StorageInfo.HDFS_MASK)
}

test("Test role rpcDispatcherNumThreads") {
val availableCores = 5
val conf = new CelebornConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, FileInfo, WorkerInfo}
import org.apache.celeborn.common.protocol.PartitionLocation
import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo}
import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
import org.apache.celeborn.common.quota.ResourceConsumption

Expand Down Expand Up @@ -51,8 +51,8 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
val userIdentifier1 = UserIdentifier("tenant-a", "user-a")
val userIdentifier2 = UserIdentifier("tenant-b", "user-b")

val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000, 2000, 3000)
val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000, 4000, 6000)
val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000L, 2000L, 3000L)
val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000L, 4000L, 6000L)

val fileInfo1 = new FileInfo("/tmp/1", chunkOffsets1, userIdentifier1)
val fileInfo2 = new FileInfo("/tmp/2", chunkOffsets2, userIdentifier2)
Expand All @@ -77,6 +77,27 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
val partitionLocation2 =
new PartitionLocation(1, 1, "host2", 20, 19, 18, 24, PartitionLocation.Mode.REPLICA)

val partitionLocation3 =
new PartitionLocation(2, 2, "host3", 30, 29, 28, 27, PartitionLocation.Mode.PRIMARY)
val partitionLocation4 =
new PartitionLocation(
3,
3,
"host4",
40,
39,
38,
37,
PartitionLocation.Mode.REPLICA,
partitionLocation3,
new StorageInfo(
StorageInfo.Type.HDD,
"mountPoint",
false,
"filePath",
StorageInfo.LOCAL_DISK_MASK),
null)

val workerResource = new WorkerResource()
workerResource.put(
workerInfo1,
Expand Down Expand Up @@ -187,4 +208,17 @@ class PbSerDeUtilsTest extends CelebornFunSuite {

assert(restoredWorkerResource.equals(workerResource))
}

test("testPbStorageInfo") {
val pbPartitionLocation3 = PbSerDeUtils.toPbPartitionLocation(partitionLocation3)
val pbPartitionLocation4 = PbSerDeUtils.toPbPartitionLocation(partitionLocation4)

val restoredPartitionLocation3 = PbSerDeUtils.fromPbPartitionLocation(pbPartitionLocation3)
val restoredPartitionLocation4 = PbSerDeUtils.fromPbPartitionLocation(pbPartitionLocation4)

assert(restoredPartitionLocation3.equals(partitionLocation3))
assert(restoredPartitionLocation4.equals(partitionLocation4))
assert(restoredPartitionLocation4.getStorageInfo.equals(partitionLocation4.getStorageInfo))
}

}
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ license: |
| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 2147483647 | Celeborn will only accept shuffle of partition number lower than this configuration value. | 0.3.0 |
| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. | 0.3.0 |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 |
| celeborn.storage.availableTypes | HDD | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->
Loading

0 comments on commit 5e77b85

Please sign in to comment.