diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index ebf1344..c70e1e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -3110,22 +3110,30 @@ public static double getHighestSamplePercentage (MapWork work) { } List finalPathsToAdd = new LinkedList<>(); - List> futures = new LinkedList<>(); + Map> getPathsCallableToFuture = new LinkedHashMap<>(); for (final Path path : pathsToAdd) { - if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { throw new IOException("Operation is Canceled. "); + } if (pool == null) { - finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call()); + Path newPath = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call(); + updatePathForMapWork(newPath, work, path); + finalPathsToAdd.add(newPath); } else { - futures.add(pool.submit(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy))); + GetInputPathsCallable callable = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy); + getPathsCallableToFuture.put(callable, pool.submit(callable)); } } if (pool != null) { - for (Future future : futures) { - if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + for (Map.Entry> future : getPathsCallableToFuture.entrySet()) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { throw new IOException("Operation is Canceled. "); - finalPathsToAdd.add(future.get()); + } + + Path newPath = future.getValue().get(); + updatePathForMapWork(newPath, work, future.getKey().path); + finalPathsToAdd.add(newPath); } } @@ -3154,7 +3162,8 @@ private GetInputPathsCallable(Path path, JobConf job, MapWork work, Path hiveScr @Override public Path call() throws Exception { if (!this.skipDummy && isEmptyPath(this.job, this.path, this.ctx)) { - return createDummyFileForEmptyPartition(this.path, this.job, this.work, this.hiveScratchDir); + return createDummyFileForEmptyPartition(this.path, this.job, this.work.getPathToPartitionInfo().get(this.path), + this.hiveScratchDir); } return this.path; } @@ -3192,14 +3201,12 @@ private static Path createEmptyFile(Path hiveScratchDir, } @SuppressWarnings("rawtypes") - private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work, - Path hiveScratchDir) - throws Exception { + private static Path createDummyFileForEmptyPartition(Path path, JobConf job, PartitionDesc partDesc, + Path hiveScratchDir) throws Exception { String strPath = path.toString(); // The input file does not exist, replace it by a empty file - PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); if (partDesc.getTableDesc().isNonNative()) { // if this isn't a hive table we can't create an empty file for it. return path; @@ -3216,16 +3223,19 @@ private static Path createDummyFileForEmptyPartition(Path path, JobConf job, Map if (LOG.isInfoEnabled()) { LOG.info("Changed input file " + strPath + " to empty file " + newPath + " (" + oneRow + ")"); } + return newPath; + } + private static void updatePathForMapWork(Path newPath, MapWork work, Path path) { // update the work + if (!newPath.equals(path)) { + PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); + work.addPathToAlias(newPath, work.getPathToAliases().get(path)); + work.removePathToAlias(path); - work.addPathToAlias(newPath, work.getPathToAliases().get(path)); - work.removePathToAlias(path); - - work.removePathToPartitionInfo(path); - work.addPathToPartitionInfo(newPath, partDesc); - - return newPath; + work.removePathToPartitionInfo(path); + work.addPathToPartitionInfo(newPath, partDesc); + } } @SuppressWarnings("rawtypes") diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 650f169..434e206 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -20,6 +20,10 @@ import static org.apache.hadoop.hive.ql.exec.Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.apache.hadoop.hive.ql.exec.Utilities.getFileExtension; import static org.mockito.Mockito.doReturn; @@ -32,6 +36,7 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -242,16 +247,17 @@ private Path setupTempDirWithSingleOutputFile(Configuration hconf) throws IOExce /** * Check that calling {@link Utilities#getInputPaths(JobConf, MapWork, Path, Context, boolean)} - * can process two different empty tables without throwing any exceptions. + * can process two different tables that both have empty partitions. */ @Test - public void testGetInputPathsWithEmptyTables() throws Exception { + public void testGetInputPathsWithEmptyPartitions() throws Exception { String alias1Name = "alias1"; String alias2Name = "alias2"; MapWork mapWork1 = new MapWork(); MapWork mapWork2 = new MapWork(); JobConf jobConf = new JobConf(); + Configuration conf = new Configuration(); Path nonExistentPath1 = new Path(UUID.randomUUID().toString()); Path nonExistentPath2 = new Path(UUID.randomUUID().toString()); @@ -269,14 +275,14 @@ public void testGetInputPathsWithEmptyTables() throws Exception { mapWork1.setPathToAliases(new LinkedHashMap<>( ImmutableMap.of(nonExistentPath1, Lists.newArrayList(alias1Name)))); - mapWork1.setAliasToWork(new LinkedHashMap>( + mapWork1.setAliasToWork(new LinkedHashMap<>( ImmutableMap.of(alias1Name, (Operator) mock(Operator.class)))); mapWork1.setPathToPartitionInfo(new LinkedHashMap<>( ImmutableMap.of(nonExistentPath1, mockPartitionDesc))); mapWork2.setPathToAliases(new LinkedHashMap<>( ImmutableMap.of(nonExistentPath2, Lists.newArrayList(alias2Name)))); - mapWork2.setAliasToWork(new LinkedHashMap>( + mapWork2.setAliasToWork(new LinkedHashMap<>( ImmutableMap.of(alias2Name, (Operator) mock(Operator.class)))); mapWork2.setPathToPartitionInfo(new LinkedHashMap<>( ImmutableMap.of(nonExistentPath2, mockPartitionDesc))); @@ -284,11 +290,22 @@ public void testGetInputPathsWithEmptyTables() throws Exception { List inputPaths = new ArrayList<>(); try { Path scratchDir = new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR)); - inputPaths.addAll(Utilities.getInputPaths(jobConf, mapWork1, scratchDir, - mock(Context.class), false)); - inputPaths.addAll(Utilities.getInputPaths(jobConf, mapWork2, scratchDir, - mock(Context.class), false)); - assertEquals(inputPaths.size(), 2); + + List inputPaths1 = Utilities.getInputPaths(jobConf, mapWork1, scratchDir, + mock(Context.class), false); + inputPaths.addAll(inputPaths1); + assertEquals(inputPaths1.size(), 1); + assertNotEquals(inputPaths1.get(0), nonExistentPath1); + assertTrue(inputPaths1.get(0).getFileSystem(conf).exists(inputPaths1.get(0))); + assertFalse(nonExistentPath1.getFileSystem(conf).exists(nonExistentPath1)); + + List inputPaths2 = Utilities.getInputPaths(jobConf, mapWork2, scratchDir, + mock(Context.class), false); + inputPaths.addAll(inputPaths2); + assertEquals(inputPaths2.size(), 1); + assertNotEquals(inputPaths2.get(0), nonExistentPath2); + assertTrue(inputPaths2.get(0).getFileSystem(conf).exists(inputPaths2.get(0))); + assertFalse(nonExistentPath2.getFileSystem(conf).exists(nonExistentPath2)); } finally { File file; for (Path path : inputPaths) { @@ -301,7 +318,72 @@ public void testGetInputPathsWithEmptyTables() throws Exception { } /** - * Check that calling {@link Utilities#getMaxExecutorsForInputListing(JobConf, int)} + * Check that calling {@link Utilities#getInputPaths(JobConf, MapWork, Path, Context, boolean)} + * can process two different tables that both have empty partitions when using multiple threads. + * Some extra logic is placed at the end of the test to validate no race conditions put the + * {@link MapWork} object in an invalid state. + */ + @Test + public void testGetInputPathsWithMultipleThreadsAndEmptyPartitions() throws Exception { + int numPartitions = 15; + JobConf jobConf = new JobConf(); + jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, + Runtime.getRuntime().availableProcessors() * 2); + MapWork mapWork = new MapWork(); + Path testTablePath = new Path("testTable"); + Path[] testPartitionsPaths = new Path[numPartitions]; + + PartitionDesc mockPartitionDesc = mock(PartitionDesc.class); + TableDesc mockTableDesc = mock(TableDesc.class); + + when(mockTableDesc.isNonNative()).thenReturn(false); + when(mockTableDesc.getProperties()).thenReturn(new Properties()); + when(mockPartitionDesc.getProperties()).thenReturn(new Properties()); + when(mockPartitionDesc.getTableDesc()).thenReturn(mockTableDesc); + doReturn(HiveSequenceFileOutputFormat.class).when( + mockPartitionDesc).getOutputFileFormatClass(); + + + for (int i = 0; i < numPartitions; i++) { + String testPartitionName = "p=" + i; + testPartitionsPaths[i] = new Path(testTablePath, "p=" + i); + mapWork.getPathToAliases().put(testPartitionsPaths[i], Lists.newArrayList(testPartitionName)); + mapWork.getAliasToWork().put(testPartitionName, (Operator) mock(Operator.class)); + mapWork.getPathToPartitionInfo().put(testPartitionsPaths[i], mockPartitionDesc); + + } + + FileSystem fs = FileSystem.getLocal(jobConf); + + try { + fs.mkdirs(testTablePath); + List inputPaths = Utilities.getInputPaths(jobConf, mapWork, + new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR)), mock(Context.class), false); + assertEquals(inputPaths.size(), numPartitions); + + for (int i = 0; i < numPartitions; i++) { + assertNotEquals(inputPaths.get(i), testPartitionsPaths[i]); + } + + assertEquals(mapWork.getPathToAliases().size(), numPartitions); + assertEquals(mapWork.getPathToPartitionInfo().size(), numPartitions); + assertEquals(mapWork.getAliasToWork().size(), numPartitions); + + for (Map.Entry> entry : mapWork.getPathToAliases().entrySet()) { + assertNotNull(entry.getKey()); + assertNotNull(entry.getValue()); + assertEquals(entry.getValue().size(), 1); + assertTrue(entry.getKey().getFileSystem(new Configuration()).exists(entry.getKey())); + } + } finally { + if (fs.exists(testTablePath)) { + fs.delete(testTablePath, true); + } + } + } + + /** + * Check that calling {@link Utilities#getMaxExecutorsForInputListing(Configuration, int)} * returns the maximum number of executors to use based on the number of input locations. */ @Test @@ -413,7 +495,7 @@ private void runTestGetInputPaths(JobConf jobConf, int numOfPartitions) throws E Path testTablePath = new Path(testTableName); Path[] testPartitionsPaths = new Path[numOfPartitions]; for (int i=0; i