From 977e23d63aac721ee280d5a8cfcb66dc82330630 Mon Sep 17 00:00:00 2001 From: zhengdong Date: Mon, 11 Sep 2017 17:02:40 +0800 Subject: [PATCH] KYLIN-2857 MR configuration should be overwritten by user specified parameters when resuming MR jobs --- .../kylin/engine/mr/common/AbstractHadoopJob.java | 11 +--- .../kylin/engine/mr/common/BatchConstants.java | 1 + .../engine/mr/common/MapReduceExecutable.java | 70 ++++++++++++++++++++-- 3 files changed, 68 insertions(+), 14 deletions(-) mode change 100644 => 100755 engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java mode change 100644 => 100755 engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java old mode 100644 new mode 100755 index 292c57d5e..203b2dfae --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -31,7 +31,6 @@ import java.io.InputStream; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.regex.Matcher; @@ -113,6 +112,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false) .withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); + protected static final Option OPTION_JOB_CONF = OptionBuilder.withArgName(BatchConstants.ARG_CONF).hasArg() + .isRequired(true).withDescription("MapReduce job config file").create(BatchConstants.ARG_CONF); private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; @@ -264,15 +265,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { StringUtil.appendWithSeparator(kylinDependency, mrLibDir); setJobTmpJarsAndFiles(job, kylinDependency.toString()); - - overrideJobConfig(job.getConfiguration(), kylinConf.getMRConfigOverride()); } - private void overrideJobConfig(Configuration jobConf, Map override) { - for (Entry entry : override.entrySet()) { - jobConf.set(entry.getKey(), entry.getValue()); - } - } + private String filterKylinHiveDependency(String kylinHiveDependency, KylinConfig config) { if (StringUtils.isBlank(kylinHiveDependency)) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index bbf38e5fe..52b6af583 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -86,6 +86,7 @@ public interface BatchConstants { String ARG_HTABLE_NAME = "htablename"; String ARG_INPUT_FORMAT = "inputformat"; String ARG_LEVEL = "level"; + String ARG_CONF = "conf"; /** * logger and counter diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java old mode 100644 new mode 100755 index 07efb34d1..4e6458f99 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -22,17 +22,24 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.Constructor; +import java.util.List; +import java.util.ListIterator; import java.util.Map; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.constant.JobStepStatusEnum; import org.apache.kylin.job.exception.ExecuteException; @@ -46,6 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** */ @@ -72,7 +80,10 @@ public class MapReduceExecutable extends AbstractExecutable { return; } try { + String params = getMapReduceParams(); + String[] args = params.trim().split("\\s+"); Configuration conf = HadoopUtil.getCurrentConfiguration(); + overwriteJobConf(conf, executableContext.getConfig(), args); Job job = new Cluster(conf).getJob(JobID.forName(mrJobId)); if (job == null || job.getJobState() == JobStatus.State.FAILED) { //remove previous mr job info @@ -80,7 +91,7 @@ public class MapReduceExecutable extends AbstractExecutable { } else { getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); } - } catch (IOException e) { + } catch (IOException | ParseException e) { logger.warn("error get hadoop status"); super.onExecuteStart(executableContext); } catch (InterruptedException e) { @@ -102,24 +113,25 @@ public class MapReduceExecutable extends AbstractExecutable { try { Job job; ExecutableManager mgr = getManager(); + String[] args = params.trim().split("\\s+"); + Configuration conf = HadoopUtil.getCurrentConfiguration(); + String[] jobArgs = overwriteJobConf(conf, context.getConfig(), args); final Map extra = mgr.getOutput(getId()).getExtra(); if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) { - Configuration conf = HadoopUtil.getCurrentConfiguration(); job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID))); logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID) + " resumed"); } else { final Constructor constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor(); final AbstractHadoopJob hadoopJob = constructor.newInstance(); - hadoopJob.setConf(HadoopUtil.getCurrentConfiguration()); + hadoopJob.setConf(conf); hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away logger.info("parameters of the MapReduceExecutable: {}", params); - String[] args = params.trim().split("\\s+"); try { //for async mr job, ToolRunner just return 0; // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe - MRUtil.runMRJob(hadoopJob, args); + hadoopJob.run(jobArgs); if (hadoopJob.isSkipped()) { return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped"); @@ -246,4 +258,50 @@ public class MapReduceExecutable extends AbstractExecutable { public void setCounterSaveAs(String value) { setParam(KEY_COUNTER_SAVEAS, value); } + + private String[] overwriteJobConf(Configuration conf, KylinConfig config, String[] jobParams) + throws ParseException { + Options options = new Options(); + options.addOption(AbstractHadoopJob.OPTION_JOB_CONF); + options.addOption(AbstractHadoopJob.OPTION_CUBE_NAME); + CustomParser parser = new CustomParser(); + CommandLine commandLine = parser.parse(options, jobParams); + String fileName = commandLine.getOptionValue(BatchConstants.ARG_CONF); + String cubeName = commandLine.getOptionValue(BatchConstants.ARG_CUBE_NAME); + Preconditions.checkArgument(cubeName != null && fileName != null, "Can't get job config"); + conf.addResource(fileName); + for (Map.Entry entry : CubeManager.getInstance(config).getCube(cubeName).getConfig() + .getMRConfigOverride().entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + List remainingArgs = parser.getRemainingArgs(); + remainingArgs.add("-" + BatchConstants.ARG_CUBE_NAME); + remainingArgs.add(cubeName); + String[] result = new String[remainingArgs.size()]; + return remainingArgs.toArray(result); + } + + private static class CustomParser extends GnuParser { + private List remainingArgs; + + public CustomParser() { + this.remainingArgs = Lists.newArrayList(); + } + + @Override + protected void processOption(final String arg, final ListIterator iter) throws ParseException { + boolean hasOption = getOptions().hasOption(arg); + + if (hasOption) { + super.processOption(arg, iter); + } else { + remainingArgs.add(arg); + remainingArgs.add(iter.next().toString()); + } + } + + public List getRemainingArgs() { + return remainingArgs; + } + } } -- 2.14.1