Skip to content

Commit

Permalink
[improvement](balance) partition rebalance chose disk by rr #36826 (#…
Browse files Browse the repository at this point in the history
…36901)

cherry pick from #36826
  • Loading branch information
yujun777 authored Jun 27, 2024
1 parent 17873d0 commit f5c0585
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -266,9 +265,9 @@ protected void completeSchedCtx(TabletSchedCtx tabletCtx)
Preconditions.checkNotNull(slot, "unable to get slot of toBe " + move.toBe);

List<RootPathLoadStatistic> paths = beStat.getPathStatistics();
Set<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
&& path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
if (pathHash == -1) {
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1933,9 +1933,12 @@ public static class PathSlot {
// path hash -> slot num
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
private long beId;
// only use in takeAnAvailBalanceSlotFrom, make pick RR
private long lastPickPathHash;

public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
this.beId = beId;
this.lastPickPathHash = -1;
for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
}
Expand Down Expand Up @@ -2046,19 +2049,6 @@ public synchronized int getTotalAvailBalanceSlotNum() {
return num;
}

/**
* get path whose balance slot num is larger than 0
*/
public synchronized Set<Long> getAvailPathsForBalance() {
Set<Long> pathHashs = Sets.newHashSet();
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
if (entry.getValue().getAvailableBalance() > 0) {
pathHashs.add(entry.getKey());
}
}
return pathHashs;
}

public synchronized List<List<String>> getSlotInfo(long beId) {
List<List<String>> results = Lists.newArrayList();
pathSlots.forEach((key, value) -> {
Expand Down Expand Up @@ -2091,15 +2081,31 @@ public synchronized long takeBalanceSlot(long pathHash) {
return -1;
}

public synchronized long takeAnAvailBalanceSlotFrom(Set<Long> pathHashs) {
for (Long pathHash : pathHashs) {
Slot slot = pathSlots.get(pathHash);
if (slot == null) {
continue;
public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs) {
if (pathHashs.isEmpty()) {
return -1;
}

Collections.sort(pathHashs);
synchronized (this) {
int preferSlotIndex = pathHashs.indexOf(lastPickPathHash) + 1;
if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) {
preferSlotIndex = 0;
}
if (slot.balanceUsed < slot.getBalanceTotal()) {
slot.balanceUsed++;
return pathHash;

for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
long pathHash = pathHashs.get(i);
if (takeBalanceSlot(pathHash) != -1) {
lastPickPathHash = pathHash;
return pathHash;
}
}
for (int i = 0; i < preferSlotIndex; i++) {
long pathHash = pathHashs.get(i);
if (takeBalanceSlot(pathHash) != -1) {
lastPickPathHash = pathHash;
return pathHash;
}
}
}
return -1;
Expand Down
64 changes: 64 additions & 0 deletions fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;

class PathSlotTest {

@Test
public void test() {
Config.balance_slot_num_per_path = 2;
Map<Long, TStorageMedium> paths = Maps.newHashMap();
List<Long> availPathHashs = Lists.newArrayList();
List<Long> expectPathHashs = Lists.newArrayList();
List<Long> gotPathHashs = Lists.newArrayList();
long startPath = 10001L;
long endPath = 10006L;
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
paths.put(pathHash, TStorageMedium.HDD);
availPathHashs.add(pathHash);
expectPathHashs.add(pathHash);
}
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
expectPathHashs.add(pathHash);
}
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
expectPathHashs.add(-1L);
}

PathSlot ps = new PathSlot(paths, 1L);
for (int i = 0; i < expectPathHashs.size(); i++) {
Collections.shuffle(availPathHashs);
gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs));
}
Assert.assertEquals(expectPathHashs, gotPathHashs);
}

}

0 comments on commit f5c0585

Please sign in to comment.