diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 12a929a..44a3699 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import java.util.ArrayList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.beans.DefaultPersistenceDelegate; @@ -1387,7 +1386,7 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, if (success) { if (fs.exists(tmpPath)) { // remove any tmp file or double-committed output files - ArrayList emptyBuckets = + List emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx, conf, hconf); // create empty buckets if necessary if (emptyBuckets.size() > 0) { @@ -1415,7 +1414,7 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, * @throws HiveException * @throws IOException */ - private static void createEmptyBuckets(Configuration hconf, ArrayList paths, + private static void createEmptyBuckets(Configuration hconf, List paths, FileSinkDesc conf, Reporter reporter) throws HiveException, IOException { @@ -1443,8 +1442,7 @@ private static void createEmptyBuckets(Configuration hconf, ArrayList pa throw new HiveException(e); } - for (String p : paths) { - Path path = new Path(p); + for (Path path : paths) { RecordWriter writer = HiveFileFormatUtils.getRecordWriter( jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path, reporter); @@ -1465,13 +1463,13 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I * * @return a list of path names corresponding to should-be-created empty buckets. */ - public static ArrayList removeTempOrDuplicateFiles(FileSystem fs, Path path, + public static List removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { if (path == null) { return null; } - ArrayList result = new ArrayList(); + List result = new ArrayList(); HashMap taskIDToFile = null; if (dpCtx != null) { FileStatus parts[] = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs); @@ -1502,8 +1500,9 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I String taskID2 = replaceTaskId(taskID1, j); if (!taskIDToFile.containsKey(taskID2)) { // create empty bucket, file name should be derived from taskID2 - String path2 = replaceTaskIdFromFilename(bucketPath.toUri().getPath().toString(), j); - result.add(path2); + URI bucketUri = bucketPath.toUri(); + String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j); + result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2)); } } } @@ -1520,8 +1519,9 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I String taskID2 = replaceTaskId(taskID1, j); if (!taskIDToFile.containsKey(taskID2)) { // create empty bucket, file name should be derived from taskID2 - String path2 = replaceTaskIdFromFilename(bucketPath.toUri().getPath().toString(), j); - result.add(path2); + URI bucketUri = bucketPath.toUri(); + String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j); + result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2)); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 3ce4723..6e8bd38 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hive.ql.exec; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.apache.hadoop.hive.ql.exec.Utilities.getFileExtension; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -28,28 +32,40 @@ import java.util.Set; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.JobConf; import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; import com.google.common.io.Files; -import junit.framework.TestCase; +public class TestUtilities { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); -public class TestUtilities extends TestCase { public static final Logger LOG = LoggerFactory.getLogger(TestUtilities.class); + private static final int NUM_BUCKETS = 3; + @Test public void testGetFileExtension() { JobConf jc = new JobConf(); assertEquals("No extension for uncompressed unknown format", "", @@ -77,6 +93,7 @@ public void testGetFileExtension() { getFileExtension(jc, true, new HiveIgnoreKeyTextOutputFormat())); } + @Test public void testSerializeTimestamp() { Timestamp ts = new Timestamp(1374554702000L); ts.setNanos(123456); @@ -89,6 +106,7 @@ public void testSerializeTimestamp() { SerializationUtilities.serializeExpression(desc)).getExprString()); } + @Test public void testgetDbTableName() throws HiveException{ String tablename; String [] dbtab; @@ -117,6 +135,7 @@ public void testgetDbTableName() throws HiveException{ } } + @Test public void testGetJarFilesByPath() { HiveConf conf = new HiveConf(this.getClass()); File f = Files.createTempDir(); @@ -144,6 +163,7 @@ public void testGetJarFilesByPath() { } } + @Test public void testReplaceTaskId() { String taskID = "000000"; int bucketNum = 1; @@ -154,6 +174,7 @@ public void testReplaceTaskId() { Assert.assertEquals("(ds%3D1)000005", newTaskID); } + @Test public void testMaskIfPassword() { Assert.assertNull(Utilities.maskIfPassword("",null)); Assert.assertNull(Utilities.maskIfPassword(null,null)); @@ -163,5 +184,80 @@ public void testMaskIfPassword() { Assert.assertEquals("###_MASKED_###",Utilities.maskIfPassword("a_passWord","test4")); Assert.assertEquals("###_MASKED_###",Utilities.maskIfPassword("password_a","test5")); Assert.assertEquals("###_MASKED_###",Utilities.maskIfPassword("a_PassWord_a","test6")); + + @Test + public void testRemoveTempOrDuplicateFilesOnTezNoDp() throws Exception { + List paths = runRemoveTempOrDuplicateFilesTestCase("tez", false); + assertEquals(0, paths.size()); + } + + @Test + public void testRemoveTempOrDuplicateFilesOnTezWithDp() throws Exception { + List paths = runRemoveTempOrDuplicateFilesTestCase("tez", true); + assertEquals(0, paths.size()); + } + + @Test + public void testRemoveTempOrDuplicateFilesOnMrNoDp() throws Exception { + List paths = runRemoveTempOrDuplicateFilesTestCase("mr", false); + assertEquals(NUM_BUCKETS, paths.size()); + } + + @Test + public void testRemoveTempOrDuplicateFilesOnMrWithDp() throws Exception { + List paths = runRemoveTempOrDuplicateFilesTestCase("mr", true); + assertEquals(NUM_BUCKETS, paths.size()); + } + + private List runRemoveTempOrDuplicateFilesTestCase(String executionEngine, boolean dPEnabled) + throws Exception { + Configuration hconf = new HiveConf(this.getClass()); + // do this to verify that Utilities.removeTempOrDuplicateFiles does not revert to default scheme information + hconf.set("fs.defaultFS", "hdfs://should-not-be-used/"); + hconf.set(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, executionEngine); + FileSystem localFs = FileSystem.getLocal(hconf); + DynamicPartitionCtx dpCtx = getDynamicPartitionCtx(dPEnabled); + Path tempDirPath = setupTempDirWithSingleOutputFile(hconf); + FileSinkDesc conf = getFileSinkDesc(tempDirPath); + + List paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf); + + String expectedScheme = tempDirPath.toUri().getScheme(); + String expectedAuthority = tempDirPath.toUri().getAuthority(); + assertPathsMatchSchemeAndAuthority(expectedScheme, expectedAuthority, paths); + + return paths; + } + + private void assertPathsMatchSchemeAndAuthority(String expectedScheme, String expectedAuthority, List paths) { + for (Path path : paths) { + assertEquals(path.toUri().getScheme().toLowerCase(), expectedScheme.toLowerCase()); + assertEquals(path.toUri().getAuthority(), expectedAuthority); + } + } + + private DynamicPartitionCtx getDynamicPartitionCtx(boolean dPEnabled) { + DynamicPartitionCtx dpCtx = null; + if (dPEnabled) { + dpCtx = mock(DynamicPartitionCtx.class); + when(dpCtx.getNumDPCols()).thenReturn(0); + when(dpCtx.getNumBuckets()).thenReturn(NUM_BUCKETS); + } + return dpCtx; + } + + private FileSinkDesc getFileSinkDesc(Path tempDirPath) { + Table table = mock(Table.class); + when(table.getNumBuckets()).thenReturn(NUM_BUCKETS); + FileSinkDesc conf = new FileSinkDesc(tempDirPath, null, false); + conf.setTable(table); + return conf; + } + + private Path setupTempDirWithSingleOutputFile(Configuration hconf) throws IOException { + Path tempDirPath = new Path("file://" + temporaryFolder.newFolder().getAbsolutePath()); + Path taskOutputPath = new Path(tempDirPath, Utilities.getTaskId(hconf)); + FileSystem.getLocal(hconf).create(taskOutputPath).close(); + return tempDirPath; } }