From eac971338f6a8438fe476dd2cda1a7e87acce570 Mon Sep 17 00:00:00 2001 From: kyotoYaho Date: Wed, 4 May 2016 14:48:44 +0800 Subject: [PATCH] Find a better way to check hadoop job status via job API --- .../engine/mr/common/HadoopJobStatusChecker.java | 64 ++++++++++++++++++++++ .../engine/mr/common/MapReduceExecutable.java | 18 +++--- 2 files changed, 73 insertions(+), 9 deletions(-) create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java new file mode 100644 index 0000000..f25d41f --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.kylin.job.constant.JobStepStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HadoopJobStatusChecker { + + protected static final Logger logger = LoggerFactory.getLogger(HadoopJobStatusChecker.class); + + public static JobStepStatusEnum checkStatus(Job job, StringBuilder output) { + if (job == null || job.getJobID() == null) { + output.append("Skip status check with empty job id..\n"); + return JobStepStatusEnum.WAITING; + } + + JobStepStatusEnum status = null; + try { + switch (job.getStatus().getState()) { + case SUCCEEDED: + status = JobStepStatusEnum.FINISHED; + break; + case FAILED: + status = JobStepStatusEnum.ERROR; + break; + case KILLED: + status = JobStepStatusEnum.KILLED; + break; + case RUNNING: + status = JobStepStatusEnum.RUNNING; + break; + case PREP: + status = JobStepStatusEnum.WAITING; + break; + } + } catch (Exception e) { + logger.error("error check status", e); + output.append("Exception: " + e.getLocalizedMessage() + "\n"); + status = JobStepStatusEnum.ERROR; + } + + return status; + } + +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 54459d7..69fd03a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -134,17 +134,17 @@ public class MapReduceExecutable extends AbstractExecutable { final StringBuilder output = new StringBuilder(); final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output); - final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig()); - if (restStatusCheckUrl == null) { - logger.error("restStatusCheckUrl is null"); - return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); - } - String mrJobId = hadoopCmdOutput.getMrJobId(); - boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); - HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); +// final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig()); +// if (restStatusCheckUrl == null) { +// logger.error("restStatusCheckUrl is null"); +// return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); +// } +// String mrJobId = hadoopCmdOutput.getMrJobId(); +// boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); +// HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); JobStepStatusEnum status = JobStepStatusEnum.NEW; while (!isDiscarded()) { - JobStepStatusEnum newStatus = statusChecker.checkStatus(); + JobStepStatusEnum newStatus = HadoopJobStatusChecker.checkStatus(job, output); if (status == JobStepStatusEnum.KILLED) { executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections. emptyMap(), "killed by admin"); return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin"); -- 2.5.4 (Apple Git-61)