diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d0eb2a4801..96e31be58c 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2560,6 +2560,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" + "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."), + TRANSACTIONAL_CONCATENATE_NOBLOCK("hive.transactional.concatenate.noblock", false, + "Will cause 'alter table T concatenate' to be non-blocking"), + HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true, "Whether the compactor should compact insert-only tables. A safety switch."), /** diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 5dbc478c98..903470aa3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4407,9 +4407,9 @@ public CompactionResponse compact2(String dbname, String tableName, String partN throws HiveException { try { CompactionType cr = null; - if ("major".equals(compactType)) { + if ("major".equalsIgnoreCase(compactType)) { cr = CompactionType.MAJOR; - } else if ("minor".equals(compactType)) { + } else if ("minor".equalsIgnoreCase(compactType)) { cr = CompactionType.MINOR; } else { throw new RuntimeException("Unknown compaction type " + compactType); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index defb8becdb..f0b9edaf01 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1964,9 +1964,19 @@ private void analyzeAlterTablePartMergeFiles(ASTNode ast, try { tblObj = getTable(tableName); - // TODO: we should probably block all ACID tables here. - if (AcidUtils.isInsertOnlyTable(tblObj.getParameters())) { - throw new SemanticException("Merge is not supported for MM tables"); + if(AcidUtils.isTransactionalTable(tblObj)) { + LinkedHashMap newPartSpec = null; + if (partSpec != null) { + newPartSpec = new LinkedHashMap<>(partSpec); + } + + boolean isBlocking = !HiveConf.getBoolVar(conf, + ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, false); + AlterTableSimpleDesc desc = new AlterTableSimpleDesc( + tableName, newPartSpec, "MAJOR", isBlocking); + + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); + return; } mergeDesc.setTableDesc(Utilities.getTableDesc(tblObj)); @@ -2039,11 +2049,6 @@ private void analyzeAlterTablePartMergeFiles(ASTNode ast, throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_NOT_MANAGED.getMsg()); } - // transactional tables are compacted and no longer needs to be bucketed, so not safe for merge/concatenation - boolean isAcid = AcidUtils.isTransactionalTable(tblObj); - if (isAcid) { - throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL.getMsg()); - } inputDir.add(oldTblPartLoc); mergeDesc.setInputDir(inputDir); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java new file mode 100644 index 0000000000..92bcefe5ee --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java @@ -0,0 +1,159 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public class TestTxnConcatenate extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnConcatenate.class); + private static final String TEST_DATA_DIR = + new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnLoadData.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + @Test + public void testConcatenate() throws Exception { + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,2),(4,5)"); + runStatementOnDriver("update " + Table.ACIDTBL + " set b = 4"); + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(5,6),(8,8)"); + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDTBL + " order by a, b"; + String[][] expected = new String[][] { + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t4", + "acidtbl/delta_0000002_0000002_0000/bucket_00001"}, + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4", + "acidtbl/delta_0000002_0000002_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t5\t6", + "acidtbl/delta_0000003_0000003_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", + "acidtbl/delta_0000003_0000003_0000/bucket_00001"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker() + but in normal usage 'concatenate' is blocking, */ + hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true); + runStatementOnDriver("alter table " + Table.ACIDTBL + " concatenate"); + + TxnStore txnStore = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.INITIATED_RESPONSE, rsp.getCompacts().get(0).getState()); + runWorker(hiveConf); + rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + String[][] expected2 = new String[][] { + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t4", + "acidtbl/base_0000003/bucket_00001"}, + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4", + "acidtbl/base_0000003/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t5\t6", + "acidtbl/base_0000003/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", + "acidtbl/base_0000003/bucket_00001"}}; + checkResult(expected2, testQuery, false, "check data after concatenate", LOG); + } + @Test + public void testConcatenatePart() throws Exception { + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " values(1,2,'p1'),(4,5,'p2')"); + runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 4 where p='p1'"); + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " values(5,6,'p1'),(8,8,'p2')"); + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDTBLPART + " order by a, b"; + String[][] expected = new String[][] { + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4", + "acidtblpart/p=p1/delta_0000002_0000002_0000/bucket_00001"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5", + "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6", + "acidtblpart/p=p1/delta_0000003_0000003_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", + "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker() + but in normal usage 'concatenate' is blocking, */ + hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true); + runStatementOnDriver("alter table " + Table.ACIDTBLPART + " PARTITION(p='p1') concatenate"); + + TxnStore txnStore = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.INITIATED_RESPONSE, rsp.getCompacts().get(0).getState()); + runWorker(hiveConf); + rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + String[][] expected2 = new String[][] { + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4", + "acidtblpart/p=p1/base_0000003/bucket_00001"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5", + "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6", + "acidtblpart/p=p1/base_0000003/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", + "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}}; + + checkResult(expected2, testQuery, false, "check data after concatenate", LOG); + } + + @Test + public void testConcatenateMM() throws Exception { + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T(a int, b int)"); + runStatementOnDriver("insert into T values(1,2),(4,5)"); + runStatementOnDriver("insert into T values(5,6),(8,8)"); + String testQuery = "select a, b, INPUT__FILE__NAME from T order by a, b"; + String[][] expected = new String[][] { + {"1\t2", + "t/delta_0000001_0000001_0000/000000_0"}, + {"4\t5", + "t/delta_0000001_0000001_0000/000000_0"}, + {"5\t6", + "t/delta_0000002_0000002_0000/000000_0"}, + {"8\t8", + "t/delta_0000002_0000002_0000/000000_0"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker() + but in normal usage 'concatenate' is blocking, */ + hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true); + runStatementOnDriver("alter table T concatenate"); + + TxnStore txnStore = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.INITIATED_RESPONSE, rsp.getCompacts().get(0).getState()); + runWorker(hiveConf); + rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + String[][] expected2 = new String[][] { + {"1\t2", + "t/base_0000002/000000_0"}, + {"4\t5", + "t/base_0000002/000000_0"}, + {"5\t6", + "t/base_0000002/000000_0"}, + {"8\t8", + "t/base_0000002/000000_0"}}; + checkResult(expected2, testQuery, false, "check data after concatenate", LOG); + } +}