diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 8f7bbb2..4556301 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2944,7 +2944,6 @@ public static double getHighestSamplePercentage (MapWork work) { */ public static List getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, Context ctx, boolean skipDummy) throws Exception { - int sequenceNumber = 0; Set pathsProcessed = new HashSet(); List pathsToAdd = new LinkedList(); @@ -2971,7 +2970,7 @@ public static double getHighestSamplePercentage (MapWork work) { if (!skipDummy && isEmptyPath(job, path, ctx)) { path = createDummyFileForEmptyPartition(path, job, work, - hiveScratchDir, alias, sequenceNumber++); + hiveScratchDir); } pathsToAdd.add(path); @@ -2987,8 +2986,7 @@ public static double getHighestSamplePercentage (MapWork work) { // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 // rows) if (path == null && !skipDummy) { - path = createDummyFileForEmptyTable(job, work, hiveScratchDir, - alias, sequenceNumber++); + path = createDummyFileForEmptyTable(job, work, hiveScratchDir, alias); pathsToAdd.add(path); } } @@ -2998,11 +2996,11 @@ public static double getHighestSamplePercentage (MapWork work) { @SuppressWarnings({"rawtypes", "unchecked"}) private static Path createEmptyFile(Path hiveScratchDir, HiveOutputFormat outFileFormat, JobConf job, - int sequenceNumber, Properties props, boolean dummyRow) + Properties props, boolean dummyRow) throws IOException, InstantiationException, IllegalAccessException { // create a dummy empty file in a new directory - String newDir = hiveScratchDir + Path.SEPARATOR + sequenceNumber; + String newDir = hiveScratchDir + Path.SEPARATOR + UUID.randomUUID().toString(); Path newPath = new Path(newDir); FileSystem fs = newPath.getFileSystem(job); fs.mkdirs(newPath); @@ -3028,7 +3026,7 @@ private static Path createEmptyFile(Path hiveScratchDir, @SuppressWarnings("rawtypes") private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work, - Path hiveScratchDir, String alias, int sequenceNumber) + Path hiveScratchDir) throws Exception { String strPath = path.toString(); @@ -3047,7 +3045,7 @@ private static Path createDummyFileForEmptyPartition(Path path, JobConf job, Map boolean oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class; Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job, - sequenceNumber, props, oneRow); + props, oneRow); if (LOG.isInfoEnabled()) { LOG.info("Changed input file " + strPath + " to empty file " + newPath); @@ -3072,7 +3070,7 @@ private static Path createDummyFileForEmptyPartition(Path path, JobConf job, Map @SuppressWarnings("rawtypes") private static Path createDummyFileForEmptyTable(JobConf job, MapWork work, - Path hiveScratchDir, String alias, int sequenceNumber) + Path hiveScratchDir, String alias) throws Exception { TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc(); @@ -3085,7 +3083,7 @@ private static Path createDummyFileForEmptyTable(JobConf job, MapWork work, HiveOutputFormat outFileFormat = HiveFileFormatUtils.getHiveOutputFormat(job, tableDesc); Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job, - sequenceNumber, props, false); + props, false); if (LOG.isInfoEnabled()) { LOG.info("Changed input file for alias " + alias + " to " + newPath); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index d2060a1..adde59f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.apache.hadoop.hive.ql.exec.Utilities.getFileExtension; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -30,13 +31,19 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.LinkedHashMap; +import java.util.Properties; +import java.util.UUID; import org.apache.commons.io.FileUtils; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -44,17 +51,25 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.JobConf; + import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.io.Files; @@ -246,4 +261,64 @@ private Path setupTempDirWithSingleOutputFile(Configuration hconf) throws IOExce FileSystem.getLocal(hconf).create(taskOutputPath).close(); return tempDirPath; } + + /** + * Check that calling {@link Utilities#getInputPaths(JobConf, MapWork, Path, Context, boolean)} + * can process two different empty tables without throwing any exceptions. + */ + @Test + public void testGetInputPathsWithEmptyTables() throws Exception { + String alias1Name = "alias1"; + String alias2Name = "alias2"; + + MapWork mapWork1 = new MapWork(); + MapWork mapWork2 = new MapWork(); + JobConf jobConf = new JobConf(); + + String nonExistentPath1 = UUID.randomUUID().toString(); + String nonExistentPath2 = UUID.randomUUID().toString(); + + PartitionDesc mockPartitionDesc = mock(PartitionDesc.class); + TableDesc mockTableDesc = mock(TableDesc.class); + + when(mockTableDesc.isNonNative()).thenReturn(false); + when(mockTableDesc.getProperties()).thenReturn(new Properties()); + + when(mockPartitionDesc.getProperties()).thenReturn(new Properties()); + when(mockPartitionDesc.getTableDesc()).thenReturn(mockTableDesc); + doReturn(HiveSequenceFileOutputFormat.class).when( + mockPartitionDesc).getOutputFileFormatClass(); + + mapWork1.setPathToAliases(new LinkedHashMap<>( + ImmutableMap.of(nonExistentPath1, Lists.newArrayList(alias1Name)))); + mapWork1.setAliasToWork(new LinkedHashMap>( + ImmutableMap.of(alias1Name, (Operator) mock(Operator.class)))); + mapWork1.setPathToPartitionInfo(new LinkedHashMap<>( + ImmutableMap.of(nonExistentPath1, mockPartitionDesc))); + + mapWork2.setPathToAliases(new LinkedHashMap<>( + ImmutableMap.of(nonExistentPath2, Lists.newArrayList(alias2Name)))); + mapWork2.setAliasToWork(new LinkedHashMap>( + ImmutableMap.of(alias2Name, (Operator) mock(Operator.class)))); + mapWork2.setPathToPartitionInfo(new LinkedHashMap<>( + ImmutableMap.of(nonExistentPath2, mockPartitionDesc))); + + List inputPaths = new ArrayList<>(); + try { + Path scratchDir = new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR)); + inputPaths.addAll(Utilities.getInputPaths(jobConf, mapWork1, scratchDir, + mock(Context.class), false)); + inputPaths.addAll(Utilities.getInputPaths(jobConf, mapWork2, scratchDir, + mock(Context.class), false)); + assertEquals(inputPaths.size(), 2); + } finally { + File file; + for (Path path : inputPaths) { + file = new File(path.toString()); + if (file.exists()) { + file.delete(); + } + } + } + } }