diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index ebc89281535..4c30b92a596 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -75,6 +75,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends val latestPartitionLocation = JavaUtils.newConcurrentHashMap[Int, ConcurrentHashMap[Int, PartitionLocation]]() private val userIdentifier: UserIdentifier = IdentityProvider.instantiate(conf).provide() + private val availableStorageTypes = conf.availableStorageTypes @VisibleForTesting def workerSnapshots(shuffleId: Int): util.Map[WorkerInfo, ShufflePartitionLocationInfo] = @@ -1025,7 +1026,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends pushReplicateEnabled, pushRackAwareEnabled, userIdentifier, - slotsAssignMaxWorkers) + slotsAssignMaxWorkers, + availableStorageTypes) val res = requestMasterRequestSlots(req) if (res.status != StatusCode.SUCCESS) { requestMasterRequestSlots(req) diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java index 08e97c0ffdc..8e76f348864 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java @@ -64,7 +64,6 @@ public static String getFileName(String uniqueId, Mode mode) { private StorageInfo storageInfo; private RoaringBitmap mapIdBitMap; private transient String _hostPushPort; - private transient String _hostFetchPort; public PartitionLocation(PartitionLocation loc) { diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java index b7bc0e87812..d3e1bf95b5f 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java @@ -38,7 +38,7 @@ public int getValue() { } } - public static String UNKNOWN_DISK = "UNKNOWN_DISK"; + @Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK"; public static Map typesMap = new HashMap<>(); public static Set typeNames = new HashSet<>(); @@ -49,6 +49,12 @@ public int getValue() { } } + public static final int MEMORY_MASK = 0b1; + public static final int LOCAL_DISK_MASK = 0b10; + public static final int HDFS_MASK = 0b100; + public static final int OSS_MASK = 0b1000; + public static final int ALL_TYPES_AVAILABLE_MASK = 0; + // Default storage Type is MEMORY. private Type type = Type.MEMORY; private String mountPoint = UNKNOWN_DISK; @@ -56,6 +62,8 @@ public int getValue() { private boolean finalResult = false; private String filePath; + public int availableStorageTypes = 0; + public StorageInfo() {} public StorageInfo(Type type, boolean isFinal, String filePath) { @@ -64,8 +72,9 @@ public StorageInfo(Type type, boolean isFinal, String filePath) { this.filePath = filePath; } - public StorageInfo(String mountPoint) { + public StorageInfo(String mountPoint, int availableStorageTypes) { this.mountPoint = mountPoint; + this.availableStorageTypes = availableStorageTypes; } public StorageInfo(Type type, String mountPoint) { @@ -86,6 +95,19 @@ public StorageInfo(Type type, String mountPoint, boolean finalResult, String fil this.filePath = filePath; } + public StorageInfo( + Type type, + String mountPoint, + boolean finalResult, + String filePath, + int availableStorageTypes) { + this.type = type; + this.mountPoint = mountPoint; + this.finalResult = finalResult; + this.filePath = filePath; + this.availableStorageTypes = availableStorageTypes; + } + public boolean isFinalResult() { return finalResult; } @@ -125,13 +147,50 @@ public String toString() { + '}'; } + public boolean localDiskAvailable() { + return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK + || (availableStorageTypes & LOCAL_DISK_MASK) > 0; + } + + public boolean HDFSAvailable() { + return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK + || (availableStorageTypes & HDFS_MASK) > 0; + } + + public boolean OSSAvailable() { + return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK + || (availableStorageTypes & OSS_MASK) > 0; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StorageInfo that = (StorageInfo) o; + return finalResult == that.finalResult + && availableStorageTypes == that.availableStorageTypes + && type == that.type + && Objects.equals(mountPoint, that.mountPoint) + && Objects.equals(filePath, that.filePath); + } + + @Override + public int hashCode() { + return Objects.hash(type, mountPoint, finalResult, filePath, availableStorageTypes); + } + + public static final boolean validate(String typeStr) { + return typeNames.contains(typeStr); + } + public static PbStorageInfo toPb(StorageInfo storageInfo) { String filePath = storageInfo.getFilePath(); PbStorageInfo.Builder builder = PbStorageInfo.newBuilder(); builder .setType(storageInfo.type.value) .setFinalResult(storageInfo.finalResult) - .setMountPoint(storageInfo.mountPoint); + .setMountPoint(storageInfo.mountPoint) + .setAvailableStorageTypes(storageInfo.availableStorageTypes); if (filePath != null) { builder.setFilePath(filePath); } @@ -143,10 +202,29 @@ public static StorageInfo fromPb(PbStorageInfo pbStorageInfo) { typesMap.get(pbStorageInfo.getType()), pbStorageInfo.getMountPoint(), pbStorageInfo.getFinalResult(), - pbStorageInfo.getFilePath()); - } - - public static boolean validateStorageType(String str) { - return typeNames.contains(str); + pbStorageInfo.getFilePath(), + pbStorageInfo.getAvailableStorageTypes()); + } + + public static int getAvailableTypes(List types) { + int ava = 0; + for (Type type : types) { + switch (type) { + case MEMORY: + ava = ava | MEMORY_MASK; + break; + case HDD: + case SSD: + ava = ava | LOCAL_DISK_MASK; + break; + case HDFS: + ava = ava | HDFS_MASK; + break; + case OSS: + ava = ava | OSS_MASK; + break; + } + } + return ava; } } diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 3e2e7a54cec..e05615e5b54 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -97,6 +97,7 @@ message PbStorageInfo { string mountPoint = 2; bool finalResult = 3; string filePath = 4; + int32 availableStorageTypes = 5; } message PbPartitionLocation { @@ -206,6 +207,7 @@ message PbRequestSlots { PbUserIdentifier userIdentifier = 8; bool shouldRackAware = 9; int32 maxWorkers = 10; + int32 availableStorageTypes = 11; } message PbSlotInfo { @@ -365,6 +367,7 @@ message PbReserveSlots { PbUserIdentifier userIdentifier = 9; int64 pushDataTimeout = 10; bool partitionSplitEnabled = 11; + int32 availableStorageTypes = 12; } message PbReserveSlotsResponse { @@ -494,7 +497,7 @@ message PbSnapshotMetaInfo { int64 partitionTotalWritten = 8; int64 partitionTotalFileCount = 9; repeated PbAppDiskUsageSnapshot appDiskUsageMetricSnapshots = 10; - optional PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11; + PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11; map lostWorkers = 12; repeated PbWorkerInfo shutdownWorkers = 13; } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 194c7e03901..e024f6f1ecf 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -514,7 +514,10 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // ////////////////////////////////////////////////////// def masterSlotAssignPolicy: SlotsAssignPolicy = SlotsAssignPolicy.valueOf(get(MASTER_SLOT_ASSIGN_POLICY)) - + def availableStorageTypes: Int = { + val types = get(ACTIVE_STORAGE_TYPES).split(",").map(StorageInfo.Type.valueOf(_)).toList + StorageInfo.getAvailableTypes(types.asJava) + } def hasHDFSStorage: Boolean = get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) && get(HDFS_DIR).isDefined def masterSlotAssignLoadAwareDiskGroupNum: Int = get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM) @@ -3955,13 +3958,16 @@ object CelebornConf extends Logging { .createWithDefaultString("32m") val ACTIVE_STORAGE_TYPES: ConfigEntry[String] = - buildConf("celeborn.storage.activeTypes") - .categories("master", "worker") + buildConf("celeborn.storage.availableTypes") + .withAlternative("celeborn.storage.activeTypes") + .categories("master", "worker", "client") .version("0.3.0") - .doc("Enabled storage levels. Available options: HDD,SSD,HDFS. ") + .doc( + "Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical.") .stringConf .transform(_.toUpperCase(Locale.ROOT)) - .createWithDefault("HDD,SSD") + .checkValue(p => p.split(",").map(StorageInfo.validate(_)).reduce(_ && _), "") + .createWithDefault("HDD") val READ_LOCAL_SHUFFLE_FILE: ConfigEntry[Boolean] = buildConf("celeborn.client.readLocalShuffleFile.enabled") diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 749170a2683..578824cc1c2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -170,6 +170,7 @@ object ControlMessages extends Logging { shouldRackAware: Boolean, userIdentifier: UserIdentifier, maxWorkers: Int, + availableStorageTypes: Int, override var requestId: String = ZERO_UUID) extends MasterRequestMessage @@ -519,6 +520,7 @@ object ControlMessages extends Logging { shouldRackAware, userIdentifier, maxWorkers, + availableStorageTypes, requestId) => val payload = PbRequestSlots.newBuilder() .setApplicationId(applicationId) @@ -529,6 +531,7 @@ object ControlMessages extends Logging { .setShouldRackAware(shouldRackAware) .setMaxWorkers(maxWorkers) .setRequestId(requestId) + .setAvailableStorageTypes(availableStorageTypes) .setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier)) .build().toByteArray new TransportMessage(MessageType.REQUEST_SLOTS, payload) @@ -896,6 +899,7 @@ object ControlMessages extends Logging { pbRequestSlots.getShouldRackAware, userIdentifier, pbRequestSlots.getMaxWorkers, + pbRequestSlots.getAvailableStorageTypes, pbRequestSlots.getRequestId) case REQUEST_SLOTS_RESPONSE_VALUE => diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala index f3e01c5ab43..736a44619eb 100644 --- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala @@ -19,6 +19,7 @@ package org.apache.celeborn.common import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.CelebornConf._ +import org.apache.celeborn.common.protocol.StorageInfo import org.apache.celeborn.common.util.Utils class CelebornConfSuite extends CelebornFunSuite { @@ -209,11 +210,11 @@ class CelebornConfSuite extends CelebornFunSuite { conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx") assert(conf.workerBaseDirs.isEmpty) - conf.set("celeborn.storage.activeTypes", "SDD,HDD,HDFS") + conf.set("celeborn.storage.activeTypes", "SSD,HDD,HDFS") conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx") assert(conf.workerBaseDirs.isEmpty) - conf.set("celeborn.storage.activeTypes", "SDD,HDD") + conf.set("celeborn.storage.activeTypes", "SSD,HDD") assert(!conf.workerBaseDirs.isEmpty) } @@ -223,10 +224,25 @@ class CelebornConfSuite extends CelebornFunSuite { conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx") assert(conf.workerCommitThreads === 128) - conf.set("celeborn.storage.activeTypes", "SDD,HDD") + conf.set("celeborn.storage.activeTypes", "SSD,HDD") assert(conf.workerCommitThreads === 32) } + test("Test available storage types") { + val conf = new CelebornConf() + + assert(conf.availableStorageTypes == StorageInfo.LOCAL_DISK_MASK) + + conf.set("celeborn.storage.availableTypes", "HDD,MEMORY") + assert(conf.availableStorageTypes == Integer.parseInt("11", 2)) + + conf.set("celeborn.storage.availableTypes", "HDD,HDFS") + assert(conf.availableStorageTypes == (StorageInfo.HDFS_MASK | StorageInfo.LOCAL_DISK_MASK)) + + conf.set("celeborn.storage.availableTypes", "HDFS") + assert(conf.availableStorageTypes == StorageInfo.HDFS_MASK) + } + test("Test role rpcDispatcherNumThreads") { val availableCores = 5 val conf = new CelebornConf() diff --git a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala index 903af02c953..64b6f7e86e1 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala @@ -23,7 +23,7 @@ import java.util import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, FileInfo, WorkerInfo} -import org.apache.celeborn.common.protocol.PartitionLocation +import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo} import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource import org.apache.celeborn.common.quota.ResourceConsumption @@ -51,8 +51,8 @@ class PbSerDeUtilsTest extends CelebornFunSuite { val userIdentifier1 = UserIdentifier("tenant-a", "user-a") val userIdentifier2 = UserIdentifier("tenant-b", "user-b") - val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000, 2000, 3000) - val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000, 4000, 6000) + val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000L, 2000L, 3000L) + val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000L, 4000L, 6000L) val fileInfo1 = new FileInfo("/tmp/1", chunkOffsets1, userIdentifier1) val fileInfo2 = new FileInfo("/tmp/2", chunkOffsets2, userIdentifier2) @@ -77,6 +77,27 @@ class PbSerDeUtilsTest extends CelebornFunSuite { val partitionLocation2 = new PartitionLocation(1, 1, "host2", 20, 19, 18, 24, PartitionLocation.Mode.REPLICA) + val partitionLocation3 = + new PartitionLocation(2, 2, "host3", 30, 29, 28, 27, PartitionLocation.Mode.PRIMARY) + val partitionLocation4 = + new PartitionLocation( + 3, + 3, + "host4", + 40, + 39, + 38, + 37, + PartitionLocation.Mode.REPLICA, + partitionLocation3, + new StorageInfo( + StorageInfo.Type.HDD, + "mountPoint", + false, + "filePath", + StorageInfo.LOCAL_DISK_MASK), + null) + val workerResource = new WorkerResource() workerResource.put( workerInfo1, @@ -187,4 +208,17 @@ class PbSerDeUtilsTest extends CelebornFunSuite { assert(restoredWorkerResource.equals(workerResource)) } + + test("testPbStorageInfo") { + val pbPartitionLocation3 = PbSerDeUtils.toPbPartitionLocation(partitionLocation3) + val pbPartitionLocation4 = PbSerDeUtils.toPbPartitionLocation(partitionLocation4) + + val restoredPartitionLocation3 = PbSerDeUtils.fromPbPartitionLocation(pbPartitionLocation3) + val restoredPartitionLocation4 = PbSerDeUtils.fromPbPartitionLocation(pbPartitionLocation4) + + assert(restoredPartitionLocation3.equals(partitionLocation3)) + assert(restoredPartitionLocation4.equals(partitionLocation4)) + assert(restoredPartitionLocation4.getStorageInfo.equals(partitionLocation4.getStorageInfo)) + } + } diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 1ff362660c9..f16e43193da 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -104,5 +104,6 @@ license: | | celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 2147483647 | Celeborn will only accept shuffle of partition number lower than this configuration value. | 0.3.0 | | celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. | 0.3.0 | | celeborn.master.endpoints | <localhost>:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | +| celeborn.storage.availableTypes | HDD | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | | celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 30741d85f0f..b83b4b20cad 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -38,6 +38,6 @@ license: | | celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.3.0 | | celeborn.master.userResourceConsumption.update.interval | 30s | Time length for a window about compute user resource consumption. | 0.3.0 | | celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | Worker unavailable info would be cleared when the retention period is expired | 0.3.1 | -| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 | +| celeborn.storage.availableTypes | HDD | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | | celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 177a25833f6..a356844cb2b 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -22,7 +22,7 @@ license: | | celeborn.master.endpoints | <localhost>:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | | celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | -| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 | +| celeborn.storage.availableTypes | HDD | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | | celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | celeborn.worker.activeConnection.max | <undefined> | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for read buffer per mount point. | 0.3.0 | diff --git a/docs/migration.md b/docs/migration.md index 62320b99786..15937d3e975 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -42,6 +42,8 @@ license: | - Since 0.4.0, Celeborn deprecate `celeborn.worker.storage.baseDir.prefix` and `celeborn.worker.storage.baseDir.number`. Please use `celeborn.worker.storage.dirs` instead. +- Since 0.4.0, Celeborn deprecate `celeborn.storage.activeTypes`. Please use `celeborn.storage.availableTypes` instead. + ## Upgrading from 0.3.1 to 0.3.2 - Since 0.3.1, Celeborn changed the default value of `raft.client.rpc.request.timeout` from `3s` to `10s`. diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index ce2e4db9bca..acfc2261031 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -53,7 +53,8 @@ static class UsableDiskInfo { List workers, List partitionIds, boolean shouldReplicate, - boolean shouldRackAware) { + boolean shouldRackAware, + int availableStorageTypes) { if (partitionIds.isEmpty()) { return new HashMap<>(); } @@ -72,7 +73,13 @@ static class UsableDiskInfo { } } } - return locateSlots(partitionIds, workers, restrictions, shouldReplicate, shouldRackAware); + return locateSlots( + partitionIds, + workers, + restrictions, + shouldReplicate, + shouldRackAware, + availableStorageTypes); } /** @@ -90,7 +97,8 @@ static class UsableDiskInfo { int diskGroupCount, double diskGroupGradient, double flushTimeWeight, - double fetchTimeWeight) { + double fetchTimeWeight, + int availableStorageTypes) { if (partitionIds.isEmpty()) { return new HashMap<>(); } @@ -123,7 +131,8 @@ static class UsableDiskInfo { logger.warn( "offer slots for {} fallback to roundrobin because there is no usable disks", StringUtils.join(partitionIds, ',')); - return offerSlotsRoundRobin(workers, partitionIds, shouldReplicate, shouldRackAware); + return offerSlotsRoundRobin( + workers, partitionIds, shouldReplicate, shouldRackAware, availableStorageTypes); } if (!initialized) { @@ -135,14 +144,21 @@ static class UsableDiskInfo { placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight, fetchTimeWeight), diskToWorkerMap, shouldReplicate ? partitionIds.size() * 2 : partitionIds.size()); - return locateSlots(partitionIds, workers, restrictions, shouldReplicate, shouldRackAware); + return locateSlots( + partitionIds, + workers, + restrictions, + shouldReplicate, + shouldRackAware, + availableStorageTypes); } private static StorageInfo getStorageInfo( List workers, int workerIndex, Map> restrictions, - Map workerDiskIndex) { + Map workerDiskIndex, + int availableStorageTypes) { WorkerInfo selectedWorker = workers.get(workerIndex); List usableDiskInfos = restrictions.get(selectedWorker); int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0); @@ -150,7 +166,9 @@ private static StorageInfo getStorageInfo( diskIndex = (diskIndex + 1) % usableDiskInfos.size(); } usableDiskInfos.get(diskIndex).usableSlots--; - StorageInfo storageInfo = new StorageInfo(usableDiskInfos.get(diskIndex).diskInfo.mountPoint()); + StorageInfo storageInfo = + new StorageInfo( + usableDiskInfos.get(diskIndex).diskInfo.mountPoint(), availableStorageTypes); workerDiskIndex.put(selectedWorker, (diskIndex + 1) % usableDiskInfos.size()); return storageInfo; } @@ -167,7 +185,8 @@ private static StorageInfo getStorageInfo( List workers, Map> restrictions, boolean shouldReplicate, - boolean shouldRackAware) { + boolean shouldRackAware, + int activeStorageTypes) { Map, List>> slots = new HashMap<>(); @@ -178,12 +197,15 @@ private static StorageInfo getStorageInfo( new LinkedList<>(restrictions.keySet()), restrictions, shouldReplicate, - shouldRackAware); + shouldRackAware, + activeStorageTypes); if (!remain.isEmpty()) { - remain = roundRobin(slots, remain, workers, null, shouldReplicate, shouldRackAware); + remain = + roundRobin( + slots, remain, workers, null, shouldReplicate, shouldRackAware, activeStorageTypes); } if (!remain.isEmpty()) { - roundRobin(slots, remain, workers, null, shouldReplicate, false); + roundRobin(slots, remain, workers, null, shouldReplicate, false, activeStorageTypes); } return slots; } @@ -194,7 +216,8 @@ private static List roundRobin( List workers, Map> restrictions, boolean shouldReplicate, - boolean shouldRackAware) { + boolean shouldRackAware, + int availableStorageTypes) { // workerInfo -> (diskIndexForPrimary, diskIndexForReplica) Map workerDiskIndexForPrimary = new HashMap<>(); Map workerDiskIndexForReplica = new HashMap<>(); @@ -215,7 +238,12 @@ private static List roundRobin( } } storageInfo = - getStorageInfo(workers, nextPrimaryInd, restrictions, workerDiskIndexForPrimary); + getStorageInfo( + workers, + nextPrimaryInd, + restrictions, + workerDiskIndexForPrimary, + availableStorageTypes); } PartitionLocation primaryPartition = createLocation(partitionId, workers.get(nextPrimaryInd), null, storageInfo, true); @@ -231,7 +259,12 @@ private static List roundRobin( } } storageInfo = - getStorageInfo(workers, nextReplicaInd, restrictions, workerDiskIndexForReplica); + getStorageInfo( + workers, + nextReplicaInd, + restrictions, + workerDiskIndexForReplica, + availableStorageTypes); } else if (shouldRackAware) { while (!satisfyRackAware(true, workers, nextPrimaryInd, nextReplicaInd)) { nextReplicaInd = (nextReplicaInd + 1) % workers.size(); @@ -460,10 +493,13 @@ public static Map> slotsToDiskAllocations( jointLocations.addAll(slots.get(worker)._2); for (PartitionLocation location : jointLocations) { String mountPoint = location.getStorageInfo().getMountPoint(); - if (slotsPerDisk.containsKey(mountPoint)) { - slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1); - } else { - slotsPerDisk.put(mountPoint, 1); + // ignore slots for UNKNOWN_DISK + if (!mountPoint.equals(StorageInfo.UNKNOWN_DISK)) { + if (slotsPerDisk.containsKey(mountPoint)) { + slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1); + } else { + slotsPerDisk.put(mountPoint, 1); + } } } } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index ead05b37524..c1102418a7e 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -357,7 +357,7 @@ private[celeborn] class Master( // keep it for compatible reason context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS)) - case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) => + case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _, _) => logTrace(s"Received RequestSlots request $requestSlots.") executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots)) @@ -668,13 +668,15 @@ private[celeborn] class Master( slotsAssignLoadAwareDiskGroupNum, slotsAssignLoadAwareDiskGroupGradient, loadAwareFlushTimeWeight, - loadAwareFetchTimeWeight) + loadAwareFetchTimeWeight, + requestSlots.availableStorageTypes) } else { SlotsAllocator.offerSlotsRoundRobin( selectedWorkers, requestSlots.partitionIdList, requestSlots.shouldReplicate, - requestSlots.shouldRackAware) + requestSlots.shouldRackAware, + requestSlots.availableStorageTypes) } } } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java index 8ca6d31006b..60707ceb566 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java @@ -36,6 +36,7 @@ import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.protocol.PartitionLocation; +import org.apache.celeborn.common.protocol.StorageInfo; import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver; public class SlotsAllocatorRackAwareSuiteJ { @@ -68,7 +69,8 @@ public void offerSlotsRoundRobinWithRackAware() throws IOException { List workers = prepareWorkers(resolver); Map, List>> slots = - SlotsAllocator.offerSlotsRoundRobin(workers, partitionIds, true, true); + SlotsAllocator.offerSlotsRoundRobin( + workers, partitionIds, true, true, StorageInfo.ALL_TYPES_AVAILABLE_MASK); Consumer assertCustomer = new Consumer() { @@ -103,7 +105,8 @@ public void offerSlotsRoundRobinWithRackAwareWithoutMappingFile() throws IOExcep List workers = prepareWorkers(resolver); Map, List>> slots = - SlotsAllocator.offerSlotsRoundRobin(workers, partitionIds, true, true); + SlotsAllocator.offerSlotsRoundRobin( + workers, partitionIds, true, true, StorageInfo.ALL_TYPES_AVAILABLE_MASK); Consumer assertConsumer = new Consumer() { diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java index ccdeb5b2d7b..e82742bab0d 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java @@ -36,6 +36,7 @@ import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.protocol.PartitionLocation; +import org.apache.celeborn.common.protocol.StorageInfo; public class SlotsAllocatorSuiteJ { private List prepareWorkers(boolean hasDisks) { @@ -239,7 +240,8 @@ private void check( conf.masterSlotAssignLoadAwareDiskGroupNum(), conf.masterSlotAssignLoadAwareDiskGroupGradient(), conf.masterSlotAssignLoadAwareFlushTimeWeight(), - conf.masterSlotAssignLoadAwareFetchTimeWeight()); + conf.masterSlotAssignLoadAwareFetchTimeWeight(), + StorageInfo.ALL_TYPES_AVAILABLE_MASK); if (expectSuccess) { if (shouldReplicate) { slots.forEach( @@ -275,10 +277,11 @@ private void check( allocateToDiskSlots += worker.usedSlots(); } if (shouldReplicate) { - Assert.assertEquals(partitionIds.size() * 2, unknownDiskSlots + allocateToDiskSlots); + Assert.assertTrue(partitionIds.size() * 2 >= unknownDiskSlots + allocateToDiskSlots); } else { - Assert.assertEquals(partitionIds.size(), unknownDiskSlots + allocateToDiskSlots); + Assert.assertTrue(partitionIds.size() >= unknownDiskSlots + allocateToDiskSlots); } + Assert.assertEquals(0, unknownDiskSlots); } else { assert slots.isEmpty() : "Expect to fail to offer slots, but return " + slots.size() + " slots."; @@ -294,7 +297,8 @@ private void checkSlotsOnHDFS( CelebornConf conf = new CelebornConf(); conf.set("celeborn.active.storage.levels", "HDFS"); Map, List>> slots = - SlotsAllocator.offerSlotsRoundRobin(workers, partitionIds, shouldReplicate, false); + SlotsAllocator.offerSlotsRoundRobin( + workers, partitionIds, shouldReplicate, false, StorageInfo.ALL_TYPES_AVAILABLE_MASK); int allocatedPartitionCount = 0; diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index e059ae4aff4..b1239b49798 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -362,7 +362,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint") } val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) - if (dirs.isEmpty) { + if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) { val shuffleDir = new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId") val fileInfo = @@ -400,7 +400,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } hdfsWriters.put(fileInfo.getFilePath, hdfsWriter) return hdfsWriter - } else { + } else if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) { val dir = dirs(getNextIndex() % dirs.size) val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, mountPoints) val shuffleDir = new File(dir, s"$appId/$shuffleId") @@ -471,6 +471,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs exception, DiskStatus.READ_OR_WRITE_FAILURE) } + } else { + exception = new IOException("No storage available for location:" + location.toString) } retryCount += 1 }