diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index c44f2b5026..543ec0b991 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -149,7 +149,8 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag job.setQueueName(queueName); } - setColumnTypes(job, sd.getCols()); + // have to use table columns since partition SD isn't updated if these are altered + setColumnTypes(job, t.getSd().getCols()); //with feature on, multiple tasks may get into conflict creating/using TMP_LOCATION and if we were //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter //to do the final move diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 79dfb02ccd..ea4e691d93 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -1977,6 +1979,69 @@ public void testInsertOverwrite2() throws Exception { Assert.assertEquals(stringifyValues(resultData), rs); } + /** + * Add column to partitioned table, update the new column, and compact. + * + * @throws Exception if a statement fails + */ + @Test + public void testSchemaEvolutionAddColUpdate() throws Exception { + String tblName = "schemaevolutionaddcolupdate"; + runStatementOnDriver("drop table if exists " + tblName); + runStatementOnDriver("CREATE TABLE " + tblName + "(a INT) " + + " PARTITIONED BY(part string)" + + " STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + + // First INSERT round. + runStatementOnDriver("insert into " + tblName + " partition (part='aa') values (1)"); + + // ALTER TABLE ... ADD COLUMNS + runStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(b int)"); + + // Validate there is an added NULL for column b. + List res = runStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a"); + Assert.assertEquals(1, res.size()); + Assert.assertEquals("1\tNULL\taa", res.get(0)); + + // Update new column (b) + runStatementOnDriver("update " + tblName + " set b=1000 where a=1"); + + // Validate there the new insertions for column c. + res = runStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a"); + Assert.assertEquals(1, res.size()); + Assert.assertEquals("1\t1000\taa", res.get(0)); + + // compact + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0); + runInitiator(hiveConf); + + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + SortedSet partNames = new TreeSet(); + for (int i = 0; i < compacts.size(); i++) { + Assert.assertEquals("default", compacts.get(i).getDbname()); + Assert.assertEquals(tblName, compacts.get(i).getTablename()); + Assert.assertEquals("initiated", compacts.get(i).getState()); + partNames.add(compacts.get(i).getPartitionname()); + } + List names = new ArrayList(partNames); + Assert.assertEquals("part=aa", names.get(0)); + + runWorker(hiveConf); + runCleaner(hiveConf); + + compacts = txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(0).getState()); + + // Validate after compaction. + res = runStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a"); + Assert.assertEquals(1, res.size()); + Assert.assertEquals("1\t1000\taa", res.get(0)); + } + /** * Test cleaner for TXN_TO_WRITE_ID table. * @throws Exception