From 786314a4d54da81320aeb27c04e7d4446de1af37 Mon Sep 17 00:00:00 2001 From: "Ma,Gang" Date: Wed, 22 Jun 2016 11:07:19 +0800 Subject: [PATCH] KYLIN-1811 Error step may be skipped sometimes when resume a cube job --- .../kylin/job/execution/DefaultChainedExecutable.java | 14 ++++++++++++++ .../org/apache/kylin/job/manager/ExecutableManager.java | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 7403715..b130f5b 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -46,6 +46,13 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai final int size = executables.size(); for (int i = 0; i < size; ++i) { Executable subTask = executables.get(i); + ExecutableState state = subTask.getStatus(); + if (state == ExecutableState.RUNNING){ + // there is already running subtask, no need to start a new subtask + break; + } else if (state == ExecutableState.ERROR){ + throw new IllegalStateException("invalid subtask state, subtask:" + subTask.getName() + ", state:" + subTask.getStatus()); + } if (subTask.isRunnable()) { return subTask.execute(context); } @@ -53,6 +60,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai return new ExecuteResult(ExecuteResult.State.SUCCEED, null); } + @Override protected void onExecuteStart(ExecutableContext executableContext) { Map info = Maps.newHashMap(); @@ -74,6 +82,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai List jobs = getTasks(); boolean allSucceed = true; boolean hasError = false; + boolean hasRunning = false; for (Executable task : jobs) { final ExecutableState status = task.getStatus(); if (status == ExecutableState.ERROR) { @@ -82,6 +91,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai if (status != ExecutableState.SUCCEED) { allSucceed = false; } + if (status == ExecutableState.RUNNING) { + hasRunning = true; + } } if (allSucceed) { setEndTime(System.currentTimeMillis()); @@ -91,6 +103,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai setEndTime(System.currentTimeMillis()); jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null); notifyUserStatusChange(executableContext, ExecutableState.ERROR); + } else if (hasRunning){ + jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); } else { jobService.updateJobOutput(getId(), ExecutableState.READY, null, null); } diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java index 5138fb4..2cd5d4f 100644 --- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java @@ -243,7 +243,6 @@ public class ExecutableManager { if (job == null) { return; } - updateJobOutput(jobId, ExecutableState.READY, null, null); if (job instanceof DefaultChainedExecutable) { List tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { @@ -253,6 +252,7 @@ public class ExecutableManager { } } } + updateJobOutput(jobId, ExecutableState.READY, null, null); } public void discardJob(String jobId) { -- 2.6.4