Skip to content

Commit

Permalink
[CELEBORN-1804] Shuffle environment metrics of RemoteShuffleEnvironme…
Browse files Browse the repository at this point in the history
…nt should use Shuffle.Remote metric group

### What changes were proposed in this pull request?

Shuffle environment metrics of `RemoteShuffleEnvironment` should use `Shuffle.Remote` metric group.

### Why are the changes needed?

Shuffle environment metrics of `RemoteShuffleEnvironment` uses incorrect netty metric group defined as `Shuffle.Netty`. Therefore, `RemoteShuffleEnvironment` should use remote metric group like `Shuffle.Remote` for shuffle environment metrics.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3032 from SteNicholas/CELEBORN-1804.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: Weijie Guo <[email protected]>
  • Loading branch information
SteNicholas authored and reswqa committed Dec 31, 2024
1 parent a572380 commit 56019c7
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.celeborn.plugin.flink;

import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup;
import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.*;
Expand Down Expand Up @@ -119,13 +120,13 @@ public boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo

public ShuffleIOOwnerContext createShuffleIOOwnerContext(
String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup parentGroup) {
MetricGroup nettyGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
MetricGroup remoteGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
return new ShuffleIOOwnerContext(
checkNotNull(ownerName),
checkNotNull(executionAttemptID),
parentGroup,
nettyGroup.addGroup(METRIC_GROUP_OUTPUT),
nettyGroup.addGroup(METRIC_GROUP_INPUT));
remoteGroup.addGroup(METRIC_GROUP_OUTPUT),
remoteGroup.addGroup(METRIC_GROUP_INPUT));
}

public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.celeborn.plugin.flink;

import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.registerShuffleMetrics;

import java.time.Duration;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.plugin.flink.metric;

import static org.apache.flink.util.Preconditions.checkNotNull;

import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;

/** Factory for remote shuffle service metrics. */
public class RemoteShuffleMetricFactory {

// shuffle environment level metrics: Shuffle.Remote.*

private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments";
private static final String METRIC_TOTAL_MEMORY = "TotalMemory";

private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments";
private static final String METRIC_AVAILABLE_MEMORY = "AvailableMemory";

private static final String METRIC_USED_MEMORY_SEGMENT = "UsedMemorySegments";
private static final String METRIC_USED_MEMORY = "UsedMemory";

public static final String METRIC_REQUESTED_MEMORY_USAGE = "RequestedMemoryUsage";

// task level metric group structure: Shuffle.Remote.<Input|Output>.Buffers

public static final String METRIC_GROUP_SHUFFLE = "Shuffle";
public static final String METRIC_GROUP_REMOTE = "Remote";

private RemoteShuffleMetricFactory() {}

public static void registerShuffleMetrics(
MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
checkNotNull(metricGroup);
checkNotNull(networkBufferPool);
internalRegisterShuffleMetrics(metricGroup, networkBufferPool);
}

private static void internalRegisterShuffleMetrics(
MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
MetricGroup networkGroup =
metricGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE);
networkGroup.gauge(
METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments);
networkGroup.gauge(METRIC_TOTAL_MEMORY, networkBufferPool::getTotalMemory);
networkGroup.gauge(
METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments);
networkGroup.gauge(METRIC_AVAILABLE_MEMORY, networkBufferPool::getAvailableMemory);
networkGroup.gauge(
METRIC_USED_MEMORY_SEGMENT, networkBufferPool::getNumberOfUsedMemorySegments);
networkGroup.gauge(METRIC_USED_MEMORY, networkBufferPool::getUsedMemory);
networkGroup.gauge(
METRIC_REQUESTED_MEMORY_USAGE, new RequestedMemoryUsageMetric(networkBufferPool));
}

public static MetricGroup createShuffleIOOwnerMetricGroup(MetricGroup parentGroup) {
return parentGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.plugin.flink.metric;

import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.View;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;

/**
* This is a small hack. Instead of spawning a custom thread to monitor {@link NetworkBufferPool}
* usage, we are re-using {@link View#update()} method for this purpose.
*/
public class RequestedMemoryUsageMetric implements Gauge<Integer>, View {

private final NetworkBufferPool networkBufferPool;

public RequestedMemoryUsageMetric(NetworkBufferPool networkBufferPool) {
this.networkBufferPool = networkBufferPool;
}

@Override
public Integer getValue() {
return networkBufferPool.getEstimatedRequestedSegmentsUsage();
}

@Override
public void update() {
networkBufferPool.maybeLogUsageWarning();
}
}

0 comments on commit 56019c7

Please sign in to comment.