diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e511260..cca7f3e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -370,7 +370,6 @@
METASTORECONNECTURLKEY("javax.jdo.option.ConnectionURL",
"jdbc:derby:;databaseName=metastore_db;create=true",
"JDBC connect string for a JDBC metastore"),
-
HMSHANDLERATTEMPTS("hive.hmshandler.retry.attempts", 1,
"The number of times to retry a HMSHandler call if there were a connection error."),
HMSHANDLERINTERVAL("hive.hmshandler.retry.interval", "1000ms",
@@ -1778,7 +1777,15 @@
"When auto reducer parallelism is enabled this factor will be used to over-partition data in shuffle edges."),
TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f,
"When auto reducer parallelism is enabled this factor will be used to put a lower limit to the number\n" +
- "of reducers that tez specifies.")
+ "of reducers that tez specifies."),
+ TEZ_DYNAMIC_PARTITION_PRUNING(
+ "hive.tez.dynamic.partition.pruning", true,
+ "When dynamic pruning is enabled, joins on partition keys will be processed by sending events from the processing " +
+ "vertices to the tez application master. These events will be used to prune unnecessary partitions."),
+ TEZ_DYNAMIC_PARTITION_PRUNING_MAX_EVENT_SIZE("hive.tez.dynamic.partition.pruning.max.event.size", 1*1024*1024L,
+ "Maximum size of events sent by processors in dynamic pruning. If this size is crossed no pruning will take place."),
+ TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE("hive.tez.dynamic.parition.pruning.max.data.size", 100*1024*1024L,
+ "Maximum total data size of events in dynamic pruning.")
;
public final String varname;
diff --git a/data/files/agg_01-p1.txt b/data/files/agg_01-p1.txt
new file mode 100644
index 0000000..1e7782b
--- /dev/null
+++ b/data/files/agg_01-p1.txt
@@ -0,0 +1,3 @@
+1.0
+2.0
+3.0
diff --git a/data/files/agg_01-p2.txt b/data/files/agg_01-p2.txt
new file mode 100644
index 0000000..dcea843
--- /dev/null
+++ b/data/files/agg_01-p2.txt
@@ -0,0 +1,3 @@
+4.0
+5.0
+6.0
diff --git a/data/files/agg_01-p3.txt b/data/files/agg_01-p3.txt
new file mode 100644
index 0000000..2320c1f
--- /dev/null
+++ b/data/files/agg_01-p3.txt
@@ -0,0 +1,3 @@
+7.0
+8.0
+9.0
diff --git a/data/files/dim_shops.txt b/data/files/dim_shops.txt
new file mode 100644
index 0000000..66cc33f
--- /dev/null
+++ b/data/files/dim_shops.txt
@@ -0,0 +1,3 @@
+1,foo
+2,bar
+3,baz
diff --git a/itests/qtest/testconfiguration.properties b/itests/qtest/testconfiguration.properties
new file mode 100644
index 0000000..6fd1096
--- /dev/null
+++ b/itests/qtest/testconfiguration.properties
@@ -0,0 +1,5 @@
+minimr.query.files=stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q,empty_dir_in_table.q,temp_table_external.q
+minimr.query.negative.files=cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q
+minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q,tez_bmj_schema_evolution.q,dynamic_partition_pruning.q
+minitez.query.files.shared=cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q,temp_table.q,vectorized_ptf.q,optimize_nullscan.q
+beeline.positive.exclude=add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index af4a3e5..78ea21d 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -43,7 +43,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -64,10 +63,10 @@
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hive.cli.CliDriver;
import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.io.CachingPrintStream;
import org.apache.hadoop.hive.common.io.DigestPrintStream;
import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream;
import org.apache.hadoop.hive.common.io.SortPrintStream;
-import org.apache.hadoop.hive.common.io.CachingPrintStream;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -75,8 +74,6 @@
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.vector.util.AllVectorTypesRecord;
-import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -87,22 +84,14 @@
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer;
-import org.apache.hadoop.hive.serde2.thrift.test.Complex;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.Shell;
import org.apache.hive.common.util.StreamPrinter;
-import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.tools.ant.BuildException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assume;
import com.google.common.collect.ImmutableList;
@@ -145,8 +134,8 @@
private QTestSetup setup = null;
private boolean isSessionStateStarted = false;
- private String initScript;
- private String cleanupScript;
+ private final String initScript;
+ private final String cleanupScript;
static {
for (String srcTable : System.getProperty("test.src.tables", "").trim().split(",")) {
@@ -332,14 +321,6 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType,
HadoopShims shims = ShimLoader.getHadoopShims();
int numberOfDataNodes = 4;
- // can run tez tests only on hadoop 2
- if (clusterType == MiniClusterType.tez) {
- Assume.assumeTrue(ShimLoader.getMajorVersion().equals("0.23"));
- // this is necessary temporarily - there's a probem with multi datanodes on MiniTezCluster
- // will be fixed in 0.3
- numberOfDataNodes = 1;
- }
-
if (clusterType != MiniClusterType.none) {
dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null);
FileSystem fs = dfs.getFileSystem();
diff --git a/pom.xml b/pom.xml
index 8973c2b..2e128fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,8 +150,8 @@
1.0.1
1.7.5
4.0.4
+ 0.5.0
2.2.0
- 0.4.1-incubating
1.1
0.2
1.4
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index eafbe5a..6026d94 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -56,6 +56,7 @@ enum OperatorType {
PTF,
MUX,
DEMUX,
+ EVENT,
}
struct Operator {
diff --git a/ql/pom.xml b/ql/pom.xml
index 0729d47..c3e0adb 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -297,6 +297,38 @@
org.apache.tez
+ tez-runtime-internals
+ ${tez.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+
+
+ org.apache.tez
tez-mapreduce
${tez.version}
true
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index 96dbb29..b9e04e2 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -51,7 +51,8 @@ int _kOperatorTypeValues[] = {
OperatorType::HASHTABLEDUMMY,
OperatorType::PTF,
OperatorType::MUX,
- OperatorType::DEMUX
+ OperatorType::DEMUX,
+ OperatorType::EVENT
};
const char* _kOperatorTypeNames[] = {
"JOIN",
@@ -74,9 +75,10 @@ const char* _kOperatorTypeNames[] = {
"HASHTABLEDUMMY",
"PTF",
"MUX",
- "DEMUX"
+ "DEMUX",
+ "EVENT"
};
-const std::map _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map _OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(22, _kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
int _kTaskTypeValues[] = {
TaskType::MAP,
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index 634dd55..30ef711 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -56,7 +56,8 @@ struct OperatorType {
HASHTABLEDUMMY = 17,
PTF = 18,
MUX = 19,
- DEMUX = 20
+ DEMUX = 20,
+ EVENT = 21
};
};
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
index aa094ee..6f23575 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
@@ -32,7 +32,8 @@
HASHTABLEDUMMY(17),
PTF(18),
MUX(19),
- DEMUX(20);
+ DEMUX(20),
+ EVENT(21);
private final int value;
@@ -95,6 +96,8 @@ public static OperatorType findByValue(int value) {
return MUX;
case 20:
return DEMUX;
+ case 21:
+ return EVENT;
default:
return null;
}
diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php
index 5164b2c..4ed7fcc 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -56,6 +56,7 @@ final class OperatorType {
const PTF = 18;
const MUX = 19;
const DEMUX = 20;
+ const EVENT = 21;
static public $__names = array(
0 => 'JOIN',
1 => 'MAPJOIN',
@@ -78,6 +79,7 @@ final class OperatorType {
18 => 'PTF',
19 => 'MUX',
20 => 'DEMUX',
+ 21 => 'EVENT',
);
}
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 2a3f745..96777fa 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -66,6 +66,7 @@ class OperatorType:
PTF = 18
MUX = 19
DEMUX = 20
+ EVENT = 21
_VALUES_TO_NAMES = {
0: "JOIN",
@@ -89,6 +90,7 @@ class OperatorType:
18: "PTF",
19: "MUX",
20: "DEMUX",
+ 21: "EVENT",
}
_NAMES_TO_VALUES = {
@@ -113,6 +115,7 @@ class OperatorType:
"PTF": 18,
"MUX": 19,
"DEMUX": 20,
+ "EVENT": 21,
}
class TaskType:
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index 35e1f0f..449becf 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -42,8 +42,9 @@ module OperatorType
PTF = 18
MUX = 19
DEMUX = 20
- VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX"}
- VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX]).freeze
+ EVENT = 21
+ VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT"}
+ VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT]).freeze
end
module TaskType
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
new file mode 100644
index 0000000..7315be5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+
+/**
+ * AppMasterEventOperator sends any rows it receives to the Tez AM. This can be
+ * used to control execution dynamically.
+ */
+@SuppressWarnings({ "deprecation", "serial" })
+public class AppMasterEventOperator extends Operator {
+
+ private transient Serializer serializer;
+ private transient DataOutputBuffer buffer;
+ private transient boolean hasReachedMaxSize = false;
+ private transient long MAX_SIZE;
+
+ @Override
+ public void initializeOp(Configuration hconf) throws HiveException {
+ MAX_SIZE = HiveConf.getLongVar(hconf, ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_MAX_EVENT_SIZE);
+ serializer =
+ (Serializer) ReflectionUtils.newInstance(conf.getTable().getDeserializerClass(), null);
+ initDataBuffer(false);
+ }
+
+ private void initDataBuffer(boolean skipPruning) throws HiveException {
+ buffer = new DataOutputBuffer();
+ try {
+ // where does this go to?
+ buffer.writeUTF(((TezContext) TezContext.get()).getTezProcessorContext().getTaskVertexName());
+
+ // add any other header info
+ getConf().writeEventHeader(buffer);
+
+ // write byte to say whether to skip pruning or not
+ buffer.writeBoolean(skipPruning);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ if (hasReachedMaxSize) {
+ return;
+ }
+
+ ObjectInspector rowInspector = inputObjInspectors[0];
+ try {
+ Writable writableRow = serializer.serialize(row, rowInspector);
+ writableRow.write(buffer);
+ if (buffer.getLength() > MAX_SIZE) {
+ LOG.info("Disabling AM events. Buffer size too large: " + buffer.getLength());
+ hasReachedMaxSize = true;
+ buffer = null;
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AppMasterEvent: " + row);
+ }
+ forward(row, rowInspector);
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if (!abort) {
+ TezContext context = (TezContext) TezContext.get();
+
+ String vertexName = getConf().getVertexName();
+ String inputName = getConf().getInputName();
+
+ byte[] payload = null;
+
+ if (hasReachedMaxSize) {
+ initDataBuffer(true);
+ }
+
+ payload = new byte[buffer.getLength()];
+ System.arraycopy(buffer.getData(), 0, payload, 0, buffer.getLength());
+
+ Event event =
+ InputInitializerEvent.create(vertexName, inputName,
+ ByteBuffer.wrap(payload, 0, payload.length));
+
+ LOG.info("Sending Tez event to vertex = " + vertexName + ", input = " + inputName
+ + ". Payload size = " + payload.length);
+
+ context.getTezProcessorContext().sendEvents(Collections.singletonList(event));
+ }
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.EVENT;
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "EVENT";
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index af835fa..99b937c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -186,7 +186,7 @@ private void loadHashTable() throws HiveException {
* process different buckets and if the container is reused to join a different bucket,
* join results can be incorrect. The cache is keyed on operator id and for bucket map join
* the operator does not change but data needed is different. For a proper fix, this
- * requires changes in the Tez API with regard to finding bucket id and
+ * requires changes in the Tez API with regard to finding bucket id and
* also ability to schedule tasks to re-use containers that have cached the specific bucket.
*/
LOG.info("This is not bucket map join, so cache");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index 2bcb481..8946221 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -29,13 +29,15 @@
import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
import org.apache.hadoop.hive.ql.plan.DemuxDesc;
import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
@@ -64,6 +66,7 @@
* OperatorFactory.
*
*/
+@SuppressWarnings({ "rawtypes", "unchecked" })
public final class OperatorFactory {
private static final List opvec;
private static final List vectorOpvec;
@@ -101,6 +104,10 @@
DemuxOperator.class));
opvec.add(new OpTuple(MuxDesc.class,
MuxOperator.class));
+ opvec.add(new OpTuple(AppMasterEventDesc.class,
+ AppMasterEventOperator.class));
+ opvec.add(new OpTuple(DynamicPruningEventDesc.class,
+ AppMasterEventOperator.class));
}
static {
@@ -119,9 +126,9 @@
private static final class OpTuple {
private final Class descClass;
- private final Class extends Operator> opClass;
+ private final Class extends Operator>> opClass;
- public OpTuple(Class descClass, Class extends Operator> opClass) {
+ public OpTuple(Class descClass, Class extends Operator>> opClass) {
this.descClass = descClass;
this.opClass = opClass;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 0c90114..ca6164a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -819,10 +819,12 @@ public Path read(Kryo kryo, Input input, Class type) {
}
}
- public static Set> cloneOperatorTree(Configuration conf, Set> roots) {
+ public static List> cloneOperatorTree(Configuration conf, List> roots) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
serializePlan(roots, baos, conf, true);
- Set> result = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+ @SuppressWarnings("unchecked")
+ List> result =
+ deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
roots.getClass(), conf, true);
return result;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
index e83bc17..7775100 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
@@ -130,7 +130,7 @@ public int execute(DriverContext driverContext) {
runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
- if(!runningViaChild) {
+ if (!runningViaChild) {
// we are not running this mapred task via child jvm
// so directly invoke ExecDriver
return super.execute(driverContext);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
index 1f68f7b..e84e65e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
@@ -14,10 +14,10 @@
import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.WriteBuffers;
import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -57,7 +57,7 @@
private boolean[] sortableSortOrders;
private KeyValueHelper writeHelper;
- private List