diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java index bf344c3357f..ed665c27c28 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java @@ -17,6 +17,7 @@ package org.apache.celeborn.plugin.flink; +import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup; import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull; import static org.apache.celeborn.plugin.flink.utils.Utils.checkState; import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.*; @@ -119,13 +120,13 @@ public boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo public ShuffleIOOwnerContext createShuffleIOOwnerContext( String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup parentGroup) { - MetricGroup nettyGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup)); + MetricGroup remoteGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup)); return new ShuffleIOOwnerContext( checkNotNull(ownerName), checkNotNull(executionAttemptID), parentGroup, - nettyGroup.addGroup(METRIC_GROUP_OUTPUT), - nettyGroup.addGroup(METRIC_GROUP_INPUT)); + remoteGroup.addGroup(METRIC_GROUP_OUTPUT), + remoteGroup.addGroup(METRIC_GROUP_INPUT)); } public Collection getPartitionsOccupyingLocalResources() { diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java index de1d7320080..8927de331b4 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java @@ -18,7 +18,7 @@ package org.apache.celeborn.plugin.flink; -import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics; +import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.registerShuffleMetrics; import java.time.Duration; diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java new file mode 100644 index 00000000000..f784a201fdf --- /dev/null +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink.metric; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; + +/** Factory for remote shuffle service metrics. */ +public class RemoteShuffleMetricFactory { + + // shuffle environment level metrics: Shuffle.Remote.* + + private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments"; + private static final String METRIC_TOTAL_MEMORY = "TotalMemory"; + + private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments"; + private static final String METRIC_AVAILABLE_MEMORY = "AvailableMemory"; + + private static final String METRIC_USED_MEMORY_SEGMENT = "UsedMemorySegments"; + private static final String METRIC_USED_MEMORY = "UsedMemory"; + + public static final String METRIC_REQUESTED_MEMORY_USAGE = "RequestedMemoryUsage"; + + // task level metric group structure: Shuffle.Remote..Buffers + + public static final String METRIC_GROUP_SHUFFLE = "Shuffle"; + public static final String METRIC_GROUP_REMOTE = "Remote"; + + private RemoteShuffleMetricFactory() {} + + public static void registerShuffleMetrics( + MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { + checkNotNull(metricGroup); + checkNotNull(networkBufferPool); + internalRegisterShuffleMetrics(metricGroup, networkBufferPool); + } + + private static void internalRegisterShuffleMetrics( + MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { + MetricGroup networkGroup = + metricGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE); + networkGroup.gauge( + METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); + networkGroup.gauge(METRIC_TOTAL_MEMORY, networkBufferPool::getTotalMemory); + networkGroup.gauge( + METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); + networkGroup.gauge(METRIC_AVAILABLE_MEMORY, networkBufferPool::getAvailableMemory); + networkGroup.gauge( + METRIC_USED_MEMORY_SEGMENT, networkBufferPool::getNumberOfUsedMemorySegments); + networkGroup.gauge(METRIC_USED_MEMORY, networkBufferPool::getUsedMemory); + networkGroup.gauge( + METRIC_REQUESTED_MEMORY_USAGE, new RequestedMemoryUsageMetric(networkBufferPool)); + } + + public static MetricGroup createShuffleIOOwnerMetricGroup(MetricGroup parentGroup) { + return parentGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE); + } +} diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RequestedMemoryUsageMetric.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RequestedMemoryUsageMetric.java new file mode 100644 index 00000000000..0d3d6e8a620 --- /dev/null +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RequestedMemoryUsageMetric.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink.metric; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.View; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; + +/** + * This is a small hack. Instead of spawning a custom thread to monitor {@link NetworkBufferPool} + * usage, we are re-using {@link View#update()} method for this purpose. + */ +public class RequestedMemoryUsageMetric implements Gauge, View { + + private final NetworkBufferPool networkBufferPool; + + public RequestedMemoryUsageMetric(NetworkBufferPool networkBufferPool) { + this.networkBufferPool = networkBufferPool; + } + + @Override + public Integer getValue() { + return networkBufferPool.getEstimatedRequestedSegmentsUsage(); + } + + @Override + public void update() { + networkBufferPool.maybeLogUsageWarning(); + } +}