diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index b48889ef9d..6ce5480840 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -46,6 +46,7 @@ minitez.query.files=acid_vectorization_original_tez.q,\ minillap.shared.query.files=acid_direct_insert_insert_overwrite.q,\ acid_multiinsert_dyn_part.q,\ + acid_multiinsert_static_parts.q,\ insert_into1.q,\ insert_into2.q,\ llapdecider.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index d8f8e72efa..e2d5497e37 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -258,7 +258,7 @@ public void closeOp(boolean abort) throws HiveException { // There's always just one file that we have merged. // The union/DP/etc. should already be account for in the path. Utilities.writeCommitManifest(Lists.newArrayList(outPath), tmpPath.getParent(), fs, taskId, conf.getWriteId(), - conf.getStmtId(), null, false, hasDynamicPartitions, new HashSet<>()); + conf.getStmtId(), null, false, hasDynamicPartitions, new HashSet<>(), null); LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes)."); } } @@ -337,8 +337,8 @@ public void jobCloseOp(Configuration hconf, boolean success) lbLevels = conf.getListBucketingDepth(); // We don't expect missing buckets from mere (actually there should be no buckets), // so just pass null as bucketing context. Union suffix should also be accounted for. - Utilities.handleDirectInsertTableFinalPath(outputDir.getParent(), null, hconf, success, - dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false, false); + Utilities.handleDirectInsertTableFinalPath(outputDir.getParent(), null, hconf, success, dpLevels, lbLevels, + null, mmWriteId, stmtId, reporter, isMmTable, false, false, false, null); } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 082f1cbc09..8ee14bcb20 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -1410,8 +1410,9 @@ public void closeOp(boolean abort) throws HiveException { } } if (conf.isMmTable() || conf.isDirectInsert()) { - Utilities.writeCommitManifest(commitPaths, specPath, fs, originalTaskId, conf.getTableWriteId(), conf - .getStatementId(), unionPath, conf.getInsertOverwrite(), bDynParts, dynamicPartitionSpecs); + Utilities.writeCommitManifest(commitPaths, specPath, fs, originalTaskId, conf.getTableWriteId(), + conf.getStatementId(), unionPath, conf.getInsertOverwrite(), bDynParts, dynamicPartitionSpecs, + conf.getStaticSpec()); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1475,7 +1476,7 @@ public void jobCloseOp(Configuration hconf, boolean success) conf.getTableInfo(), numBuckets, conf.getCompressed()); Utilities.handleDirectInsertTableFinalPath(specPath, unionSuffix, hconf, success, dpLevels, lbLevels, mbc, conf.getTableWriteId(), conf.getStatementId(), reporter, conf.isMmTable(), conf.isMmCtas(), conf - .getInsertOverwrite(), conf.isDirectInsert()); + .getInsertOverwrite(), conf.isDirectInsert(), conf.getStaticSpec()); } } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 0e4ce78827..bae3600b5b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4344,7 +4344,7 @@ private static void tryDeleteAllDirectInsertFiles(FileSystem fs, Path specPath, public static void writeCommitManifest(List commitPaths, Path specPath, FileSystem fs, String taskId, Long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite, - boolean hasDynamicPartitions, Set dynamicPartitionSpecs) throws HiveException { + boolean hasDynamicPartitions, Set dynamicPartitionSpecs, String staticSpec) throws HiveException { // When doing a multi-statement insert overwrite with dynamic partitioning, // the partition information will be written to the manifest file. @@ -4359,8 +4359,9 @@ public static void writeCommitManifest(List commitPaths, Path specPath, Fi if (commitPaths.isEmpty() && !writeDynamicPartitionsToManifest) { return; } + // We assume one FSOP per task (per specPath), so we create it in specPath. - Path manifestPath = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite); + Path manifestPath = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite, staticSpec); manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION); Utilities.FILE_OP_LOGGER.info("Writing manifest to {} with {}", manifestPath, commitPaths); try { @@ -4387,9 +4388,15 @@ public static void writeCommitManifest(List commitPaths, Path specPath, Fi } } - private static Path getManifestDir( - Path specPath, long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite) { - Path manifestPath = new Path(specPath, "_tmp." + + private static Path getManifestDir(Path specPath, long writeId, int stmtId, String unionSuffix, + boolean isInsertOverwrite, String staticSpec) { + Path manifestRoot = specPath; + if (staticSpec != null) { + String tableRoot = specPath.toString(); + tableRoot = tableRoot.substring(0, tableRoot.length() - staticSpec.length()); + manifestRoot = new Path(tableRoot); + } + Path manifestPath = new Path(manifestRoot, "_tmp." + AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, writeId, writeId, stmtId)); if (isInsertOverwrite) { // When doing a multi-statement insert overwrite query with dynamic partitioning, the @@ -4414,10 +4421,10 @@ public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isComp public static void handleDirectInsertTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long writeId, int stmtId, - Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite, boolean isDirectInsert) + Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite, boolean isDirectInsert, String staticSpec) throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); - Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite); + Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite, staticSpec); if (!success) { AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId); tryDeleteAllDirectInsertFiles(fs, specPath, manifestDir, dpLevels, lbLevels, diff --git ql/src/test/queries/clientpositive/acid_multiinsert_static_parts.q ql/src/test/queries/clientpositive/acid_multiinsert_static_parts.q new file mode 100644 index 0000000000..7382e9e900 --- /dev/null +++ ql/src/test/queries/clientpositive/acid_multiinsert_static_parts.q @@ -0,0 +1,35 @@ +set hive.acid.direct.insert.enabled=true; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.stats.autogather=true; + +drop table if exists multi_test_text; +drop table if exists multi_test_acid; + +create external table multi_test_text (a int, b int, c int) stored as textfile; + +insert into multi_test_text values (1111, 11, 1111), (2222, 22, 1111), (3333, 33, 2222), (4444, 44, NULL), (5555, 55, NULL); + +create table multi_test_acid (a int, b int) partitioned by (c int) stored as orc tblproperties('transactional'='true'); + +from multi_test_text a +insert overwrite table multi_test_acid partition (c=66) +select + a.a, + a.b + where a.c is not null +insert overwrite table multi_test_acid partition (c=77) +select + a.a, + a.b +where a.c=1 +insert overwrite table multi_test_acid partition (c=88) +select + a.a, + a.b +where a.c is null; + +select * from multi_test_acid order by a; + +drop table if exists multi_test_text; +drop table if exists multi_test_acid; \ No newline at end of file diff --git ql/src/test/results/clientpositive/llap/acid_multiinsert_static_parts.q.out ql/src/test/results/clientpositive/llap/acid_multiinsert_static_parts.q.out new file mode 100644 index 0000000000..5e5ea73d63 --- /dev/null +++ ql/src/test/results/clientpositive/llap/acid_multiinsert_static_parts.q.out @@ -0,0 +1,118 @@ +PREHOOK: query: drop table if exists multi_test_text +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists multi_test_text +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists multi_test_acid +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists multi_test_acid +POSTHOOK: type: DROPTABLE +PREHOOK: query: create external table multi_test_text (a int, b int, c int) stored as textfile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@multi_test_text +POSTHOOK: query: create external table multi_test_text (a int, b int, c int) stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@multi_test_text +PREHOOK: query: insert into multi_test_text values (1111, 11, 1111), (2222, 22, 1111), (3333, 33, 2222), (4444, 44, NULL), (5555, 55, NULL) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@multi_test_text +POSTHOOK: query: insert into multi_test_text values (1111, 11, 1111), (2222, 22, 1111), (3333, 33, 2222), (4444, 44, NULL), (5555, 55, NULL) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@multi_test_text +POSTHOOK: Lineage: multi_test_text.a SCRIPT [] +POSTHOOK: Lineage: multi_test_text.b SCRIPT [] +POSTHOOK: Lineage: multi_test_text.c SCRIPT [] +PREHOOK: query: create table multi_test_acid (a int, b int) partitioned by (c int) stored as orc tblproperties('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@multi_test_acid +POSTHOOK: query: create table multi_test_acid (a int, b int) partitioned by (c int) stored as orc tblproperties('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@multi_test_acid +PREHOOK: query: from multi_test_text a +insert overwrite table multi_test_acid partition (c=66) +select + a.a, + a.b + where a.c is not null +insert overwrite table multi_test_acid partition (c=77) +select + a.a, + a.b +where a.c=1 +insert overwrite table multi_test_acid partition (c=88) +select + a.a, + a.b +where a.c is null +PREHOOK: type: QUERY +PREHOOK: Input: default@multi_test_text +PREHOOK: Output: default@multi_test_acid@c=66 +PREHOOK: Output: default@multi_test_acid@c=77 +PREHOOK: Output: default@multi_test_acid@c=88 +POSTHOOK: query: from multi_test_text a +insert overwrite table multi_test_acid partition (c=66) +select + a.a, + a.b + where a.c is not null +insert overwrite table multi_test_acid partition (c=77) +select + a.a, + a.b +where a.c=1 +insert overwrite table multi_test_acid partition (c=88) +select + a.a, + a.b +where a.c is null +POSTHOOK: type: QUERY +POSTHOOK: Input: default@multi_test_text +POSTHOOK: Output: default@multi_test_acid@c=66 +POSTHOOK: Output: default@multi_test_acid@c=77 +POSTHOOK: Output: default@multi_test_acid@c=88 +POSTHOOK: Lineage: multi_test_acid PARTITION(c=66).a SIMPLE [(multi_test_text)a.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: multi_test_acid PARTITION(c=66).b SIMPLE [(multi_test_text)a.FieldSchema(name:b, type:int, comment:null), ] +POSTHOOK: Lineage: multi_test_acid PARTITION(c=77).a SIMPLE [(multi_test_text)a.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: multi_test_acid PARTITION(c=77).b SIMPLE [(multi_test_text)a.FieldSchema(name:b, type:int, comment:null), ] +POSTHOOK: Lineage: multi_test_acid PARTITION(c=88).a SIMPLE [(multi_test_text)a.FieldSchema(name:a, type:int, comment:null), ] +POSTHOOK: Lineage: multi_test_acid PARTITION(c=88).b SIMPLE [(multi_test_text)a.FieldSchema(name:b, type:int, comment:null), ] +PREHOOK: query: select * from multi_test_acid order by a +PREHOOK: type: QUERY +PREHOOK: Input: default@multi_test_acid +PREHOOK: Input: default@multi_test_acid@c=66 +PREHOOK: Input: default@multi_test_acid@c=77 +PREHOOK: Input: default@multi_test_acid@c=88 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from multi_test_acid order by a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@multi_test_acid +POSTHOOK: Input: default@multi_test_acid@c=66 +POSTHOOK: Input: default@multi_test_acid@c=77 +POSTHOOK: Input: default@multi_test_acid@c=88 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1111 11 66 +2222 22 66 +3333 33 66 +4444 44 88 +5555 55 88 +PREHOOK: query: drop table if exists multi_test_text +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@multi_test_text +PREHOOK: Output: default@multi_test_text +POSTHOOK: query: drop table if exists multi_test_text +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@multi_test_text +POSTHOOK: Output: default@multi_test_text +PREHOOK: query: drop table if exists multi_test_acid +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@multi_test_acid +PREHOOK: Output: default@multi_test_acid +POSTHOOK: query: drop table if exists multi_test_acid +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@multi_test_acid +POSTHOOK: Output: default@multi_test_acid