diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index ed1c0abdf2..08d26dc14e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnStatsUpdateTask; +import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -3389,7 +3390,13 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole Table tab = getTable(qualified); boolean isView = tab.isView(); validateAlterTableType(tab, AlterTableTypes.ADDPARTITION, expectView); - outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_SHARED)); + outputs.add(new WriteEntity(tab, + /*use DDL_EXCLUSIVE to cause X lock to prevent races between concurrent add partition calls + with IF NOT EXISTS. w/o this 2 concurrent calls to add the same partition may both add + data since for transactional tables creating partition metadata and moving data there are + 2 separate actions. */ + ifNotExists && AcidUtils.isTransactionalTable(tab) ? WriteType.DDL_EXCLUSIVE + : WriteEntity.WriteType.DDL_SHARED)); int numCh = ast.getChildCount(); int start = ifNotExists ? 1 : 0; @@ -3446,7 +3453,10 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole return; } - rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc))); + Task ddlTask = + TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc)); + rootTasks.add(ddlTask); + handleTransactionalTable(tab, addPartitionDesc, ddlTask); if (isView) { // Compile internal query to capture underlying table partition dependencies @@ -3488,6 +3498,56 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole } } + /** + * Add partition for Transactional tables needs to add (copy/rename) the data so that it lands + * in a delta_x_x/ folder in the partition dir. + */ + private void handleTransactionalTable(Table tab, AddPartitionDesc addPartitionDesc, + Task ddlTask) throws SemanticException { + if(!AcidUtils.isTransactionalTable(tab)) { + return; + } + Long writeId; + int stmtId; + try { + writeId = SessionState.get().getTxnMgr().getTableWriteId(tab.getDbName(), + tab.getTableName()); + } catch (LockException ex) { + throw new SemanticException("Failed to allocate the write id", ex); + } + stmtId = SessionState.get().getTxnMgr().getStmtIdAndIncrement(); + + for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) { + OnePartitionDesc desc = addPartitionDesc.getPartition(index); + if (desc.getLocation() != null) { + if(addPartitionDesc.isIfNotExists()) { + //Don't add + Partition oldPart = getPartition(tab, desc.getPartSpec(), false); + if(oldPart != null) { + continue; + } + } + LoadTableDesc loadTableWork = new LoadTableDesc(new Path(desc.getLocation()), + Utilities.getTableDesc(tab), desc.getPartSpec(), + LoadTableDesc.LoadFileType.KEEP_EXISTING,//not relevant - creating new partition + writeId); + loadTableWork.setStmtId(stmtId); + loadTableWork.setInheritTableSpecs(true); + try { + desc.setLocation(new Path(tab.getDataLocation(), + Warehouse.makePartPath(desc.getPartSpec())).toString()); + } + catch (MetaException ex) { + throw new SemanticException("Could not determine partition path due to: " + ex.getMessage(), ex); + } + Task moveTask = TaskFactory.get( + new MoveWork(getInputs(), getOutputs(), loadTableWork, null, + true,//make sure to check format + false));//is this right? + ddlTask.addDependentTask(moveTask); + } + } + } /** * Rewrite the metadata for one or more partitions in a table. Useful when * an external process modifies files on HDFS and you want the pre/post diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 3710311f80..0fee075df2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -448,17 +448,9 @@ public void testAbort() throws Exception { }; checkResult(expected, testQuery, isVectorized, "load data inpath"); } - /** - * We have to use a different query to check results for Vectorized tests because to get the - * file name info we need to use {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#FILENAME} - * which will currently make the query non-vectorizable. This means we can't check the file name - * for vectorized version of the test. - */ private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{ - List rs = runStatementOnDriver(query); - checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); - assertVectorized(isVectorized, query); + checkResult(expectedResult, query, isVectorized, msg, LOG); } @Test public void testLoadAcidFile() throws Exception { diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index bc6e230bd2..a2adb966fe 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -206,9 +206,7 @@ void assertExpectedFileSet(Set expectedFiles, String rootPath) throws Ex } void checkExpected(List rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) { LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); - for(String s : rs) { - LOG.warn(s); - } + logResult(LOG, rs); Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); //verify data and layout for(int i = 0; i < expected.length; i++) { @@ -218,4 +216,22 @@ void checkExpected(List rs, String[][] expected, String msg, Logger LOG, } } } + void logResult(Logger LOG, List rs) { + StringBuilder sb = new StringBuilder(); + for(String s : rs) { + sb.append(s).append('\n'); + } + LOG.info(sb.toString()); + } + /** + * We have to use a different query to check results for Vectorized tests because to get the + * file name info we need to use {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#FILENAME} + * which will currently make the query non-vectorizable. This means we can't check the file name + * for vectorized version of the test. + */ + void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg, Logger LOG) throws Exception{ + List rs = runStatementOnDriver(query); + checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); + assertVectorized(isVectorized, query); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 39f40b1b85..8406caa761 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -45,7 +45,9 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -2418,4 +2420,34 @@ public void testValidWriteIdListSnapshot() throws Exception { cpr = driver.run("drop database if exists temp cascade"); checkCmdOnDriver(cpr); } + @Rule + public TemporaryFolder exportFolder = new TemporaryFolder(); + /** + * see also {@link org.apache.hadoop.hive.ql.TestTxnAddPartition} + */ + @Test + public void testAddPartitionLocks() throws Exception { + dropTable(new String[] {"T", "Tstage"}); + CommandProcessorResponse cpr = driver.run("create table T (a int, b int) partitioned by (p int) " + + "stored as orc tblproperties('transactional'='true')"); + checkCmdOnDriver(cpr); + //bucketed just so that we get 2 files + cpr = driver.run("create table Tstage (a int, b int) clustered by (a) into 2 " + + "buckets stored as orc tblproperties('transactional'='false')"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into Tstage values(0,2),(1,4)"); + checkCmdOnDriver(cpr); + String exportLoc = exportFolder.newFolder("1").toString(); + cpr = driver.run("export table Tstage to '" + exportLoc + "'"); + checkCmdOnDriver(cpr); + + cpr = driver.compileAndRespond("ALTER TABLE T ADD if not exists PARTITION (p=0)" + + " location '" + exportLoc + "/data'", true); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets X lock on T + + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks); + } }