diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 06e4ebee82..7e11b15f98 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -564,6 +564,7 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, (tbd.getLbCtx() == null) ? 0 : tbd.getLbCtx().calculateListBucketingLevel(), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && !tbd.isMmTable(), + tbd.isMmTable(), work.getLoadTableWork().getWriteId(), tbd.getStmtId(), resetStatisticsProps(table), diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index f4bd0f9399..1b4d913a32 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2837,9 +2837,9 @@ private void constructOneLBLocationMap(FileStatus fSta, */ public Map, Partition> loadDynamicPartitions(final Path loadPath, final String tableName, final Map partSpec, final LoadFileType loadFileType, - final int numDP, final int numLB, final boolean isAcid, final long writeId, final int stmtId, - final boolean resetStatistics, final AcidUtils.Operation operation, - boolean isInsertOverwrite) throws HiveException { + final int numDP, final int numLB, final boolean isAcid, final boolean isMM, + final long writeId, final int stmtId, final boolean resetStatistics, + final AcidUtils.Operation operation, boolean isInsertOverwrite) throws HiveException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS); @@ -2946,6 +2946,7 @@ private void constructOneLBLocationMap(FileStatus fSta, + " loadFileType=" + loadFileType.toString() + ", " + " listBucketingLevel=" + numLB + ", " + " isAcid=" + isAcid + ", " + + " isMM(table)=" + isMM + ", " + " resetStatistics=" + resetStatistics, e); throw e; } @@ -2997,9 +2998,9 @@ private void constructOneLBLocationMap(FileStatus fSta, .collect(Collectors.toList()), tableSnapshot); } catch (InterruptedException | ExecutionException e) { - throw new HiveException("Exception when loading " + validPartitions.size() + throw new HiveException("Exception when loading " + validPartitions.size() + " partitions" + " in table " + tbl.getTableName() - + " with loadPath=" + loadPath); + + " with loadPath=" + loadPath, e); } catch (TException e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); @@ -3024,7 +3025,7 @@ private void constructOneLBLocationMap(FileStatus fSta, } try { - if (isAcid) { + if (isAcid || isMM) { List partNames = result.values().stream().map(Partition::getName).collect(Collectors.toList()); getMSC().addDynamicPartitions(parentSession.getTxnMgr().getCurrentTxnId(), writeId, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 60bfba826d..aa45dd9a1f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7408,8 +7408,13 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // NOTE: specify Dynamic partitions in dest_tab for WriteEntity if (!isNonNativeTable) { AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; - if (destTableIsFullAcid) { - acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); + if (destTableIsFullAcid || isMmTable) { + if (isMmTable) { + acidOp = getAcidType(dest); + } else { + // contains stricter checks: the output format and TxnManager must be ACID-related + acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); + } //todo: should this be done for MM? is it ok to use CombineHiveInputFormat with MM checkAcidConstraints(); } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index c033a94bfa..bb0fc44dc7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -1558,6 +1558,31 @@ public void testMultiInsert() throws Exception { } //todo: Concurrent insert/update of same partition - should pass + @Test public void testMultiInsertOnDynamicallyPartitionedMmTable() throws Exception { + dropTable(new String[] {"tabMmDp", "tab_not_acid"}); + + driver.run("create table if not exists tabMmDp (a int, b int) partitioned by (p string) " + + "stored as orc " + + "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); + driver.run("create table if not exists tab_not_acid (a int, b int, p string)"); + driver.run("insert into tab_not_acid values (1 ,1, 'one'), (2, 2, 'two')"); + // insert 2 rows twice into the MM table + driver.run("from tab_not_acid " + + "insert into tabMmDp select a,b,p " + + "insert into tabMmDp select a,b,p"); //txnid: 6 (2 drops, 2 creates, 2 inserts) + + final String completedTxnComponentsContents = TxnDbUtil.queryToString(conf,"select * from COMPLETED_TXN_COMPONENTS"); + Assert.assertEquals(completedTxnComponentsContents, + 2, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS")); + Assert.assertEquals(completedTxnComponentsContents, + 2, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=6")); + Assert.assertEquals(completedTxnComponentsContents, + 2, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=6 and ctc_table='tabmmdp'")); + // ctc_update_delete value should be "N" for both partitions since these are inserts + Assert.assertEquals(completedTxnComponentsContents, + 2, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=6 and ctc_table='tabmmdp' and ctc_update_delete='N'")); + } + private List getLocksWithFilterOptions(HiveTxnManager txnMgr, String dbName, String tblName, Map partSpec) throws Exception { if (dbName == null && tblName != null) {