diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 74e3c8a..8e9d6af 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -783,6 +783,13 @@
HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true, ""),
HIVEMERGEINPUTFORMATBLOCKLEVEL("hive.merge.input.format.block.level",
"org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat", ""),
+ HIVEMERGEORCFILESTRIPELEVEL("hive.merge.orcfile.stripe.level", true,
+ "When hive.merge.mapfiles or hive merge.mapredfiles is enabled while writing a\n" +
+ " table with ORC file format, enabling this config will do stripe level fast merge\n" +
+ " for small orc files. Note that enabling this config will not honor padding tolerance\n" +
+ " config (hive.exec.orc.block.padding.tolerance)."),
+ HIVEMERGEINPUTFORMATSTRIPELEVEL("hive.merge.input.format.stripe.level",
+ "org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat", ""),
HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS(
"hive.merge.current.job.has.dynamic.partitions", false, ""),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index d6a8e70..ad33634 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -1266,6 +1266,21 @@
+ hive.merge.orcfile.stripe.level
+ true
+
+ When hive.merge.mapfiles or hive merge.mapredfiles is enabled while writing a
+ table with ORC file format, enabling this config will do stripe level fast merge
+ for small orc files. Note that enabling this config will not honor padding tolerance
+ config (hive.exec.orc.block.padding.tolerance).
+
+
+
+ hive.merge.input.format.stripe.level
+ org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat
+
+
+
hive.merge.current.job.has.dynamic.partitions
false
diff --git itests/qtest/testconfiguration.properties itests/qtest/testconfiguration.properties
index 6a3ee1d..bc1fb91 100644
--- itests/qtest/testconfiguration.properties
+++ itests/qtest/testconfiguration.properties
@@ -2,4 +2,5 @@ minimr.query.files=stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q
minimr.query.negative.files=cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q
minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q
minitez.query.files.shared=cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q,temp_table.q,vectorized_ptf.q,optimize_nullscan.q,vector_cast_constant.q
+minitez.query.files.shared=orc_merge1.q,orc_merge2.q,orc_merge3.q,orc_merge4.q,alter_merge_orc.q,alter_merge_2_orc.q,alter_merge_stats_orc.q,cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q,temp_table.q,vectorized_ptf.q,optimize_nullscan.q
beeline.positive.exclude=add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index c80a2a3..bf56d8f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -89,7 +89,7 @@
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeTask;
import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
@@ -550,12 +550,13 @@ private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc)
throws HiveException {
// merge work only needs input and output.
MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
- mergeFilesDesc.getOutputDir());
+ mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass());
mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx());
mergeWork.resolveConcatenateMerge(db.getConf());
mergeWork.setMapperCannotSpanPartns(true);
+ mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass());
DriverContext driverCxt = new DriverContext();
- BlockMergeTask taskExec = new BlockMergeTask();
+ MergeTask taskExec = new MergeTask();
taskExec.initialize(db.getConf(), null, driverCxt);
taskExec.setWork(mergeWork);
taskExec.setQueryPlan(this.getQueryPlan());
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e1dc911..fa4f46b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -36,7 +36,7 @@
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeTask;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
@@ -294,7 +294,7 @@ public int execute(DriverContext driverContext) {
while (task.getParentTasks() != null && task.getParentTasks().size() == 1) {
task = (Task)task.getParentTasks().get(0);
// If it was a merge task or a local map reduce task, nothing can be inferred
- if (task instanceof BlockMergeTask || task instanceof MapredLocalTask) {
+ if (task instanceof MergeTask || task instanceof MapredLocalTask) {
break;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index ad6e19c..eae80c0 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -28,7 +28,7 @@
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeTask;
import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
@@ -93,7 +93,7 @@ public TaskTuple(Class workClass, Class extends Task> taskClass) {
taskvec.add(new TaskTuple(StatsNoJobWork.class, StatsNoJobTask.class));
taskvec.add(new TaskTuple(ColumnStatsWork.class, ColumnStatsTask.class));
taskvec.add(new TaskTuple(MergeWork.class,
- BlockMergeTask.class));
+ MergeTask.class));
taskvec.add(new TaskTuple(DependencyCollectionWork.class,
DependencyCollectionTask.class));
taskvec.add(new TaskTuple(PartialScanWork.class,
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 10ae157..81db284 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -120,6 +120,7 @@
import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper;
import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
@@ -346,7 +347,8 @@ private static BaseWork getBaseWork(Configuration conf, String name) {
if(MAP_PLAN_NAME.equals(name)){
if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
gWork = deserializePlan(in, MapWork.class, conf);
- } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
+ } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS)) ||
+ OrcFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
gWork = deserializePlan(in, MergeWork.class, conf);
} else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
gWork = deserializePlan(in, ColumnTruncateWork.class, conf);
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
new file mode 100644
index 0000000..6eb0e22
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Key for OrcFileMergeMapper task. Contains orc file related information that
+ * should match before merging two orc files.
+ */
+public class OrcFileKeyWrapper implements WritableComparable {
+
+ protected Path inputPath;
+ protected CompressionKind compression;
+ protected long compressBufferSize;
+ protected List types;
+ protected int rowIndexStride;
+ protected List versionList;
+
+ public List getVersionList() {
+ return versionList;
+ }
+
+ public void setVersionList(List versionList) {
+ this.versionList = versionList;
+ }
+
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ public void setRowIndexStride(int rowIndexStride) {
+ this.rowIndexStride = rowIndexStride;
+ }
+
+ public long getCompressBufferSize() {
+ return compressBufferSize;
+ }
+
+ public void setCompressBufferSize(long compressBufferSize) {
+ this.compressBufferSize = compressBufferSize;
+ }
+
+ public CompressionKind getCompression() {
+ return compression;
+ }
+
+ public void setCompression(CompressionKind compression) {
+ this.compression = compression;
+ }
+
+ public List getTypes() {
+ return types;
+ }
+
+ public void setTypes(List types) {
+ this.types = types;
+ }
+
+ public Path getInputPath() {
+ return inputPath;
+ }
+
+ public void setInputPath(Path inputPath) {
+ this.inputPath = inputPath;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new RuntimeException("Not supported.");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new RuntimeException("Not supported.");
+ }
+
+ @Override
+ public int compareTo(OrcFileKeyWrapper o) {
+ return inputPath.compareTo(o.inputPath);
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java
new file mode 100644
index 0000000..4876913
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.CombineHiveKey;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Map task fast merging of ORC files.
+ */
+public class OrcFileMergeMapper extends MapReduceBase implements
+ Mapper