From 218c0a33df4970e8ec0084b48e7344768bab4f12 Mon Sep 17 00:00:00 2001 From: kyotoYaho Date: Wed, 4 May 2016 17:17:36 +0800 Subject: [PATCH] KYLIN-1319: Find a better way to check hadoop job status by YarnClient --- .../engine/mr/common/HadoopAppStatusGetter.java | 63 +++++++++++++++ .../engine/mr/common/HadoopStatusAppChecker.java | 93 ++++++++++++++++++++++ .../engine/mr/common/MapReduceExecutable.java | 17 ++-- 3 files changed, 165 insertions(+), 8 deletions(-) create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopAppStatusGetter.java create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusAppChecker.java diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopAppStatusGetter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopAppStatusGetter.java new file mode 100644 index 0000000..a5137ad --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopAppStatusGetter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class HadoopAppStatusGetter { + + protected static final Logger logger = LoggerFactory.getLogger(HadoopAppStatusGetter.class); + + private static HadoopAppStatusGetter SINGLE_INSTANCE; + private YarnClient yarnClient; + + public static HadoopAppStatusGetter getInstance(){ + if(SINGLE_INSTANCE == null){ + SINGLE_INSTANCE = new HadoopAppStatusGetter(); + } + return SINGLE_INSTANCE; + } + + private HadoopAppStatusGetter() { + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(HadoopUtil.getCurrentConfiguration()); + yarnClient.start(); + } + + public Pair get(String mrJobId) throws IOException, YarnException { + String appId = mrJobId.replace("job", "application"); + ApplicationReport appReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId)); + RMAppState state = RMAppState.valueOf(appReport.getYarnApplicationState().toString()); + FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(appReport.getFinalApplicationStatus().toString()); + return Pair.of(state, finalStatus); + } + +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusAppChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusAppChecker.java new file mode 100644 index 0000000..22f4185 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusAppChecker.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.kylin.job.constant.JobStepStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HadoopStatusAppChecker { + + protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusAppChecker.class); + + private final String mrJobID; + private final StringBuilder output; + + public HadoopStatusAppChecker(String mrJobID, StringBuilder output) { + this.mrJobID = mrJobID; + this.output = output; + } + + public JobStepStatusEnum checkStatus() { + if (null == mrJobID) { + this.output.append("Skip status check with empty job id..\n"); + return JobStepStatusEnum.WAITING; + } + JobStepStatusEnum status = null; + try { + final Pair result = HadoopAppStatusGetter.getInstance().get(mrJobID); + logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight()); + output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n"); + + switch (result.getRight()) { + case SUCCEEDED: + status = JobStepStatusEnum.FINISHED; + break; + case FAILED: + status = JobStepStatusEnum.ERROR; + break; + case KILLED: + status = JobStepStatusEnum.KILLED; + break; + case UNDEFINED: + switch (result.getLeft()) { + case NEW: + case NEW_SAVING: + case SUBMITTED: + case ACCEPTED: + status = JobStepStatusEnum.WAITING; + break; + case RUNNING: + status = JobStepStatusEnum.RUNNING; + break; + case FINAL_SAVING: + case FINISHING: + case FINISHED: + case FAILED: + case KILLING: + case KILLED: + } + break; + } + } catch (Exception e) { + logger.error("error check status", e); + output.append("Exception: " + e.getLocalizedMessage() + "\n"); + status = JobStepStatusEnum.ERROR; + } + + return status; + } + +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 54459d7..22f1ffa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -134,15 +134,16 @@ public class MapReduceExecutable extends AbstractExecutable { final StringBuilder output = new StringBuilder(); final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output); - final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig()); - if (restStatusCheckUrl == null) { - logger.error("restStatusCheckUrl is null"); - return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); - } +// final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig()); +// if (restStatusCheckUrl == null) { +// logger.error("restStatusCheckUrl is null"); +// return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); +// } String mrJobId = hadoopCmdOutput.getMrJobId(); - boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); - HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); - JobStepStatusEnum status = JobStepStatusEnum.NEW; +// boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); +// HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); + HadoopStatusAppChecker statusChecker = new HadoopStatusAppChecker(mrJobId, output); + JobStepStatusEnum status = JobStepStatusEnum.NEW; while (!isDiscarded()) { JobStepStatusEnum newStatus = statusChecker.checkStatus(); if (status == JobStepStatusEnum.KILLED) { -- 2.5.4 (Apple Git-61)