commit 66fa97e146f0b0de52521ffcf57d1377dea1d0bd Author: Chris Drome Date: Tue Aug 8 22:57:33 2017 +0000 HIVE-17275: Auto-merge fails on writes of UNION ALL output to ORC file with dynamic partitioning diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index f3f1563..e63f595 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -418,6 +418,7 @@ minitez.query.files=acid_vectorization_missing_cols.q,\ tez_union.q,\ tez_union2.q,\ tez_union_dynamic_partition.q,\ + tez_union_dynamic_partition_2.q,\ tez_union_view.q,\ tez_union_with_udf.q,\ tez_union_decimal.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index dfad6c1..32c37c9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -43,6 +43,7 @@ extends Operator implements Serializable { public static final String BACKUP_PREFIX = "_backup."; + public static final String UNION_SUDBIR_PREFIX = "HIVE_UNION_SUBDIR_"; public static final Logger LOG = LoggerFactory.getLogger(AbstractFileMergeOperator.class); protected JobConf jc; @@ -193,12 +194,19 @@ protected void fixTmpPath(Path path) throws IOException { } } else { if (hasDynamicPartitions || (listBucketingDepth > 0)) { + // In light of results from union queries, we need to be aware that + // sub-directories can exist in the partition directory. We want to + // ignore these sub-directories and promote merged files to the + // partition directory. + String name = path.getName(); + Path realPartitionPath = name.startsWith(UNION_SUDBIR_PREFIX) ? path.getParent() : path; + if (tmpPathFixed) { - checkPartitionsMatch(path); + checkPartitionsMatch(realPartitionPath); } else { // We haven't fixed the TMP path for this mapper yet - int depthDiff = path.depth() - tmpPath.depth(); - fixTmpPath(path, depthDiff); + int depthDiff = realPartitionPath.depth() - tmpPath.depth(); + fixTmpPath(realPartitionPath, depthDiff); tmpPathFixed = true; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index c88d537..514d13c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -294,7 +295,7 @@ public static void removeUnionOperators(GenTezProcContext context, BaseWork work linked = context.linkedFileSinks.get(path); linked.add(desc); - desc.setDirName(new Path(path, "" + linked.size())); + desc.setDirName(new Path(path, AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + linked.size())); desc.setLinkedFileSink(true); desc.setParentDir(path); desc.setLinkedFileSinkDesc(linked); diff --git ql/src/test/queries/clientpositive/tez_union_dynamic_partition_2.q ql/src/test/queries/clientpositive/tez_union_dynamic_partition_2.q new file mode 100644 index 0000000..e8cfb3a --- /dev/null +++ ql/src/test/queries/clientpositive/tez_union_dynamic_partition_2.q @@ -0,0 +1,27 @@ +drop table if exists dummy; +drop table if exists partunion1; + +create table dummy(i int); +insert into table dummy values (1); +select * from dummy; + +create table partunion1(id1 int) partitioned by (part1 string) stored as orc; + +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.merge.tezfiles=true; + +explain insert into table partunion1 partition(part1) +select temps.* from ( +select 1 as id1, '2014' as part1 from dummy +union all +select 2 as id1, '2014' as part1 from dummy ) temps; + +insert into table partunion1 partition(part1) +select 1 as id1, '2014' as part1 from dummy +union all +select 2 as id1, '2014' as part1 from dummy; + +select * from partunion1; + +drop table dummy; +drop table partunion1; diff --git ql/src/test/results/clientpositive/tez/tez_union_dynamic_partition_2.q.out ql/src/test/results/clientpositive/tez/tez_union_dynamic_partition_2.q.out new file mode 100644 index 0000000..43e500e --- /dev/null +++ ql/src/test/results/clientpositive/tez/tez_union_dynamic_partition_2.q.out @@ -0,0 +1,141 @@ +PREHOOK: query: drop table if exists dummy +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists dummy +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists partunion1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists partunion1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table dummy(i int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dummy +POSTHOOK: query: create table dummy(i int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dummy +PREHOOK: query: insert into table dummy values (1) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@dummy +POSTHOOK: query: insert into table dummy values (1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@dummy +POSTHOOK: Lineage: dummy.i EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: select * from dummy +PREHOOK: type: QUERY +PREHOOK: Input: default@dummy +#### A masked pattern was here #### +POSTHOOK: query: select * from dummy +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dummy +#### A masked pattern was here #### +1 +PREHOOK: query: create table partunion1(id1 int) partitioned by (part1 string) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@partunion1 +POSTHOOK: query: create table partunion1(id1 int) partitioned by (part1 string) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@partunion1 +PREHOOK: query: explain insert into table partunion1 partition(part1) +select temps.* from ( +select 1 as id1, '2014' as part1 from dummy +union all +select 2 as id1, '2014' as part1 from dummy ) temps +PREHOOK: type: QUERY +POSTHOOK: query: explain insert into table partunion1 partition(part1) +select temps.* from ( +select 1 as id1, '2014' as part1 from dummy +union all +select 2 as id1, '2014' as part1 from dummy ) temps +POSTHOOK: type: QUERY +Plan optimized by CBO. + +Vertex dependency in root stage +Map 1 <- Union 2 (CONTAINS) +Map 3 <- Union 2 (CONTAINS) + +Stage-3 + Stats-Aggr Operator + Stage-0 + Move Operator + table:{"name:":"default.partunion1"} + Stage-2 + Dependency Collection{} + Stage-5(CONDITIONAL) + Move Operator + Stage-8(CONDITIONAL CHILD TASKS: Stage-5, Stage-4, Stage-6) + Conditional Operator + Stage-1 + Union 2 + <-Map 1 [CONTAINS] + File Output Operator [FS_6] + table:{"name:":"default.partunion1"} + Select Operator [SEL_5] (rows=2 width=92) + Output:["_col0","_col1"] + Select Operator [SEL_1] (rows=1 width=4) + Output:["_col0"] + TableScan [TS_0] (rows=1 width=1) + <-Map 3 [CONTAINS] + File Output Operator [FS_6] + table:{"name:":"default.partunion1"} + Select Operator [SEL_5] (rows=2 width=92) + Output:["_col0","_col1"] + Select Operator [SEL_3] (rows=1 width=4) + Output:["_col0"] + TableScan [TS_2] (rows=1 width=1) + Stage-4(CONDITIONAL) + File Merge + Please refer to the previous Stage-8(CONDITIONAL CHILD TASKS: Stage-5, Stage-4, Stage-6) + Stage-7 + Move Operator + Stage-6(CONDITIONAL) + File Merge + Please refer to the previous Stage-8(CONDITIONAL CHILD TASKS: Stage-5, Stage-4, Stage-6) + +PREHOOK: query: insert into table partunion1 partition(part1) +select 1 as id1, '2014' as part1 from dummy +union all +select 2 as id1, '2014' as part1 from dummy +PREHOOK: type: QUERY +PREHOOK: Input: default@dummy +PREHOOK: Output: default@partunion1 +POSTHOOK: query: insert into table partunion1 partition(part1) +select 1 as id1, '2014' as part1 from dummy +union all +select 2 as id1, '2014' as part1 from dummy +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dummy +POSTHOOK: Output: default@partunion1@part1=2014 +POSTHOOK: Lineage: partunion1 PARTITION(part1=2014).id1 EXPRESSION [] +PREHOOK: query: select * from partunion1 +PREHOOK: type: QUERY +PREHOOK: Input: default@partunion1 +PREHOOK: Input: default@partunion1@part1=2014 +#### A masked pattern was here #### +POSTHOOK: query: select * from partunion1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@partunion1 +POSTHOOK: Input: default@partunion1@part1=2014 +#### A masked pattern was here #### +1 2014 +2 2014 +PREHOOK: query: drop table dummy +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@dummy +PREHOOK: Output: default@dummy +POSTHOOK: query: drop table dummy +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@dummy +POSTHOOK: Output: default@dummy +PREHOOK: query: drop table partunion1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@partunion1 +PREHOOK: Output: default@partunion1 +POSTHOOK: query: drop table partunion1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@partunion1 +POSTHOOK: Output: default@partunion1