From 34b7b2636334deb9f150ee6cefe6b50072fbc456 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Echo=F0=9F=8C=9F?= <70749713+liiuzq-xiaobai@users.noreply.github.com> Date: Mon, 8 Jul 2024 13:51:00 +0800 Subject: [PATCH] Fix job-master leak memory when submitting distributed jobs fix:fix job-master leak memory when submitting a large number of distributed jobs(DIST_LOAD/DIST_CP/Persist jobs) ### What changes are proposed in this pull request? Start a periodic thread to clear expired jobs information that cannot be trace by the client in CmdJobTracker.The default retention time is 1day,which is the same configuration as LoadV2. ### Why are the changes needed? When many jobs are submitted,the job master finally will have an oom problem, we can find that the cmdJobTracker retains the residual job information and not cleaned regularly, resulting in memory leaks. ### Does this PR introduce any user facing changes? Please list the user-facing changes introduced by your change, including 1.add Configuration: alluxio.job.master.job.trace.retention.time=xx,the default value is 1d. Related issue: #18635 pr-link: Alluxio/alluxio#18639 change-id: cid-d4e5853a1818a22c8a0411a27bfe1141c6f24ebd --- .../main/java/alluxio/conf/PropertyKey.java | 8 +++ .../main/java/alluxio/job/wire/Status.java | 7 ++ .../java/alluxio/master/job/JobMaster.java | 2 +- .../alluxio/master/job/plan/PlanTracker.java | 20 +++++- .../master/job/tracker/CmdJobTracker.java | 70 +++++++++++++++++-- .../master/job/plan/PlanTrackerTest.java | 13 ++++ .../master/job/tracker/CmdJobTrackerTest.java | 48 +++++++++++-- 7 files changed, 155 insertions(+), 13 deletions(-) diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index ea6be69160da..396f4d566fda 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -7357,6 +7357,12 @@ public String toString() { .setDefaultValue("60sec") .setScope(Scope.MASTER) .build(); + public static final PropertyKey JOB_MASTER_JOB_TRACE_RETENTION_TIME = + durationBuilder(Name.JOB_MASTER_JOB_TRACE_RETENTION_TIME) + .setDescription("The length of time the client can trace the submitted job.") + .setDefaultValue("1d") + .setScope(Scope.MASTER) + .build(); public static final PropertyKey JOB_MASTER_JOB_CAPACITY = longBuilder(Name.JOB_MASTER_JOB_CAPACITY) .setDescription("The total possible number of available job statuses in the job master. " @@ -9191,6 +9197,8 @@ public static final class Name { "alluxio.job.master.finished.job.purge.count"; public static final String JOB_MASTER_FINISHED_JOB_RETENTION_TIME = "alluxio.job.master.finished.job.retention.time"; + public static final String JOB_MASTER_JOB_TRACE_RETENTION_TIME = + "alluxio.job.master.job.trace.retention.time"; public static final String JOB_MASTER_JOB_CAPACITY = "alluxio.job.master.job.capacity"; public static final String JOB_MASTER_MASTER_HEARTBEAT_INTERVAL = "alluxio.job.master.master.heartbeat.interval"; diff --git a/job/common/src/main/java/alluxio/job/wire/Status.java b/job/common/src/main/java/alluxio/job/wire/Status.java index 1041b482ac6c..74826b99c56b 100644 --- a/job/common/src/main/java/alluxio/job/wire/Status.java +++ b/job/common/src/main/java/alluxio/job/wire/Status.java @@ -27,6 +27,13 @@ public boolean isFinished() { return this.equals(CANCELED) || this.equals(FAILED) || this.equals(COMPLETED); } + /** + * @return whether this status represents a Completed state + */ + public boolean isCompleted() { + return this.equals(COMPLETED); + } + /** * @return proto representation of the status */ diff --git a/job/server/src/main/java/alluxio/master/job/JobMaster.java b/job/server/src/main/java/alluxio/master/job/JobMaster.java index 8dc21170b26b..d8562fa76549 100644 --- a/job/server/src/main/java/alluxio/master/job/JobMaster.java +++ b/job/server/src/main/java/alluxio/master/job/JobMaster.java @@ -211,7 +211,7 @@ public JobMaster(MasterContext masterContext, FileSystem filesystem, mWorkerHealth = new ConcurrentHashMap<>(); mCmdJobTracker = new CmdJobTracker( - fsContext, this); + fsContext, this, mPlanTracker); MetricsSystem.registerGaugeIfAbsent( MetricKey.MASTER_JOB_COUNT.getName(), diff --git a/job/server/src/main/java/alluxio/master/job/plan/PlanTracker.java b/job/server/src/main/java/alluxio/master/job/plan/PlanTracker.java index 7b939a3093e3..7b3cd5cdd777 100644 --- a/job/server/src/main/java/alluxio/master/job/plan/PlanTracker.java +++ b/job/server/src/main/java/alluxio/master/job/plan/PlanTracker.java @@ -35,13 +35,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -86,7 +86,7 @@ public class PlanTracker { private final SortedSet mFailed; /** A FIFO queue used to track jobs which have status {@link Status#isFinished()} as true. */ - private final LinkedBlockingQueue mFinished; + private final LinkedList mFinished; private final WorkflowTracker mWorkflowTracker; @@ -114,7 +114,7 @@ public PlanTracker(long capacity, long retentionMs, } return Long.signum(right.getId() - left.getId()); })); - mFinished = new LinkedBlockingQueue<>(); + mFinished = new LinkedList<>(); mWorkflowTracker = workflowTracker; } @@ -300,6 +300,20 @@ public Set findJobs(String name, List statusList) { .map(Map.Entry::getKey).collect(Collectors.toSet()); } + /** + * Remove expired jobs in PlanTracker. + * @param jobIds the list of removed jobId + */ + public void removeJobs(List jobIds) { + mWorkflowTracker.cleanup(jobIds); + for (Long jobId : jobIds) { + PlanInfo removedPlanInfo = mCoordinators.get(jobId).getPlanInfo(); + mCoordinators.remove(jobId); + mFailed.remove(removedPlanInfo); + mFinished.remove(removedPlanInfo); + } + } + private void checkActiveSetReplicaJobs(JobConfig jobConfig) throws JobDoesNotExistException { if (jobConfig instanceof SetReplicaConfig) { Set> activeJobs = mCoordinators.values().stream() diff --git a/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java b/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java index 2b7e6246d47f..bd7d65a6203d 100644 --- a/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java +++ b/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java @@ -13,8 +13,11 @@ import alluxio.AlluxioURI; import alluxio.client.file.FileSystemContext; +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; import alluxio.exception.ExceptionMessage; import alluxio.exception.JobDoesNotExistException; +import alluxio.grpc.OperationType; import alluxio.job.CmdConfig; import alluxio.job.cmd.load.LoadCliConfig; import alluxio.job.cmd.migrate.MigrateCliConfig; @@ -24,11 +27,13 @@ import alluxio.job.wire.Status; import alluxio.master.job.JobMaster; import alluxio.master.job.common.CmdInfo; +import alluxio.master.job.plan.PlanTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -36,6 +41,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.concurrent.ThreadSafe; @@ -43,7 +51,7 @@ * CmdJobTracker to schedule a Cmd job to run. */ @ThreadSafe -public class CmdJobTracker { +public class CmdJobTracker implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(CmdJobTracker.class); private final Map mInfoMap = new ConcurrentHashMap<>(0, 0.95f, Math.max(8, 2 * Runtime.getRuntime().availableProcessors())); @@ -52,18 +60,29 @@ public class CmdJobTracker { private final PersistRunner mPersistRunner; protected FileSystemContext mFsContext; public static final String DELIMITER = ","; + private final ScheduledExecutorService mScheduleCleanExecutor; + private final Long mTraceRetentionTime; + + private final PlanTracker mPlanTracker; /** * Create a new instance of {@link CmdJobTracker}. * @param fsContext filesystem context * @param jobMaster the job master + * @param planTracker the planTracker */ public CmdJobTracker(FileSystemContext fsContext, - JobMaster jobMaster) { + JobMaster jobMaster, PlanTracker planTracker) { mFsContext = fsContext; mDistLoadCliRunner = new DistLoadCliRunner(mFsContext, jobMaster); mMigrateCliRunner = new MigrateCliRunner(mFsContext, jobMaster); mPersistRunner = new PersistRunner(mFsContext, jobMaster); + mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor(); + mScheduleCleanExecutor.scheduleAtFixedRate(this:: + cleanExpiredJobInfos, 60, 600, TimeUnit.SECONDS); + mTraceRetentionTime = Configuration.getMs( + PropertyKey.JOB_MASTER_JOB_TRACE_RETENTION_TIME); + mPlanTracker = planTracker; } /** @@ -72,15 +91,25 @@ public CmdJobTracker(FileSystemContext fsContext, * @param distLoadCliRunner DistributedLoad runner * @param migrateCliRunner DistributedCopy runner * @param persistRunner Persist runner + * @param retentionTime job retention time + * @param planTracker the planTracker */ public CmdJobTracker(FileSystemContext fsContext, DistLoadCliRunner distLoadCliRunner, MigrateCliRunner migrateCliRunner, - PersistRunner persistRunner) { + PersistRunner persistRunner, + Long retentionTime, + PlanTracker planTracker + ) { mFsContext = fsContext; mDistLoadCliRunner = distLoadCliRunner; mMigrateCliRunner = migrateCliRunner; mPersistRunner = persistRunner; + mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor(); + mScheduleCleanExecutor.scheduleAtFixedRate(this:: + cleanExpiredJobInfos, 60, 600, TimeUnit.SECONDS); + mTraceRetentionTime = retentionTime; + mPlanTracker = planTracker; } /** @@ -134,7 +163,7 @@ private void runDistributedCommand(CmdConfig cmdConfig, long jobControlId) /** * Get status information for a CMD. - * @param jobControlId + * @param jobControlId jobControlId to trace a CMD * @return the Command level status */ public Status getCmdStatus(long jobControlId) throws JobDoesNotExistException { @@ -270,4 +299,37 @@ public CmdStatusBlock getCmdStatusBlock(long jobControlId) .collect(Collectors.toList()); return new CmdStatusBlock(cmdInfo.getJobControlId(), blockList, cmdInfo.getOperationType()); } + + private void cleanExpiredJobInfos() { + long currentTime = System.currentTimeMillis(); + for (Map.Entry x : mInfoMap.entrySet()) { + CmdInfo cmdInfo = x.getValue(); + List cleanedJobsId = new ArrayList<>(); + if (OperationType.DIST_LOAD.equals(cmdInfo.getOperationType()) + && currentTime - cmdInfo.getJobSubmissionTime() > mTraceRetentionTime) { + try { + Status jobStatus = getCmdStatus(cmdInfo.getJobControlId()); + if (jobStatus.isFinished()) { + for (CmdRunAttempt runAttempt : cmdInfo.getCmdRunAttempt()) { + cleanedJobsId.add(runAttempt.getJobId()); + } + mPlanTracker.removeJobs(cleanedJobsId); + mInfoMap.remove(cmdInfo.getJobControlId()); + LOG.info("JobControlId:{} has been cleaned in CmdJobTracker," + + " client will not trace the job anymore.The filePaths in CmdInfo are:{}", + cmdInfo.getJobControlId(), String.join(", ", cmdInfo.getFilePath())); + } + } catch (JobDoesNotExistException e) { + LOG.warn("JobControlId:{} can not find in CmdJobTracker when clean expired Job" + + "with unexpected exception.The filePaths in CmdInfo are:{}", + cmdInfo.getJobControlId(), String.join(", ", cmdInfo.getFilePath())); + } + } + } + } + + @Override + public void close() throws Exception { + mScheduleCleanExecutor.shutdown(); + } } diff --git a/job/server/src/test/java/alluxio/master/job/plan/PlanTrackerTest.java b/job/server/src/test/java/alluxio/master/job/plan/PlanTrackerTest.java index 09a7d20b43c4..3e54efa75ce7 100644 --- a/job/server/src/test/java/alluxio/master/job/plan/PlanTrackerTest.java +++ b/job/server/src/test/java/alluxio/master/job/plan/PlanTrackerTest.java @@ -40,6 +40,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -145,6 +146,18 @@ public void testGetCoordinator() throws Exception { ((Queue) AlluxioMockUtil.getInternalState(mTracker, "mFinished")).size()); } + @Test + public void removeExpiredJobsInfo() throws Exception { + List jobIdList = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + jobIdList.add(addJob(100)); + } + mTracker.removeJobs(jobIdList); + for (Long jobId : jobIdList) { + assertNull("job id should not exist", mTracker.getCoordinator(jobId)); + } + } + @Test public void testDuplicateSetReplicaJobs() throws Exception { long jobId = mJobIdGenerator.getNewJobId(); diff --git a/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java b/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java index dfc0575fa685..ed0a4cabe5c0 100644 --- a/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java +++ b/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java @@ -26,7 +26,10 @@ import alluxio.job.wire.JobSource; import alluxio.job.wire.SimpleJobStatusBlock; import alluxio.job.wire.Status; +import alluxio.master.job.JobMaster; import alluxio.master.job.common.CmdInfo; +import alluxio.master.job.plan.PlanTracker; +import alluxio.master.job.workflow.WorkflowTracker; import com.beust.jcommander.internal.Lists; import org.junit.Assert; @@ -46,6 +49,7 @@ public final class CmdJobTrackerTest { private static final int REPEATED_ATTEMPT_COUNT = 5; private static final int ONE_ATTEMPT = 1; + private static final long CAPACITY = 100; private CmdJobTracker mCmdJobTracker; private FileSystem mFs; @@ -55,10 +59,14 @@ public final class CmdJobTrackerTest { private MigrateCliRunner mMigrateCliRunner; private DistLoadCliRunner mDistLoadRunner; private PersistRunner mPersistRunner; - + private PlanTracker mPlanTracker; private LoadCliConfig mLoad; private MigrateCliConfig mMigrate; private List mSearchingCriteria = Lists.newArrayList(); + private WorkflowTracker mWorkflowTracker; + private JobMaster mMockJobMaster; + + private Long mRetentionTime; @Rule public ExpectedException mException = ExpectedException.none(); @@ -66,14 +74,17 @@ public final class CmdJobTrackerTest { @Before public void before() throws Exception { mFs = mock(FileSystem.class); + mRetentionTime = 1000L; FileSystemContext fsCtx = mock(FileSystemContext.class); mMigrateCliRunner = mock(MigrateCliRunner.class); mDistLoadRunner = mock(DistLoadCliRunner.class); mPersistRunner = mock(PersistRunner.class); - + mPlanTracker = mock(PlanTracker.class); + mMockJobMaster = mock(JobMaster.class); + mWorkflowTracker = new WorkflowTracker(mMockJobMaster); mCmdJobTracker = new CmdJobTracker(fsCtx, - mDistLoadRunner, mMigrateCliRunner, mPersistRunner); + mDistLoadRunner, mMigrateCliRunner, mPersistRunner, mRetentionTime, mPlanTracker); mLoad = new LoadCliConfig("/path/to/load", 3, 1, Collections.EMPTY_SET, Collections.EMPTY_SET, Collections.EMPTY_SET, Collections.EMPTY_SET, true); @@ -97,6 +108,33 @@ public void runDistLoadBatchCompleteTest() throws Exception { Assert.assertEquals(s, Status.COMPLETED); } + @Test + public void runCleanExpiredJobsTest() throws Exception { + generateLoadCommandForStatus(Status.CANCELED); + generateLoadCommandForStatus(Status.RUNNING); + generateLoadCommandForStatus(Status.FAILED); + generateLoadCommandForStatus(Status.COMPLETED); + generateLoadCommandForStatus(Status.CREATED); + Thread.sleep(70000L); + // the expired job has been cleaned in mInfoMap + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.CANCELED); + Set cancelCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(0, cancelCmdIds.size()); + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.COMPLETED); + Set completedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(0, completedCmdIds.size()); + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.FAILED); + Set failedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(0, failedCmdIds.size()); + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.RUNNING); + Set runningCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(2, runningCmdIds.size()); + } + @Test public void runDistLoadBatchFailTest() throws Exception { CmdInfo cmdInfo = new CmdInfo(mLoadJobId, OperationType.DIST_LOAD, @@ -316,7 +354,7 @@ public void testGetCmdStatusBlock() throws Exception { // Below are all help functions. private void prepareDistLoadTest( - CmdInfo cmdInfo, LoadCliConfig loadCliConfig, long loadId) throws Exception { + CmdInfo cmdInfo, LoadCliConfig loadCliConfig, long loadId) throws Exception { AlluxioURI filePath = new AlluxioURI(loadCliConfig.getFilePath()); int replication = loadCliConfig.getReplication(); Set workerSet = loadCliConfig.getWorkerSet(); @@ -325,7 +363,7 @@ private void prepareDistLoadTest( Set excludedLocalityIds = loadCliConfig.getExcludedLocalityIds(); boolean directCache = loadCliConfig.getDirectCache(); int batch = loadCliConfig.getBatchSize(); - + // Mock the behavior of runDistLoad when(mDistLoadRunner.runDistLoad(batch, filePath, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, loadId)) .thenReturn(cmdInfo);