Skip to content

Commit

Permalink
Collect the detailed execution error log to Yarn diagnostics (#653)
Browse files Browse the repository at this point in the history
Signed-off-by: zhangjunfan <[email protected]>
  • Loading branch information
zuston authored Mar 10, 2022
1 parent 7cd4438 commit 1961403
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 27 deletions.
2 changes: 1 addition & 1 deletion tony-core/src/main/java/com/linkedin/tony/Framework.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public interface TaskExecutorAdapter {

default int executorPythonShell(TaskExecutor executor) throws IOException, InterruptedException {
return Utils.executeShell(executor.getTaskCommand(), executor.getExecutionTimeout(),
executor.getShellEnv(), executor.getPythonStdErrFile(), executor.getPythonStdOutFile());
executor.getShellEnv(), executor.getExecutionStdErrFile(), executor.getExecutionStdOutFile());
}
}
}
70 changes: 67 additions & 3 deletions tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
*/
package com.linkedin.tony;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -81,6 +85,7 @@ public class TaskExecutor implements AutoCloseable {
private volatile boolean markedAsLostConnectionWithAM = false;

private String containerLogDir;
private int executionErrorMsgOutputMaxDepth;

@VisibleForTesting
public TaskExecutor() { }
Expand Down Expand Up @@ -230,7 +235,23 @@ public static void main(String[] unused) throws Exception {
executor.registerExecutionResult(exitCode, executor.jobName, String.valueOf(executor.taskIndex));

LOG.info("Child process exited with exit code: " + exitCode);
System.exit(exitCode);

if (exitCode == 0) {
System.exit(0);
}

String errorMsg = executor.getExecutionErrorLog();

try {
Utils.shutdownThreadPool(executor.scheduledThreadPool);
if (!childProcessFuture.isDone()) {
childProcessFuture.cancel(true);
}
} catch (Exception e) {
System.exit(exitCode);
}

throw new Exception("Execution exit code: " + exitCode + ", error messages: \n" + errorMsg);
}
}

Expand Down Expand Up @@ -284,6 +305,8 @@ protected void initConfigs() {
TonyConfigurationKeys.DEFAULT_TASK_EXECUTOR_MAX_REGISTRY_SEC);

containerLogDir = System.getProperty(YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR);
executionErrorMsgOutputMaxDepth = tonyConf.getInt(TonyConfigurationKeys.TASK_EXECUTOR_EXECUTION_ERROR_MESSAGE_MAX_DEPTH,
TonyConfigurationKeys.DEFAULT_TASK_EXECUTOR_EXECUTION_ERROR_MESSAGE_MAX_DEPTH);

Utils.initYarnConf(yarnConf);
Utils.initHdfsConf(hdfsConf);
Expand Down Expand Up @@ -491,11 +514,52 @@ public void setTaskCommand(String taskCommand) {
this.taskCommand = taskCommand;
}

