From f95cbc01ce73ae1a9766ee9f2d1caff8b332c7c5 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Tue, 4 Aug 2015 20:39:35 +0800 Subject: [PATCH] KYLIN-923 FetcherRunner will never run again if encountered exception during running --- .../job/impl/threadpool/DefaultScheduler.java | 66 ++++++++++++---------- .../job/impl/threadpool/DefaultSchedulerTest.java | 47 +++++++++++++++ 2 files changed, 82 insertions(+), 31 deletions(-) diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 5fceab3..6ae7d42 100644 --- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -75,40 +75,44 @@ public class DefaultScheduler implements Scheduler, Connecti @Override synchronized public void run() { - // logger.debug("Job Fetcher is running..."); - Map runningJobs = context.getRunningJobs(); - if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { - logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); - return; - } - - int nRunning = 0, nReady = 0, nOthers = 0; - for (final String id : executableManager.getAllJobIds()) { - if (runningJobs.containsKey(id)) { - // logger.debug("Job id:" + id + " is already running"); - nRunning++; - continue; - } - final Output output = executableManager.getOutput(id); - if ((output.getState() != ExecutableState.READY)) { - // logger.debug("Job id:" + id + " not runnable"); - nOthers++; - continue; + try { + // logger.debug("Job Fetcher is running..."); + Map runningJobs = context.getRunningJobs(); + if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { + logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); + return; } - nReady++; - AbstractExecutable executable = executableManager.getJob(id); - String jobDesc = executable.toString(); - logger.info(jobDesc + " prepare to schedule"); - try { - context.addRunningJob(executable); - jobPool.execute(new JobRunner(executable)); - logger.info(jobDesc + " scheduled"); - } catch (Exception ex) { - context.removeRunningJob(executable); - logger.warn(jobDesc + " fail to schedule", ex); + + int nRunning = 0, nReady = 0, nOthers = 0; + for (final String id : executableManager.getAllJobIds()) { + if (runningJobs.containsKey(id)) { + // logger.debug("Job id:" + id + " is already running"); + nRunning++; + continue; + } + final Output output = executableManager.getOutput(id); + if ((output.getState() != ExecutableState.READY)) { + // logger.debug("Job id:" + id + " not runnable"); + nOthers++; + continue; + } + nReady++; + AbstractExecutable executable = executableManager.getJob(id); + String jobDesc = executable.toString(); + logger.info(jobDesc + " prepare to schedule"); + try { + context.addRunningJob(executable); + jobPool.execute(new JobRunner(executable)); + logger.info(jobDesc + " scheduled"); + } catch (Exception ex) { + context.removeRunningJob(executable); + logger.warn(jobDesc + " fail to schedule", ex); + } } + logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + nReady + " ready, " + nOthers + " others"); + } catch (Exception e) { + logger.warn("Job Fetcher caught a exception " + e); } - logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + nReady + " ready, " + nOthers + " others"); } } diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index da99bb3..e704775 100644 --- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -19,6 +19,8 @@ package org.apache.kylin.job.impl.threadpool; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import org.apache.kylin.job.BaseTestExecutable; import org.apache.kylin.job.ErrorTestExecutable; @@ -29,6 +31,12 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + /** * Created by qianzhou on 12/19/14. */ @@ -101,4 +109,43 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { Thread.sleep(5000); System.out.println(job); } + + @Test + public void testSchedulerPool() throws InterruptedException { + ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1); + final CountDownLatch countDownLatch = new CountDownLatch(3); + ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + countDownLatch.countDown(); + } + }, 5, 5, TimeUnit.SECONDS); + assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS)); + assertTrue("future should still running", future.cancel(true)); + + final CountDownLatch countDownLatch2 = new CountDownLatch(3); + ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + countDownLatch2.countDown(); + throw new RuntimeException(); + } + }, 5, 5, TimeUnit.SECONDS); + assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS)); + assertFalse("future2 should has been stopped", future2.cancel(true)); + + final CountDownLatch countDownLatch3 = new CountDownLatch(3); + ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + countDownLatch3.countDown(); + throw new RuntimeException(); + } catch (Exception e) { + } + } + }, 5, 5, TimeUnit.SECONDS); + assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS)); + assertTrue("future3 should still running", future3.cancel(true)); + } } -- 2.3.2 (Apple Git-55)