Skip to content

Commit

Permalink
Add DefaultMinionTaskProgressManager that stores progress status in m…
Browse files Browse the repository at this point in the history
…emory
  • Loading branch information
shounakmk219 committed Dec 26, 2024
1 parent c56ede3 commit 073919d
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pinot.minion;

import org.apache.pinot.minion.event.DefaultMinionTaskProgressManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.tasks.MinionTaskProgressManager;
import org.apache.pinot.spi.utils.CommonConstants;


Expand All @@ -45,6 +47,16 @@ private static PinotConfiguration applyMinionConfigs(PinotConfiguration minionCo
return minionConfig;
}

@Override
public MinionTaskProgressManager getMinionTaskProgressManager() {
String maxNumStatusToTrackValue = _config.getProperty(DefaultMinionTaskProgressManager.MAX_NUM_STATUS_TO_TRACK);
if (maxNumStatusToTrackValue == null) {
return new DefaultMinionTaskProgressManager(DefaultMinionTaskProgressManager.DEFAULT_MAX_NUM_STATUS_TO_TRACK);
} else {
return new DefaultMinionTaskProgressManager(Integer.parseInt(maxNumStatusToTrackValue));
}
}

@Deprecated
public MinionStarter(PinotConfiguration config)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*/
public abstract class BaseMinionProgressObserverFactory implements MinionEventObserverFactory {

protected MinionTaskProgressManager _taskProgressManager;

/**
* Initializes the task executor factory.
*/
Expand All @@ -35,6 +37,7 @@ public void init(MinionTaskZkMetadataManager zkMetadataManager) {

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionTaskProgressManager taskProgressManager) {
_taskProgressManager = taskProgressManager;
}

/**
Expand All @@ -46,6 +49,6 @@ public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionTaskProgre
* Creates a new task event observer.
*/
public MinionEventObserver create() {
return new MinionProgressObserver();
return new MinionProgressObserver(_taskProgressManager);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.minion.event;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.spi.tasks.MinionTaskProgressManager;
import org.apache.pinot.spi.tasks.MinionTaskProgressStats;


public class DefaultMinionTaskProgressManager implements MinionTaskProgressManager {
public static final String MAX_NUM_STATUS_TO_TRACK = "pinot.minion.task.maxNumStatusToTrack";
public static final int DEFAULT_MAX_NUM_STATUS_TO_TRACK = 128;

private final int _maxNumStatusToTrack;
private final Map<String, MinionTaskProgressStats> _minionTaskProgressStatsMap = new HashMap<>();

public DefaultMinionTaskProgressManager(int maxNumStatusToTrack) {
_maxNumStatusToTrack = maxNumStatusToTrack;
}

@Override
public MinionTaskProgressStats getTaskProgress(String taskId) {
return _minionTaskProgressStatsMap.get(taskId);
}

@Override
public synchronized void setTaskProgress(String taskId, MinionTaskProgressStats progress) {
_minionTaskProgressStatsMap.put(taskId, progress);
List<MinionTaskProgressStats.StatusEntry> progressLogs = progress.getProgressLogs();
int logSize = progressLogs.size();
int startIndex = Math.max(logSize - _maxNumStatusToTrack, 0);
_minionTaskProgressStatsMap.get(taskId).setProgressLogs(progressLogs.subList(startIndex, logSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
package org.apache.pinot.minion.event;

import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.tasks.MinionTaskProgressManager;
import org.apache.pinot.spi.tasks.MinionTaskProgressStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,71 +40,76 @@
@ThreadSafe
public class MinionProgressObserver extends DefaultMinionEventObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(MinionProgressObserver.class);
// TODO: make this configurable
private static final int DEFAULT_MAX_NUM_STATUS_TO_TRACK = 128;

private final int _maxNumStatusToTrack;
private final Deque<StatusEntry> _lastStatus = new LinkedList<>();
private MinionTaskState _taskState;
private long _startTs;

public MinionProgressObserver() {
this(DEFAULT_MAX_NUM_STATUS_TO_TRACK);
}

public MinionProgressObserver(int maxNumStatusToTrack) {
_maxNumStatusToTrack = maxNumStatusToTrack;
_taskState = MinionTaskState.UNKNOWN;
protected MinionTaskState _taskState;
protected final Map<String, MinionTaskProgressStats.Timer> _stageTimes = new HashMap<>();
protected Set<String> _stages = new HashSet<>();
protected String _stage;
protected long _startTs;
protected long _endTs;
protected int _segmentsGenerated;
protected List<MinionTaskProgressStats.StatusEntry> _progressBuffer = new ArrayList<>();
protected MinionTaskProgressManager _progressManager;
protected String _taskId;

public MinionProgressObserver(MinionTaskProgressManager progressManager) {
_progressManager = progressManager;
}

@Override
public synchronized void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
_startTs = System.currentTimeMillis();
addStatus(_startTs, "Task started", MinionTaskState.IN_PROGRESS);
super.notifyTaskStart(pinotTaskConfig);
_taskState = MinionTaskState.IN_PROGRESS;
_taskId = pinotTaskConfig.getTaskId();
setStageStats(new MinionTaskProgressStats.StatusEntry(_startTs, _taskState.name(), "Task started"));
}

/**
* Invoked to update a minion task progress status.
*
* @param pinotTaskConfig Pinot task config
* @param progress progress status and its toString() returns sth meaningful.
*/
@Override
public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullable Object progress) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Update progress: {} for task: {}", progress, pinotTaskConfig.getTaskId());
public synchronized void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult) {
_endTs = System.currentTimeMillis();
if (executionResult instanceof List) {
List<Object> results = (List<Object>) executionResult;
_segmentsGenerated = results.size();
}
addStatus(System.currentTimeMillis(), (progress == null) ? "" : progress.toString(), MinionTaskState.IN_PROGRESS);
super.notifyProgress(pinotTaskConfig, progress);
_taskState = MinionTaskState.SUCCEEDED;
setStageStats(new MinionTaskProgressStats.StatusEntry(_endTs, _taskState.name(), "Task succeeded in "
+ (_endTs - _startTs) + "ms"));
flush();
}

@Override
@Nullable
public synchronized List<StatusEntry> getProgress() {
return new ArrayList<>(_lastStatus);
public synchronized void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
_endTs = System.currentTimeMillis();
_taskState = MinionTaskState.CANCELLED;
setStageStats(new MinionTaskProgressStats.StatusEntry(_endTs, _taskState.name(),
"Task got cancelled after " + (_endTs - _startTs) + "ms"));
flush();
}

@Override
public synchronized void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult) {
long endTs = System.currentTimeMillis();
addStatus(endTs, "Task succeeded in " + (endTs - _startTs) + "ms", MinionTaskState.SUCCEEDED);
super.notifyTaskSuccess(pinotTaskConfig, executionResult);
public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception e) {
_endTs = System.currentTimeMillis();
_taskState = MinionTaskState.ERROR;
setStageStats(new MinionTaskProgressStats.StatusEntry(_endTs, _taskState.name(),
"Task failed in " + (_endTs - _startTs) + "ms with error: " + ExceptionUtils.getStackTrace(e)));
flush();
}

@Override
public synchronized void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
long endTs = System.currentTimeMillis();
addStatus(endTs, "Task got cancelled after " + (endTs - _startTs) + "ms", MinionTaskState.CANCELLED);
super.notifyTaskCancelled(pinotTaskConfig);
public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullable Object progress) {
_taskState = MinionTaskState.IN_PROGRESS;
if (progress instanceof MinionTaskProgressStats.StatusEntry) {
setStageStats((MinionTaskProgressStats.StatusEntry) progress);
} else {
String progressMessage = progress == null ? "" : progress.toString();
setStageStats(new MinionTaskProgressStats.StatusEntry(_stage, progressMessage));
}
}

@Nullable
@Override
public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception e) {
long endTs = System.currentTimeMillis();
addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: " + ExceptionUtils.getStackTrace(e),
MinionTaskState.ERROR);
super.notifyTaskError(pinotTaskConfig, e);
public synchronized List<MinionTaskProgressStats.StatusEntry> getProgress() {
return getProgressStats().getProgressLogs();
}

@Override
Expand All @@ -113,34 +122,52 @@ public long getStartTs() {
return _startTs;
}

private void addStatus(long ts, String progress, MinionTaskState taskState) {
_taskState = taskState;
_lastStatus.addLast(new StatusEntry(ts, progress));
if (_lastStatus.size() > _maxNumStatusToTrack) {
_lastStatus.pollFirst();
public MinionTaskProgressStats getProgressStats() {
MinionTaskProgressStats minionTaskProgressStats = _progressManager.getTaskProgress(_taskId);
List<MinionTaskProgressStats.StatusEntry> progressLog = new ArrayList<>();
if (minionTaskProgressStats != null) {
progressLog.addAll(minionTaskProgressStats.getProgressLogs());
}
return buildProgressStats(progressLog);
}

public static class StatusEntry {
private final long _ts;
private final String _status;
private MinionTaskProgressStats buildProgressStats(List<MinionTaskProgressStats.StatusEntry> progressLog) {
progressLog.addAll(_progressBuffer);
return new MinionTaskProgressStats()
.setTaskId(_taskId)
.setCurrentStage(_stage)
.setCurrentState(_taskState.name())
.setStageTimes(_stageTimes)
.setStartTimestamp(_startTs)
.setEndTimestamp(_endTs)
.setSegmentsGenerated(_segmentsGenerated)
.setProgressLogs(progressLog);
}

public StatusEntry(long ts, String status) {
_ts = ts;
_status = status;
protected void setStageStats(MinionTaskProgressStats.StatusEntry progress) {
String incomingStage = progress.getStage();
if (_stage != null && !_stage.equals(incomingStage)) {
_stageTimes.get(_stage).stop();
}

public long getTs() {
return _ts;
if (_endTs != 0) {
_stage = incomingStage;
} else if (_stages.contains(incomingStage)) {
_stage = incomingStage;
_stageTimes.get(_stage).start();
}

public String getStatus() {
return _status;
_progressBuffer.add(progress);
if (_progressBuffer.size() >= _progressManager.getProgressBufferSize()) {
flush();
}
}

@Override
public String toString() {
return "StatusEntry{" + "_ts=" + _ts + ", _status=" + _status + '}';
}
public void flush() {
_progressManager.setTaskProgress(_taskId, getProgressStats());
_progressBuffer.clear();
}

public void setStages(Set<String> stages) {
_stages = stages;
stages.forEach(stage -> _stageTimes.put(stage, new MinionTaskProgressStats.Timer()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Map;
import javax.ws.rs.WebApplicationException;
import org.apache.pinot.minion.event.DefaultMinionTaskProgressManager;
import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.minion.event.MinionProgressObserver;
Expand All @@ -37,15 +38,16 @@ public class PinotTaskProgressResourceTest {
@Test
public void testGetGivenSubtaskOrStateProgress()
throws IOException {
MinionEventObserver observer1 = new MinionProgressObserver();
DefaultMinionTaskProgressManager taskProgressManager = new DefaultMinionTaskProgressManager(128);
MinionEventObserver observer1 = new MinionProgressObserver(taskProgressManager);
observer1.notifyTaskStart(null);
MinionEventObservers.getInstance().addMinionEventObserver("t01", observer1);

MinionEventObserver observer2 = new MinionProgressObserver();
MinionEventObserver observer2 = new MinionProgressObserver(taskProgressManager);
observer2.notifyProgress(null, "");
MinionEventObservers.getInstance().addMinionEventObserver("t02", observer2);

MinionEventObserver observer3 = new MinionProgressObserver();
MinionEventObserver observer3 = new MinionProgressObserver(taskProgressManager);
observer3.notifyTaskSuccess(null, "");
MinionEventObservers.getInstance().addMinionEventObserver("t03", observer3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testCleanupImmediately() {
MinionConf config = new MinionConf();
MinionEventObservers.init(config, null);
for (String taskId : new String[]{"t01", "t02", "t03"}) {
MinionEventObserver observer = new MinionProgressObserver();
MinionEventObserver observer = new MinionProgressObserver(new DefaultMinionTaskProgressManager(128));
MinionEventObservers.getInstance().addMinionEventObserver(taskId, observer);
assertSame(MinionEventObservers.getInstance().getMinionEventObserver(taskId), observer);
MinionEventObservers.getInstance().removeMinionEventObserver(taskId);
Expand All @@ -54,7 +54,7 @@ public void testCleanupWithDelay() {
MinionEventObservers.init(config, executor);
String[] taskIds = new String[]{"t01", "t02", "t03"};
for (String taskId : taskIds) {
MinionEventObserver observer = new MinionProgressObserver();
MinionEventObserver observer = new MinionProgressObserver(new DefaultMinionTaskProgressManager(128));
MinionEventObservers.getInstance().addMinionEventObserver(taskId, observer);
assertSame(MinionEventObservers.getInstance().getMinionEventObserver(taskId), observer);
MinionEventObservers.getInstance().removeMinionEventObserver(taskId);
Expand All @@ -69,15 +69,15 @@ public void testCleanupWithDelay() {

@Test
public void testGetMinionEventObserverWithGivenState() {
MinionEventObserver observer1 = new MinionProgressObserver();
MinionEventObserver observer1 = new MinionProgressObserver(new DefaultMinionTaskProgressManager(128));
observer1.notifyTaskStart(null);
MinionEventObservers.getInstance().addMinionEventObserver("t01", observer1);

MinionEventObserver observer2 = new MinionProgressObserver();
MinionEventObserver observer2 = new MinionProgressObserver(new DefaultMinionTaskProgressManager(128));
observer2.notifyProgress(null, "");
MinionEventObservers.getInstance().addMinionEventObserver("t02", observer2);

MinionEventObserver observer3 = new MinionProgressObserver();
MinionEventObserver observer3 = new MinionProgressObserver(new DefaultMinionTaskProgressManager(128));
observer3.notifyTaskSuccess(null, "");
MinionEventObservers.getInstance().addMinionEventObserver("t03", observer3);

Expand Down
Loading

0 comments on commit 073919d

Please sign in to comment.