public String getPythonStdErrFile() {
public String getExecutionStdErrFile() {
return String.format("%s%s%s", containerLogDir, File.separatorChar, Constants.TASK_EXECUTOR_EXECUTION_STDERR_FILENAME);
}

public String getPythonStdOutFile() {
public String getExecutionStdOutFile() {
return String.format("%s%s%s", containerLogDir, File.separatorChar, Constants.TASK_EXECUTOR_EXECUTION_STDOUT_FILENAME);
}

/** Only for test case. */
@VisibleForTesting
protected void setContainerLogDir(String containerLogDir) {
this.containerLogDir = containerLogDir;
}

/** Only for test case. */
@VisibleForTesting
protected void setExecutionErrorMsgOutputMaxDepth(int executionErrorMsgOutputMaxDepth) {
this.executionErrorMsgOutputMaxDepth = executionErrorMsgOutputMaxDepth;
}

@VisibleForTesting
protected String getExecutionErrorLog() throws IOException {
String executionErrorFile = getExecutionStdErrFile();
File file = new File(executionErrorFile);
if (!file.exists()) {
return StringUtils.EMPTY;
}

try (FileInputStream inputStream = new FileInputStream(executionErrorFile);
BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader(inputStream, Charset.forName("UTF-8")))) {
String line;
int lineNumber = 0;
StringBuilder errorMsgBuilder = new StringBuilder();
while (true) {
line = bufferedReader.readLine();
if (line == null) {
break;
}
if (lineNumber++ >= executionErrorMsgOutputMaxDepth) {
break;
}
errorMsgBuilder.append(line);
errorMsgBuilder.append("\n");
}
return errorMsgBuilder.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ private TonyConfigurationKeys() {
public static final String TASK_EXECUTOR_JVM_OPTS = TONY_TASK_PREFIX + "executor.jvm.opts";
public static final String DEFAULT_TASK_EXECUTOR_JVM_OPTS = "-Xmx1536m";

public static final String TASK_EXECUTOR_EXECUTION_ERROR_MESSAGE_MAX_DEPTH = TONY_TASK_PREFIX + "executor.execution.error-message-max-depth";
public static final int DEFAULT_TASK_EXECUTOR_EXECUTION_ERROR_MESSAGE_MAX_DEPTH = 20;

public static final String TASK_DEFAULT_JVM_OPTS = TONY_TASK_PREFIX + "default.jvm.opts";

public static final String TASK_HEARTBEAT_INTERVAL_MS = TONY_TASK_PREFIX + "heartbeat-interval-ms";
Expand Down
20 changes: 20 additions & 0 deletions tony-core/src/main/java/com/linkedin/tony/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -873,5 +874,24 @@ public static boolean existRunningTasksWithJobtype(List<TonySession.TonyTask> ru
return runningTasks.stream().anyMatch(x -> x.getJobName().equals(jobtype));
}

public static void shutdownThreadPool(ExecutorService executor) {
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.error("Thread pool did not terminate");
}
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

private Utils() { }
}
5 changes: 5 additions & 0 deletions tony-core/src/main/resources/tony-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -431,4 +431,9 @@
<name>tony.horovod.driver.mode.debug</name>
<value>false</value>
</property>

<property>
<name>tony.task.executor.execution.error-message-max-depth</name>
<value>20</value>
</property>
</configuration>
38 changes: 38 additions & 0 deletions tony-core/src/test/java/com/linkedin/tony/TestTaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
*/
package com.linkedin.tony;

import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

public class TestTaskExecutor {
Expand All @@ -28,4 +32,38 @@ public void testTaskExecutorConfShouldThrowException() throws Exception {
// Should throw exception since we didn't set up Task Command.
taskExecutor.initConfigs();
}

@Test
public void testGetExecutionErrorLog() throws IOException {
TaskExecutor executor = new TaskExecutor();

File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();

executor.setContainerLogDir(tmpDir.getAbsolutePath());

String errorFile = executor.getExecutionStdErrFile();

executor.setExecutionErrorMsgOutputMaxDepth(20);

String errorContent = "hello world-1\n"
+ "hello world-2\n"
+ "hello world-3\n"
+ "hello world-4\n"
+ "hello world-5\n"
+ "hello world-6\n"
+ "hello world-7\n"
+ "hello world-8\n"
+ "hello world-9\n"
+ "hello world-10\n";

FileUtils.writeStringToFile(new File(errorFile), errorContent);

String captureout = executor.getExecutionErrorLog();
Assert.assertEquals(captureout, errorContent);

executor.setExecutionErrorMsgOutputMaxDepth(5);
captureout = executor.getExecutionErrorLog();
Assert.assertEquals(5, captureout.split("\n").length);
}
}
27 changes: 4 additions & 23 deletions tony-core/src/test/java/com/linkedin/tony/TestTonyE2E.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.HashSet;

import com.linkedin.tony.util.Utils;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -36,7 +37,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static com.linkedin.tony.TaskExecutor.MARK_LOST_CONNECTION_ENV_KEY;
Expand Down Expand Up @@ -637,30 +637,11 @@ public void testTonyAMStartupTimeoutShouldFail() throws ParseException, IOExcept
int exitCode = client.start();
Assert.assertEquals(exitCode, -1);

shutdownExecutor(executorService);
}

private void shutdownExecutor(ExecutorService executor) {
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.error("Thread pool did not terminate");
}
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
Utils.shutdownThreadPool(executorService);
}

private Future<?> mockedTask(ExecutorService service) {
return service.submit((Runnable) () -> {
return service.submit(() -> {
TonyClient client = new TonyClient(conf);
try {
client.init(new String[]{
Expand Down Expand Up @@ -806,7 +787,7 @@ public void testTaskWithDependencyTimeAndIgnoredButFailedShouldPass() throws Exc
Assert.assertEquals(exitCode, -1);
}

@Test
@Test(enabled = false)
public void testLostConnectionWithAMJobShouldFail() throws Exception {
client.init(new String[]{
"--src_dir", "tony-core/src/test/resources/scripts",
Expand Down

0 comments on commit 1961403

Please sign in to comment.