From eea012c6af165e323e8ff1946c3684b1eaf4ef6c Mon Sep 17 00:00:00 2001 From: sunyerui Date: Tue, 4 Aug 2015 20:34:16 +0800 Subject: [PATCH] KYLIN-923 FetcherRunner will never run again if encountered exception during running --- .../job/impl/threadpool/DefaultScheduler.java | 66 ++++++++++++---------- .../job/impl/threadpool/DefaultSchedulerTest.java | 46 ++++++++++++++- 2 files changed, 80 insertions(+), 32 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 8a83870..0d198bb 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -65,40 +65,44 @@ public class DefaultScheduler implements Scheduler, Connecti @Override synchronized public void run() { - // logger.debug("Job Fetcher is running..."); - Map runningJobs = context.getRunningJobs(); - if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { - logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); - return; - } - - int nRunning = 0, nReady = 0, nOthers = 0; - for (final String id : executableManager.getAllJobIds()) { - if (runningJobs.containsKey(id)) { - // logger.debug("Job id:" + id + " is already running"); - nRunning++; - continue; - } - final Output output = executableManager.getOutput(id); - if ((output.getState() != ExecutableState.READY)) { - // logger.debug("Job id:" + id + " not runnable"); - nOthers++; - continue; + try { + // logger.debug("Job Fetcher is running..."); + Map runningJobs = context.getRunningJobs(); + if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { + logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); + return; } - nReady++; - AbstractExecutable executable = executableManager.getJob(id); - String jobDesc = executable.toString(); - logger.info(jobDesc + " prepare to schedule"); - try { - context.addRunningJob(executable); - jobPool.execute(new JobRunner(executable)); - logger.info(jobDesc + " scheduled"); - } catch (Exception ex) { - context.removeRunningJob(executable); - logger.warn(jobDesc + " fail to schedule", ex); + + int nRunning = 0, nReady = 0, nOthers = 0; + for (final String id : executableManager.getAllJobIds()) { + if (runningJobs.containsKey(id)) { + // logger.debug("Job id:" + id + " is already running"); + nRunning++; + continue; + } + final Output output = executableManager.getOutput(id); + if ((output.getState() != ExecutableState.READY)) { + // logger.debug("Job id:" + id + " not runnable"); + nOthers++; + continue; + } + nReady++; + AbstractExecutable executable = executableManager.getJob(id); + String jobDesc = executable.toString(); + logger.info(jobDesc + " prepare to schedule"); + try { + context.addRunningJob(executable); + jobPool.execute(new JobRunner(executable)); + logger.info(jobDesc + " scheduled"); + } catch (Exception ex) { + context.removeRunningJob(executable); + logger.warn(jobDesc + " fail to schedule", ex); + } } + logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + nReady + " ready, " + nOthers + " others"); + } catch (Exception e) { + logger.warn("Job Fetcher caught a exception " + e); } - logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + nReady + " ready, " + nOthers + " others"); } } diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index debe9a4..a2d61de 100644 --- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -21,11 +21,17 @@ package org.apache.kylin.job.impl.threadpool; import org.apache.kylin.job.*; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.execution.Output; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** */ @@ -97,4 +103,42 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { System.out.println(job); } + @Test + public void testSchedulerPool() throws InterruptedException { + ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1); + final CountDownLatch countDownLatch = new CountDownLatch(3); + ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + countDownLatch.countDown(); + } + }, 5, 5, TimeUnit.SECONDS); + assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS)); + assertTrue("future should still running", future.cancel(true)); + + final CountDownLatch countDownLatch2 = new CountDownLatch(3); + ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + countDownLatch2.countDown(); + throw new RuntimeException(); + } + }, 5, 5, TimeUnit.SECONDS); + assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS)); + assertFalse("future2 should has been stopped", future2.cancel(true)); + + final CountDownLatch countDownLatch3 = new CountDownLatch(3); + ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + countDownLatch3.countDown(); + throw new RuntimeException(); + } catch (Exception e) { + } + } + }, 5, 5, TimeUnit.SECONDS); + assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS)); + assertTrue("future3 should still running", future3.cancel(true)); + } } -- 2.3.2 (Apple Git-55)