From bfca1b9c67c28ec13f909d193b1f2a36e160a3e8 Mon Sep 17 00:00:00 2001 From: Zhong Date: Wed, 20 Jun 2018 17:46:10 +0800 Subject: [PATCH 1/2] KYLIN-3421 improve the fetcher runner in job scheduler --- .../job/impl/threadpool/DefaultFetcherRunner.java | 104 ++++++++++ .../job/impl/threadpool/DefaultScheduler.java | 210 ++------------------- .../job/impl/threadpool/DistributedScheduler.java | 116 ++++-------- .../kylin/job/impl/threadpool/FetcherRunner.java | 77 ++++++++ .../kylin/job/impl/threadpool/JobExecutor.java | 25 +++ .../job/impl/threadpool/PriorityFetcherRunner.java | 146 ++++++++++++++ .../job/impl/threadpool/BaseSchedulerTest.java | 2 +- 7 files changed, 401 insertions(+), 279 deletions(-) create mode 100644 core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java create mode 100644 core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java create mode 100644 core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java create mode 100644 core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java new file mode 100644 index 0000000..e5f15fe --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.impl.threadpool; + +import java.util.Map; + +import org.apache.kylin.common.util.SetThreadName; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.Output; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultFetcherRunner extends FetcherRunner { + + private static final Logger logger = LoggerFactory.getLogger(DefaultFetcherRunner.class); + + public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, + ExecutableManager executableManager, JobExecutor jobExecutor) { + super(jobEngineConfig, context, executableManager, jobExecutor); + } + + @Override + synchronized public void run() { + try (SetThreadName ignored = new SetThreadName(// + "FetcherRunner %s", System.identityHashCode(this))) {// + // logger.debug("Job Fetcher is running..."); + Map runningJobs = context.getRunningJobs(); + if (isJobPoolFull()) { + return; + } + + int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; + for (final String id : executableManager.getAllJobIds()) { + if (isJobPoolFull()) { + return; + } + if (runningJobs.containsKey(id)) { + // logger.debug("Job id:" + id + " is already running"); + nRunning++; + continue; + } + + final Output output = executableManager.getOutput(id); + if ((output.getState() != ExecutableState.READY)) { + // logger.debug("Job id:" + id + " not runnable"); + if (output.getState() == ExecutableState.SUCCEED) { + nSUCCEED++; + } else if (output.getState() == ExecutableState.ERROR) { + nError++; + } else if (output.getState() == ExecutableState.DISCARDED) { + nDiscarded++; + } else if (output.getState() == ExecutableState.STOPPED) { + nStopped++; + } else { + if (fetchFailed) { + executableManager.forceKillJob(id); + nError++; + } else { + nOthers++; + } + } + continue; + } + + final AbstractExecutable executable = executableManager.getJob(id); + if (!executable.isReady()) { + nOthers++; + continue; + } + + nReady++; + addToJobPool(executable, executable.getDefaultPriority()); + } + + fetchFailed = false; + logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " + + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError + + " error, " + nDiscarded + " discarded, " + nOthers + " others"); + } catch (Throwable th) { + fetchFailed = true; // this could happen when resource store is unavailable + logger.warn("Job Fetcher caught a exception ", th); + } + } +} diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 920601d..c566408 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -18,9 +18,6 @@ package org.apache.kylin.job.impl.threadpool; -import java.util.Comparator; -import java.util.Map; -import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -31,7 +28,6 @@ import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.engine.JobEngineConfig; @@ -40,8 +36,6 @@ import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.Executable; import org.apache.kylin.job.execution.ExecutableManager; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.lock.JobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +78,7 @@ public class DefaultScheduler implements Scheduler, Connecti private JobLock jobLock; private ExecutableManager executableManager; - private Runnable fetcher; + private FetcherRunner fetcher; private ScheduledExecutorService fetcherPool; private ExecutorService jobPool; private DefaultContext context; @@ -92,7 +86,6 @@ public class DefaultScheduler implements Scheduler, Connecti private static final Logger logger = LoggerFactory.getLogger(DefaultScheduler.class); private volatile boolean initialized = false; private volatile boolean hasStarted = false; - volatile boolean fetchFailed = false; private JobEngineConfig jobEngineConfig; public DefaultScheduler() { @@ -101,195 +94,8 @@ public class DefaultScheduler implements Scheduler, Connecti } } - private class FetcherRunnerWithPriority implements Runnable { - volatile PriorityQueue> jobPriorityQueue = new PriorityQueue<>(1, - new Comparator>() { - @Override - public int compare(Pair o1, Pair o2) { - return o1.getSecond() > o2.getSecond() ? -1 : 1; - } - }); - - private void addToJobPool(AbstractExecutable executable, int priority) { - String jobDesc = executable.toString(); - logger.info(jobDesc + " prepare to schedule and its priority is " + priority); - try { - context.addRunningJob(executable); - jobPool.execute(new JobRunner(executable)); - logger.info(jobDesc + " scheduled"); - } catch (Exception ex) { - context.removeRunningJob(executable); - logger.warn(jobDesc + " fail to schedule", ex); - } - } - - @Override - synchronized public void run() { - try (SetThreadName ignored = new SetThreadName(// - "Scheduler %s PriorityFetcherRunner %s"// - , System.identityHashCode(DefaultScheduler.this)// - , System.identityHashCode(this)// - )) {// - // logger.debug("Job Fetcher is running..."); - Map runningJobs = context.getRunningJobs(); - - // fetch job from jobPriorityQueue first to reduce chance to scan job list - Map leftJobPriorities = Maps.newHashMap(); - Pair executableWithPriority; - while ((executableWithPriority = jobPriorityQueue.peek()) != null - // the priority of jobs in pendingJobPriorities should be above a threshold - && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) { - executableWithPriority = jobPriorityQueue.poll(); - AbstractExecutable executable = executableWithPriority.getFirst(); - int curPriority = executableWithPriority.getSecond(); - // the job should wait more than one time - if (curPriority > executable.getDefaultPriority() + 1) { - addToJobPool(executable, curPriority); - } else { - leftJobPriorities.put(executable.getId(), curPriority + 1); - } - } - - if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { - logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); - return; - } - - while ((executableWithPriority = jobPriorityQueue.poll()) != null) { - leftJobPriorities.put(executableWithPriority.getFirst().getId(), - executableWithPriority.getSecond() + 1); - } - - int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; - for (final String id : executableManager.getAllJobIds()) { - if (runningJobs.containsKey(id)) { - // logger.debug("Job id:" + id + " is already running"); - nRunning++; - continue; - } - - AbstractExecutable executable = executableManager.getJob(id); - if (!executable.isReady()) { - final Output output = executableManager.getOutput(id); - // logger.debug("Job id:" + id + " not runnable"); - if (output.getState() == ExecutableState.DISCARDED) { - nDiscarded++; - } else if (output.getState() == ExecutableState.ERROR) { - nError++; - } else if (output.getState() == ExecutableState.SUCCEED) { - nSUCCEED++; - } else if (output.getState() == ExecutableState.STOPPED) { - nStopped++; - } else { - nOthers++; - } - continue; - } - - nReady++; - Integer priority = leftJobPriorities.get(id); - if (priority == null) { - priority = executable.getDefaultPriority(); - } - jobPriorityQueue.add(new Pair<>(executable, priority)); - } - - while (runningJobs.size() < jobEngineConfig.getMaxConcurrentJobLimit() - && (executableWithPriority = jobPriorityQueue.poll()) != null) { - addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond()); - } - - logger.info("Priority Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " - + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " // - + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers - + " others"); - } catch (Throwable th) { - logger.warn("Priority Job Fetcher caught a exception " + th); - } - } - } - - private class FetcherRunner implements Runnable { - - @Override - synchronized public void run() { - try (SetThreadName ignored = new SetThreadName(// - "Scheduler %s FetcherRunner %s"// - , System.identityHashCode(DefaultScheduler.this)// - , System.identityHashCode(this)// - )) {// - // logger.debug("Job Fetcher is running..."); - Map runningJobs = context.getRunningJobs(); - if (isJobPoolFull()) { - return; - } - - int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; - for (final String id : executableManager.getAllJobIds()) { - if (isJobPoolFull()) { - return; - } - if (runningJobs.containsKey(id)) { - // logger.debug("Job id:" + id + " is already running"); - nRunning++; - continue; - } - final AbstractExecutable executable = executableManager.getJob(id); - if (!executable.isReady()) { - final Output output = executableManager.getOutput(id); - // logger.debug("Job id:" + id + " not runnable"); - if (output.getState() == ExecutableState.DISCARDED) { - nDiscarded++; - } else if (output.getState() == ExecutableState.ERROR) { - nError++; - } else if (output.getState() == ExecutableState.SUCCEED) { - nSUCCEED++; - } else if (output.getState() == ExecutableState.STOPPED) { - nStopped++; - } else { - if (fetchFailed) { - executableManager.forceKillJob(id); - nError++; - } else { - nOthers++; - } - } - continue; - } - nReady++; - String jobDesc = null; - try { - jobDesc = executable.toString(); - logger.info(jobDesc + " prepare to schedule"); - context.addRunningJob(executable); - jobPool.execute(new JobRunner(executable)); - logger.info(jobDesc + " scheduled"); - } catch (Exception ex) { - if (executable != null) - context.removeRunningJob(executable); - logger.warn(jobDesc + " fail to schedule", ex); - } - } - - fetchFailed = false; - logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " - + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError - + " error, " + nDiscarded + " discarded, " + nOthers + " others"); - } catch (Throwable th) { - fetchFailed = true; // this could happen when resource store is unavailable - logger.warn("Job Fetcher caught a exception ", th); - } - } - } - - private boolean isJobPoolFull() { - Map runningJobs = context.getRunningJobs(); - if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { - logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); - return true; - } - - return false; + public FetcherRunner getFetcherRunner() { + return fetcher; } private class JobRunner implements Runnable { @@ -367,7 +173,15 @@ public class DefaultScheduler implements Scheduler, Connecti int pollSecond = jobEngineConfig.getPollIntervalSecond(); logger.info("Fetching jobs every {} seconds", pollSecond); - fetcher = jobEngineConfig.getJobPriorityConsidered() ? new FetcherRunnerWithPriority() : new FetcherRunner(); + JobExecutor jobExecutor = new JobExecutor() { + @Override + public void execute(AbstractExecutable executable) { + jobPool.execute(new JobRunner(executable)); + } + }; + fetcher = jobEngineConfig.getJobPriorityConsidered() + ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor) + : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor); logger.info("Creating fetcher pool instance:" + System.identityHashCode(fetcher)); fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS); hasStarted = true; diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 055de4d..cb4d815 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -20,7 +20,6 @@ package org.apache.kylin.job.impl.threadpool; import java.io.Closeable; import java.io.IOException; -import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; @@ -63,7 +62,6 @@ import com.google.common.collect.Maps; public class DistributedScheduler implements Scheduler, ConnectionStateListener { private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class); - private final static String SEGMENT_ID = "segmentId"; public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /${kylin.env.zookeeper-base-path}/metadata public static DistributedScheduler getInstance(KylinConfig config) { @@ -86,57 +84,13 @@ public class DistributedScheduler implements Scheduler, Conn private DistributedLock jobLock; private Closeable lockWatch; - //keep all segments having running job - private final Set segmentWithLocks = new CopyOnWriteArraySet<>(); + //keep all running job + private final Set jobWithLocks = new CopyOnWriteArraySet<>(); private volatile boolean initialized = false; private volatile boolean hasStarted = false; private JobEngineConfig jobEngineConfig; private String serverName; - private class FetcherRunner implements Runnable { - @Override - synchronized public void run() { - try { - Map runningJobs = context.getRunningJobs(); - if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { - logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); - return; - } - - int nRunning = 0, nOtherRunning = 0, nReady = 0, nOthers = 0; - for (final String id : executableManager.getAllJobIds()) { - if (runningJobs.containsKey(id)) { - nRunning++; - continue; - } - - final Output output = executableManager.getOutput(id); - - if ((output.getState() != ExecutableState.READY)) { - if (output.getState() == ExecutableState.RUNNING) { - nOtherRunning++; - } else { - nOthers++; - } - continue; - } - - nReady++; - final AbstractExecutable executable = executableManager.getJob(id); - try { - jobPool.execute(new JobRunner(executable)); - } catch (Exception ex) { - logger.warn(executable.toString() + " fail to schedule in server: " + serverName, ex); - } - } - logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " - + nOtherRunning + " running in other server, " + nReady + " ready, " + nOthers + " others"); - } catch (Exception e) { - logger.warn("Job Fetcher caught a exception " + e); - } - } - } - private class JobRunner implements Runnable { private final AbstractExecutable executable; @@ -149,12 +103,11 @@ public class DistributedScheduler implements Scheduler, Conn public void run() { try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s", System.identityHashCode(DistributedScheduler.this), executable.getId())) { - String segmentId = executable.getParam(SEGMENT_ID); - if (jobLock.lock(getLockPath(segmentId))) { + if (jobLock.lock(getLockPath(executable.getId()))) { logger.info(executable.toString() + " scheduled in server: " + serverName); context.addRunningJob(executable); - segmentWithLocks.add(segmentId); + jobWithLocks.add(executable.getId()); executable.execute(context); } } catch (ExecuteException e) { @@ -172,21 +125,21 @@ public class DistributedScheduler implements Scheduler, Conn //release job lock when job state is ready or running and the job server keep the cube lock. private void releaseJobLock(AbstractExecutable executable) { if (executable instanceof DefaultChainedExecutable) { - String segmentId = executable.getParam(SEGMENT_ID); ExecutableState state = executable.getStatus(); if (state != ExecutableState.READY && state != ExecutableState.RUNNING) { - if (segmentWithLocks.contains(segmentId)) { - logger.info(executable.toString() + " will release the lock for the segment: " + segmentId); - jobLock.unlock(getLockPath(segmentId)); - segmentWithLocks.remove(segmentId); + if (jobWithLocks.contains(executable.getId())) { + logger.info( + executable.toString() + " will release the lock for the job: " + executable.getId()); + jobLock.unlock(getLockPath(executable.getId())); + jobWithLocks.remove(executable.getId()); } } } } } - //when the segment lock released but the segment related job still running, resume the job. + //when the job lock released but the related job still running, resume the job. private class WatcherProcessImpl implements DistributedLock.Watcher { private String serverName; @@ -197,26 +150,21 @@ public class DistributedScheduler implements Scheduler, Conn @Override public void onUnlock(String path, String nodeData) { String[] paths = path.split("/"); - String segmentId = paths[paths.length - 1]; - - for (final String id : executableManager.getAllJobIds()) { - final Output output = executableManager.getOutput(id); - if (output.getState() == ExecutableState.RUNNING) { - AbstractExecutable executable = executableManager.getJob(id); - if (executable instanceof DefaultChainedExecutable - && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) - && !nodeData.equalsIgnoreCase(serverName)) { - try { - logger.warn(nodeData + " has released the lock for: " + segmentId - + " but the job still running. so " + serverName + " resume the job"); - if (!jobLock.isLocked(getLockPath(segmentId))) { - executableManager.resumeRunningJobForce(executable.getId()); - fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); - break; - } - } catch (Exception e) { - logger.error("resume the job but fail in server: " + serverName, e); + String jobId = paths[paths.length - 1]; + + final Output output = executableManager.getOutput(jobId); + if (output.getState() == ExecutableState.RUNNING) { + AbstractExecutable executable = executableManager.getJob(jobId); + if (executable instanceof DefaultChainedExecutable && !nodeData.equalsIgnoreCase(serverName)) { + try { + logger.warn(nodeData + " has released the lock for: " + jobId + + " but the job still running. so " + serverName + " resume the job"); + if (!jobLock.isLocked(getLockPath(jobId))) { + executableManager.resumeRunningJobForce(executable.getId()); + fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); } + } catch (Exception e) { + logger.error("resume the job but fail in server: " + serverName, e); } } } @@ -273,7 +221,15 @@ public class DistributedScheduler implements Scheduler, Conn int pollSecond = jobEngineConfig.getPollIntervalSecond(); logger.info("Fetching jobs every {} seconds", pollSecond); - fetcher = new FetcherRunner(); + JobExecutor jobExecutor = new JobExecutor() { + @Override + public void execute(AbstractExecutable executable) { + jobPool.execute(new JobRunner(executable)); + } + }; + fetcher = jobEngineConfig.getJobPriorityConsidered() + ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor) + : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor); fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS); hasStarted = true; @@ -286,7 +242,7 @@ public class DistributedScheduler implements Scheduler, Conn AbstractExecutable executable = executableManager.getJob(id); if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) { try { - if (!jobLock.isLocked(getLockPath(executable.getParam(SEGMENT_ID)))) { + if (!jobLock.isLocked(getLockPath(executable.getId()))) { executableManager.resumeRunningJobForce(executable.getId()); fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); } @@ -334,8 +290,8 @@ public class DistributedScheduler implements Scheduler, Conn } private void releaseAllLocks() { - for (String segmentId : segmentWithLocks) { - jobLock.unlock(getLockPath(segmentId)); + for (String jobId : jobWithLocks) { + jobLock.unlock(getLockPath(jobId)); } } diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java new file mode 100644 index 0000000..d98ca33 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.impl.threadpool; + +import java.util.Map; + +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +public abstract class FetcherRunner implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(FetcherRunner.class); + + protected JobEngineConfig jobEngineConfig; + protected DefaultContext context; + protected ExecutableManager executableManager; + protected JobExecutor jobExecutor; + protected volatile boolean fetchFailed = false; + + public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, ExecutableManager executableManager, + JobExecutor jobExecutor) { + this.jobEngineConfig = jobEngineConfig; + this.context = context; + this.executableManager = executableManager; + this.jobExecutor = jobExecutor; + } + + protected boolean isJobPoolFull() { + Map runningJobs = context.getRunningJobs(); + if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { + logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); + return true; + } + + return false; + } + + protected void addToJobPool(AbstractExecutable executable, int priority) { + String jobDesc = executable.toString(); + logger.info(jobDesc + " prepare to schedule and its priority is " + priority); + try { + context.addRunningJob(executable); + jobExecutor.execute(executable); + logger.info(jobDesc + " scheduled"); + } catch (Exception ex) { + context.removeRunningJob(executable); + logger.warn(jobDesc + " fail to schedule", ex); + } + } + + @VisibleForTesting + void setFetchFailed(boolean fetchFailed) { + this.fetchFailed = fetchFailed; + } +} diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java new file mode 100644 index 0000000..d2efd22 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.impl.threadpool; + +import org.apache.kylin.job.execution.AbstractExecutable; + +public interface JobExecutor { + void execute(AbstractExecutable executable); +} diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java new file mode 100644 index 0000000..b562fac --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.impl.threadpool; + +import java.util.Comparator; +import java.util.Map; +import java.util.PriorityQueue; + +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.SetThreadName; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.Output; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +public class PriorityFetcherRunner extends FetcherRunner { + + private static final Logger logger = LoggerFactory.getLogger(PriorityFetcherRunner.class); + + private volatile PriorityQueue> jobPriorityQueue = new PriorityQueue<>(1, + new Comparator>() { + @Override + public int compare(Pair o1, Pair o2) { + return o1.getSecond() > o2.getSecond() ? -1 : 1; + } + }); + + public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, + ExecutableManager executableManager, JobExecutor jobExecutor) { + super(jobEngineConfig, context, executableManager, jobExecutor); + } + + @Override + synchronized public void run() { + try (SetThreadName ignored = new SetThreadName(// + "PriorityFetcherRunner %s", System.identityHashCode(this))) {// + // logger.debug("Job Fetcher is running..."); + + // fetch job from jobPriorityQueue first to reduce chance to scan job list + Map leftJobPriorities = Maps.newHashMap(); + Pair executableWithPriority; + while ((executableWithPriority = jobPriorityQueue.peek()) != null + // the priority of jobs in pendingJobPriorities should be above a threshold + && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) { + executableWithPriority = jobPriorityQueue.poll(); + AbstractExecutable executable = executableWithPriority.getFirst(); + int curPriority = executableWithPriority.getSecond(); + // the job should wait more than one time + if (curPriority > executable.getDefaultPriority() + 1) { + addToJobPool(executable, curPriority); + } else { + leftJobPriorities.put(executable.getId(), curPriority + 1); + } + } + + Map runningJobs = context.getRunningJobs(); + if (isJobPoolFull()) { + return; + } + + while ((executableWithPriority = jobPriorityQueue.poll()) != null) { + leftJobPriorities.put(executableWithPriority.getFirst().getId(), + executableWithPriority.getSecond() + 1); + } + + int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; + for (final String id : executableManager.getAllJobIds()) { + if (runningJobs.containsKey(id)) { + // logger.debug("Job id:" + id + " is already running"); + nRunning++; + continue; + } + + final Output output = executableManager.getOutput(id); + if ((output.getState() != ExecutableState.READY)) { + // logger.debug("Job id:" + id + " not runnable"); + if (output.getState() == ExecutableState.SUCCEED) { + nSUCCEED++; + } else if (output.getState() == ExecutableState.ERROR) { + nError++; + } else if (output.getState() == ExecutableState.DISCARDED) { + nDiscarded++; + } else if (output.getState() == ExecutableState.STOPPED) { + nStopped++; + } else { + if (fetchFailed) { + executableManager.forceKillJob(id); + nError++; + } else { + nOthers++; + } + } + continue; + } + + AbstractExecutable executable = executableManager.getJob(id); + if (!executable.isReady()) { + nOthers++; + continue; + } + + nReady++; + Integer priority = leftJobPriorities.get(id); + if (priority == null) { + priority = executable.getDefaultPriority(); + } + jobPriorityQueue.add(new Pair<>(executable, priority)); + } + + while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()) { + addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond()); + } + + fetchFailed = false; + logger.info("Priority Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " // + + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers + + " others"); + } catch (Throwable th) { + fetchFailed = true; // this could happen when resource store is unavailable + logger.warn("Priority Job Fetcher caught a exception " + th); + } + } +} diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index d7201f2..7c66f2c 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -129,7 +129,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { AbstractExecutable job = execMgr.getJob(jobId); ExecutableState status = job.getStatus(); if (status == ExecutableState.RUNNING) { - scheduler.fetchFailed = true; + scheduler.getFetcherRunner().setFetchFailed(true); break; } Thread.sleep(1000); -- 2.5.4 (Apple Git-61) From 54df9e6942e23f894cecbe6b305fe6e3e153a4db Mon Sep 17 00:00:00 2001 From: Zhong Date: Fri, 22 Jun 2018 11:04:10 +0800 Subject: [PATCH 2/2] KYLIN-3421 fix IT --- .../kylin/job/BaseTestDistributedScheduler.java | 5 +-- .../apache/kylin/job/ContextTestExecutable.java | 44 ---------------------- .../kylin/job/ITDistributedSchedulerBaseTest.java | 43 ++------------------- .../job/ITDistributedSchedulerTakeOverTest.java | 7 +--- 4 files changed, 8 insertions(+), 91 deletions(-) delete mode 100644 kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java index 644b2b3..ec6faf2 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java @@ -57,9 +57,8 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { static File localMetaDir; static String backup; - static final String SEGMENT_ID = "segmentId"; - static final String segmentId1 = "seg1" + UUID.randomUUID(); - static final String segmentId2 = "seg2" + UUID.randomUUID(); + static final String jobId1 = "job1" + UUID.randomUUID(); + static final String jobId2 = "job2" + UUID.randomUUID(); static final String serverName1 = "serverName1"; static final String serverName2 = "serverName2"; static final String confDstPath1 = "target/kylin_metadata_dist_lock_test1/kylin.properties"; diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java deleted file mode 100644 index 9b4f299..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job; - -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; - -public class ContextTestExecutable extends AbstractExecutable { - public ContextTestExecutable() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - if (context.getConfig() == BaseTestDistributedScheduler.kylinConfig1) { - return new ExecuteResult(); - } else { - return new ExecuteResult(ExecuteResult.State.ERROR, "error"); - } - } -} diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java index 483d8f7..0b0a40f 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java @@ -28,23 +28,20 @@ import org.junit.Test; public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler { @Test public void testSchedulerLock() throws Exception { - if (!lock(jobLock1, segmentId1)) { + if (!lock(jobLock1, jobId1)) { throw new JobException("fail to get the lock"); } DefaultChainedExecutable job = new DefaultChainedExecutable(); - job.setParam(SEGMENT_ID, segmentId1); + job.setId(jobId1); AbstractExecutable task1 = new SucceedTestExecutable(); - task1.setParam(SEGMENT_ID, segmentId1); AbstractExecutable task2 = new SucceedTestExecutable(); - task2.setParam(SEGMENT_ID, segmentId1); AbstractExecutable task3 = new SucceedTestExecutable(); - task3.setParam(SEGMENT_ID, segmentId1); job.addTask(task1); job.addTask(task2); job.addTask(task3); execMgr.addJob(job); - Assert.assertEquals(serverName1, getServerName(segmentId1)); + Assert.assertEquals(serverName1, getServerName(jobId1)); waitForJobFinish(job.getId()); @@ -55,38 +52,6 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler Thread.sleep(5000); - Assert.assertEquals(null, getServerName(segmentId1)); - } - - @Test - public void testSchedulerConsistent() throws Exception { - if (!lock(jobLock1, segmentId2)) { - throw new JobException("fail to get the lock"); - } - DefaultChainedExecutable job = new DefaultChainedExecutable(); - job.setParam(SEGMENT_ID, segmentId2); - ContextTestExecutable task1 = new ContextTestExecutable(); - task1.setParam(SEGMENT_ID, segmentId2); - job.addTask(task1); - execMgr.addJob(job); - - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState()); - - if (!lock(jobLock2, segmentId2)) { - throw new JobException("fail to get the lock"); - } - - DefaultChainedExecutable job2 = new DefaultChainedExecutable(); - job2.setParam(SEGMENT_ID, segmentId2); - ContextTestExecutable task2 = new ContextTestExecutable(); - task2.setParam(SEGMENT_ID, segmentId2); - job2.addTask(task2); - execMgr.addJob(job2); - - waitForJobFinish(job2.getId()); - Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState()); - Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job2.getId()).getState()); + Assert.assertEquals(null, getServerName(jobId1)); } } diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java index d9e0d9a..c2256e3 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java @@ -28,18 +28,15 @@ import org.junit.Test; public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedScheduler { @Test public void testSchedulerTakeOver() throws Exception { - if (!lock(jobLock1, segmentId2)) { + if (!lock(jobLock1, jobId2)) { throw new JobException("fail to get the lock"); } DefaultChainedExecutable job = new DefaultChainedExecutable(); - job.setParam(SEGMENT_ID, segmentId2); + job.setId(jobId2); AbstractExecutable task1 = new SucceedTestExecutable(); - task1.setParam(SEGMENT_ID, segmentId2); AbstractExecutable task2 = new SucceedTestExecutable(); - task2.setParam(SEGMENT_ID, segmentId2); AbstractExecutable task3 = new SucceedTestExecutable(); - task3.setParam(SEGMENT_ID, segmentId2); job.addTask(task1); job.addTask(task2); job.addTask(task3); -- 2.5.4 (Apple Git-61)