diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index c44f2b5026..543ec0b991 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -149,7 +149,8 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag job.setQueueName(queueName); } - setColumnTypes(job, sd.getCols()); + // have to use table columns since partition SD isn't updated if these are altered + setColumnTypes(job, t.getSd().getCols()); //with feature on, multiple tasks may get into conflict creating/using TMP_LOCATION and if we were //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter //to do the final move diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 79dfb02ccd..b3b03baf48 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -1977,6 +1979,57 @@ public void testInsertOverwrite2() throws Exception { Assert.assertEquals(stringifyValues(resultData), rs); } + /** + * Create a table with schema evolution, and verify that no data is lost during (MR major) + * compaction. + * + * @throws Exception if a query fails + */ + @Test + public void testSchemaEvolutionCompaction() throws Exception { + String tblName = "schemaevolutioncompaction"; + runStatementOnDriver("drop table if exists " + tblName); + runStatementOnDriver("CREATE TABLE " + tblName + "(a INT) " + + " PARTITIONED BY(part string)" + + " STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + + // First INSERT round. + runStatementOnDriver("insert into " + tblName + " partition (part='aa') values (1)"); + + // ALTER TABLE ... ADD COLUMNS + runStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(b int)"); + + // Second INSERT round. + runStatementOnDriver("insert into " + tblName + " partition (part='aa') values (2, 2000)"); + + // Validate data + List res = runStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a"); + Assert.assertEquals(2, res.size()); + Assert.assertEquals("1\tNULL\taa", res.get(0)); + Assert.assertEquals("2\t2000\taa", res.get(1)); + + // Compact + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + CompactionRequest compactionRequest = + new CompactionRequest("default", tblName, CompactionType.MAJOR); + compactionRequest.setPartitionname("part=aa"); + txnHandler.compact(compactionRequest); + runWorker(hiveConf); + runCleaner(hiveConf); + + // Verify successful compaction + List compacts = + txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(0).getState()); + + // Validate data after compaction + res = runStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a"); + Assert.assertEquals(2, res.size()); + Assert.assertEquals("1\tNULL\taa", res.get(0)); + Assert.assertEquals("2\t2000\taa", res.get(1)); + } + /** * Test cleaner for TXN_TO_WRITE_ID table. * @throws Exception