diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index ed1c0abdf2..08d26dc14e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnStatsUpdateTask; +import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -3389,7 +3390,13 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole Table tab = getTable(qualified); boolean isView = tab.isView(); validateAlterTableType(tab, AlterTableTypes.ADDPARTITION, expectView); - outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_SHARED)); + outputs.add(new WriteEntity(tab, + /*use DDL_EXCLUSIVE to cause X lock to prevent races between concurrent add partition calls + with IF NOT EXISTS. w/o this 2 concurrent calls to add the same partition may both add + data since for transactional tables creating partition metadata and moving data there are + 2 separate actions. */ + ifNotExists && AcidUtils.isTransactionalTable(tab) ? WriteType.DDL_EXCLUSIVE + : WriteEntity.WriteType.DDL_SHARED)); int numCh = ast.getChildCount(); int start = ifNotExists ? 1 : 0; @@ -3446,7 +3453,10 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole return; } - rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc))); + Task ddlTask = + TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc)); + rootTasks.add(ddlTask); + handleTransactionalTable(tab, addPartitionDesc, ddlTask); if (isView) { // Compile internal query to capture underlying table partition dependencies @@ -3488,6 +3498,56 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole } } + /** + * Add partition for Transactional tables needs to add (copy/rename) the data so that it lands + * in a delta_x_x/ folder in the partition dir. + */ + private void handleTransactionalTable(Table tab, AddPartitionDesc addPartitionDesc, + Task ddlTask) throws SemanticException { + if(!AcidUtils.isTransactionalTable(tab)) { + return; + } + Long writeId; + int stmtId; + try { + writeId = SessionState.get().getTxnMgr().getTableWriteId(tab.getDbName(), + tab.getTableName()); + } catch (LockException ex) { + throw new SemanticException("Failed to allocate the write id", ex); + } + stmtId = SessionState.get().getTxnMgr().getStmtIdAndIncrement(); + + for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) { + OnePartitionDesc desc = addPartitionDesc.getPartition(index); + if (desc.getLocation() != null) { + if(addPartitionDesc.isIfNotExists()) { + //Don't add + Partition oldPart = getPartition(tab, desc.getPartSpec(), false); + if(oldPart != null) { + continue; + } + } + LoadTableDesc loadTableWork = new LoadTableDesc(new Path(desc.getLocation()), + Utilities.getTableDesc(tab), desc.getPartSpec(), + LoadTableDesc.LoadFileType.KEEP_EXISTING,//not relevant - creating new partition + writeId); + loadTableWork.setStmtId(stmtId); + loadTableWork.setInheritTableSpecs(true); + try { + desc.setLocation(new Path(tab.getDataLocation(), + Warehouse.makePartPath(desc.getPartSpec())).toString()); + } + catch (MetaException ex) { + throw new SemanticException("Could not determine partition path due to: " + ex.getMessage(), ex); + } + Task moveTask = TaskFactory.get( + new MoveWork(getInputs(), getOutputs(), loadTableWork, null, + true,//make sure to check format + false));//is this right? + ddlTask.addDependentTask(moveTask); + } + } + } /** * Rewrite the metadata for one or more partitions in a table. Useful when * an external process modifies files on HDFS and you want the pre/post diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java new file mode 100644 index 0000000000..901b5dbd31 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java @@ -0,0 +1,260 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +/** + * Tests related to support of ADD PARTITION with Acid/MM tables + + * Most tests run in vectorized and non-vectorized mode since we currently have a vectorized and + * a non-vectorized acid readers and it's critical that ROW_IDs are generated the same way. + * + * Side Note: Alter Table Add Partition does no validations on the data - not file name checks, + * not Input/OutputFormat, bucketing etc... + */ +public class TestTxnAddPartition extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnAddPartition.class); + private static final String TEST_DATA_DIR = + new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnLoadData.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + + @Test + public void addPartition() throws Exception { + addPartition(false); + } + + @Test + public void addPartitionVectorized() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + addPartition(true); + } + + /** + * Tests adding multiple partitions + * adding partition w/o location + * adding partition when it already exists + * adding partition when it already exists with "if not exists" + */ + private void addPartition(boolean isVectorized) throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc" + + " tblproperties('transactional'='true')"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc" + + " tblproperties('transactional'='false')"); + + runStatementOnDriver("insert into Tstage values(0,2),(0,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/2'"); + + runStatementOnDriver("ALTER TABLE T ADD" + + " PARTITION (p=0) location '" + getWarehouseDir() + "/1/data'" + + " PARTITION (p=1) location '" + getWarehouseDir() + "/2/data'" + + " PARTITION (p=2)"); + + String testQuery = isVectorized ? "select ROW__ID, p, a, b from T order by p, ROW__ID" : + "select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"; + String[][] expected = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", + "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", + "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t0\t2", + "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t0\t4", + "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}}; + checkResult(expected, testQuery, isVectorized, "add 2 parts w/data and 1 empty", LOG); + + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/3'"); + //should be an error since p=3 exists + CommandProcessorResponse cpr = runStatementOnDriverNegative( + "ALTER TABLE T ADD PARTITION (p=0) location '" + getWarehouseDir() + "/3/data'"); + Assert.assertTrue("add existing partition", cpr.getErrorMessage() != null + && cpr.getErrorMessage().contains("Partition already exists")); + + //should be no-op since p=3 exists + String stmt = "ALTER TABLE T ADD IF NOT EXISTS " + + "PARTITION (p=0) location '" + getWarehouseDir() + "/3/data' "//p=0 exists and is not empty + + "PARTITION (p=2) location '" + getWarehouseDir() + "/3/data'"//p=2 exists and is empty + + "PARTITION (p=3) location '" + getWarehouseDir() + "/3/data'";//p=3 doesn't exist + runStatementOnDriver(stmt); + String[][] expected2 = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", + "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", + "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t0\t2", + "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t0\t4", + "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t3\t0\t2", + "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t3\t0\t4", + "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}}; + checkResult(expected2, testQuery, isVectorized, "add 2 existing parts and 1 empty", LOG); + } + + @Test + public void addPartitionMM() throws Exception { + addPartitionMM(false); + } + + @Test + public void addPartitionMMVectorized() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + addPartitionMM(true); + } + + /** + * Micro managed table test + * Tests adding multiple partitions + * adding partition w/o location + * adding partition when it already exists + * adding partition when it already exists with "if not exists" + */ + private void addPartitionMM(boolean isVectorized) throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc" + + " tblproperties('transactional'='true', 'transactional_properties'='insert_only')"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc" + + " tblproperties('transactional'='false')"); + + runStatementOnDriver("insert into Tstage values(0,2),(0,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/2'"); + + runStatementOnDriver("ALTER TABLE T ADD" + + " PARTITION (p=0) location '" + getWarehouseDir() + "/1/data'" + + " PARTITION (p=1) location '" + getWarehouseDir() + "/2/data'" + + " PARTITION (p=2)"); + + String testQuery = isVectorized ? "select p, a, b from T order by p, a, b" : + "select p, a, b, INPUT__FILE__NAME from T order by p, a, b"; + String[][] expected = new String[][]{ + {"0\t0\t2", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"0\t0\t4", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"1\t0\t2", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}, + {"1\t0\t4", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}}; + checkResult(expected, testQuery, isVectorized, "add 2 parts w/data and 1 empty", LOG); + + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/3'"); + //should be an error since p=3 exists + CommandProcessorResponse cpr = runStatementOnDriverNegative( + "ALTER TABLE T ADD PARTITION (p=0) location '" + getWarehouseDir() + "/3/data'"); + Assert.assertTrue("add existing partition", cpr.getErrorMessage() != null + && cpr.getErrorMessage().contains("Partition already exists")); + + //should be no-op since p=3 exists + runStatementOnDriver("ALTER TABLE T ADD IF NOT EXISTS " + + "PARTITION (p=0) location '" + getWarehouseDir() + "/3/data' "//p=0 exists and is not empty + + "PARTITION (p=2) location '" + getWarehouseDir() + "/3/data'"//p=2 exists and is empty + + "PARTITION (p=3) location '" + getWarehouseDir() + "/3/data'");//p=3 doesn't exist + String[][] expected2 = new String[][]{ + {"0\t0\t2", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"0\t0\t4", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"1\t0\t2", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}, + {"1\t0\t4", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}, + {"3\t0\t2", "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}, + {"3\t0\t4", "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}}; + checkResult(expected2, testQuery, isVectorized, "add 2 existing parts and 1 empty", LOG); + } + + @Test + public void addPartitionBucketed() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) " + + "clustered by (a) into 2 buckets stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create table Tstage (a int, b int) clustered by (a) into 2 " + + "buckets stored as orc tblproperties('transactional'='false')"); + + runStatementOnDriver("insert into Tstage values(0,2),(1,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + + runStatementOnDriver("ALTER TABLE T ADD PARTITION (p=0) location '" + + getWarehouseDir() + "/1/data'"); + + List rs = runStatementOnDriver( + "select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); + String[][] expected = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", + "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t0\t1\t4", + "warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}}; + checkExpected(rs, expected, "add partition (p=0)"); + } + + private void checkExpected(List rs, String[][] expected, String msg) { + super.checkExpected(rs, expected, msg, LOG, true); + } + + /** + * Check to make sure that if files being loaded don't have standard Hive names, that they are + * renamed during add. + */ + @Test + public void addPartitionReaname() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) " + + "stored as orc tblproperties('transactional'='true')"); + //bucketed just so that we get 2 files + runStatementOnDriver("create table Tstage (a int, b int) clustered by (a) into 2 " + + "buckets stored as orc tblproperties('transactional'='false')"); + + runStatementOnDriver("insert into Tstage values(0,2),(1,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status = fs.listStatus(new Path(getWarehouseDir() + "/1/data"), + AcidUtils.originalBucketFilter); + boolean b = fs.rename(new Path(getWarehouseDir() + "/1/data/000000_0"), new Path(getWarehouseDir() + "/1/data/part-m000")); + b = fs.rename(new Path(getWarehouseDir() + "/1/data/000001_0"), new Path(getWarehouseDir() + "/1/data/part-m001")); + + runStatementOnDriver("ALTER TABLE T ADD PARTITION (p=0) location '" + + getWarehouseDir() + "/1/data'"); + + List rs = runStatementOnDriver( + "select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); + String[][] expected = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", + "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t0\t1\t4", + "warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}}; + checkExpected(rs, expected, "add partition (p=0)"); + } + + /** + * {@link TestDbTxnManager2#testAddPartitionLocks} + */ + @Ignore + @Test + public void testLocks() throws Exception { + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 3710311f80..0fee075df2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -448,17 +448,9 @@ public void testAbort() throws Exception { }; checkResult(expected, testQuery, isVectorized, "load data inpath"); } - /** - * We have to use a different query to check results for Vectorized tests because to get the - * file name info we need to use {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#FILENAME} - * which will currently make the query non-vectorizable. This means we can't check the file name - * for vectorized version of the test. - */ private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{ - List rs = runStatementOnDriver(query); - checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); - assertVectorized(isVectorized, query); + checkResult(expectedResult, query, isVectorized, msg, LOG); } @Test public void testLoadAcidFile() throws Exception { diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index bc6e230bd2..a2adb966fe 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -206,9 +206,7 @@ void assertExpectedFileSet(Set expectedFiles, String rootPath) throws Ex } void checkExpected(List rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) { LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); - for(String s : rs) { - LOG.warn(s); - } + logResult(LOG, rs); Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); //verify data and layout for(int i = 0; i < expected.length; i++) { @@ -218,4 +216,22 @@ void checkExpected(List rs, String[][] expected, String msg, Logger LOG, } } } + void logResult(Logger LOG, List rs) { + StringBuilder sb = new StringBuilder(); + for(String s : rs) { + sb.append(s).append('\n'); + } + LOG.info(sb.toString()); + } + /** + * We have to use a different query to check results for Vectorized tests because to get the + * file name info we need to use {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#FILENAME} + * which will currently make the query non-vectorizable. This means we can't check the file name + * for vectorized version of the test. + */ + void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg, Logger LOG) throws Exception{ + List rs = runStatementOnDriver(query); + checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); + assertVectorized(isVectorized, query); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 39f40b1b85..8406caa761 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -45,7 +45,9 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -2418,4 +2420,34 @@ public void testValidWriteIdListSnapshot() throws Exception { cpr = driver.run("drop database if exists temp cascade"); checkCmdOnDriver(cpr); } + @Rule + public TemporaryFolder exportFolder = new TemporaryFolder(); + /** + * see also {@link org.apache.hadoop.hive.ql.TestTxnAddPartition} + */ + @Test + public void testAddPartitionLocks() throws Exception { + dropTable(new String[] {"T", "Tstage"}); + CommandProcessorResponse cpr = driver.run("create table T (a int, b int) partitioned by (p int) " + + "stored as orc tblproperties('transactional'='true')"); + checkCmdOnDriver(cpr); + //bucketed just so that we get 2 files + cpr = driver.run("create table Tstage (a int, b int) clustered by (a) into 2 " + + "buckets stored as orc tblproperties('transactional'='false')"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into Tstage values(0,2),(1,4)"); + checkCmdOnDriver(cpr); + String exportLoc = exportFolder.newFolder("1").toString(); + cpr = driver.run("export table Tstage to '" + exportLoc + "'"); + checkCmdOnDriver(cpr); + + cpr = driver.compileAndRespond("ALTER TABLE T ADD if not exists PARTITION (p=0)" + + " location '" + exportLoc + "/data'", true); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets X lock on T + + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks); + } }