diff --git a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java deleted file mode 100644 index 6ca35e2..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; - -import java.util.Collection; - -/** - * Utilities for different blob (object) storage systems - */ -public class BlobStorageUtils { - private static final boolean DISABLE_BLOBSTORAGE_AS_SCRATCHDIR = false; - - public static boolean isBlobStoragePath(final Configuration conf, final Path path) { - return (path == null) ? false : isBlobStorageScheme(conf, path.toUri().getScheme()); - } - - public static boolean isBlobStorageFileSystem(final Configuration conf, final FileSystem fs) { - return (fs == null) ? false : isBlobStorageScheme(conf, fs.getScheme()); - } - - public static boolean isBlobStorageScheme(final Configuration conf, final String scheme) { - Collection supportedBlobStoreSchemes = - conf.getStringCollection(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname); - - return supportedBlobStoreSchemes.contains(scheme); - } - - public static boolean isBlobStorageAsScratchDir(final Configuration conf) { - return conf.getBoolean( - HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, - DISABLE_BLOBSTORAGE_AS_SCRATCHDIR - ); - } -} diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2f3fba7..bdbc064 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3161,7 +3161,14 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Comma-separated list of supported blobstore schemes."), HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR("hive.blobstore.use.blobstore.as.scratchdir", false, - "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."); + "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."), + + HIVE_BLOBSTORE_PARALLEL_DIRECTORY_RENAME("hive.blobstore.parallel.directory.rename", true, + "When renaming directories within a blobstore, rename files one at a time rather than at at directory level. " + + "Since renames may require copying the entire file, each rename can take a long amount of time. Renaming at " + + "a directory level may not be ideal if the blobstore connector cannot efficiently rename a directory " + + "(e.g. HADOOP-13600). By default, renames are done using a thread pool which allows each individual file " + + "to be renamed in parallel."); public final String varname; diff --git a/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java deleted file mode 100644 index 84a0d86..0000000 --- a/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.net.URI; - -import static org.apache.hadoop.hive.common.BlobStorageUtils.*; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; - -public class TestBlobStorageUtils { - private static final Configuration conf = new Configuration(); - - @Before - public void setUp() { - conf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname, "s3a,swift"); - conf.setBoolean(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, false); - } - - @Test - public void testValidAndInvalidPaths() throws IOException { - // Valid paths - assertTrue(isBlobStoragePath(conf, new Path("s3a://bucket/path"))); - assertTrue(isBlobStoragePath(conf, new Path("swift://bucket/path"))); - - // Invalid paths - assertFalse(isBlobStoragePath(conf, new Path("/tmp/a-path"))); - assertFalse(isBlobStoragePath(conf, new Path("s3fs://tmp/file"))); - assertFalse(isBlobStoragePath(conf, null)); - assertFalse(isBlobStorageFileSystem(conf, null)); - assertFalse(isBlobStoragePath(conf, new Path(URI.create("")))); - } - - @Test - public void testValidAndInvalidFileSystems() { - FileSystem fs = mock(FileSystem.class); - - /* Valid FileSystem schemes */ - - doReturn("s3a").when(fs).getScheme(); - assertTrue(isBlobStorageFileSystem(conf, fs)); - - doReturn("swift").when(fs).getScheme(); - assertTrue(isBlobStorageFileSystem(conf, fs)); - - /* Invalid FileSystem schemes */ - - doReturn("hdfs").when(fs).getScheme(); - assertFalse(isBlobStorageFileSystem(conf, fs)); - - doReturn("").when(fs).getScheme(); - assertFalse(isBlobStorageFileSystem(conf, fs)); - - assertFalse(isBlobStorageFileSystem(conf, null)); - } - - @Test - public void testValidAndInvalidSchemes() { - // Valid schemes - assertTrue(isBlobStorageScheme(conf, "s3a")); - assertTrue(isBlobStorageScheme(conf, "swift")); - - // Invalid schemes - assertFalse(isBlobStorageScheme(conf, "hdfs")); - assertFalse(isBlobStorageScheme(conf, "")); - assertFalse(isBlobStorageScheme(conf, null)); - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 838d73e..70a9e6c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -55,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.util.BlobStorageUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index dab4c6a..ea287d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -136,6 +136,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.util.BlobStorageUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; @@ -3001,6 +3002,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, //needed for perm inheritance. final boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); + final boolean shouldRenameDirectoryInParallel = BlobStorageUtils.shouldRenameDirectoryInParallel(conf, destFs); HdfsUtils.HadoopFileStatus destStatus = null; // If source path is a subdirectory of the destination path: @@ -3053,6 +3055,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, conf); } else { if (destIsSubDir) { + FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); List> futures = new LinkedList<>(); @@ -3105,13 +3108,21 @@ public Void call() throws Exception { } return true; } else { - if (destFs.rename(srcf, destf)) { - if (inheritPerms) { - HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true); - } + if (shouldRenameDirectoryInParallel && conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0) { + final ExecutorService pool = Executors.newFixedThreadPool( + conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()); + BlobStorageUtils.renameDirectoryInParallel(conf, srcFs, destFs, srcf, destf, inheritPerms, pool); return true; + } else { + if (destFs.rename(srcf, destf)) { + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true); + } + return true; + } + return false; } - return false; } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index cea99e1..cd6d7a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -110,6 +109,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.util.BlobStorageUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/BlobStorageUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/util/BlobStorageUtils.java new file mode 100644 index 0000000..e41f8d5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/BlobStorageUtils.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.util; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.io.HdfsUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * Utilities for different blob (object) storage systems + */ +public class BlobStorageUtils { + + private static final boolean DISABLE_BLOBSTORAGE_AS_SCRATCHDIR = false; + private static final Logger LOG = LoggerFactory.getLogger(BlobStorageUtils.class); + + public static boolean isBlobStoragePath(final Configuration conf, final Path path) { + return (path == null) ? false : isBlobStorageScheme(conf, path.toUri().getScheme()); + } + + public static boolean isBlobStorageFileSystem(final Configuration conf, final FileSystem fs) { + return (fs == null) ? false : isBlobStorageScheme(conf, fs.getScheme()); + } + + public static boolean isBlobStorageScheme(final Configuration conf, final String scheme) { + Collection supportedBlobStoreSchemes = + conf.getStringCollection(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname); + + return supportedBlobStoreSchemes.contains(scheme); + } + + public static boolean isBlobStorageAsScratchDir(final Configuration conf) { + return conf.getBoolean( + HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, + DISABLE_BLOBSTORAGE_AS_SCRATCHDIR + ); + } + + /** + * Returns true if a directory should be renamed in parallel, false otherwise. + */ + public static boolean shouldRenameDirectoryInParallel(final Configuration conf, final FileSystem fs) { + return HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_BLOBSTORE_PARALLEL_DIRECTORY_RENAME) && BlobStorageUtils.isBlobStorageFileSystem( + fs.getConf(), fs); + } + + /** + * Given a source directory and a destination directory, moves all the files under the source to the destination + * folder. Rename operations are done using the specified {@link ExecutorService}. + * + *

