diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 3968b0e899..e8e1fa797c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -426,7 +426,7 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB continue; } if(t != null) { - compBuilder.setIsAcid(AcidUtils.isAcidTable(t)); + compBuilder.setIsAcid(AcidUtils.isAcidTable(t) || AcidUtils.isInsertOnlyTable(t)); } LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); @@ -524,7 +524,9 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi output.getWriteType().toString()); } if(t != null) { - compBuilder.setIsAcid(AcidUtils.isAcidTable(t)); + + + compBuilder.setIsAcid(AcidUtils.isAcidTable(t) || AcidUtils.isInsertOnlyTable(t)); } compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite()); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java index d4f1dd5a86..e383cb60a7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -591,6 +591,68 @@ public void testInsertOverwriteWithUnionAll() throws Exception { Assert.assertEquals(stringifyValues(rExpected), rs); } + @Test + public void writeBetweenWorkerAndCleanerOnMmTable() throws Exception { + String tblName = "hive18693"; + + String statement = "create table " + tblName + " (a int, b string) " + + " tblproperties ('transactional'='true', 'transactional_properties'='insert_only')"; + + writeBetweenWorkerAndCleaner(statement, tblName); + } + + protected void writeBetweenWorkerAndCleaner(String createTableStatement, String tblName) throws Exception { + + runStatementOnDriver("drop table if exists " + tblName); + // Create table + runStatementOnDriver(createTableStatement); + + // Make some data + runStatementOnDriver("insert into " + tblName + " values(1, 'foo')"); + runStatementOnDriver("insert into " + tblName + " values(2, 'bar')"); + + //run Worker to execute compaction + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setConf(hiveConf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Start a transaction with insert and roll back this transaction. + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into " + tblName + " values (3, 'baz')"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + + List expected = new ArrayList<>(); + expected.add("1\tfoo"); + expected.add("2\tbar"); + + Assert.assertEquals("", expected, + runStatementOnDriver("select a,b from " + tblName + " order by a")); + + // Run Cleaner + Cleaner c = new Cleaner(); + c.setThreadId((int)c.getId()); + c.setConf(hiveConf); + c.init(stop, new AtomicBoolean()); + c.run(); + + // Run initiator to execute CompactionTxnHandler.cleanEmptyAbortedTxns() + Initiator i = new Initiator(); + i.setThreadId((int)i.getId()); + i.setConf(hiveConf); + i.init(stop, new AtomicBoolean()); + i.run(); + + // Check that aborted operation is not counted for MM table. + Assert.assertEquals("", expected, + runStatementOnDriver("select a,b from " + tblName + " order by a")); + } + private void verifyDirAndResult(int expectedDeltas) throws Exception { FileSystem fs = FileSystem.get(hiveConf); // Verify the content of subdirs