diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 86f1a71..bd27d99 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -499,6 +499,7 @@
HIVEMERGEMAPFILES("hive.merge.mapfiles", true),
HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false),
+ HIVEMERGETEZFILES("hive.merge.tezfiles", false),
HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long) (256 * 1000 * 1000)),
HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long) (16 * 1000 * 1000)),
HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true),
@@ -560,6 +561,10 @@
HIVEDEBUGLOCALTASK("hive.debug.localtask",false),
HIVEINPUTFORMAT("hive.input.format", "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat"),
+ HIVETEZINPUTFORMAT("hive.tez.input.format", "org.apache.hadoop.hive.ql.io.HiveInputFormat"),
+
+ HIVETEZCONTAINERSIZE("hive.tez.container.size", -1),
+ HIVETEZJAVAOPTS("hive.tez.java.opts", null),
HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
HIVEENFORCESORTING("hive.enforce.sorting", false),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 8c64633..065c397 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -794,6 +794,12 @@
+ hive.merge.tezfiles
+ false
+ Merge small files at the end of a Tez DAG
+
+
+
hive.heartbeat.interval
1000
Send a heartbeat after this interval - used by mapjoin and filter operators
@@ -960,6 +966,12 @@
+ hive.tez.input.format
+ org.apache.hadoop.hive.ql.io.HiveInputFormat
+ The default input format for tez. Tez groups splits in the AM.
+
+
+
hive.udtf.auto.progress
false
Whether Hive should automatically send progress information to TaskTracker when using UDTF's to prevent the task getting killed because of inactivity. Users should be cautious because this may prevent TaskTracker from killing tasks with infinite loops.
@@ -2360,4 +2372,16 @@
+
+ hive.tez.container.size
+ -1
+ By default tez will spawn containers of the size of a mapper. This can be used to overwrite.
+
+
+
+ hive.tez.java.opts
+
+ By default tez will use the java opts from map tasks. This can be used to overwrite.
+
+
diff --git data/conf/tez/hive-site.xml data/conf/tez/hive-site.xml
index d240056..5ff5b4c 100644
--- data/conf/tez/hive-site.xml
+++ data/conf/tez/hive-site.xml
@@ -33,8 +33,26 @@
- mapred.child.java.opts
- -Xmx200m
+ mapred.tez.java.opts
+ -Xmx128m
+
+
+
+ hive.tez.container.size
+ 128
+
+
+
+
+ hive.merge.tezfiles
+ false
+ Merge small files at the end of a Tez DAG
+
+
+
+ hive.tez.input.format
+ org.apache.hadoop.hive.ql.io.HiveInputFormat
+ The default input format for tez. Tez groups splits in the AM.
@@ -172,7 +190,7 @@
hive.input.format
- org.apache.hadoop.hive.ql.io.HiveInputFormat
+ org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
The default input format, if it is not specified, the system assigns it. It is set to HiveInputFormat for hadoop versions 17, 18 and 19, whereas it is set to CombineHiveInputFormat for hadoop 20. The user can always overwrite it - if there is a bug in CombineHiveInputFormat, it can always be manually set to HiveInputFormat.
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 0418d18..23ef69b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -3144,8 +3144,10 @@ public static void setInputPaths(JobConf job, List pathsToAdd) {
* Set hive input format, and input format file if necessary.
*/
public static void setInputAttributes(Configuration conf, MapWork mWork) {
+ HiveConf.ConfVars var = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ?
+ HiveConf.ConfVars.HIVETEZINPUTFORMAT : HiveConf.ConfVars.HIVEINPUTFORMAT;
if (mWork.getInputformat() != null) {
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+ HiveConf.setVar(conf, var, mWork.getInputformat());
}
if (mWork.getIndexIntermediateFile() != null) {
conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile());
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 608c1ed..1fbc57d 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -74,6 +74,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -178,7 +179,7 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
Utilities.setInputAttributes(conf, mapWork);
- String inpFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT);
+ String inpFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZINPUTFORMAT);
if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) {
inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
}
@@ -294,6 +295,35 @@ private EdgeProperty createEdgeProperty(EdgeType edgeType) {
}
/*
+ * Helper to determine the size of the container requested
+ * from yarn. Falls back to Map-reduce's map size if tez
+ * container size isn't set.
+ */
+ private Resource getContainerResource(Configuration conf) {
+ Resource containerResource;
+ int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
+ HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
+ conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+ int cpus = conf.getInt(MRJobConfig.MAP_CPU_VCORES,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ return Resource.newInstance(memory, cpus);
+ }
+
+ /*
+ * Helper to determine what java options to use for the containers
+ * Falls back to Map-reduces map java opts if no tez specific options
+ * are set
+ */
+ private String getContainerJavaOpts(Configuration conf) {
+ String javaOpts = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZJAVAOPTS);
+ if (javaOpts != null && !javaOpts.isEmpty()) {
+ return javaOpts;
+ }
+ return MRHelpers.getMapJavaOpts(conf);
+ }
+
+
+ /*
* Helper function to create Vertex from MapWork.
*/
private Vertex createVertex(JobConf conf, MapWork mapWork,
@@ -344,12 +374,11 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
map = new Vertex(mapWork.getName(),
new ProcessorDescriptor(MapTezProcessor.class.getName()).
- setUserPayload(serializedConf), numTasks,
- MRHelpers.getMapResource(conf));
+ setUserPayload(serializedConf), numTasks, getContainerResource(conf));
Map environment = new HashMap();
MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
map.setTaskEnvironment(environment);
- map.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
+ map.setJavaOpts(getContainerJavaOpts(conf));
assert mapWork.getAliasToWork().keySet().size() == 1;
@@ -419,14 +448,14 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
Vertex reducer = new Vertex(reduceWork.getName(),
new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
- reduceWork.getNumReduceTasks(), MRHelpers.getReduceResource(conf));
+ reduceWork.getNumReduceTasks(), getContainerResource(conf));
Map environment = new HashMap();
MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
reducer.setTaskEnvironment(environment);
- reducer.setJavaOpts(MRHelpers.getReduceJavaOpts(conf));
+ reducer.setJavaOpts(getContainerJavaOpts(conf));
Map localResources = new HashMap();
localResources.put(getBaseName(appJarLr), appJarLr);
@@ -479,7 +508,7 @@ public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig
ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
- PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, MRHelpers.getMapResource(conf),
+ PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, getContainerResource(conf),
numContainers, new VertexLocationHint(null));
Map combinedResources = new HashMap();
@@ -504,7 +533,7 @@ public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig
Map environment = new HashMap();
MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
context.setEnvironment(environment);
- context.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
+ context.setJavaOpts(getContainerJavaOpts(conf));
return context;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
index 427ea12..c39e823 100644
--- ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
+++ ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
@@ -152,6 +152,7 @@ private void createAggregationFunction(List indexTblCols, String pr
HiveConf builderConf = new HiveConf(getConf(), AggregateIndexHandler.class);
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES, false);
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
+ builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false);
Task> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
command, (LinkedHashMap) partSpec, indexTableName, dbName);
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
index 11ddcae..0135a71 100644
--- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
+++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
@@ -144,6 +144,7 @@ public void analyzeIndexDefinition(Table baseTable, Index index,
HiveConf builderConf = new HiveConf(getConf(), CompactIndexHandler.class);
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES, false);
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
+ builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false);
Task> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
command, partSpec, indexTableName, dbName);
return rootTask;
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 9a2182a..c0ca404 100644
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1619,6 +1619,13 @@ public static boolean isMergeRequired(List> mvTasks, HiveConf hco
}
if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) {
+
+ if (currTask.getWork() instanceof TezWork) {
+ // tez blurs the boundary between map and reduce, thus it has it's own
+ // config
+ return hconf.getBoolVar(ConfVars.HIVEMERGETEZFILES);
+ }
+
if (fsOp.getConf().isLinkedFileSink()) {
// If the user has HIVEMERGEMAPREDFILES set to false, the idea was the
// number of reducers are few, so the number of files anyway are small.
@@ -1632,16 +1639,13 @@ public static boolean isMergeRequired(List> mvTasks, HiveConf hco
// There are separate configuration parameters to control whether to
// merge for a map-only job
// or for a map-reduce job
- if (currTask.getWork() instanceof TezWork) {
- return hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
- hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES);
- } else if (currTask.getWork() instanceof MapredWork) {
+ if (currTask.getWork() instanceof MapredWork) {
ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork();
boolean mergeMapOnly =
- hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
boolean mergeMapRed =
- hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
- reduceWork != null;
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
+ reduceWork != null;
if (mergeMapOnly || mergeMapRed) {
return true;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 77388dd..1acd29f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -643,7 +643,9 @@ private String processTable(QB qb, ASTNode tabref) throws SemanticException {
}
private void assertCombineInputFormat(Tree numerator, String message) throws SemanticException {
- String inputFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT);
+ String inputFormat = conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ?
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZINPUTFORMAT):
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT);
if (!inputFormat.equals(CombineHiveInputFormat.class.getName())) {
throw new SemanticException(generateErrorMessage((ASTNode) numerator,
message + " sampling is not supported in " + inputFormat));
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index fa16d0c..a5e6cbf 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -88,10 +88,6 @@ public void init(HiveConf conf, LogHelper console, Hive db) {
// We require the use of recursive input dirs for union processing
conf.setBoolean("mapred.input.dir.recursive", true);
HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
-
- // Don't auto-merge files in tez
- HiveConf.setBoolVar(conf, ConfVars.HIVEMERGEMAPFILES, false);
- HiveConf.setBoolVar(conf, ConfVars.HIVEMERGEMAPREDFILES, false);
}
@Override
diff --git ql/src/test/results/clientpositive/tez/insert1.q.out ql/src/test/results/clientpositive/tez/insert1.q.out
index 3027079..0009d71 100644
--- ql/src/test/results/clientpositive/tez/insert1.q.out
+++ ql/src/test/results/clientpositive/tez/insert1.q.out
@@ -264,10 +264,10 @@ POSTHOOK: Lineage: insert1.value SIMPLE [(insert2)a.FieldSchema(name:value, type
STAGE DEPENDENCIES:
Stage-2 is a root stage
Stage-3 depends on stages: Stage-2
- Stage-0 depends on stages: Stage-3
- Stage-4 depends on stages: Stage-0
Stage-1 depends on stages: Stage-3
- Stage-5 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-3
+ Stage-5 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-2
@@ -313,28 +313,28 @@ STAGE PLANS:
Stage: Stage-3
Dependency Collection
- Stage: Stage-0
+ Stage: Stage-1
Move Operator
tables:
- replace: false
+ replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.insert1
+ name: x.insert1
Stage: Stage-4
Stats-Aggr Operator
- Stage: Stage-1
+ Stage: Stage-0
Move Operator
tables:
- replace: true
+ replace: false
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: x.insert1
+ name: default.insert1
Stage: Stage-5
Stats-Aggr Operator
diff --git ql/src/test/results/clientpositive/tez/load_dyn_part1.q.out ql/src/test/results/clientpositive/tez/load_dyn_part1.q.out
index 898091a..0a36484 100644
--- ql/src/test/results/clientpositive/tez/load_dyn_part1.q.out
+++ ql/src/test/results/clientpositive/tez/load_dyn_part1.q.out
@@ -45,10 +45,10 @@ POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-2 is a root stage
Stage-3 depends on stages: Stage-2
- Stage-0 depends on stages: Stage-3
- Stage-4 depends on stages: Stage-0
Stage-1 depends on stages: Stage-3
- Stage-5 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-3
+ Stage-5 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-2
@@ -94,34 +94,34 @@ STAGE PLANS:
Stage: Stage-3
Dependency Collection
- Stage: Stage-0
+ Stage: Stage-1
Move Operator
tables:
partition:
- ds
+ ds 2008-12-31
hr
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_part1
+ name: default.nzhang_part2
Stage: Stage-4
Stats-Aggr Operator
- Stage: Stage-1
+ Stage: Stage-0
Move Operator
tables:
partition:
- ds 2008-12-31
+ ds
hr
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_part2
+ name: default.nzhang_part1
Stage: Stage-5
Stats-Aggr Operator