From 45bfc8a1ea3d2c234ea3bed709d38cdadb8499b4 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Thu, 8 Jan 2015 20:29:01 +0800 Subject: [PATCH] refactor --- job/src/main/java/com/kylinolap/job2/dao/JobDao.java | 18 ++++++++++++++++++ .../job2/impl/threadpool/DefaultScheduler.java | 14 ++++++++------ .../com/kylinolap/job2/service/ExecutableManager.java | 15 ++++++++++++--- .../com/kylinolap/job/BuildCubeWithEngineTest.java | 4 ++-- .../kylinolap/job2/service/ExecutableManagerTest.java | 6 +++--- 5 files changed, 43 insertions(+), 14 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job2/dao/JobDao.java b/job/src/main/java/com/kylinolap/job2/dao/JobDao.java index f8b546d..17a4c6e 100644 --- a/job/src/main/java/com/kylinolap/job2/dao/JobDao.java +++ b/job/src/main/java/com/kylinolap/job2/dao/JobDao.java @@ -1,6 +1,7 @@ package com.kylinolap.job2.dao; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.kylinolap.common.KylinConfig; import com.kylinolap.common.persistence.JsonSerializer; import com.kylinolap.common.persistence.ResourceStore; @@ -110,6 +111,23 @@ private long writeJobOutputResource(String path, JobOutputPO output) throws IOEx } } + public List getJobIds() throws PersistentException { + try { + ArrayList resources = store.listResources(JOB_PATH_ROOT); + if (resources == null) { + return Collections.emptyList(); + } + ArrayList result = Lists.newArrayListWithExpectedSize(resources.size()); + for (String path : resources) { + result.add(path.substring(path.lastIndexOf("/") + 1)); + } + return result; + } catch (IOException e) { + logger.error("error get all Jobs:", e); + throw new PersistentException(e); + } + } + public JobPO getJob(String uuid) throws PersistentException { try { return readJobResource(pathOfJob(uuid)); diff --git a/job/src/main/java/com/kylinolap/job2/impl/threadpool/DefaultScheduler.java b/job/src/main/java/com/kylinolap/job2/impl/threadpool/DefaultScheduler.java index 7538acc..e687403 100644 --- a/job/src/main/java/com/kylinolap/job2/impl/threadpool/DefaultScheduler.java +++ b/job/src/main/java/com/kylinolap/job2/impl/threadpool/DefaultScheduler.java @@ -9,6 +9,7 @@ import com.kylinolap.job2.exception.SchedulerException; import com.kylinolap.job2.execution.Executable; import com.kylinolap.job2.execution.ExecutableState; +import com.kylinolap.job2.execution.Output; import com.kylinolap.job2.service.ExecutableManager; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -60,17 +61,18 @@ public void run() { logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); return; } - for (final AbstractExecutable executable : executableManager.getAllExecutables()) { - final String id = executable.getId(); - String jobDesc = executable.toString(); + for (final String id : executableManager.getAllJobIds()) { if (runningJobs.containsKey(id)) { - logger.info(jobDesc + " is already running"); + logger.info("Job id:" + id + " is already running"); continue; } - if (!executable.isRunnable()) { - logger.info(jobDesc + " not runnable"); + final Output output = executableManager.getOutput(id); + if ((output.getState() != ExecutableState.READY)) { + logger.info("Job id:" + id + " not runnable"); continue; } + AbstractExecutable executable = executableManager.getJob(id); + String jobDesc = executable.toString(); logger.info(jobDesc + " prepare to schedule"); try { context.addRunningJob(executable); diff --git a/job/src/main/java/com/kylinolap/job2/service/ExecutableManager.java b/job/src/main/java/com/kylinolap/job2/service/ExecutableManager.java index 877f88a..8d88224 100644 --- a/job/src/main/java/com/kylinolap/job2/service/ExecutableManager.java +++ b/job/src/main/java/com/kylinolap/job2/service/ExecutableManager.java @@ -73,11 +73,11 @@ private void addJobOutput(AbstractExecutable executable) throws PersistentExcept } //for ut - public void deleteJob(AbstractExecutable executable) { + public void deleteJob(String jobId) { try { - jobDao.deleteJob(executable.getId()); + jobDao.deleteJob(jobId); } catch (PersistentException e) { - logger.error("fail to delete job:" + executable.getId(), e); + logger.error("fail to delete job:" + jobId, e); throw new RuntimeException(e); } } @@ -121,6 +121,15 @@ public AbstractExecutable apply(JobPO input) { } } + public List getAllJobIds() { + try { + return jobDao.getJobIds(); + } catch (PersistentException e) { + logger.error("error get All Job Ids", e); + throw new RuntimeException(e); + } + } + public void updateAllRunningJobsToError() { try { final List jobOutputs = jobDao.getJobOutputs(); diff --git a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java index d124280..ce19a62 100644 --- a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java +++ b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java @@ -77,8 +77,8 @@ public void before() throws Exception { } cubeManager = CubeManager.getInstance(kylinConfig); jobEngineConfig = new JobEngineConfig(kylinConfig); - for (AbstractExecutable job: jobService.getAllExecutables()) { - jobService.deleteJob(job); + for (String jobId: jobService.getAllJobIds()) { + jobService.deleteJob(jobId); } } diff --git a/job/src/test/java/com/kylinolap/job2/service/ExecutableManagerTest.java b/job/src/test/java/com/kylinolap/job2/service/ExecutableManagerTest.java index d2aa46d..44588b0 100644 --- a/job/src/test/java/com/kylinolap/job2/service/ExecutableManagerTest.java +++ b/job/src/test/java/com/kylinolap/job2/service/ExecutableManagerTest.java @@ -30,9 +30,9 @@ public void setup() throws Exception { createTestMetadata(); service = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - for (AbstractExecutable executable: service.getAllExecutables()) { - System.out.println("deleting " + executable.getId()); - service.deleteJob(executable); + for (String jobId: service.getAllJobIds()) { + System.out.println("deleting " + jobId); + service.deleteJob(jobId); } }