Index: ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java
===================================================================
--- ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java (revision 1556697)
+++ ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java (working copy)
@@ -135,6 +135,8 @@
private String clusterMode;
+ private String hiveConfDir;
+
private String runDisabled;
private String hadoopVersion;
@@ -146,6 +148,14 @@
public String getHadoopVersion() {
return hadoopVersion;
}
+
+ public void setHiveConfDir(String hiveConfDir) {
+ this.hiveConfDir = hiveConfDir;
+ }
+
+ public String getHiveConfDir() {
+ return hiveConfDir;
+ }
public void setClusterMode(String clusterMode) {
this.clusterMode = clusterMode;
@@ -416,6 +426,9 @@
if (hadoopVersion == null) {
hadoopVersion = "";
}
+ if (hiveConfDir == null) {
+ hiveConfDir = "";
+ }
// For each of the qFiles generate the test
VelocityContext ctx = new VelocityContext();
@@ -429,6 +442,7 @@
}
ctx.put("logDir", relativePath(hiveRootDir, logDir));
ctx.put("clusterMode", clusterMode);
+ ctx.put("hiveConfDir", hiveConfDir);
ctx.put("hadoopVersion", hadoopVersion);
File outFile = new File(outDir, className + ".java");
Index: common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java (revision 1556697)
+++ common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java (working copy)
@@ -47,6 +47,9 @@
public String getPublisher(Configuration conf) {
return "org.apache.hadoop.hive.ql.stats.CounterStatsPublisher"; }
public String getAggregator(Configuration conf) {
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregatorTez";
+ }
return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregator"; }
},
custom {
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1556697)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -863,7 +863,12 @@
// Whether to show the unquoted partition names in query results.
HIVE_DECODE_PARTITION_NAME("hive.decode.partition.name", false),
- //Vectorization enabled
+ HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr",
+ new StringsValidator("mr", "tez")),
+ HIVE_JAR_DIRECTORY("hive.jar.directory", "hdfs:///user/hive/"),
+ HIVE_USER_INSTALL_DIR("hive.user.install.directory", "hdfs:///user/"),
+
+ // Vectorization enabled
HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false),
HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL("hive.vectorized.groupby.checkinterval", 100000),
HIVE_VECTORIZATION_GROUPBY_MAXENTRIES("hive.vectorized.groupby.maxentries", 1000000),
@@ -872,6 +877,12 @@
HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true),
+ // Whether to send the query plan via local resource or RPC
+ HIVE_RPC_QUERY_PLAN("hive.rpc.query.plan", false),
+
+ // Whether to generate the splits locally or in the AM (tez only)
+ HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true),
+
// none, idonly, traverse, execution
HIVESTAGEIDREARRANGE("hive.stageid.rearrange", "none"),
HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES("hive.explain.dependency.append.tasktype", false),
@@ -1341,7 +1352,11 @@
return hiveDefaultURL;
}
- public URL getHiveSiteLocation() {
+ public static void setHiveSiteLocation(URL location) {
+ hiveSiteURL = location;
+ }
+
+ public static URL getHiveSiteLocation() {
return hiveSiteURL;
}
Index: conf/hive-default.xml.template
===================================================================
--- conf/hive-default.xml.template (revision 1556697)
+++ conf/hive-default.xml.template (working copy)
@@ -2037,6 +2037,14 @@
+ hive.execution.engine
+ mr
+
+ Chooses execution engine. Options are: mr (Map reduce, default) or tez (hadoop 2 only)
+
+
+
+
hive.server2.table.type.mapping
CLASSIC
@@ -2128,4 +2136,49 @@
is also irrelevant.
+
+
+ hive.orc.splits.include.file.footer
+ false
+
+ If turned on splits generated by orc will include metadata about the stripes in the file. This
+ data is read remotely (from the client or HS2 machine) and sent to all the tasks.
+
+
+
+
+ hive.orc.cache.stripe.details.size
+ 10000
+
+ Cache size for keeping meta info about orc splits cached in the client.
+
+
+
+
+ hive.orc.compute.splits.num.threads
+ 10
+
+ How many threads orc should use to create splits in parallel.
+
+
+
+
+ hive.jar.directory
+ hdfs:///user/hive/
+
+ This is the location hive in tez mode will look for to find a site wide
+ installed hive instance.
+
+
+
+
+ hive.user.install.directory
+ hdfs:///user/
+
+ If hive (in tez mode only) cannot find a usable hive jar in "hive.jar.directory",
+ it will upload the hive jar to <hive.user.install.directory>/<user name>
+ and use it to run queries.
+
+
+
Index: data/conf/tez/hive-site.xml
===================================================================
--- data/conf/tez/hive-site.xml (revision 0)
+++ data/conf/tez/hive-site.xml (working copy)
@@ -0,0 +1,197 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ hadoop.tmp.dir
+ ${test.tmp.dir}/hadoop-tmp
+ A base for other temporary directories.
+
+
+
+ mapred.child.java.opts
+ -Xmx200m
+
+
+
+ hive.exec.scratchdir
+ ${test.tmp.dir}/scratchdir
+ Scratch space for Hive jobs
+
+
+
+ hive.exec.local.scratchdir
+ ${test.tmp.dir}/localscratchdir/
+ Local scratch space for Hive jobs
+
+
+
+ javax.jdo.option.ConnectionURL
+ jdbc:derby:;databaseName=${test.tmp.dir}/junit_metastore_db;create=true
+
+
+
+ hive.stats.dbconnectionstring
+ jdbc:derby:;databaseName=${test.tmp.dir}/TempStatsStore;create=true
+
+
+
+
+ javax.jdo.option.ConnectionDriverName
+ org.apache.derby.jdbc.EmbeddedDriver
+
+
+
+ javax.jdo.option.ConnectionUserName
+ APP
+
+
+
+ javax.jdo.option.ConnectionPassword
+ mine
+
+
+
+
+ hive.metastore.warehouse.dir
+ ${test.warehouse.dir}
+
+
+
+
+ hive.metastore.metadb.dir
+ file://${test.tmp.dir}/metadb/
+
+ Required by metastore server or if the uris argument below is not supplied
+
+
+
+
+ test.log.dir
+ ${test.tmp.dir}/log/
+
+
+
+
+ test.data.files
+ ${hive.root}/data/files
+
+
+
+
+ hive.jar.path
+ ${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar
+
+
+
+
+ hive.metastore.rawstore.impl
+ org.apache.hadoop.hive.metastore.ObjectStore
+ Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. This class is used to store and retrieval of raw metadata objects such as table, database
+
+
+
+ hive.querylog.location
+ ${test.tmp.dir}/tmp
+ Location of the structured hive logs
+
+
+
+ hive.exec.pre.hooks
+ org.apache.hadoop.hive.ql.hooks.PreExecutePrinter, org.apache.hadoop.hive.ql.hooks.EnforceReadOnlyTables
+ Pre Execute Hook for Tests
+
+
+
+ hive.exec.post.hooks
+ org.apache.hadoop.hive.ql.hooks.PostExecutePrinter
+ Post Execute Hook for Tests
+
+
+
+ hive.support.concurrency
+ false
+ Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks.
+
+
+
+ fs.pfile.impl
+ org.apache.hadoop.fs.ProxyLocalFileSystem
+ A proxy for local file system used for cross file system testing
+
+
+
+ hive.exec.mode.local.auto
+ false
+
+ Let hive determine whether to run in local mode automatically
+ Disabling this for tests so that minimr is not affected
+
+
+
+
+ hive.auto.convert.join
+ false
+ Whether Hive enable the optimization about converting common join into mapjoin based on the input file size
+
+
+
+ hive.ignore.mapjoin.hint
+ true
+ Whether Hive ignores the mapjoin hint
+
+
+
+ io.sort.mb
+ 10
+
+
+
+ hive.input.format
+ org.apache.hadoop.hive.ql.io.HiveInputFormat
+ The default input format, if it is not specified, the system assigns it. It is set to HiveInputFormat for hadoop versions 17, 18 and 19, whereas it is set to CombineHiveInputFormat for hadoop 20. The user can always overwrite it - if there is a bug in CombineHiveInputFormat, it can always be manually set to HiveInputFormat.
+
+
+
+ hive.default.rcfile.serde
+ org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
+ The default SerDe hive will use for the rcfile format
+
+
+
+ hive.stats.dbclass
+ counter
+ The default storatge that stores temporary hive statistics. Currently, jdbc, hbase and counter type is supported
+
+
+
+ hive.execution.engine
+ tez
+ Whether to use MR or Tez
+
+
+
Property changes on: data/conf/tez/hive-site.xml
___________________________________________________________________
Added: svn:mime-type
## -0,0 +1 ##
+text/xml
\ No newline at end of property
Index: hbase-handler/src/test/templates/TestHBaseCliDriver.vm
===================================================================
--- hbase-handler/src/test/templates/TestHBaseCliDriver.vm (revision 1556697)
+++ hbase-handler/src/test/templates/TestHBaseCliDriver.vm (working copy)
@@ -24,6 +24,7 @@
import java.io.*;
import java.util.*;
+import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
import org.apache.hadoop.hive.hbase.HBaseQTestUtil;
import org.apache.hadoop.hive.hbase.HBaseTestSetup;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -42,10 +43,11 @@
@Override
protected void setUp() {
+
+ MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode");
+
try {
- boolean miniMR = "$clusterMode".equals("miniMR");
qt = new HBaseQTestUtil((HIVE_ROOT + "$resultsDir"), (HIVE_ROOT + "$logDir"), miniMR, setup);
-
} catch (Exception e) {
System.err.println("Exception: " + e.getMessage());
e.printStackTrace();
Index: hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm
===================================================================
--- hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm (revision 1556697)
+++ hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm (working copy)
@@ -25,6 +25,7 @@
import java.io.*;
import java.util.*;
+import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
import org.apache.hadoop.hive.hbase.HBaseQTestUtil;
import org.apache.hadoop.hive.hbase.HBaseTestSetup;
@@ -42,11 +43,11 @@
@Override
protected void setUp() {
+
+ MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode");
+
try {
- boolean miniMR = "$clusterMode".equals("miniMR");
-
qt = new HBaseQTestUtil((HIVE_ROOT + "$resultsDir"), (HIVE_ROOT + "$logDir"), miniMR, setup);
-
} catch (Exception e) {
System.err.println("Exception: " + e.getMessage());
e.printStackTrace();
Index: hcatalog/webhcat/svr/src/test/data/status/hive/stderr
===================================================================
--- hcatalog/webhcat/svr/src/test/data/status/hive/stderr (revision 1556697)
+++ hcatalog/webhcat/svr/src/test/data/status/hive/stderr (working copy)
@@ -19,7 +19,7 @@
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
Logging initialized using configuration in jar:file:/Users/daijy/hadoop-1.0.3/tmp/mapred/local/taskTracker/distcache/7168149899505899073_637041239_1133292873/localhost/apps/templeton/hive-0.10.0.tar.gz/hive-0.10.0/lib/hive-common-0.10.0.jar!/hive-log4j.properties
Hive history file=/tmp/daijy/hive_job_log_daijy_201305091500_862342848.txt
-Total MapReduce jobs = 1
+Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201305091437_0012, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201305091437_0012
Index: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestLocationQueries.java
===================================================================
--- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestLocationQueries.java (revision 1556697)
+++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestLocationQueries.java (working copy)
@@ -24,6 +24,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
+
/**
* Suite for testing location. e.g. if "alter table alter partition
* location" is run, do the partitions end up in the correct location.
@@ -82,7 +84,7 @@
return failedCount;
}
- public CheckResults(String outDir, String logDir, boolean miniMr,
+ public CheckResults(String outDir, String logDir, MiniClusterType miniMr,
String hadoopVer, String locationSubdir)
throws Exception
{
@@ -102,8 +104,9 @@
File[] qfiles = setupQFiles(testNames);
QTestUtil[] qt = new QTestUtil[qfiles.length];
+
for (int i = 0; i < qfiles.length; i++) {
- qt[i] = new CheckResults(resDir, logDir, false, "0.20", "parta");
+ qt[i] = new CheckResults(resDir, logDir, MiniClusterType.none, "0.20", "parta");
qt[i].addFile(qfiles[i]);
qt[i].clearTestSideEffects();
}
Index: itests/qtest/pom.xml
===================================================================
--- itests/qtest/pom.xml (revision 1556697)
+++ itests/qtest/pom.xml (working copy)
@@ -38,6 +38,8 @@
false
stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q
cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q
+ tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q
+ join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q
add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q
@@ -107,7 +109,6 @@
test
-
@@ -260,6 +261,11 @@
test
+ commons-logging
+ commons-logging
+ ${commons-logging.version}
+
+
org.apache.hadoop
hadoop-yarn-server-tests
${hadoop-23.version}
@@ -318,6 +324,48 @@
tests
test
+
+ org.apache.tez
+ tez-tests
+ ${tez.version}
+ test-jar
+
+
+ org.apache.tez
+ tez-api
+ ${tez.version}
+ test
+
+
+ org.apache.tez
+ tez-runtime-library
+ ${tez.version}
+ test
+
+
+ org.apache.tez
+ tez-mapreduce
+ ${tez.version}
+ test
+
+
+ org.apache.tez
+ tez-dag
+ ${tez.version}
+ test
+
+
+ org.apache.tez
+ tez-common
+ ${tez.version}
+ test
+
+
+ org.apache.tez
+ tez-runtime-internals
+ ${tez.version}
+ test
+
@@ -334,6 +382,7 @@
+
+
+
+
+
+
+
+
+
+
qSortSet;
private static final String SORT_SUFFIX = ".sorted";
public static final HashSet srcTables = new HashSet();
+ private static MiniClusterType clusterType = MiniClusterType.none;
private ParseDriver pd;
private Hive db;
protected HiveConf conf;
@@ -215,7 +219,7 @@
}
public QTestUtil(String outDir, String logDir) throws Exception {
- this(outDir, logDir, false, "0.20");
+ this(outDir, logDir, MiniClusterType.none, null, "0.20");
}
public String getOutputDirectory() {
@@ -249,9 +253,8 @@
conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
"org.apache.hadoop.hive.metastore.VerifyingObjectStore");
- if (miniMr) {
+ if (mr != null) {
assert dfs != null;
- assert mr != null;
mr.setupConfiguration(conf);
@@ -310,21 +313,66 @@
return uriStr;
}
- public QTestUtil(String outDir, String logDir, boolean miniMr, String hadoopVer)
+ public enum MiniClusterType {
+ mr,
+ tez,
+ none;
+
+ public static MiniClusterType valueForString(String type) {
+ if (type.equals("miniMR")) {
+ return mr;
+ } else if (type.equals("tez")) {
+ return tez;
+ } else {
+ return none;
+ }
+ }
+ }
+
+ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, String hadoopVer)
throws Exception {
+ this(outDir, logDir, clusterType, null, hadoopVer);
+ }
+
+ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType,
+ String confDir, String hadoopVer)
+ throws Exception {
this.outDir = outDir;
this.logDir = logDir;
+ if (confDir != null && !confDir.isEmpty()) {
+ HiveConf.setHiveSiteLocation(new URL("file://"+confDir+"/hive-site.xml"));
+ System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
+ }
conf = new HiveConf(Driver.class);
- this.miniMr = miniMr;
+ this.miniMr = (clusterType == MiniClusterType.mr);
this.hadoopVer = getHadoopMainVersion(hadoopVer);
qMap = new TreeMap();
qSkipSet = new HashSet();
qSortSet = new HashSet();
+ this.clusterType = clusterType;
- if (miniMr) {
- dfs = ShimLoader.getHadoopShims().getMiniDfs(conf, 4, true, null);
+ HadoopShims shims = ShimLoader.getHadoopShims();
+ int numberOfDataNodes = 4;
+
+ // can run tez tests only on hadoop 2
+ if (clusterType == MiniClusterType.tez) {
+ Assume.assumeTrue(ShimLoader.getMajorVersion().equals("0.23"));
+ // this is necessary temporarily - there's a probem with multi datanodes on MiniTezCluster
+ // will be fixed in 0.3
+ numberOfDataNodes = 1;
+ }
+
+ if (clusterType != MiniClusterType.none) {
+ dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null);
FileSystem fs = dfs.getFileSystem();
- mr = ShimLoader.getHadoopShims().getMiniMrCluster(conf, 4, getHdfsUriString(fs.getUri().toString()), 1);
+ if (clusterType == MiniClusterType.tez) {
+ if (!(shims instanceof Hadoop23Shims)) {
+ throw new Exception("Cannot run tez on hadoop-1, Version: "+this.hadoopVer);
+ }
+ mr = ((Hadoop23Shims)shims).getMiniTezCluster(conf, 4, getHdfsUriString(fs.getUri().toString()), 1);
+ } else {
+ mr = shims.getMiniMrCluster(conf, 4, getHdfsUriString(fs.getUri().toString()), 1);
+ }
}
initConf();
@@ -790,6 +838,11 @@
ss.err = new CachingPrintStream(fo, true, "UTF-8");
ss.setIsSilent(true);
SessionState oldSs = SessionState.get();
+
+ if (oldSs != null && clusterType == MiniClusterType.tez) {
+ oldSs.close();
+ }
+
if (oldSs != null && oldSs.out != null && oldSs.out != System.out) {
oldSs.out.close();
}
@@ -1496,7 +1549,7 @@
{
QTestUtil[] qt = new QTestUtil[qfiles.length];
for (int i = 0; i < qfiles.length; i++) {
- qt[i] = new QTestUtil(resDir, logDir, false, "0.20");
+ qt[i] = new QTestUtil(resDir, logDir, MiniClusterType.none, null, "0.20");
qt[i].addFile(qfiles[i]);
qt[i].clearTestSideEffects();
}
Index: pom.xml
===================================================================
--- pom.xml (revision 1556697)
+++ pom.xml (working copy)
@@ -91,6 +91,7 @@
3.0.1
2.4
2.4
+ 3.1
1.1.3
10.10.1.1
11.0.2
@@ -131,6 +132,7 @@
1.0.1
1.7.5
4.0.4
+ 0.2.0
1.1
0.2
1.4
Index: ql/pom.xml
===================================================================
--- ql/pom.xml (revision 1556697)
+++ ql/pom.xml (working copy)
@@ -82,6 +82,11 @@
${commons-io.version}
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+
+
commons-lang
commons-lang
${commons-lang.version}
@@ -210,6 +215,102 @@
${mockito-all.version}
test
+
+ org.apache.tez
+ tez-api
+ ${tez.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+
+
+ org.apache.tez
+ tez-runtime-library
+ ${tez.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+
+
+ org.apache.tez
+ tez-mapreduce
+ ${tez.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+
@@ -239,6 +340,29 @@
${hadoop-23.version}
true
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop-23.version}
+
+
+ org.apache.hadoop
+ hadoop-yarn-api
+ ${hadoop-23.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-yarn-common
+ ${hadoop-23.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ ${hadoop-23.version}
+ true
+
Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy)
@@ -1212,9 +1212,10 @@
}
- int jobs = Utilities.getMRTasks(plan.getRootTasks()).size();
+ int jobs = Utilities.getMRTasks(plan.getRootTasks()).size()
+ + Utilities.getTezTasks(plan.getRootTasks()).size();
if (jobs > 0) {
- console.printInfo("Total MapReduce jobs = " + jobs);
+ console.printInfo("Total jobs = " + jobs);
}
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS,
Index: ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (working copy)
@@ -366,6 +366,11 @@
UNSUPPORTED_SUBQUERY_EXPRESSION(10249, "Unsupported SubQuery Expression"),
INVALID_SUBQUERY_EXPRESSION(10250, "Invalid SubQuery expression"),
+ INVALID_HDFS_URI(10251, "{0} is not a hdfs uri", true),
+ INVALID_DIR(10252, "{0} is not a directory", true),
+ NO_VALID_LOCATIONS(10253, "Could not find any valid location to place the jars. " +
+ "Please update hive.jar.directory or hive.user.install.directory with a valid location", false),
+
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
+ "It may have crashed with an error."),
Index: ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java (working copy)
@@ -32,7 +32,11 @@
}
public static HashTableLoader getLoader(Configuration hconf) {
- return new org.apache.hadoop.hive.ql.exec.mr.HashTableLoader();
+ if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ return new org.apache.hadoop.hive.ql.exec.tez.HashTableLoader();
+ } else {
+ return new org.apache.hadoop.hive.ql.exec.mr.HashTableLoader();
+ }
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java (working copy)
@@ -26,6 +26,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
@@ -45,7 +48,9 @@
}
public static MapredContext init(boolean isMap, JobConf jobConf) {
- MapredContext context = new MapredContext(isMap, jobConf);
+ MapredContext context =
+ HiveConf.getVar(jobConf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ?
+ new TezContext(isMap, jobConf) : new MapredContext(isMap, jobConf);
contexts.set(context);
return context;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java (working copy)
@@ -35,6 +35,10 @@
* Returns the appropriate cache
*/
public static ObjectCache getCache(Configuration conf) {
- return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache();
+ } else {
+ return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
+ }
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (working copy)
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;
@@ -41,6 +42,7 @@
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
/**
* TaskFactory implementation.
@@ -89,6 +91,7 @@
DependencyCollectionTask.class));
taskvec.add(new TaskTuple(PartialScanWork.class,
PartialScanTask.class));
+ taskvec.add(new TaskTuple(TezWork.class, TezTask.class));
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy)
@@ -88,7 +88,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -98,6 +97,7 @@
import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -109,6 +109,7 @@
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -132,9 +133,12 @@
import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
@@ -180,6 +184,8 @@
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import org.apache.commons.codec.binary.Base64;
+
/**
* Utilities.
*
@@ -301,8 +307,7 @@
try {
path = getPlanPath(conf, name);
assert path != null;
- gWork = gWorkMap.get(path);
- if (gWork == null) {
+ if (!gWorkMap.containsKey(path)) {
Path localPath;
if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
localPath = path;
@@ -309,7 +314,20 @@
} else {
localPath = new Path(name);
}
- in = new FileInputStream(localPath.toUri().getPath());
+
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
+ LOG.debug("Loading plan from string: "+path.toUri().getPath());
+ String planString = conf.get(path.toUri().getPath());
+ if (planString == null) {
+ LOG.debug("Could not find plan string in conf");
+ return null;
+ }
+ byte[] planBytes = Base64.decodeBase64(planString);
+ in = new ByteArrayInputStream(planBytes);
+ } else {
+ in = new FileInputStream(localPath.toUri().getPath());
+ }
+
if(MAP_PLAN_NAME.equals(name)){
if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
gWork = deserializePlan(in, MapWork.class, conf);
@@ -332,6 +350,9 @@
}
}
gWorkMap.put(path, gWork);
+ } else {
+ LOG.debug("Found plan in cache.");
+ gWork = gWorkMap.get(path);
}
return gWork;
} catch (FileNotFoundException fnf) {
@@ -554,26 +575,37 @@
Path planPath = getPlanPath(conf, name);
- // use the default file system of the conf
- FileSystem fs = planPath.getFileSystem(conf);
- FSDataOutputStream out = fs.create(planPath);
- serializePlan(w, out, conf);
+ OutputStream out;
- // Serialize the plan to the default hdfs instance
- // Except for hadoop local mode execution where we should be
- // able to get the plan directly from the cache
- if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) {
- // Set up distributed cache
- if (!DistributedCache.getSymlink(conf)) {
- DistributedCache.createSymlink(conf);
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
+ // add it to the conf
+ out = new ByteArrayOutputStream();
+ serializePlan(w, out, conf);
+ LOG.info("Setting plan: "+planPath.toUri().getPath());
+ conf.set(planPath.toUri().getPath(),
+ Base64.encodeBase64String(((ByteArrayOutputStream)out).toByteArray()));
+ } else {
+ // use the default file system of the conf
+ FileSystem fs = planPath.getFileSystem(conf);
+ out = fs.create(planPath);
+ serializePlan(w, out, conf);
+
+ // Serialize the plan to the default hdfs instance
+ // Except for hadoop local mode execution where we should be
+ // able to get the plan directly from the cache
+ if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) {
+ // Set up distributed cache
+ if (!DistributedCache.getSymlink(conf)) {
+ DistributedCache.createSymlink(conf);
+ }
+ String uriWithLink = planPath.toUri().toString() + "#" + name;
+ DistributedCache.addCacheFile(new URI(uriWithLink), conf);
+
+ // set replication of the plan file to a high number. we use the same
+ // replication factor as used by the hadoop jobclient for job.xml etc.
+ short replication = (short) conf.getInt("mapred.submit.replication", 10);
+ fs.setReplication(planPath, replication);
}
- String uriWithLink = planPath.toUri().toString() + "#" + name;
- DistributedCache.addCacheFile(new URI(uriWithLink), conf);
-
- // set replication of the plan file to a high number. we use the same
- // replication factor as used by the hadoop jobclient for job.xml etc.
- short replication = (short) conf.getInt("mapred.submit.replication", 10);
- fs.setReplication(planPath, replication);
}
// Cache the plan in this process
@@ -2235,6 +2267,26 @@
return true;
}
+ public static List getTezTasks(List> tasks) {
+ List tezTasks = new ArrayList();
+ if (tasks != null) {
+ getTezTasks(tasks, tezTasks);
+ }
+ return tezTasks;
+ }
+
+ private static void getTezTasks(List> tasks, List tezTasks) {
+ for (Task extends Serializable> task : tasks) {
+ if (task instanceof TezTask && !tezTasks.contains((TezTask) task)) {
+ tezTasks.add((TezTask) task);
+ }
+
+ if (task.getDependentTasks() != null) {
+ getTezTasks(task.getDependentTasks(), tezTasks);
+ }
+ }
+ }
+
public static List getMRTasks(List> tasks) {
List mrTasks = new ArrayList();
if (tasks != null) {
@@ -2885,7 +2937,8 @@
pathsProcessed.add(path);
LOG.info("Adding input file " + path);
- if (isEmptyPath(job, path, ctx)) {
+ if (!HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+ && isEmptyPath(job, path, ctx)) {
path = createDummyFileForEmptyPartition(path, job, work,
hiveScratchDir, alias, sequenceNumber++);
@@ -2902,7 +2955,8 @@
// T2) x;
// If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
// rows)
- if (path == null) {
+ if (path == null
+ && !HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
path = createDummyFileForEmptyTable(job, work, hiveScratchDir,
alias, sequenceNumber++);
pathsToAdd.add(path);
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (working copy)
@@ -134,6 +134,12 @@
this.console = console;
this.task = task;
this.callBackObj = hookCallBack;
+
+ if (job != null) {
+ // even with tez on some jobs are run as MR. disable the flag in
+ // the conf, so that the backend runs fully as MR.
+ HiveConf.setVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (working copy)
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure.NullOutputCommitter;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+
+/**
+ * DagUtils. DagUtils is a collection of helper methods to convert
+ * map and reduce work to tez vertices and edges. It handles configuration
+ * objects, file localization and vertex/edge creation.
+ */
+public class DagUtils {
+
+ private static final String TEZ_DIR = "_tez_scratch_dir";
+ private static DagUtils instance;
+
+ /*
+ * Creates the configuration object necessary to run a specific vertex from
+ * map work. This includes input formats, input processor, etc.
+ */
+ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
+ JobConf conf = new JobConf(baseConf);
+
+ if (mapWork.getNumMapTasks() != null) {
+ conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
+ }
+
+ if (mapWork.getMaxSplitSize() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE,
+ mapWork.getMaxSplitSize().longValue());
+ }
+
+ if (mapWork.getMinSplitSize() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE,
+ mapWork.getMinSplitSize().longValue());
+ }
+
+ if (mapWork.getMinSplitSizePerNode() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE,
+ mapWork.getMinSplitSizePerNode().longValue());
+ }
+
+ if (mapWork.getMinSplitSizePerRack() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK,
+ mapWork.getMinSplitSizePerRack().longValue());
+ }
+
+ Utilities.setInputAttributes(conf, mapWork);
+
+ String inpFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT);
+ if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) {
+ inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
+ }
+
+ if (mapWork.isUseBucketizedHiveInputFormat()) {
+ inpFormat = BucketizedHiveInputFormat.class.getName();
+ }
+
+ conf.set("mapred.mapper.class", ExecMapper.class.getName());
+ conf.set("mapred.input.format.class", inpFormat);
+
+ return conf;
+ }
+
+ /**
+ * Given two vertices and their respective configuration objects createEdge
+ * will create an Edge object that connects the two. Currently the edge will
+ * always be a stable bi-partite edge.
+ *
+ * @param vConf JobConf of the first vertex
+ * @param v The first vertex (source)
+ * @param wConf JobConf of the second vertex
+ * @param w The second vertex (sink)
+ * @return
+ */
+ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
+ EdgeType edgeType)
+ throws IOException {
+
+ // Tez needs to setup output subsequent input pairs correctly
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
+
+ // update payloads (configuration for the vertices might have changed)
+ v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
+ w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+
+ DataMovementType dataMovementType;
+ Class logicalInputClass;
+ Class logicalOutputClass;
+
+ switch (edgeType) {
+ case BROADCAST_EDGE:
+ dataMovementType = DataMovementType.BROADCAST;
+ logicalOutputClass = OnFileUnorderedKVOutput.class;
+ logicalInputClass = ShuffledUnorderedKVInput.class;
+ break;
+
+ case SIMPLE_EDGE:
+ default:
+ dataMovementType = DataMovementType.SCATTER_GATHER;
+ logicalOutputClass = OnFileSortedOutput.class;
+ logicalInputClass = ShuffledMergedInputLegacy.class;
+ break;
+ }
+
+ EdgeProperty edgeProperty =
+ new EdgeProperty(dataMovementType,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(logicalOutputClass.getName()),
+ new InputDescriptor(logicalInputClass.getName()));
+ return new Edge(v, w, edgeProperty);
+ }
+
+ /*
+ * Helper function to create Vertex from MapWork.
+ */
+ private Vertex createVertex(JobConf conf, MapWork mapWork,
+ LocalResource appJarLr, List additionalLr, FileSystem fs,
+ Path mrScratchDir, Context ctx) throws Exception {
+
+ Path tezDir = getTezDir(mrScratchDir);
+
+ // set up the operator plan
+ Path planPath = Utilities.setMapWork(conf, mapWork,
+ mrScratchDir.toUri().toString(), false);
+
+ // setup input paths and split info
+ List inputPaths = Utilities.getInputPaths(conf, mapWork,
+ mrScratchDir.toUri().toString(), ctx);
+ Utilities.setInputPaths(conf, inputPaths);
+
+ // create the directories FileSinkOperators need
+ Utilities.createTmpDirs(conf, mapWork);
+
+ // Tez ask us to call this even if there's no preceding vertex
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
+
+ // finally create the vertex
+ Vertex map = null;
+
+ // use tez to combine splits
+ boolean useTezGroupedSplits = false;
+
+ int numTasks = -1;
+ Class amSplitGeneratorClass = null;
+ InputSplitInfo inputSplitInfo = null;
+ Class inputFormatClass = conf.getClass("mapred.input.format.class",
+ InputFormat.class);
+
+ // we'll set up tez to combine spits for us iff the input format
+ // is HiveInputFormat
+ if (inputFormatClass == HiveInputFormat.class) {
+ useTezGroupedSplits = true;
+ conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class);
+ }
+
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) {
+ // if we're generating the splits in the AM, we just need to set
+ // the correct plugin.
+ amSplitGeneratorClass = MRInputAMSplitGenerator.class;
+ } else {
+ // client side split generation means we have to compute them now
+ inputSplitInfo = MRHelpers.generateInputSplits(conf,
+ new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_")));
+ numTasks = inputSplitInfo.getNumTasks();
+ }
+
+ byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
+ map = new Vertex(mapWork.getName(),
+ new ProcessorDescriptor(MapTezProcessor.class.getName()).
+ setUserPayload(serializedConf), numTasks,
+ MRHelpers.getMapResource(conf));
+ Map environment = new HashMap();
+ MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
+ map.setTaskEnvironment(environment);
+ map.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
+
+ assert mapWork.getAliasToWork().keySet().size() == 1;
+
+ String alias = mapWork.getAliasToWork().keySet().iterator().next();
+
+ byte[] mrInput = null;
+ if (useTezGroupedSplits) {
+ mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf,
+ null, HiveInputFormat.class.getName());
+ } else {
+ mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
+ }
+ map.addInput(alias,
+ new InputDescriptor(MRInputLegacy.class.getName()).
+ setUserPayload(mrInput), amSplitGeneratorClass);
+
+ Map localResources = new HashMap();
+ localResources.put(getBaseName(appJarLr), appJarLr);
+ for (LocalResource lr: additionalLr) {
+ localResources.put(getBaseName(lr), lr);
+ }
+
+ if (inputSplitInfo != null) {
+ // only relevant for client-side split generation
+ map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+ MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo,
+ localResources);
+ }
+
+ map.setTaskLocalResources(localResources);
+ return map;
+ }
+
+ /*
+ * Helper function to create JobConf for specific ReduceWork.
+ */
+ private JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) {
+ JobConf conf = new JobConf(baseConf);
+
+ conf.set("mapred.reducer.class", ExecReducer.class.getName());
+
+ boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
+ useSpeculativeExecReducers);
+
+ return conf;
+ }
+
+ /*
+ * Helper function to create Vertex for given ReduceWork.
+ */
+ private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
+ LocalResource appJarLr, List additionalLr, FileSystem fs,
+ Path mrScratchDir, Context ctx) throws Exception {
+
+ // set up operator plan
+ Path planPath = Utilities.setReduceWork(conf, reduceWork,
+ mrScratchDir.toUri().toString(), false);
+
+ // create the directories FileSinkOperators need
+ Utilities.createTmpDirs(conf, reduceWork);
+
+ // Call once here, will be updated when we find edges
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
+
+ // create the vertex
+ Vertex reducer = new Vertex(reduceWork.getName(),
+ new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
+ setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+ reduceWork.getNumReduceTasks(), MRHelpers.getReduceResource(conf));
+
+ Map environment = new HashMap();
+
+ MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
+ reducer.setTaskEnvironment(environment);
+
+ reducer.setJavaOpts(MRHelpers.getReduceJavaOpts(conf));
+
+ Map localResources = new HashMap();
+ localResources.put(getBaseName(appJarLr), appJarLr);
+ for (LocalResource lr: additionalLr) {
+ localResources.put(getBaseName(lr), lr);
+ }
+ reducer.setTaskLocalResources(localResources);
+
+ return reducer;
+ }
+
+ /*
+ * Helper method to create a yarn local resource.
+ */
+ private LocalResource createLocalResource(FileSystem remoteFs, Path file,
+ LocalResourceType type, LocalResourceVisibility visibility) {
+
+ FileStatus fstat = null;
+ try {
+ fstat = remoteFs.getFileStatus(file);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ URL resourceURL = ConverterUtils.getYarnUrlFromPath(file);
+ long resourceSize = fstat.getLen();
+ long resourceModificationTime = fstat.getModificationTime();
+
+ LocalResource lr = Records.newRecord(LocalResource.class);
+ lr.setResource(resourceURL);
+ lr.setType(type);
+ lr.setSize(resourceSize);
+ lr.setVisibility(visibility);
+ lr.setTimestamp(resourceModificationTime);
+
+ return lr;
+ }
+
+ /**
+ * @param conf
+ * @return path to destination directory on hdfs
+ * @throws LoginException if we are unable to figure user information
+ * @throws IOException when any dfs operation fails.
+ */
+ public Path getDefaultDestDir(Configuration conf) throws LoginException, IOException {
+ UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
+ String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
+ String userPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_USER_INSTALL_DIR);
+ Path userPath = new Path(userPathStr);
+ FileSystem fs = userPath.getFileSystem(conf);
+ if (!(fs instanceof DistributedFileSystem)) {
+ throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(userPathStr));
+ }
+
+ String jarPathStr = userPathStr + "/" + userName;
+ String hdfsDirPathStr = jarPathStr;
+ Path hdfsDirPath = new Path(hdfsDirPathStr);
+
+ FileStatus fstatus = fs.getFileStatus(hdfsDirPath);
+ if (!fstatus.isDir()) {
+ throw new IOException(ErrorMsg.INVALID_DIR.format(hdfsDirPath.toString()));
+ }
+
+ Path retPath = new Path(hdfsDirPath.toString() + "/.hiveJars");
+
+ fs.mkdirs(retPath);
+ return retPath;
+ }
+
+ /**
+ * Localizes files, archives and jars the user has instructed us
+ * to provide on the cluster as resources for execution.
+ *
+ * @param conf
+ * @return List local resources to add to execution
+ * @throws IOException when hdfs operation fails
+ * @throws LoginException when getDefaultDestDir fails with the same exception
+ */
+ public List localizeTempFiles(Configuration conf) throws IOException, LoginException {
+ List tmpResources = new ArrayList();
+
+ String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
+ if (StringUtils.isNotBlank(addedFiles)) {
+ HiveConf.setVar(conf, ConfVars.HIVEADDEDFILES, addedFiles);
+ }
+ String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+ if (StringUtils.isNotBlank(addedJars)) {
+ HiveConf.setVar(conf, ConfVars.HIVEADDEDJARS, addedJars);
+ }
+ String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
+ if (StringUtils.isNotBlank(addedArchives)) {
+ HiveConf.setVar(conf, ConfVars.HIVEADDEDARCHIVES, addedArchives);
+ }
+
+ String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
+
+ // need to localize the additional jars and files
+
+ // we need the directory on hdfs to which we shall put all these files
+ String hdfsDirPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_JAR_DIRECTORY);
+ Path hdfsDirPath = new Path(hdfsDirPathStr);
+ FileSystem fs = hdfsDirPath.getFileSystem(conf);
+ if (!(fs instanceof DistributedFileSystem)) {
+ throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hdfsDirPathStr));
+ }
+
+ FileStatus fstatus = null;
+ try {
+ fstatus = fs.getFileStatus(hdfsDirPath);
+ } catch (FileNotFoundException fe) {
+ // do nothing
+ }
+
+ if ((fstatus == null) || (!fstatus.isDir())) {
+ Path destDir = getDefaultDestDir(conf);
+ hdfsDirPathStr = destDir.toString();
+ }
+
+ String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives;
+ String[] allFilesArr = allFiles.split(",");
+ for (String file : allFilesArr) {
+ if (!StringUtils.isNotBlank(file)) {
+ continue;
+ }
+ String hdfsFilePathStr = hdfsDirPathStr + "/" + getResourceBaseName(file);
+ LocalResource localResource = localizeResource(new Path(file),
+ new Path(hdfsFilePathStr), conf);
+ tmpResources.add(localResource);
+ }
+
+ return tmpResources;
+ }
+
+ // the api that finds the jar being used by this class on disk
+ public String getExecJarPathLocal () throws URISyntaxException {
+ // returns the location on disc of the jar of this class.
+ return DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString();
+ }
+
+ /*
+ * Helper function to retrieve the basename of a local resource
+ */
+ public String getBaseName(LocalResource lr) {
+ return FilenameUtils.getName(lr.getResource().getFile());
+ }
+
+ /**
+ * @param pathStr - the string from which we try to determine the resource base name
+ * @return the name of the resource from a given path string.
+ */
+ public String getResourceBaseName(String pathStr) {
+ String[] splits = pathStr.split("/");
+ return splits[splits.length - 1];
+ }
+
+ /**
+ * @param src the source file.
+ * @param dest the destination file.
+ * @param conf the configuration
+ * @return true if the file names match else returns false.
+ * @throws IOException when any file system related call fails
+ */
+ private boolean checkPreExisting(Path src, Path dest, Configuration conf)
+ throws IOException {
+ FileSystem destFS = dest.getFileSystem(conf);
+
+ if (!destFS.exists(dest)) {
+ return false;
+ }
+ FileStatus destStatus = destFS.getFileStatus(dest);
+ if (destStatus.isDir()) {
+ return false;
+ }
+
+ String srcName = getResourceBaseName(src.toString());
+ String destName = getResourceBaseName(dest.toString());
+
+ if (srcName.equals(destName)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param src path to the source for the resource
+ * @param dest path in hdfs for the resource
+ * @param conf
+ * @return localresource from tez localization.
+ * @throws IOException when any file system related calls fails.
+ */
+ public LocalResource localizeResource(Path src, Path dest, Configuration conf)
+ throws IOException {
+ FileSystem destFS = dest.getFileSystem(conf);
+ if (!(destFS instanceof DistributedFileSystem)) {
+ throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(dest.toString()));
+ }
+
+ if (src != null) {
+ if (!checkPreExisting(src, dest, conf)) {
+ // copy the src to the destination and create local resource.
+ // overwrite even if file already exists.
+ destFS.copyFromLocalFile(false, true, src, dest);
+ }
+ }
+
+ return createLocalResource(destFS, dest, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION);
+ }
+
+ /**
+ * Creates and initializes a JobConf object that can be used to execute
+ * the DAG. The configuration object will contain configurations from mapred-site
+ * overlaid with key/value pairs from the hiveConf object. Finally it will also
+ * contain some hive specific configurations that do not change from DAG to DAG.
+ *
+ * @param hiveConf Current hiveConf for the execution
+ * @return JobConf base configuration for job execution
+ * @throws IOException
+ */
+ public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
+ hiveConf.setBoolean("mapred.mapper.new-api", false);
+
+ JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf);
+
+ conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
+
+ conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+ conf.setBoolean("mapred.committer.job.task.cleanup.needed", false);
+
+ conf.setClass("mapred.output.format.class", HiveOutputFormatImpl.class, OutputFormat.class);
+
+ conf.set(MRJobConfig.OUTPUT_KEY_CLASS, HiveKey.class.getName());
+ conf.set(MRJobConfig.OUTPUT_VALUE_CLASS, BytesWritable.class.getName());
+
+ conf.set("mapred.partitioner.class", HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER));
+ conf.set("tez.runtime.partitioner.class", MRPartitioner.class.getName());
+
+ return conf;
+ }
+
+ /**
+ * Creates and initializes the JobConf object for a given BaseWork object.
+ *
+ * @param conf Any configurations in conf will be copied to the resulting new JobConf object.
+ * @param work BaseWork will be used to populate the configuration object.
+ * @return JobConf new configuration object
+ */
+ public JobConf initializeVertexConf(JobConf conf, BaseWork work) {
+
+ // simply dispatch the call to the right method for the actual (sub-) type of
+ // BaseWork.
+ if (work instanceof MapWork) {
+ return initializeVertexConf(conf, (MapWork)work);
+ } else if (work instanceof ReduceWork) {
+ return initializeVertexConf(conf, (ReduceWork)work);
+ } else {
+ assert false;
+ return null;
+ }
+ }
+
+ /**
+ * Create a vertex from a given work object.
+ *
+ * @param conf JobConf to be used to this execution unit
+ * @param work The instance of BaseWork representing the actual work to be performed
+ * by this vertex.
+ * @param scratchDir HDFS scratch dir for this execution unit.
+ * @param appJarLr Local resource for hive-exec.
+ * @param additionalLr
+ * @param fileSystem FS corresponding to scratchDir and LocalResources
+ * @param ctx This query's context
+ * @return Vertex
+ */
+ public Vertex createVertex(JobConf conf, BaseWork work,
+ Path scratchDir, LocalResource appJarLr, List additionalLr,
+ FileSystem fileSystem, Context ctx, boolean hasChildren) throws Exception {
+
+ Vertex v = null;
+ // simply dispatch the call to the right method for the actual (sub-) type of
+ // BaseWork.
+ if (work instanceof MapWork) {
+ v = createVertex(conf, (MapWork) work, appJarLr,
+ additionalLr, fileSystem, scratchDir, ctx);
+ } else if (work instanceof ReduceWork) {
+ v = createVertex(conf, (ReduceWork) work, appJarLr,
+ additionalLr, fileSystem, scratchDir, ctx);
+ } else {
+ // something is seriously wrong if this is happening
+ throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
+ }
+
+ // initialize stats publisher if necessary
+ if (work.isGatheringStats()) {
+ StatsPublisher statsPublisher;
+ StatsFactory factory = StatsFactory.newFactory(conf);
+ if (factory != null) {
+ statsPublisher = factory.getStatsPublisher();
+ if (!statsPublisher.init(conf)) { // creating stats table if not exists
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
+ throw
+ new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+ }
+ }
+ }
+ }
+
+
+ // final vertices need to have at least one output
+ if (!hasChildren) {
+ v.addOutput("out_"+work.getName(),
+ new OutputDescriptor(MROutput.class.getName())
+ .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+ }
+
+ return v;
+ }
+
+ /**
+ * createTezDir creates a temporary directory in the scratchDir folder to
+ * be used with Tez. Assumes scratchDir exists.
+ */
+ public Path createTezDir(Path scratchDir, Configuration conf)
+ throws IOException {
+ Path tezDir = getTezDir(scratchDir);
+ FileSystem fs = tezDir.getFileSystem(conf);
+ fs.mkdirs(tezDir);
+ return tezDir;
+ }
+
+ /**
+ * Gets the tez dir that belongs to the hive scratch dir
+ */
+ public Path getTezDir(Path scratchDir) {
+ return new Path(scratchDir, TEZ_DIR);
+ }
+
+ /**
+ * Singleton
+ * @return instance of this class
+ */
+ public static DagUtils getInstance() {
+ if (instance == null) {
+ instance = new DagUtils();
+ }
+ return instance;
+ }
+
+ private DagUtils() {
+ // don't instantiate
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (working copy)
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * HashTableLoader for Tez constructs the hashtable from records read from
+ * a broadcast edge.
+ */
+public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader {
+
+ private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
+
+ public HashTableLoader() {
+ }
+
+ @Override
+ public void load(ExecMapperContext context,
+ Configuration hconf,
+ MapJoinDesc desc,
+ byte posBigTable,
+ MapJoinTableContainer[] mapJoinTables,
+ MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+
+ TezContext tezContext = (TezContext) MapredContext.get();
+ Map parentToInput = desc.getParentToInput();
+ int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
+ float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
+ HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
+
+ for (int pos = 0; pos < mapJoinTables.length; pos++) {
+ if (pos == posBigTable) {
+ continue;
+ }
+
+ LogicalInput input = tezContext.getInput(parentToInput.get(pos));
+
+ try {
+ KeyValueReader kvReader = (KeyValueReader) input.getReader();
+
+ MapJoinTableContainer tableContainer = new HashMapWrapper(hashTableThreshold,
+ hashTableLoadFactor);
+
+ // simply read all the kv pairs into the hashtable.
+ while (kvReader.next()) {
+ MapJoinKey key = new MapJoinKey();
+ key.read(mapJoinTableSerdes[pos].getKeyContext(), (Writable)kvReader.getCurrentKey());
+
+ MapJoinRowContainer values = tableContainer.get(key);
+ if(values == null){
+ values = new MapJoinRowContainer();
+ tableContainer.put(key, values);
+ }
+ values.read(mapJoinTableSerdes[pos].getValueContext(), (Writable)kvReader.getCurrentValue());
+ }
+
+ mapJoinTables[pos] = tableContainer;
+ } catch (IOException e) {
+ throw new HiveException(e);
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (working copy)
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * Process input from tez LogicalInput and write output - for a map plan
+ * Just pump the records through the query plan.
+ */
+public class MapRecordProcessor extends RecordProcessor{
+
+
+ private MapOperator mapOp;
+ public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
+ private final ExecMapperContext execContext = new ExecMapperContext();
+ private boolean abort = false;
+ protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
+ private MapWork mapWork;
+
+ @Override
+ void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs,
+ Map outMap){
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+ super.init(jconf, mrReporter, inputs, outMap);
+
+ //Update JobConf using MRInput, info like filename comes via this
+ MRInputLegacy mrInput = getMRInput(inputs);
+ try {
+ mrInput.init();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed while initializing MRInput", e);
+ }
+ Configuration updatedConf = mrInput.getConfigUpdates();
+ if (updatedConf != null) {
+ for (Entry entry : updatedConf) {
+ jconf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+ try {
+
+ execContext.setJc(jconf);
+ // create map and fetch operators
+ mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+ if (mapWork == null) {
+ mapWork = Utilities.getMapWork(jconf);
+ cache.cache(MAP_PLAN_KEY, mapWork);
+ l4j.info("Plan: "+mapWork);
+ for (String s: mapWork.getAliases()) {
+ l4j.info("Alias: "+s);
+ }
+ } else {
+ Utilities.setMapWork(jconf, mapWork);
+ }
+ if (mapWork.getVectorMode()) {
+ mapOp = new VectorMapOperator();
+ } else {
+ mapOp = new MapOperator();
+ }
+
+ // initialize map operator
+ mapOp.setConf(mapWork);
+ mapOp.setChildren(jconf);
+ l4j.info(mapOp.dump(0));
+
+ MapredContext.init(true, new JobConf(jconf));
+ ((TezContext)MapredContext.get()).setInputs(inputs);
+ mapOp.setExecContext(execContext);
+ mapOp.initializeLocalWork(jconf);
+ mapOp.initialize(jconf, null);
+
+ // Initialization isn't finished until all parents of all operators
+ // are initialized. For broadcast joins that means initializing the
+ // dummy parent operators as well.
+ List dummyOps = mapWork.getDummyOps();
+ if (dummyOps != null) {
+ for (Operator extends OperatorDesc> dummyOp : dummyOps){
+ dummyOp.setExecContext(execContext);
+ dummyOp.initialize(jconf, null);
+ }
+ }
+
+ OperatorUtils.setChildrenCollector(mapOp.getChildOperators(), outMap);
+ mapOp.setReporter(reporter);
+ MapredContext.get().setReporter(reporter);
+
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // will this be true here?
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ throw new RuntimeException("Map operator initialization failed", e);
+ }
+ }
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+ }
+
+ private MRInputLegacy getMRInput(Map inputs) {
+ //there should be only one MRInput
+ MRInputLegacy theMRInput = null;
+ for(LogicalInput inp : inputs.values()){
+ if(inp instanceof MRInputLegacy){
+ if(theMRInput != null){
+ throw new IllegalArgumentException("Only one MRInput is expected");
+ }
+ //a better logic would be to find the alias
+ theMRInput = (MRInputLegacy)inp;
+ }
+ }
+ return theMRInput;
+ }
+
+ @Override
+ void run() throws IOException{
+
+ MRInputLegacy in = getMRInput(inputs);
+ KeyValueReader reader = in.getReader();
+
+ //process records until done
+ while(reader.next()){
+ //ignore the key for maps - reader.getCurrentKey();
+ Object value = reader.getCurrentValue();
+ boolean needMore = processRow(value);
+ if(!needMore){
+ break;
+ }
+ }
+ }
+
+
+ /**
+ * @param value value to process
+ * @return true if it is not done and can take more inputs
+ */
+ private boolean processRow(Object value) {
+ // reset the execContext for each new row
+ execContext.resetRow();
+
+ try {
+ if (mapOp.getDone()) {
+ return false; //done
+ } else {
+ // Since there is no concept of a group, we don't invoke
+ // startGroup/endGroup for a mapper
+ mapOp.process((Writable)value);
+ if (isLogInfoEnabled) {
+ logProgress();
+ }
+ }
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ l4j.fatal(StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+ return true; //give me more
+ }
+
+ @Override
+ void close(){
+ // check if there are IOExceptions
+ if (!abort) {
+ abort = execContext.getIoCxt().getIOExceptions();
+ }
+
+ // detecting failed executions by exceptions thrown by the operator tree
+ try {
+ mapOp.close(abort);
+
+ // Need to close the dummyOps as well. The operator pipeline
+ // is not considered "closed/done" unless all operators are
+ // done. For broadcast joins that includes the dummy parents.
+ List dummyOps = mapWork.getDummyOps();
+ if (dummyOps != null) {
+ for (Operator extends OperatorDesc> dummyOp : dummyOps){
+ dummyOp.close(abort);
+ }
+ }
+
+ if (isLogInfoEnabled) {
+ logCloseInfo();
+ }
+ reportStats rps = new reportStats(reporter);
+ mapOp.preorderMap(rps);
+ return;
+ } catch (Exception e) {
+ if (!abort) {
+ // signal new failure to map-reduce
+ l4j.error("Hit error while closing operators - failing tree");
+ throw new RuntimeException("Hive Runtime Error while closing operators", e);
+ }
+ } finally {
+ Utilities.clearWorkMap();
+ MapredContext.close();
+ }
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (working copy)
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+/**
+ * Subclass that is used to indicate if this is a map or reduce process
+ */
+public class MapTezProcessor extends TezProcessor {
+ public MapTezProcessor(){
+ super(true);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (working copy)
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+
+
+/**
+ * ObjectCache. Tez implementation based on the tez object registry.
+ *
+ */
+public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
+
+ private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
+ private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+
+ @Override
+ public void cache(String key, Object value) {
+ LOG.info("Adding " + key + " to cache with value " + value);
+ registry.add(ObjectLifeCycle.VERTEX, key, value);
+ }
+
+ @Override
+ public Object retrieve(String key) {
+ Object o = registry.get(key);
+ if (o != null) {
+ LOG.info("Found " + key + " in cache with value: " + o);
+ }
+ return o;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (working copy)
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.LogicalInput;
+
+/**
+ * Process input from tez LogicalInput and write output
+ * It has different subclasses for map and reduce processing
+ */
+public abstract class RecordProcessor {
+
+ protected JobConf jconf;
+ protected Map inputs;
+ protected Map outMap;
+
+ public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
+
+
+ // used to log memory usage periodically
+ public static MemoryMXBean memoryMXBean;
+ protected boolean isLogInfoEnabled = false;
+ protected MRTaskReporter reporter;
+
+ private long numRows = 0;
+ private long nextUpdateCntr = 1;
+ protected PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ protected String CLASS_NAME = RecordProcessor.class.getName();
+
+
+ /**
+ * Common initialization code for RecordProcessors
+ * @param jconf
+ * @param mrReporter
+ * @param inputs
+ * @param out
+ */
+ void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs,
+ Map outMap){
+ this.jconf = jconf;
+ this.reporter = mrReporter;
+ this.inputs = inputs;
+ this.outMap = outMap;
+
+ // Allocate the bean at the beginning -
+ memoryMXBean = ManagementFactory.getMemoryMXBean();
+
+ l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+
+ isLogInfoEnabled = l4j.isInfoEnabled();
+
+ //log classpaths
+ try {
+ if (l4j.isDebugEnabled()) {
+ l4j.debug("conf classpath = "
+ + Arrays.asList(((URLClassLoader) jconf.getClassLoader()).getURLs()));
+ l4j.debug("thread classpath = "
+ + Arrays.asList(((URLClassLoader) Thread.currentThread()
+ .getContextClassLoader()).getURLs()));
+ }
+ } catch (Exception e) {
+ l4j.info("cannot get classpath: " + e.getMessage());
+ }
+ }
+
+ /**
+ * start processing the inputs and writing output
+ * @throws IOException
+ */
+ abstract void run() throws IOException;
+
+
+ abstract void close();
+
+ /**
+ * Log information to be logged at the end
+ */
+ protected void logCloseInfo() {
+ long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
+ + used_memory);
+ }
+
+ /**
+ * Log number of records processed and memory used after processing many records
+ */
+ protected void logProgress() {
+ numRows++;
+ if (numRows == nextUpdateCntr) {
+ long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ l4j.info("ExecMapper: processing " + numRows
+ + " rows: used memory = " + used_memory);
+ nextUpdateCntr = getNextUpdateRecordCounter(numRows);
+ }
+ }
+
+ private long getNextUpdateRecordCounter(long cntr) {
+ // A very simple counter to keep track of number of rows processed by the
+ // reducer. It dumps
+ // every 1 million times, and quickly before that
+ if (cntr >= 1000000) {
+ return cntr + 1000000;
+ }
+
+ return 10 * cntr;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (working copy)
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+
+/**
+ * Process input from tez LogicalInput and write output - for a map plan
+ * Just pump the records through the query plan.
+ */
+public class ReduceRecordProcessor extends RecordProcessor{
+
+ private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__";
+
+ public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class);
+ private final ExecMapperContext execContext = new ExecMapperContext();
+ private boolean abort = false;
+ private Deserializer inputKeyDeserializer;
+
+ // Input value serde needs to be an array to support different SerDe
+ // for different tags
+ private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
+
+ TableDesc keyTableDesc;
+ TableDesc[] valueTableDesc;
+
+ ObjectInspector[] rowObjectInspector;
+ private Operator> reducer;
+ private boolean isTagged = false;
+
+ private Object keyObject = null;
+ private BytesWritable groupKey;
+
+ private ReduceWork redWork;
+
+ List