+ * This method is useful when running on blob stores where rename operations require copying data from one location + * to another. Specifically, this method should be used if the blobstore connector renames files under a directory + * sequentially. This method will issue the renames in parallel, which can offer significant performance + * improvements. + *

+ * + *

+ * The source and destination {@link Path}s must be directories. + *

+ * + * @param hiveConf the {@link HiveConf} to use when setting permissions + * @param srcFs the source {@link FileSystem} + * @param destFs the destination {@link FileSystem} + * @param srcPath the source {@link Path} + * @param destPath the destination {@link Path} + * @param inheritPerms if true, renamed files with inherit their parent permissions, if false they will preserve + * their original permissions + * @param pool the {@link ExecutorService} to use to issue all the {@link FileSystem#rename(Path, Path)} + * requests + * + * @throws IOException if their is an issuing renaming the files + * @throws HiveException if any other exception occurs while renaming the files + */ + public static void renameDirectoryInParallel(final HiveConf hiveConf, final FileSystem srcFs, + final FileSystem destFs, final Path srcPath, + final Path destPath, final boolean inheritPerms, + ExecutorService pool) throws IOException, HiveException { + + Preconditions.checkArgument(srcFs.isDirectory(srcPath)); + Preconditions.checkArgument(destFs.isDirectory(destPath)); + + final SessionState parentSession = SessionState.get(); + final HdfsUtils.HadoopFileStatus desiredStatus = new HdfsUtils.HadoopFileStatus(destFs.getConf(), destFs, + destPath); + + List> futures = new ArrayList<>(); + + for (final FileStatus srcStatus : srcFs.listStatus(srcPath)) { + final Path destFile = new Path(destPath, srcStatus.getPath().getName()); + + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + final String group = srcStatus.getGroup(); + if (destFs.rename(srcStatus.getPath(), destFile)) { + if (inheritPerms) { + HdfsUtils.setFullFileStatus(hiveConf, desiredStatus, group, destFs, destFile, false); + } + } else { + throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest path:" + + destFile + " returned false"); + } + return null; + } + })); + } + + pool.shutdown(); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e); + } + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/TestBlobStorageUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/util/TestBlobStorageUtils.java new file mode 100644 index 0000000..c818c9d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/util/TestBlobStorageUtils.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hive.ql.util.BlobStorageUtils.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestBlobStorageUtils { + private static final Configuration conf = new Configuration(); + + @Before + public void setUp() { + conf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname, "s3a,swift"); + conf.setBoolean(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, false); + } + + @Test + public void testValidAndInvalidPaths() throws IOException { + // Valid paths + assertTrue(isBlobStoragePath(conf, new Path("s3a://bucket/path"))); + assertTrue(isBlobStoragePath(conf, new Path("swift://bucket/path"))); + + // Invalid paths + assertFalse(isBlobStoragePath(conf, new Path("/tmp/a-path"))); + assertFalse(isBlobStoragePath(conf, new Path("s3fs://tmp/file"))); + assertFalse(isBlobStoragePath(conf, null)); + assertFalse(isBlobStorageFileSystem(conf, null)); + assertFalse(isBlobStoragePath(conf, new Path(URI.create("")))); + } + + @Test + public void testValidAndInvalidFileSystems() { + FileSystem fs = mock(FileSystem.class); + + /* Valid FileSystem schemes */ + + doReturn("s3a").when(fs).getScheme(); + assertTrue(isBlobStorageFileSystem(conf, fs)); + + doReturn("swift").when(fs).getScheme(); + assertTrue(isBlobStorageFileSystem(conf, fs)); + + /* Invalid FileSystem schemes */ + + doReturn("hdfs").when(fs).getScheme(); + assertFalse(isBlobStorageFileSystem(conf, fs)); + + doReturn("").when(fs).getScheme(); + assertFalse(isBlobStorageFileSystem(conf, fs)); + + assertFalse(isBlobStorageFileSystem(conf, null)); + } + + @Test + public void testValidAndInvalidSchemes() { + // Valid schemes + assertTrue(isBlobStorageScheme(conf, "s3a")); + assertTrue(isBlobStorageScheme(conf, "swift")); + + // Invalid schemes + assertFalse(isBlobStorageScheme(conf, "hdfs")); + assertFalse(isBlobStorageScheme(conf, "")); + assertFalse(isBlobStorageScheme(conf, null)); + } + + /** + * Test if {@link BlobStorageUtils#renameDirectoryInParallel(HiveConf, FileSystem, FileSystem, Path, Path, boolean, ExecutorService)} + * works as specified. The test checks that the directory is successfully renamed. + */ + @Test + public void testRenameDirectoryInParallel() throws IOException, HiveException { + FileSystem localFs = FileSystem.getLocal(new Configuration()); + Path srcPath = new Path("testRenameDirectoryInParallel-input"); + Path destPath = new Path("testRenameDirectoryInParallel-output"); + + String fileName1 = "test-1.txt"; + String fileName2 = "test-2.txt"; + String fileName3 = "test-3.txt"; + + HiveConf hiveConf = new HiveConf(); + SessionState.start(hiveConf); + + try { + localFs.mkdirs(srcPath); + localFs.mkdirs(destPath); + + localFs.create(new Path(srcPath, fileName1)).close(); + localFs.create(new Path(srcPath, fileName2)).close(); + localFs.create(new Path(srcPath, fileName3)).close(); + + BlobStorageUtils.renameDirectoryInParallel(hiveConf, localFs, localFs, srcPath, destPath, true, + Executors.newFixedThreadPool(1)); + + assertTrue(localFs.exists(new Path(destPath, fileName1))); + assertTrue(localFs.exists(new Path(destPath, fileName2))); + assertTrue(localFs.exists(new Path(destPath, fileName3))); + } finally { + try { + localFs.delete(srcPath, true); + } finally { + localFs.delete(destPath, true); + } + } + + } + + /** + * Test if {@link BlobStorageUtils#renameDirectoryInParallel(HiveConf, FileSystem, FileSystem, Path, Path, boolean, ExecutorService)} + * works as specified. The test doesn't check the functionality of the method, it only verifies that the method + * executes the rename requests in parallel. + */ + @Test + public void testRenameDirectoryInParallelMockThreadPool() throws IOException, HiveException { + FileSystem localFs = FileSystem.getLocal(new Configuration()); + Path srcPath = new Path("testRenameDirectoryInParallel-input"); + Path destPath = new Path("testRenameDirectoryInParallel-output"); + + String fileName1 = "test-1.txt"; + String fileName2 = "test-2.txt"; + String fileName3 = "test-3.txt"; + + HiveConf hiveConf = new HiveConf(); + SessionState.start(hiveConf); + + try { + localFs.mkdirs(srcPath); + localFs.mkdirs(destPath); + + localFs.create(new Path(srcPath, fileName1)).close(); + localFs.create(new Path(srcPath, fileName2)).close(); + localFs.create(new Path(srcPath, fileName3)).close(); + + ExecutorService mockExecutorService = mock(ExecutorService.class); + when(mockExecutorService.submit(any(Callable.class))).thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Callable callable = (Callable) invocationOnMock.getArguments()[0]; + Future mockFuture = mock(Future.class); + Object callableResult = callable.call(); + when(mockFuture.get()).thenReturn(callableResult); + when(mockFuture.get(any(Long.class), any(TimeUnit.class))).thenReturn(callableResult); + return mockFuture; + } + }); + + BlobStorageUtils.renameDirectoryInParallel(hiveConf, localFs, localFs, srcPath, destPath, true, mockExecutorService); + + verify(mockExecutorService, times(3)).submit(any(Callable.class)); + } finally { + try { + localFs.delete(srcPath, true); + } finally { + localFs.delete(destPath, true); + } + } + } +}