Index: ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java
===================================================================
--- ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java (revision 1556697)
+++ ant/src/org/apache/hadoop/hive/ant/QTestGenTask.java (working copy)
@@ -135,6 +135,8 @@
private String clusterMode;
+ private String hiveConfDir;
+
private String runDisabled;
private String hadoopVersion;
@@ -146,6 +148,14 @@
public String getHadoopVersion() {
return hadoopVersion;
}
+
+ public void setHiveConfDir(String hiveConfDir) {
+ this.hiveConfDir = hiveConfDir;
+ }
+
+ public String getHiveConfDir() {
+ return hiveConfDir;
+ }
public void setClusterMode(String clusterMode) {
this.clusterMode = clusterMode;
@@ -416,6 +426,9 @@
if (hadoopVersion == null) {
hadoopVersion = "";
}
+ if (hiveConfDir == null) {
+ hiveConfDir = "";
+ }
// For each of the qFiles generate the test
VelocityContext ctx = new VelocityContext();
@@ -429,6 +442,7 @@
}
ctx.put("logDir", relativePath(hiveRootDir, logDir));
ctx.put("clusterMode", clusterMode);
+ ctx.put("hiveConfDir", hiveConfDir);
ctx.put("hadoopVersion", hadoopVersion);
File outFile = new File(outDir, className + ".java");
Index: common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java (revision 1556697)
+++ common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java (working copy)
@@ -47,6 +47,9 @@
public String getPublisher(Configuration conf) {
return "org.apache.hadoop.hive.ql.stats.CounterStatsPublisher"; }
public String getAggregator(Configuration conf) {
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregatorTez";
+ }
return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregator"; }
},
custom {
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1556697)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -863,7 +863,12 @@
// Whether to show the unquoted partition names in query results.
HIVE_DECODE_PARTITION_NAME("hive.decode.partition.name", false),
- //Vectorization enabled
+ HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr",
+ new StringsValidator("mr", "tez")),
+ HIVE_JAR_DIRECTORY("hive.jar.directory", "hdfs:///user/hive/"),
+ HIVE_USER_INSTALL_DIR("hive.user.install.directory", "hdfs:///user/"),
+
+ // Vectorization enabled
HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false),
HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL("hive.vectorized.groupby.checkinterval", 100000),
HIVE_VECTORIZATION_GROUPBY_MAXENTRIES("hive.vectorized.groupby.maxentries", 1000000),
@@ -872,6 +877,12 @@
HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true),
+ // Whether to send the query plan via local resource or RPC
+ HIVE_RPC_QUERY_PLAN("hive.rpc.query.plan", false),
+
+ // Whether to generate the splits locally or in the AM (tez only)
+ HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true),
+
// none, idonly, traverse, execution
HIVESTAGEIDREARRANGE("hive.stageid.rearrange", "none"),
HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES("hive.explain.dependency.append.tasktype", false),
@@ -1341,7 +1352,11 @@
return hiveDefaultURL;
}
- public URL getHiveSiteLocation() {
+ public static void setHiveSiteLocation(URL location) {
+ hiveSiteURL = location;
+ }
+
+ public static URL getHiveSiteLocation() {
return hiveSiteURL;
}
Index: conf/hive-default.xml.template
===================================================================
--- conf/hive-default.xml.template (revision 1556697)
+++ conf/hive-default.xml.template (working copy)
@@ -2037,6 +2037,14 @@
+ hive.execution.engine
+ mr
+
+ Chooses execution engine. Options are: mr (Map reduce, default) or tez (hadoop 2 only)
+
+
+
+
hive.server2.table.type.mapping
CLASSIC
@@ -2128,4 +2136,49 @@
is also irrelevant.
+
+
+ hive.orc.splits.include.file.footer
+ false
+
+ If turned on splits generated by orc will include metadata about the stripes in the file. This
+ data is read remotely (from the client or HS2 machine) and sent to all the tasks.
+
+
+
+
+ hive.orc.cache.stripe.details.size
+ 10000
+
+ Cache size for keeping meta info about orc splits cached in the client.
+
+
+
+
+ hive.orc.compute.splits.num.threads
+ 10
+
+ How many threads orc should use to create splits in parallel.
+
+
+
+
+ hive.jar.directory
+ hdfs:///user/hive/
+
+ This is the location hive in tez mode will look for to find a site wide
+ installed hive instance.
+
+
+
+
+ hive.user.install.directory
+ hdfs:///user/
+
+ If hive (in tez mode only) cannot find a usable hive jar in "hive.jar.directory",
+ it will upload the hive jar to <hive.user.install.directory>/<user name>
+ and use it to run queries.
+
+
+
Index: data/conf/tez/hive-site.xml
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/xml
Index: data/conf/tez/hive-site.xml
===================================================================
--- data/conf/tez/hive-site.xml (revision 0)
+++ data/conf/tez/hive-site.xml (working copy)
Property changes on: data/conf/tez/hive-site.xml
___________________________________________________________________
Added: svn:mime-type
## -0,0 +1 ##
+application/xml
\ No newline at end of property
Index: hbase-handler/src/test/templates/TestHBaseCliDriver.vm
===================================================================
--- hbase-handler/src/test/templates/TestHBaseCliDriver.vm (revision 1556697)
+++ hbase-handler/src/test/templates/TestHBaseCliDriver.vm (working copy)
@@ -24,6 +24,7 @@
import java.io.*;
import java.util.*;
+import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
import org.apache.hadoop.hive.hbase.HBaseQTestUtil;
import org.apache.hadoop.hive.hbase.HBaseTestSetup;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -42,10 +43,11 @@
@Override
protected void setUp() {
+
+ MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode");
+
try {
- boolean miniMR = "$clusterMode".equals("miniMR");
qt = new HBaseQTestUtil((HIVE_ROOT + "$resultsDir"), (HIVE_ROOT + "$logDir"), miniMR, setup);
-
} catch (Exception e) {
System.err.println("Exception: " + e.getMessage());
e.printStackTrace();
Index: hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm
===================================================================
--- hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm (revision 1556697)
+++ hbase-handler/src/test/templates/TestHBaseNegativeCliDriver.vm (working copy)
@@ -25,6 +25,7 @@
import java.io.*;
import java.util.*;
+import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
import org.apache.hadoop.hive.hbase.HBaseQTestUtil;
import org.apache.hadoop.hive.hbase.HBaseTestSetup;
@@ -42,11 +43,11 @@
@Override
protected void setUp() {
+
+ MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode");
+
try {
- boolean miniMR = "$clusterMode".equals("miniMR");
-
qt = new HBaseQTestUtil((HIVE_ROOT + "$resultsDir"), (HIVE_ROOT + "$logDir"), miniMR, setup);
-
} catch (Exception e) {
System.err.println("Exception: " + e.getMessage());
e.printStackTrace();
Index: hcatalog/webhcat/svr/src/test/data/status/hive/stderr
===================================================================
--- hcatalog/webhcat/svr/src/test/data/status/hive/stderr (revision 1556697)
+++ hcatalog/webhcat/svr/src/test/data/status/hive/stderr (working copy)
@@ -19,7 +19,7 @@
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
Logging initialized using configuration in jar:file:/Users/daijy/hadoop-1.0.3/tmp/mapred/local/taskTracker/distcache/7168149899505899073_637041239_1133292873/localhost/apps/templeton/hive-0.10.0.tar.gz/hive-0.10.0/lib/hive-common-0.10.0.jar!/hive-log4j.properties
Hive history file=/tmp/daijy/hive_job_log_daijy_201305091500_862342848.txt
-Total MapReduce jobs = 1
+Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201305091437_0012, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201305091437_0012
Index: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestLocationQueries.java
===================================================================
--- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestLocationQueries.java (revision 1556697)
+++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestLocationQueries.java (working copy)
@@ -24,6 +24,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
+
/**
* Suite for testing location. e.g. if "alter table alter partition
* location" is run, do the partitions end up in the correct location.
@@ -82,7 +84,7 @@
return failedCount;
}
- public CheckResults(String outDir, String logDir, boolean miniMr,
+ public CheckResults(String outDir, String logDir, MiniClusterType miniMr,
String hadoopVer, String locationSubdir)
throws Exception
{
@@ -102,8 +104,9 @@
File[] qfiles = setupQFiles(testNames);
QTestUtil[] qt = new QTestUtil[qfiles.length];
+
for (int i = 0; i < qfiles.length; i++) {
- qt[i] = new CheckResults(resDir, logDir, false, "0.20", "parta");
+ qt[i] = new CheckResults(resDir, logDir, MiniClusterType.none, "0.20", "parta");
qt[i].addFile(qfiles[i]);
qt[i].clearTestSideEffects();
}
Index: itests/qtest/pom.xml
===================================================================
--- itests/qtest/pom.xml (revision 1556697)
+++ itests/qtest/pom.xml (working copy)
@@ -38,6 +38,8 @@
false
stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q
cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q
+ tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q
+ join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q
add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q
@@ -107,7 +109,6 @@
test
-
@@ -260,6 +261,11 @@
test
+ commons-logging
+ commons-logging
+ ${commons-logging.version}
+
+
org.apache.hadoop
hadoop-yarn-server-tests
${hadoop-23.version}
@@ -318,6 +324,48 @@
tests
test
+
+ org.apache.tez
+ tez-tests
+ ${tez.version}
+ test-jar
+
+
+ org.apache.tez
+ tez-api
+ ${tez.version}
+ test
+
+
+ org.apache.tez
+ tez-runtime-library
+ ${tez.version}
+ test
+
+
+ org.apache.tez
+ tez-mapreduce
+ ${tez.version}
+ test
+
+
+ org.apache.tez
+ tez-dag
+ ${tez.version}
+ test
+
+
+ org.apache.tez
+ tez-common
+ ${tez.version}
+ test
+
+
+ org.apache.tez
+ tez-runtime-internals
+ ${tez.version}
+ test
+
@@ -334,6 +382,7 @@
+
+
+
+
+
+
+
+
+
+
qSortSet;
private static final String SORT_SUFFIX = ".sorted";
public static final HashSet srcTables = new HashSet();
+ private static MiniClusterType clusterType = MiniClusterType.none;
private ParseDriver pd;
private Hive db;
protected HiveConf conf;
@@ -215,7 +219,7 @@
}
public QTestUtil(String outDir, String logDir) throws Exception {
- this(outDir, logDir, false, "0.20");
+ this(outDir, logDir, MiniClusterType.none, null, "0.20");
}
public String getOutputDirectory() {
@@ -249,9 +253,8 @@
conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
"org.apache.hadoop.hive.metastore.VerifyingObjectStore");
- if (miniMr) {
+ if (mr != null) {
assert dfs != null;
- assert mr != null;
mr.setupConfiguration(conf);
@@ -310,21 +313,66 @@
return uriStr;
}
- public QTestUtil(String outDir, String logDir, boolean miniMr, String hadoopVer)
+ public enum MiniClusterType {
+ mr,
+ tez,
+ none;
+
+ public static MiniClusterType valueForString(String type) {
+ if (type.equals("miniMR")) {
+ return mr;
+ } else if (type.equals("tez")) {
+ return tez;
+ } else {
+ return none;
+ }
+ }
+ }
+
+ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, String hadoopVer)
throws Exception {
+ this(outDir, logDir, clusterType, null, hadoopVer);
+ }
+
+ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType,
+ String confDir, String hadoopVer)
+ throws Exception {
this.outDir = outDir;
this.logDir = logDir;
+ if (confDir != null && !confDir.isEmpty()) {
+ HiveConf.setHiveSiteLocation(new URL("file://"+confDir+"/hive-site.xml"));
+ System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
+ }
conf = new HiveConf(Driver.class);
- this.miniMr = miniMr;
+ this.miniMr = (clusterType == MiniClusterType.mr);
this.hadoopVer = getHadoopMainVersion(hadoopVer);
qMap = new TreeMap();
qSkipSet = new HashSet();
qSortSet = new HashSet();
+ this.clusterType = clusterType;
- if (miniMr) {
- dfs = ShimLoader.getHadoopShims().getMiniDfs(conf, 4, true, null);
+ HadoopShims shims = ShimLoader.getHadoopShims();
+ int numberOfDataNodes = 4;
+
+ // can run tez tests only on hadoop 2
+ if (clusterType == MiniClusterType.tez) {
+ Assume.assumeTrue(ShimLoader.getMajorVersion().equals("0.23"));
+ // this is necessary temporarily - there's a probem with multi datanodes on MiniTezCluster
+ // will be fixed in 0.3
+ numberOfDataNodes = 1;
+ }
+
+ if (clusterType != MiniClusterType.none) {
+ dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null);
FileSystem fs = dfs.getFileSystem();
- mr = ShimLoader.getHadoopShims().getMiniMrCluster(conf, 4, getHdfsUriString(fs.getUri().toString()), 1);
+ if (clusterType == MiniClusterType.tez) {
+ if (!(shims instanceof Hadoop23Shims)) {
+ throw new Exception("Cannot run tez on hadoop-1, Version: "+this.hadoopVer);
+ }
+ mr = ((Hadoop23Shims)shims).getMiniTezCluster(conf, 4, getHdfsUriString(fs.getUri().toString()), 1);
+ } else {
+ mr = shims.getMiniMrCluster(conf, 4, getHdfsUriString(fs.getUri().toString()), 1);
+ }
}
initConf();
@@ -790,6 +838,11 @@
ss.err = new CachingPrintStream(fo, true, "UTF-8");
ss.setIsSilent(true);
SessionState oldSs = SessionState.get();
+
+ if (oldSs != null && clusterType == MiniClusterType.tez) {
+ oldSs.close();
+ }
+
if (oldSs != null && oldSs.out != null && oldSs.out != System.out) {
oldSs.out.close();
}
@@ -1496,7 +1549,7 @@
{
QTestUtil[] qt = new QTestUtil[qfiles.length];
for (int i = 0; i < qfiles.length; i++) {
- qt[i] = new QTestUtil(resDir, logDir, false, "0.20");
+ qt[i] = new QTestUtil(resDir, logDir, MiniClusterType.none, null, "0.20");
qt[i].addFile(qfiles[i]);
qt[i].clearTestSideEffects();
}
Index: pom.xml
===================================================================
--- pom.xml (revision 1556697)
+++ pom.xml (working copy)
@@ -91,6 +91,7 @@
3.0.1
2.4
2.4
+ 3.1
1.1.3
10.10.1.1
11.0.2
@@ -131,6 +132,7 @@
1.0.1
1.7.5
4.0.4
+ 0.2.0
1.1
0.2
1.4
Index: ql/pom.xml
===================================================================
--- ql/pom.xml (revision 1556697)
+++ ql/pom.xml (working copy)
@@ -82,6 +82,11 @@
${commons-io.version}
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+
+
commons-lang
commons-lang
${commons-lang.version}
@@ -210,6 +215,102 @@
${mockito-all.version}
test
+
+ org.apache.tez
+ tez-api
+ ${tez.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+
+
+ org.apache.tez
+ tez-runtime-library
+ ${tez.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+
+
+ org.apache.tez
+ tez-mapreduce
+ ${tez.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+
@@ -239,6 +340,29 @@
${hadoop-23.version}
true
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop-23.version}
+
+
+ org.apache.hadoop
+ hadoop-yarn-api
+ ${hadoop-23.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-yarn-common
+ ${hadoop-23.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ ${hadoop-23.version}
+ true
+
Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy)
@@ -1212,9 +1212,10 @@
}
- int jobs = Utilities.getMRTasks(plan.getRootTasks()).size();
+ int jobs = Utilities.getMRTasks(plan.getRootTasks()).size()
+ + Utilities.getTezTasks(plan.getRootTasks()).size();
if (jobs > 0) {
- console.printInfo("Total MapReduce jobs = " + jobs);
+ console.printInfo("Total jobs = " + jobs);
}
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS,
Index: ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (working copy)
@@ -366,6 +366,11 @@
UNSUPPORTED_SUBQUERY_EXPRESSION(10249, "Unsupported SubQuery Expression"),
INVALID_SUBQUERY_EXPRESSION(10250, "Invalid SubQuery expression"),
+ INVALID_HDFS_URI(10251, "{0} is not a hdfs uri", true),
+ INVALID_DIR(10252, "{0} is not a directory", true),
+ NO_VALID_LOCATIONS(10253, "Could not find any valid location to place the jars. " +
+ "Please update hive.jar.directory or hive.user.install.directory with a valid location", false),
+
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
+ "It may have crashed with an error."),
Index: ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java (working copy)
@@ -32,7 +32,11 @@
}
public static HashTableLoader getLoader(Configuration hconf) {
- return new org.apache.hadoop.hive.ql.exec.mr.HashTableLoader();
+ if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ return new org.apache.hadoop.hive.ql.exec.tez.HashTableLoader();
+ } else {
+ return new org.apache.hadoop.hive.ql.exec.mr.HashTableLoader();
+ }
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java (working copy)
@@ -26,6 +26,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
@@ -45,7 +48,9 @@
}
public static MapredContext init(boolean isMap, JobConf jobConf) {
- MapredContext context = new MapredContext(isMap, jobConf);
+ MapredContext context =
+ HiveConf.getVar(jobConf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ?
+ new TezContext(isMap, jobConf) : new MapredContext(isMap, jobConf);
contexts.set(context);
return context;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java (working copy)
@@ -35,6 +35,10 @@
* Returns the appropriate cache
*/
public static ObjectCache getCache(Configuration conf) {
- return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache();
+ } else {
+ return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
+ }
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (working copy)
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;
@@ -41,6 +42,7 @@
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
/**
* TaskFactory implementation.
@@ -89,6 +91,7 @@
DependencyCollectionTask.class));
taskvec.add(new TaskTuple(PartialScanWork.class,
PartialScanTask.class));
+ taskvec.add(new TaskTuple(TezWork.class, TezTask.class));
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy)
@@ -88,7 +88,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -98,6 +97,7 @@
import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -109,6 +109,7 @@
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -132,9 +133,12 @@
import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
@@ -180,6 +184,8 @@
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import org.apache.commons.codec.binary.Base64;
+
/**
* Utilities.
*
@@ -301,8 +307,7 @@
try {
path = getPlanPath(conf, name);
assert path != null;
- gWork = gWorkMap.get(path);
- if (gWork == null) {
+ if (!gWorkMap.containsKey(path)) {
Path localPath;
if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
localPath = path;
@@ -309,7 +314,20 @@
} else {
localPath = new Path(name);
}
- in = new FileInputStream(localPath.toUri().getPath());
+
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
+ LOG.debug("Loading plan from string: "+path.toUri().getPath());
+ String planString = conf.get(path.toUri().getPath());
+ if (planString == null) {
+ LOG.debug("Could not find plan string in conf");
+ return null;
+ }
+ byte[] planBytes = Base64.decodeBase64(planString);
+ in = new ByteArrayInputStream(planBytes);
+ } else {
+ in = new FileInputStream(localPath.toUri().getPath());
+ }
+
if(MAP_PLAN_NAME.equals(name)){
if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
gWork = deserializePlan(in, MapWork.class, conf);
@@ -332,6 +350,9 @@
}
}
gWorkMap.put(path, gWork);
+ } else {
+ LOG.debug("Found plan in cache.");
+ gWork = gWorkMap.get(path);
}
return gWork;
} catch (FileNotFoundException fnf) {
@@ -554,26 +575,37 @@
Path planPath = getPlanPath(conf, name);
- // use the default file system of the conf
- FileSystem fs = planPath.getFileSystem(conf);
- FSDataOutputStream out = fs.create(planPath);
- serializePlan(w, out, conf);
+ OutputStream out;
- // Serialize the plan to the default hdfs instance
- // Except for hadoop local mode execution where we should be
- // able to get the plan directly from the cache
- if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) {
- // Set up distributed cache
- if (!DistributedCache.getSymlink(conf)) {
- DistributedCache.createSymlink(conf);
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
+ // add it to the conf
+ out = new ByteArrayOutputStream();
+ serializePlan(w, out, conf);
+ LOG.info("Setting plan: "+planPath.toUri().getPath());
+ conf.set(planPath.toUri().getPath(),
+ Base64.encodeBase64String(((ByteArrayOutputStream)out).toByteArray()));
+ } else {
+ // use the default file system of the conf
+ FileSystem fs = planPath.getFileSystem(conf);
+ out = fs.create(planPath);
+ serializePlan(w, out, conf);
+
+ // Serialize the plan to the default hdfs instance
+ // Except for hadoop local mode execution where we should be
+ // able to get the plan directly from the cache
+ if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) {
+ // Set up distributed cache
+ if (!DistributedCache.getSymlink(conf)) {
+ DistributedCache.createSymlink(conf);
+ }
+ String uriWithLink = planPath.toUri().toString() + "#" + name;
+ DistributedCache.addCacheFile(new URI(uriWithLink), conf);
+
+ // set replication of the plan file to a high number. we use the same
+ // replication factor as used by the hadoop jobclient for job.xml etc.
+ short replication = (short) conf.getInt("mapred.submit.replication", 10);
+ fs.setReplication(planPath, replication);
}
- String uriWithLink = planPath.toUri().toString() + "#" + name;
- DistributedCache.addCacheFile(new URI(uriWithLink), conf);
-
- // set replication of the plan file to a high number. we use the same
- // replication factor as used by the hadoop jobclient for job.xml etc.
- short replication = (short) conf.getInt("mapred.submit.replication", 10);
- fs.setReplication(planPath, replication);
}
// Cache the plan in this process
@@ -2235,6 +2267,26 @@
return true;
}
+ public static List getTezTasks(List> tasks) {
+ List tezTasks = new ArrayList();
+ if (tasks != null) {
+ getTezTasks(tasks, tezTasks);
+ }
+ return tezTasks;
+ }
+
+ private static void getTezTasks(List> tasks, List tezTasks) {
+ for (Task extends Serializable> task : tasks) {
+ if (task instanceof TezTask && !tezTasks.contains((TezTask) task)) {
+ tezTasks.add((TezTask) task);
+ }
+
+ if (task.getDependentTasks() != null) {
+ getTezTasks(task.getDependentTasks(), tezTasks);
+ }
+ }
+ }
+
public static List getMRTasks(List> tasks) {
List mrTasks = new ArrayList();
if (tasks != null) {
@@ -2885,7 +2937,8 @@
pathsProcessed.add(path);
LOG.info("Adding input file " + path);
- if (isEmptyPath(job, path, ctx)) {
+ if (!HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+ && isEmptyPath(job, path, ctx)) {
path = createDummyFileForEmptyPartition(path, job, work,
hiveScratchDir, alias, sequenceNumber++);
@@ -2902,7 +2955,8 @@
// T2) x;
// If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
// rows)
- if (path == null) {
+ if (path == null
+ && !HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
path = createDummyFileForEmptyTable(job, work, hiveScratchDir,
alias, sequenceNumber++);
pathsToAdd.add(path);
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (revision 1556697)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (working copy)
@@ -134,6 +134,12 @@
this.console = console;
this.task = task;
this.callBackObj = hookCallBack;
+
+ if (job != null) {
+ // even with tez on some jobs are run as MR. disable the flag in
+ // the conf, so that the backend runs fully as MR.
+ HiveConf.setVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (working copy)
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure.NullOutputCommitter;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+
+/**
+ * DagUtils. DagUtils is a collection of helper methods to convert
+ * map and reduce work to tez vertices and edges. It handles configuration
+ * objects, file localization and vertex/edge creation.
+ */
+public class DagUtils {
+
+ private static final String TEZ_DIR = "_tez_scratch_dir";
+ private static DagUtils instance;
+
+ /*
+ * Creates the configuration object necessary to run a specific vertex from
+ * map work. This includes input formats, input processor, etc.
+ */
+ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
+ JobConf conf = new JobConf(baseConf);
+
+ if (mapWork.getNumMapTasks() != null) {
+ conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
+ }
+
+ if (mapWork.getMaxSplitSize() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE,
+ mapWork.getMaxSplitSize().longValue());
+ }
+
+ if (mapWork.getMinSplitSize() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE,
+ mapWork.getMinSplitSize().longValue());
+ }
+
+ if (mapWork.getMinSplitSizePerNode() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE,
+ mapWork.getMinSplitSizePerNode().longValue());
+ }
+
+ if (mapWork.getMinSplitSizePerRack() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK,
+ mapWork.getMinSplitSizePerRack().longValue());
+ }
+
+ Utilities.setInputAttributes(conf, mapWork);
+
+ String inpFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT);
+ if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) {
+ inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
+ }
+
+ if (mapWork.isUseBucketizedHiveInputFormat()) {
+ inpFormat = BucketizedHiveInputFormat.class.getName();
+ }
+
+ conf.set("mapred.mapper.class", ExecMapper.class.getName());
+ conf.set("mapred.input.format.class", inpFormat);
+
+ return conf;
+ }
+
+ /**
+ * Given two vertices and their respective configuration objects createEdge
+ * will create an Edge object that connects the two. Currently the edge will
+ * always be a stable bi-partite edge.
+ *
+ * @param vConf JobConf of the first vertex
+ * @param v The first vertex (source)
+ * @param wConf JobConf of the second vertex
+ * @param w The second vertex (sink)
+ * @return
+ */
+ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
+ EdgeType edgeType)
+ throws IOException {
+
+ // Tez needs to setup output subsequent input pairs correctly
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
+
+ // update payloads (configuration for the vertices might have changed)
+ v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
+ w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+
+ DataMovementType dataMovementType;
+ Class logicalInputClass;
+ Class logicalOutputClass;
+
+ switch (edgeType) {
+ case BROADCAST_EDGE:
+ dataMovementType = DataMovementType.BROADCAST;
+ logicalOutputClass = OnFileUnorderedKVOutput.class;
+ logicalInputClass = ShuffledUnorderedKVInput.class;
+ break;
+
+ case SIMPLE_EDGE:
+ default:
+ dataMovementType = DataMovementType.SCATTER_GATHER;
+ logicalOutputClass = OnFileSortedOutput.class;
+ logicalInputClass = ShuffledMergedInputLegacy.class;
+ break;
+ }
+
+ EdgeProperty edgeProperty =
+ new EdgeProperty(dataMovementType,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(logicalOutputClass.getName()),
+ new InputDescriptor(logicalInputClass.getName()));
+ return new Edge(v, w, edgeProperty);
+ }
+
+ /*
+ * Helper function to create Vertex from MapWork.
+ */
+ private Vertex createVertex(JobConf conf, MapWork mapWork,
+ LocalResource appJarLr, List additionalLr, FileSystem fs,
+ Path mrScratchDir, Context ctx) throws Exception {
+
+ Path tezDir = getTezDir(mrScratchDir);
+
+ // set up the operator plan
+ Path planPath = Utilities.setMapWork(conf, mapWork,
+ mrScratchDir.toUri().toString(), false);
+
+ // setup input paths and split info
+ List inputPaths = Utilities.getInputPaths(conf, mapWork,
+ mrScratchDir.toUri().toString(), ctx);
+ Utilities.setInputPaths(conf, inputPaths);
+
+ // create the directories FileSinkOperators need
+ Utilities.createTmpDirs(conf, mapWork);
+
+ // Tez ask us to call this even if there's no preceding vertex
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
+
+ // finally create the vertex
+ Vertex map = null;
+
+ // use tez to combine splits
+ boolean useTezGroupedSplits = false;
+
+ int numTasks = -1;
+ Class amSplitGeneratorClass = null;
+ InputSplitInfo inputSplitInfo = null;
+ Class inputFormatClass = conf.getClass("mapred.input.format.class",
+ InputFormat.class);
+
+ // we'll set up tez to combine spits for us iff the input format
+ // is HiveInputFormat
+ if (inputFormatClass == HiveInputFormat.class) {
+ useTezGroupedSplits = true;
+ conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class);
+ }
+
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) {
+ // if we're generating the splits in the AM, we just need to set
+ // the correct plugin.
+ amSplitGeneratorClass = MRInputAMSplitGenerator.class;
+ } else {
+ // client side split generation means we have to compute them now
+ inputSplitInfo = MRHelpers.generateInputSplits(conf,
+ new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_")));
+ numTasks = inputSplitInfo.getNumTasks();
+ }
+
+ byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
+ map = new Vertex(mapWork.getName(),
+ new ProcessorDescriptor(MapTezProcessor.class.getName()).
+ setUserPayload(serializedConf), numTasks,
+ MRHelpers.getMapResource(conf));
+ Map environment = new HashMap();
+ MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
+ map.setTaskEnvironment(environment);
+ map.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
+
+ assert mapWork.getAliasToWork().keySet().size() == 1;
+
+ String alias = mapWork.getAliasToWork().keySet().iterator().next();
+
+ byte[] mrInput = null;
+ if (useTezGroupedSplits) {
+ mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf,
+ null, HiveInputFormat.class.getName());
+ } else {
+ mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
+ }
+ map.addInput(alias,
+ new InputDescriptor(MRInputLegacy.class.getName()).
+ setUserPayload(mrInput), amSplitGeneratorClass);
+
+ Map localResources = new HashMap();
+ localResources.put(getBaseName(appJarLr), appJarLr);
+ for (LocalResource lr: additionalLr) {
+ localResources.put(getBaseName(lr), lr);
+ }
+
+ if (inputSplitInfo != null) {
+ // only relevant for client-side split generation
+ map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+ MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo,
+ localResources);
+ }
+
+ map.setTaskLocalResources(localResources);
+ return map;
+ }
+
+ /*
+ * Helper function to create JobConf for specific ReduceWork.
+ */
+ private JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) {
+ JobConf conf = new JobConf(baseConf);
+
+ conf.set("mapred.reducer.class", ExecReducer.class.getName());
+
+ boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
+ useSpeculativeExecReducers);
+
+ return conf;
+ }
+
+ /*
+ * Helper function to create Vertex for given ReduceWork.
+ */
+ private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
+ LocalResource appJarLr, List additionalLr, FileSystem fs,
+ Path mrScratchDir, Context ctx) throws Exception {
+
+ // set up operator plan
+ Path planPath = Utilities.setReduceWork(conf, reduceWork,
+ mrScratchDir.toUri().toString(), false);
+
+ // create the directories FileSinkOperators need
+ Utilities.createTmpDirs(conf, reduceWork);
+
+ // Call once here, will be updated when we find edges
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
+
+ // create the vertex
+ Vertex reducer = new Vertex(reduceWork.getName(),
+ new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
+ setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+ reduceWork.getNumReduceTasks(), MRHelpers.getReduceResource(conf));
+
+ Map environment = new HashMap();
+
+ MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
+ reducer.setTaskEnvironment(environment);
+
+ reducer.setJavaOpts(MRHelpers.getReduceJavaOpts(conf));
+
+ Map localResources = new HashMap();
+ localResources.put(getBaseName(appJarLr), appJarLr);
+ for (LocalResource lr: additionalLr) {
+ localResources.put(getBaseName(lr), lr);
+ }
+ reducer.setTaskLocalResources(localResources);
+
+ return reducer;
+ }
+
+ /*
+ * Helper method to create a yarn local resource.
+ */
+ private LocalResource createLocalResource(FileSystem remoteFs, Path file,
+ LocalResourceType type, LocalResourceVisibility visibility) {
+
+ FileStatus fstat = null;
+ try {
+ fstat = remoteFs.getFileStatus(file);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ URL resourceURL = ConverterUtils.getYarnUrlFromPath(file);
+ long resourceSize = fstat.getLen();
+ long resourceModificationTime = fstat.getModificationTime();
+
+ LocalResource lr = Records.newRecord(LocalResource.class);
+ lr.setResource(resourceURL);
+ lr.setType(type);
+ lr.setSize(resourceSize);
+ lr.setVisibility(visibility);
+ lr.setTimestamp(resourceModificationTime);
+
+ return lr;
+ }
+
+ /**
+ * @param conf
+ * @return path to destination directory on hdfs
+ * @throws LoginException if we are unable to figure user information
+ * @throws IOException when any dfs operation fails.
+ */
+ public Path getDefaultDestDir(Configuration conf) throws LoginException, IOException {
+ UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
+ String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
+ String userPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_USER_INSTALL_DIR);
+ Path userPath = new Path(userPathStr);
+ FileSystem fs = userPath.getFileSystem(conf);
+ if (!(fs instanceof DistributedFileSystem)) {
+ throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(userPathStr));
+ }
+
+ String jarPathStr = userPathStr + "/" + userName;
+ String hdfsDirPathStr = jarPathStr;
+ Path hdfsDirPath = new Path(hdfsDirPathStr);
+
+ FileStatus fstatus = fs.getFileStatus(hdfsDirPath);
+ if (!fstatus.isDir()) {
+ throw new IOException(ErrorMsg.INVALID_DIR.format(hdfsDirPath.toString()));
+ }
+
+ Path retPath = new Path(hdfsDirPath.toString() + "/.hiveJars");
+
+ fs.mkdirs(retPath);
+ return retPath;
+ }
+
+ /**
+ * Localizes files, archives and jars the user has instructed us
+ * to provide on the cluster as resources for execution.
+ *
+ * @param conf
+ * @return List local resources to add to execution
+ * @throws IOException when hdfs operation fails
+ * @throws LoginException when getDefaultDestDir fails with the same exception
+ */
+ public List localizeTempFiles(Configuration conf) throws IOException, LoginException {
+ List tmpResources = new ArrayList();
+
+ String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
+ if (StringUtils.isNotBlank(addedFiles)) {
+ HiveConf.setVar(conf, ConfVars.HIVEADDEDFILES, addedFiles);
+ }
+ String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+ if (StringUtils.isNotBlank(addedJars)) {
+ HiveConf.setVar(conf, ConfVars.HIVEADDEDJARS, addedJars);
+ }
+ String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
+ if (StringUtils.isNotBlank(addedArchives)) {
+ HiveConf.setVar(conf, ConfVars.HIVEADDEDARCHIVES, addedArchives);
+ }
+
+ String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
+
+ // need to localize the additional jars and files
+
+ // we need the directory on hdfs to which we shall put all these files
+ String hdfsDirPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_JAR_DIRECTORY);
+ Path hdfsDirPath = new Path(hdfsDirPathStr);
+ FileSystem fs = hdfsDirPath.getFileSystem(conf);
+ if (!(fs instanceof DistributedFileSystem)) {
+ throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hdfsDirPathStr));
+ }
+
+ FileStatus fstatus = null;
+ try {
+ fstatus = fs.getFileStatus(hdfsDirPath);
+ } catch (FileNotFoundException fe) {
+ // do nothing
+ }
+
+ if ((fstatus == null) || (!fstatus.isDir())) {
+ Path destDir = getDefaultDestDir(conf);
+ hdfsDirPathStr = destDir.toString();
+ }
+
+ String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives;
+ String[] allFilesArr = allFiles.split(",");
+ for (String file : allFilesArr) {
+ if (!StringUtils.isNotBlank(file)) {
+ continue;
+ }
+ String hdfsFilePathStr = hdfsDirPathStr + "/" + getResourceBaseName(file);
+ LocalResource localResource = localizeResource(new Path(file),
+ new Path(hdfsFilePathStr), conf);
+ tmpResources.add(localResource);
+ }
+
+ return tmpResources;
+ }
+
+ // the api that finds the jar being used by this class on disk
+ public String getExecJarPathLocal () throws URISyntaxException {
+ // returns the location on disc of the jar of this class.
+ return DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString();
+ }
+
+ /*
+ * Helper function to retrieve the basename of a local resource
+ */
+ public String getBaseName(LocalResource lr) {
+ return FilenameUtils.getName(lr.getResource().getFile());
+ }
+
+ /**
+ * @param pathStr - the string from which we try to determine the resource base name
+ * @return the name of the resource from a given path string.
+ */
+ public String getResourceBaseName(String pathStr) {
+ String[] splits = pathStr.split("/");
+ return splits[splits.length - 1];
+ }
+
+ /**
+ * @param src the source file.
+ * @param dest the destination file.
+ * @param conf the configuration
+ * @return true if the file names match else returns false.
+ * @throws IOException when any file system related call fails
+ */
+ private boolean checkPreExisting(Path src, Path dest, Configuration conf)
+ throws IOException {
+ FileSystem destFS = dest.getFileSystem(conf);
+
+ if (!destFS.exists(dest)) {
+ return false;
+ }
+ FileStatus destStatus = destFS.getFileStatus(dest);
+ if (destStatus.isDir()) {
+ return false;
+ }
+
+ String srcName = getResourceBaseName(src.toString());
+ String destName = getResourceBaseName(dest.toString());
+
+ if (srcName.equals(destName)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param src path to the source for the resource
+ * @param dest path in hdfs for the resource
+ * @param conf
+ * @return localresource from tez localization.
+ * @throws IOException when any file system related calls fails.
+ */
+ public LocalResource localizeResource(Path src, Path dest, Configuration conf)
+ throws IOException {
+ FileSystem destFS = dest.getFileSystem(conf);
+ if (!(destFS instanceof DistributedFileSystem)) {
+ throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(dest.toString()));
+ }
+
+ if (src != null) {
+ if (!checkPreExisting(src, dest, conf)) {
+ // copy the src to the destination and create local resource.
+ // overwrite even if file already exists.
+ destFS.copyFromLocalFile(false, true, src, dest);
+ }
+ }
+
+ return createLocalResource(destFS, dest, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION);
+ }
+
+ /**
+ * Creates and initializes a JobConf object that can be used to execute
+ * the DAG. The configuration object will contain configurations from mapred-site
+ * overlaid with key/value pairs from the hiveConf object. Finally it will also
+ * contain some hive specific configurations that do not change from DAG to DAG.
+ *
+ * @param hiveConf Current hiveConf for the execution
+ * @return JobConf base configuration for job execution
+ * @throws IOException
+ */
+ public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
+ hiveConf.setBoolean("mapred.mapper.new-api", false);
+
+ JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf);
+
+ conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
+
+ conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+ conf.setBoolean("mapred.committer.job.task.cleanup.needed", false);
+
+ conf.setClass("mapred.output.format.class", HiveOutputFormatImpl.class, OutputFormat.class);
+
+ conf.set(MRJobConfig.OUTPUT_KEY_CLASS, HiveKey.class.getName());
+ conf.set(MRJobConfig.OUTPUT_VALUE_CLASS, BytesWritable.class.getName());
+
+ conf.set("mapred.partitioner.class", HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER));
+ conf.set("tez.runtime.partitioner.class", MRPartitioner.class.getName());
+
+ return conf;
+ }
+
+ /**
+ * Creates and initializes the JobConf object for a given BaseWork object.
+ *
+ * @param conf Any configurations in conf will be copied to the resulting new JobConf object.
+ * @param work BaseWork will be used to populate the configuration object.
+ * @return JobConf new configuration object
+ */
+ public JobConf initializeVertexConf(JobConf conf, BaseWork work) {
+
+ // simply dispatch the call to the right method for the actual (sub-) type of
+ // BaseWork.
+ if (work instanceof MapWork) {
+ return initializeVertexConf(conf, (MapWork)work);
+ } else if (work instanceof ReduceWork) {
+ return initializeVertexConf(conf, (ReduceWork)work);
+ } else {
+ assert false;
+ return null;
+ }
+ }
+
+ /**
+ * Create a vertex from a given work object.
+ *
+ * @param conf JobConf to be used to this execution unit
+ * @param work The instance of BaseWork representing the actual work to be performed
+ * by this vertex.
+ * @param scratchDir HDFS scratch dir for this execution unit.
+ * @param appJarLr Local resource for hive-exec.
+ * @param additionalLr
+ * @param fileSystem FS corresponding to scratchDir and LocalResources
+ * @param ctx This query's context
+ * @return Vertex
+ */
+ public Vertex createVertex(JobConf conf, BaseWork work,
+ Path scratchDir, LocalResource appJarLr, List additionalLr,
+ FileSystem fileSystem, Context ctx, boolean hasChildren) throws Exception {
+
+ Vertex v = null;
+ // simply dispatch the call to the right method for the actual (sub-) type of
+ // BaseWork.
+ if (work instanceof MapWork) {
+ v = createVertex(conf, (MapWork) work, appJarLr,
+ additionalLr, fileSystem, scratchDir, ctx);
+ } else if (work instanceof ReduceWork) {
+ v = createVertex(conf, (ReduceWork) work, appJarLr,
+ additionalLr, fileSystem, scratchDir, ctx);
+ } else {
+ // something is seriously wrong if this is happening
+ throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
+ }
+
+ // initialize stats publisher if necessary
+ if (work.isGatheringStats()) {
+ StatsPublisher statsPublisher;
+ StatsFactory factory = StatsFactory.newFactory(conf);
+ if (factory != null) {
+ statsPublisher = factory.getStatsPublisher();
+ if (!statsPublisher.init(conf)) { // creating stats table if not exists
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
+ throw
+ new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+ }
+ }
+ }
+ }
+
+
+ // final vertices need to have at least one output
+ if (!hasChildren) {
+ v.addOutput("out_"+work.getName(),
+ new OutputDescriptor(MROutput.class.getName())
+ .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+ }
+
+ return v;
+ }
+
+ /**
+ * createTezDir creates a temporary directory in the scratchDir folder to
+ * be used with Tez. Assumes scratchDir exists.
+ */
+ public Path createTezDir(Path scratchDir, Configuration conf)
+ throws IOException {
+ Path tezDir = getTezDir(scratchDir);
+ FileSystem fs = tezDir.getFileSystem(conf);
+ fs.mkdirs(tezDir);
+ return tezDir;
+ }
+
+ /**
+ * Gets the tez dir that belongs to the hive scratch dir
+ */
+ public Path getTezDir(Path scratchDir) {
+ return new Path(scratchDir, TEZ_DIR);
+ }
+
+ /**
+ * Singleton
+ * @return instance of this class
+ */
+ public static DagUtils getInstance() {
+ if (instance == null) {
+ instance = new DagUtils();
+ }
+ return instance;
+ }
+
+ private DagUtils() {
+ // don't instantiate
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (working copy)
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * HashTableLoader for Tez constructs the hashtable from records read from
+ * a broadcast edge.
+ */
+public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader {
+
+ private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
+
+ public HashTableLoader() {
+ }
+
+ @Override
+ public void load(ExecMapperContext context,
+ Configuration hconf,
+ MapJoinDesc desc,
+ byte posBigTable,
+ MapJoinTableContainer[] mapJoinTables,
+ MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+
+ TezContext tezContext = (TezContext) MapredContext.get();
+ Map parentToInput = desc.getParentToInput();
+ int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
+ float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
+ HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
+
+ for (int pos = 0; pos < mapJoinTables.length; pos++) {
+ if (pos == posBigTable) {
+ continue;
+ }
+
+ LogicalInput input = tezContext.getInput(parentToInput.get(pos));
+
+ try {
+ KeyValueReader kvReader = (KeyValueReader) input.getReader();
+
+ MapJoinTableContainer tableContainer = new HashMapWrapper(hashTableThreshold,
+ hashTableLoadFactor);
+
+ // simply read all the kv pairs into the hashtable.
+ while (kvReader.next()) {
+ MapJoinKey key = new MapJoinKey();
+ key.read(mapJoinTableSerdes[pos].getKeyContext(), (Writable)kvReader.getCurrentKey());
+
+ MapJoinRowContainer values = tableContainer.get(key);
+ if(values == null){
+ values = new MapJoinRowContainer();
+ tableContainer.put(key, values);
+ }
+ values.read(mapJoinTableSerdes[pos].getValueContext(), (Writable)kvReader.getCurrentValue());
+ }
+
+ mapJoinTables[pos] = tableContainer;
+ } catch (IOException e) {
+ throw new HiveException(e);
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (working copy)
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * Process input from tez LogicalInput and write output - for a map plan
+ * Just pump the records through the query plan.
+ */
+public class MapRecordProcessor extends RecordProcessor{
+
+
+ private MapOperator mapOp;
+ public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
+ private final ExecMapperContext execContext = new ExecMapperContext();
+ private boolean abort = false;
+ protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
+ private MapWork mapWork;
+
+ @Override
+ void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs,
+ Map outMap){
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+ super.init(jconf, mrReporter, inputs, outMap);
+
+ //Update JobConf using MRInput, info like filename comes via this
+ MRInputLegacy mrInput = getMRInput(inputs);
+ try {
+ mrInput.init();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed while initializing MRInput", e);
+ }
+ Configuration updatedConf = mrInput.getConfigUpdates();
+ if (updatedConf != null) {
+ for (Entry entry : updatedConf) {
+ jconf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+ try {
+
+ execContext.setJc(jconf);
+ // create map and fetch operators
+ mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+ if (mapWork == null) {
+ mapWork = Utilities.getMapWork(jconf);
+ cache.cache(MAP_PLAN_KEY, mapWork);
+ l4j.info("Plan: "+mapWork);
+ for (String s: mapWork.getAliases()) {
+ l4j.info("Alias: "+s);
+ }
+ } else {
+ Utilities.setMapWork(jconf, mapWork);
+ }
+ if (mapWork.getVectorMode()) {
+ mapOp = new VectorMapOperator();
+ } else {
+ mapOp = new MapOperator();
+ }
+
+ // initialize map operator
+ mapOp.setConf(mapWork);
+ mapOp.setChildren(jconf);
+ l4j.info(mapOp.dump(0));
+
+ MapredContext.init(true, new JobConf(jconf));
+ ((TezContext)MapredContext.get()).setInputs(inputs);
+ mapOp.setExecContext(execContext);
+ mapOp.initializeLocalWork(jconf);
+ mapOp.initialize(jconf, null);
+
+ // Initialization isn't finished until all parents of all operators
+ // are initialized. For broadcast joins that means initializing the
+ // dummy parent operators as well.
+ List dummyOps = mapWork.getDummyOps();
+ if (dummyOps != null) {
+ for (Operator extends OperatorDesc> dummyOp : dummyOps){
+ dummyOp.setExecContext(execContext);
+ dummyOp.initialize(jconf, null);
+ }
+ }
+
+ OperatorUtils.setChildrenCollector(mapOp.getChildOperators(), outMap);
+ mapOp.setReporter(reporter);
+ MapredContext.get().setReporter(reporter);
+
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // will this be true here?
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ throw new RuntimeException("Map operator initialization failed", e);
+ }
+ }
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+ }
+
+ private MRInputLegacy getMRInput(Map inputs) {
+ //there should be only one MRInput
+ MRInputLegacy theMRInput = null;
+ for(LogicalInput inp : inputs.values()){
+ if(inp instanceof MRInputLegacy){
+ if(theMRInput != null){
+ throw new IllegalArgumentException("Only one MRInput is expected");
+ }
+ //a better logic would be to find the alias
+ theMRInput = (MRInputLegacy)inp;
+ }
+ }
+ return theMRInput;
+ }
+
+ @Override
+ void run() throws IOException{
+
+ MRInputLegacy in = getMRInput(inputs);
+ KeyValueReader reader = in.getReader();
+
+ //process records until done
+ while(reader.next()){
+ //ignore the key for maps - reader.getCurrentKey();
+ Object value = reader.getCurrentValue();
+ boolean needMore = processRow(value);
+ if(!needMore){
+ break;
+ }
+ }
+ }
+
+
+ /**
+ * @param value value to process
+ * @return true if it is not done and can take more inputs
+ */
+ private boolean processRow(Object value) {
+ // reset the execContext for each new row
+ execContext.resetRow();
+
+ try {
+ if (mapOp.getDone()) {
+ return false; //done
+ } else {
+ // Since there is no concept of a group, we don't invoke
+ // startGroup/endGroup for a mapper
+ mapOp.process((Writable)value);
+ if (isLogInfoEnabled) {
+ logProgress();
+ }
+ }
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ l4j.fatal(StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+ return true; //give me more
+ }
+
+ @Override
+ void close(){
+ // check if there are IOExceptions
+ if (!abort) {
+ abort = execContext.getIoCxt().getIOExceptions();
+ }
+
+ // detecting failed executions by exceptions thrown by the operator tree
+ try {
+ mapOp.close(abort);
+
+ // Need to close the dummyOps as well. The operator pipeline
+ // is not considered "closed/done" unless all operators are
+ // done. For broadcast joins that includes the dummy parents.
+ List dummyOps = mapWork.getDummyOps();
+ if (dummyOps != null) {
+ for (Operator extends OperatorDesc> dummyOp : dummyOps){
+ dummyOp.close(abort);
+ }
+ }
+
+ if (isLogInfoEnabled) {
+ logCloseInfo();
+ }
+ reportStats rps = new reportStats(reporter);
+ mapOp.preorderMap(rps);
+ return;
+ } catch (Exception e) {
+ if (!abort) {
+ // signal new failure to map-reduce
+ l4j.error("Hit error while closing operators - failing tree");
+ throw new RuntimeException("Hive Runtime Error while closing operators", e);
+ }
+ } finally {
+ Utilities.clearWorkMap();
+ MapredContext.close();
+ }
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (working copy)
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+/**
+ * Subclass that is used to indicate if this is a map or reduce process
+ */
+public class MapTezProcessor extends TezProcessor {
+ public MapTezProcessor(){
+ super(true);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (working copy)
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+
+
+/**
+ * ObjectCache. Tez implementation based on the tez object registry.
+ *
+ */
+public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
+
+ private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
+ private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+
+ @Override
+ public void cache(String key, Object value) {
+ LOG.info("Adding " + key + " to cache with value " + value);
+ registry.add(ObjectLifeCycle.VERTEX, key, value);
+ }
+
+ @Override
+ public Object retrieve(String key) {
+ Object o = registry.get(key);
+ if (o != null) {
+ LOG.info("Found " + key + " in cache with value: " + o);
+ }
+ return o;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (working copy)
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.LogicalInput;
+
+/**
+ * Process input from tez LogicalInput and write output
+ * It has different subclasses for map and reduce processing
+ */
+public abstract class RecordProcessor {
+
+ protected JobConf jconf;
+ protected Map inputs;
+ protected Map outMap;
+
+ public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
+
+
+ // used to log memory usage periodically
+ public static MemoryMXBean memoryMXBean;
+ protected boolean isLogInfoEnabled = false;
+ protected MRTaskReporter reporter;
+
+ private long numRows = 0;
+ private long nextUpdateCntr = 1;
+ protected PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ protected String CLASS_NAME = RecordProcessor.class.getName();
+
+
+ /**
+ * Common initialization code for RecordProcessors
+ * @param jconf
+ * @param mrReporter
+ * @param inputs
+ * @param out
+ */
+ void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs,
+ Map outMap){
+ this.jconf = jconf;
+ this.reporter = mrReporter;
+ this.inputs = inputs;
+ this.outMap = outMap;
+
+ // Allocate the bean at the beginning -
+ memoryMXBean = ManagementFactory.getMemoryMXBean();
+
+ l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+
+ isLogInfoEnabled = l4j.isInfoEnabled();
+
+ //log classpaths
+ try {
+ if (l4j.isDebugEnabled()) {
+ l4j.debug("conf classpath = "
+ + Arrays.asList(((URLClassLoader) jconf.getClassLoader()).getURLs()));
+ l4j.debug("thread classpath = "
+ + Arrays.asList(((URLClassLoader) Thread.currentThread()
+ .getContextClassLoader()).getURLs()));
+ }
+ } catch (Exception e) {
+ l4j.info("cannot get classpath: " + e.getMessage());
+ }
+ }
+
+ /**
+ * start processing the inputs and writing output
+ * @throws IOException
+ */
+ abstract void run() throws IOException;
+
+
+ abstract void close();
+
+ /**
+ * Log information to be logged at the end
+ */
+ protected void logCloseInfo() {
+ long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
+ + used_memory);
+ }
+
+ /**
+ * Log number of records processed and memory used after processing many records
+ */
+ protected void logProgress() {
+ numRows++;
+ if (numRows == nextUpdateCntr) {
+ long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ l4j.info("ExecMapper: processing " + numRows
+ + " rows: used memory = " + used_memory);
+ nextUpdateCntr = getNextUpdateRecordCounter(numRows);
+ }
+ }
+
+ private long getNextUpdateRecordCounter(long cntr) {
+ // A very simple counter to keep track of number of rows processed by the
+ // reducer. It dumps
+ // every 1 million times, and quickly before that
+ if (cntr >= 1000000) {
+ return cntr + 1000000;
+ }
+
+ return 10 * cntr;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (working copy)
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+
+/**
+ * Process input from tez LogicalInput and write output - for a map plan
+ * Just pump the records through the query plan.
+ */
+public class ReduceRecordProcessor extends RecordProcessor{
+
+ private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__";
+
+ public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class);
+ private final ExecMapperContext execContext = new ExecMapperContext();
+ private boolean abort = false;
+ private Deserializer inputKeyDeserializer;
+
+ // Input value serde needs to be an array to support different SerDe
+ // for different tags
+ private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
+
+ TableDesc keyTableDesc;
+ TableDesc[] valueTableDesc;
+
+ ObjectInspector[] rowObjectInspector;
+ private Operator> reducer;
+ private boolean isTagged = false;
+
+ private Object keyObject = null;
+ private BytesWritable groupKey;
+
+ private ReduceWork redWork;
+
+ List