From f5fca445c6d8ac935130e3ec64ea34e2f279c3b9 Mon Sep 17 00:00:00 2001 From: zhengdong Date: Thu, 7 Sep 2017 17:19:29 +0800 Subject: [PATCH] KYLIN-2857 MR configuration should be overwritten by user specified parameters when resuming MR jobs --- .../org/apache/kylin/common/util/HadoopUtil.java | 27 ++++++++++ .../kylin/engine/mr/common/AbstractHadoopJob.java | 2 + .../kylin/engine/mr/common/BatchConstants.java | 1 + .../engine/mr/common/MapReduceExecutable.java | 62 ++++++++++++++++++++-- 4 files changed, 88 insertions(+), 4 deletions(-) mode change 100644 => 100755 core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java mode change 100644 => 100755 engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java mode change 100644 => 100755 engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java mode change 100644 => 100755 engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java old mode 100644 new mode 100755 index f242515b0..acf00d193 --- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -20,10 +20,14 @@ package org.apache.kylin.common.util; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Map; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -34,6 +38,13 @@ import org.apache.hadoop.io.Writable; import org.apache.kylin.common.KylinConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; public class HadoopUtil { @SuppressWarnings("unused") @@ -160,4 +171,20 @@ public class HadoopUtil { return null; } } + + public static Map parseConfigFile(File file) + throws ParserConfigurationException, IOException, SAXException { + Preconditions.checkArgument(file != null && file.exists()); + Map result = Maps.newHashMap(); + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder = factory.newDocumentBuilder(); + Document doc = builder.parse(file); + NodeList nl = doc.getElementsByTagName("property"); + for (int i = 0; i < nl.getLength(); i++) { + String key = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue(); + String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue(); + result.put(key, value); + } + return result; + } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java old mode 100644 new mode 100755 index 081ac934e..8c123d727 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -111,6 +111,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false) .withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); + protected static final Option OPTION_JOB_CONF = OptionBuilder.withArgName(BatchConstants.ARG_CONF).hasArg() + .isRequired(true).withDescription("MapReduce job config file").create(BatchConstants.ARG_CONF); private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java old mode 100644 new mode 100755 index 84ca0063f..389b8b419 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -85,6 +85,7 @@ public interface BatchConstants { String ARG_HTABLE_NAME = "htablename"; String ARG_INPUT_FORMAT = "inputformat"; String ARG_LEVEL = "level"; + String ARG_CONF = "conf"; /** * logger and counter diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java old mode 100644 new mode 100755 index 07efb34d1..556fb9c45 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -18,20 +18,31 @@ package org.apache.kylin.engine.mr.common; +import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.Constructor; +import java.util.ListIterator; import java.util.Map; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.constant.JobStepStatusEnum; @@ -44,6 +55,7 @@ import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.job.execution.Output; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; import com.google.common.base.Preconditions; @@ -72,7 +84,9 @@ public class MapReduceExecutable extends AbstractExecutable { return; } try { - Configuration conf = HadoopUtil.getCurrentConfiguration(); + String params = getMapReduceParams(); + String[] args = params.trim().split("\\s+"); + Configuration conf = getJobConf(executableContext.getConfig(), args); Job job = new Cluster(conf).getJob(JobID.forName(mrJobId)); if (job == null || job.getJobState() == JobStatus.State.FAILED) { //remove previous mr job info @@ -80,7 +94,7 @@ public class MapReduceExecutable extends AbstractExecutable { } else { getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); } - } catch (IOException e) { + } catch (IOException | ParseException | ParserConfigurationException | SAXException e) { logger.warn("error get hadoop status"); super.onExecuteStart(executableContext); } catch (InterruptedException e) { @@ -102,9 +116,10 @@ public class MapReduceExecutable extends AbstractExecutable { try { Job job; ExecutableManager mgr = getManager(); + String[] args = params.trim().split("\\s+"); final Map extra = mgr.getOutput(getId()).getExtra(); if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) { - Configuration conf = HadoopUtil.getCurrentConfiguration(); + Configuration conf = getJobConf(context.getConfig(), args); job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID))); logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID) + " resumed"); } else { @@ -113,7 +128,6 @@ public class MapReduceExecutable extends AbstractExecutable { hadoopJob.setConf(HadoopUtil.getCurrentConfiguration()); hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away logger.info("parameters of the MapReduceExecutable: {}", params); - String[] args = params.trim().split("\\s+"); try { //for async mr job, ToolRunner just return 0; @@ -246,4 +260,44 @@ public class MapReduceExecutable extends AbstractExecutable { public void setCounterSaveAs(String value) { setParam(KEY_COUNTER_SAVEAS, value); } + + public Configuration getJobConf(KylinConfig config, String[] jobParams) + throws ParseException, IOException, SAXException, ParserConfigurationException { + Configuration conf = HadoopUtil.getCurrentConfiguration(); + Options options = new Options(); + options.addOption(AbstractHadoopJob.OPTION_CUBE_NAME); + options.addOption(AbstractHadoopJob.OPTION_JOB_CONF); + CommandLineParser parser = new CustomParser(true); + CommandLine commandLine = parser.parse(options, jobParams); + String cubeName = commandLine.getOptionValue(BatchConstants.ARG_CUBE_NAME); + String fileName = commandLine.getOptionValue(BatchConstants.ARG_CONF); + Preconditions.checkArgument(cubeName != null && fileName != null, "Can't get job config"); + File configFile = new File(fileName); + for (Map.Entry entry : HadoopUtil.parseConfigFile(configFile).entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : CubeManager.getInstance(config).getCube(cubeName).getConfig() + .getMRConfigOverride().entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + return conf; + } + + private static class CustomParser extends GnuParser { + private boolean ignoreUnrecognizedOption; + + public CustomParser(final boolean ignoreUnrecognizedOption) { + this.ignoreUnrecognizedOption = ignoreUnrecognizedOption; + } + + @Override + protected void processOption(final String arg, final ListIterator iter) throws ParseException { + boolean hasOption = getOptions().hasOption(arg); + + if (hasOption || !ignoreUnrecognizedOption) { + super.processOption(arg, iter); + } + } + } } -- 2.14.1