diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fe7b23fb38..b398d4cce2 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2537,6 +2537,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" + "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."), + TRANSACTIONAL_CONCATENATE_NOBLOCK("hive.transactional.concatenate.noblock", false, + "Will cause 'alter table T concatenate' to be non-blocking"), + HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true, "Whether the compactor should compact insert-only tables. A safety switch."), /** diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index f6608eb827..66853cd7ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4407,9 +4407,9 @@ public CompactionResponse compact2(String dbname, String tableName, String partN throws HiveException { try { CompactionType cr = null; - if ("major".equals(compactType)) { + if ("major".equalsIgnoreCase(compactType)) { cr = CompactionType.MAJOR; - } else if ("minor".equals(compactType)) { + } else if ("minor".equalsIgnoreCase(compactType)) { cr = CompactionType.MINOR; } else { throw new RuntimeException("Unknown compaction type " + compactType); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index defb8becdb..f1a8cc225f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1964,9 +1964,20 @@ private void analyzeAlterTablePartMergeFiles(ASTNode ast, try { tblObj = getTable(tableName); - // TODO: we should probably block all ACID tables here. - if (AcidUtils.isInsertOnlyTable(tblObj.getParameters())) { - throw new SemanticException("Merge is not supported for MM tables"); + if(AcidUtils.isTransactionalTable(tblObj)) { + LinkedHashMap newPartSpec = null; + if (partSpec != null) { + newPartSpec = new LinkedHashMap<>(partSpec); + } + + boolean isBlocking = !HiveConf.getBoolVar(conf, + ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, false); + AlterTableSimpleDesc desc = new AlterTableSimpleDesc( + tableName, newPartSpec, "MAJOR", isBlocking); + + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); + //todo: do stats auto gather crap? + return; } mergeDesc.setTableDesc(Utilities.getTableDesc(tblObj)); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java new file mode 100644 index 0000000000..641ce6ac65 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -0,0 +1,77 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public class TestTxnCommands3 extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands3.class); + private static final String TEST_DATA_DIR = + new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnLoadData.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Rule + public TemporaryFolder folder= new TemporaryFolder(); + + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + @Test + public void testConcatenate() throws Exception { + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,2),(4,5)"); + runStatementOnDriver("update " + Table.ACIDTBL + " set b = 4"); + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(5,6),(8,8)"); + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDTBL + " order by a, b"; + String[][] expected = new String[][] { + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t4", + "acidtbl/delta_0000002_0000002_0000/bucket_00001"}, + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4", + "acidtbl/delta_0000002_0000002_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t5\t6", + "acidtbl/delta_0000003_0000003_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", + "acidtbl/delta_0000003_0000003_0000/bucket_00001"}}; +// String[][] expected = new String[][]{ +// {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, +// {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}}; + + checkResult(expected, testQuery, false, "check data", LOG); + + /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker() + but in normal usage 'concatenate' is blocking, */ + hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true); + runStatementOnDriver("alter table " + Table.ACIDTBL + " concatenate"); + + TxnStore txnStore = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.INITIATED_RESPONSE, rsp.getCompacts().get(0).getState()); + runWorker(hiveConf); + rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + String[][] expected2 = new String[][] { + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t4", + "acidtbl/base_0000003/bucket_00001"}, + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4", + "acidtbl/base_0000003/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t5\t6", + "acidtbl/base_0000003/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", + "acidtbl/base_0000003/bucket_00001"}}; + checkResult(expected2, testQuery, false, "check data after concatenate", LOG); + } +}