diff --git a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java index 6ca35e2..268d6dd 100644 --- a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java @@ -24,10 +24,12 @@ import java.util.Collection; + /** * Utilities for different blob (object) storage systems */ public class BlobStorageUtils { + private static final boolean DISABLE_BLOBSTORAGE_AS_SCRATCHDIR = false; public static boolean isBlobStoragePath(final Configuration conf, final Path path) { @@ -35,7 +37,7 @@ public static boolean isBlobStoragePath(final Configuration conf, final Path pat } public static boolean isBlobStorageFileSystem(final Configuration conf, final FileSystem fs) { - return (fs == null) ? false : isBlobStorageScheme(conf, fs.getScheme()); + return (fs == null) ? false : isBlobStorageScheme(conf, fs.getUri().getScheme()); } public static boolean isBlobStorageScheme(final Configuration conf, final String scheme) { @@ -51,4 +53,13 @@ public static boolean isBlobStorageAsScratchDir(final Configuration conf) { DISABLE_BLOBSTORAGE_AS_SCRATCHDIR ); } + + /** + * Returns true if a directory should be renamed in parallel, false otherwise. + */ + public static boolean shouldRenameDirectoryInParallel(final Configuration conf, final FileSystem fs) { + return HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_BLOBSTORE_PARALLEL_DIRECTORY_RENAME) && BlobStorageUtils.isBlobStorageFileSystem( + fs.getConf(), fs); + } } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 80cd5ad..941791d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3166,7 +3166,14 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Comma-separated list of supported blobstore schemes."), HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR("hive.blobstore.use.blobstore.as.scratchdir", false, - "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."); + "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."), + + HIVE_BLOBSTORE_PARALLEL_DIRECTORY_RENAME("hive.blobstore.parallel.directory.rename", true, + "When renaming directories within a blobstore, rename files one at a time rather than at at directory level. " + + "Since renames may require copying the entire file, each rename can take a long amount of time. Renaming at " + + "a directory level may not be ideal if the blobstore connector cannot efficiently rename a directory " + + "(e.g. HADOOP-13600). By default, renames are done using a thread pool which allows each individual file " + + "to be renamed in parallel. The size of the threadpool is controlled by the hive.mv.files.thread parameter."); public final String varname; diff --git a/itests/hive-blobstore/src/test/queries/clientpositive/parallel_directory_rename.q b/itests/hive-blobstore/src/test/queries/clientpositive/parallel_directory_rename.q new file mode 100644 index 0000000..4f7ca19 --- /dev/null +++ b/itests/hive-blobstore/src/test/queries/clientpositive/parallel_directory_rename.q @@ -0,0 +1,7 @@ +SET hive.blobstore.parallel.directory.rename=true; +SET hive.blobstore.use.blobstore.as.scratchdir=true; + +DROP TABLE parallel_directory_rename; +CREATE TABLE parallel_directory_rename (value int) LOCATION '${hiveconf:test.blobstore.path.unique}/parallel_directory_rename/'; +INSERT INTO parallel_directory_rename VALUES (1), (10), (100), (1000); +SELECT * FROM parallel_directory_rename; diff --git a/itests/hive-blobstore/src/test/results/clientpositive/parallel_directory_rename.q.out b/itests/hive-blobstore/src/test/results/clientpositive/parallel_directory_rename.q.out new file mode 100644 index 0000000..a65b0f1 --- /dev/null +++ b/itests/hive-blobstore/src/test/results/clientpositive/parallel_directory_rename.q.out @@ -0,0 +1,35 @@ +PREHOOK: query: DROP TABLE parallel_directory_rename +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE parallel_directory_rename +POSTHOOK: type: DROPTABLE +#### A masked pattern was here #### +PREHOOK: type: CREATETABLE +PREHOOK: Input: #### A masked pattern was here #### +PREHOOK: Output: database:default +PREHOOK: Output: default@parallel_directory_rename +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +POSTHOOK: Input: #### A masked pattern was here #### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parallel_directory_rename +PREHOOK: query: INSERT INTO parallel_directory_rename VALUES (1), (10), (100), (1000) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@parallel_directory_rename +POSTHOOK: query: INSERT INTO parallel_directory_rename VALUES (1), (10), (100), (1000) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@parallel_directory_rename +POSTHOOK: Lineage: parallel_directory_rename.value EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: SELECT * FROM parallel_directory_rename +PREHOOK: type: QUERY +PREHOOK: Input: default@parallel_directory_rename +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM parallel_directory_rename +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parallel_directory_rename +#### A masked pattern was here #### +1 +10 +100 +1000 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index dab4c6a..a6161c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -62,6 +62,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -136,6 +137,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.util.ParallelDirectoryRenamer; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; @@ -3001,6 +3003,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, //needed for perm inheritance. final boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); + final boolean shouldRenameDirectoryInParallel = BlobStorageUtils.shouldRenameDirectoryInParallel(conf, destFs); HdfsUtils.HadoopFileStatus destStatus = null; // If source path is a subdirectory of the destination path: @@ -3053,6 +3056,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, conf); } else { if (destIsSubDir) { + FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); List> futures = new LinkedList<>(); @@ -3105,13 +3109,21 @@ public Void call() throws Exception { } return true; } else { - if (destFs.rename(srcf, destf)) { - if (inheritPerms) { - HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true); - } + if (shouldRenameDirectoryInParallel && conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0) { + final ExecutorService pool = Executors.newFixedThreadPool( + conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()); + ParallelDirectoryRenamer.renameDirectoryInParallel(conf, srcFs, destFs, srcf, destf, inheritPerms, pool); return true; + } else { + if (destFs.rename(srcf, destf)) { + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true); + } + return true; + } + return false; } - return false; } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ParallelDirectoryRenamer.java b/ql/src/java/org/apache/hadoop/hive/ql/util/ParallelDirectoryRenamer.java new file mode 100644 index 0000000..d000fbf --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ParallelDirectoryRenamer.java @@ -0,0 +1,120 @@ +package org.apache.hadoop.hive.ql.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.io.HdfsUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Given a source directory and a destination directory, moves all the files under the source to the destination + * folder. Rename operations are done using the specified {@link ExecutorService}. + * + *

+ * This class is useful when running on blob stores where rename operations require copying data from one location + * to another. Specifically, this method should be used if the blobstore connector renames files under a directory + * sequentially. This class will issue the renames in parallel, which can offer significant performance + * improvements. + *

+ */ +public class ParallelDirectoryRenamer { + + private static final Logger LOG = LoggerFactory.getLogger(ParallelDirectoryRenamer.class); + + /** + * Move all files under the srcPath to the destPath. The method preserves the behavior of a normal + * {@link FileSystem#rename(Path, Path)} operation, regardless of whether or not the src and dst paths exist, or if + * they are files or directories. + * + * @param hiveConf the {@link HiveConf} to use when setting permissions + * @param srcFs the source {@link FileSystem} + * @param destFs the destination {@link FileSystem} + * @param srcPath the source {@link Path} + * @param destPath the destination {@link Path} + * @param inheritPerms if true, renamed files with inherit their parent permissions, if false they will preserve + * their original permissions + * @param pool the {@link ExecutorService} to use to issue all the {@link FileSystem#rename(Path, Path)} + * requests + * + * @throws IOException if their is an issuing renaming the files + * @throws HiveException if any other exception occurs while renaming the files + */ + public static void renameDirectoryInParallel(final HiveConf hiveConf, final FileSystem srcFs, + final FileSystem destFs, final Path srcPath, + final Path destPath, final boolean inheritPerms, + ExecutorService pool) throws IOException, HiveException { + + Preconditions.checkArgument(srcFs.exists(srcPath), "Source Path " + srcPath + " does not exist"); + + if (srcFs.isDirectory(srcPath)) { + + // If the destination doesn't exist, create it and move all files under srcPath/ to destPath/ + // If the destination does exist, then move all files under destPath/srcPath.name/, this is inline with the + // normal behavior of the FileSystem.rename operation + Path basePath; + if (!destFs.exists(destPath)) { + destFs.mkdirs(destPath); + basePath = destPath; + } else { + basePath = new Path(destPath, srcPath.getName()); + Preconditions.checkArgument(!destFs.exists(basePath), "Path " + basePath + " already exists"); + } + + final SessionState parentSession = SessionState.get(); + final HdfsUtils.HadoopFileStatus desiredStatus = new HdfsUtils.HadoopFileStatus(destFs.getConf(), destFs, + destPath); + + List> futures = new ArrayList<>(); + + for (final FileStatus srcStatus : srcFs.listStatus(srcPath)) { + final Path destFile = new Path(basePath, srcStatus.getPath().getName()); + + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + final String group = srcStatus.getGroup(); + if (destFs.rename(srcStatus.getPath(), destFile)) { + if (inheritPerms) { + HdfsUtils.setFullFileStatus(hiveConf, desiredStatus, group, destFs, destFile, false); + } + } else { + throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest path: " + + destFile + " returned false"); + } + return null; + } + })); + } + + pool.shutdown(); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e); + } + } + } else { + if (!destFs.rename(srcPath, destPath)) { + throw new IOException("rename for src path: " + srcPath + " to dest path: " + destPath + " returned false"); + } + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/ParallelDirectoryRenamerTest.java b/ql/src/test/org/apache/hadoop/hive/ql/util/ParallelDirectoryRenamerTest.java new file mode 100644 index 0000000..4305117 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/util/ParallelDirectoryRenamerTest.java @@ -0,0 +1,163 @@ +package org.apache.hadoop.hive.ql.util; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Test; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class ParallelDirectoryRenamerTest { + + /** + * Test if {@link ParallelDirectoryRenamer#renameDirectoryInParallel(HiveConf, FileSystem, FileSystem, Path, Path, boolean, ExecutorService)} + * works as specified when the destination dir doesn't exist. The test checks that the directory is successfully renamed. + */ + @Test + public void testRenameDirectoryInParallelDestNotExists() throws IOException, HiveException { + FileSystem localFs = FileSystem.getLocal(new Configuration()); + Path srcPath = new Path("testRenameDirectoryInParallel-input"); + Path destPath = new Path("testRenameDirectoryInParallel-output"); + + String fileName1 = "test-1.txt"; + String fileName2 = "test-2.txt"; + String fileName3 = "test-3.txt"; + + HiveConf hiveConf = new HiveConf(); + SessionState.start(hiveConf); + + try { + localFs.mkdirs(srcPath); + + localFs.create(new Path(srcPath, fileName1)).close(); + localFs.create(new Path(srcPath, fileName2)).close(); + localFs.create(new Path(srcPath, fileName3)).close(); + + ParallelDirectoryRenamer.renameDirectoryInParallel(hiveConf, localFs, localFs, srcPath, destPath, true, + Executors.newFixedThreadPool(1)); + + assertTrue(localFs.exists(new Path(destPath, fileName1))); + assertTrue(localFs.exists(new Path(destPath, fileName2))); + assertTrue(localFs.exists(new Path(destPath, fileName3))); + } finally { + try { + localFs.delete(srcPath, true); + } finally { + localFs.delete(destPath, true); + } + } + } + + /** + * Test if {@link ParallelDirectoryRenamer#renameDirectoryInParallel(HiveConf, FileSystem, FileSystem, Path, Path, boolean, ExecutorService)} + * works as specified when the destination dir does exist. The test checks that the directory is successfully renamed. + */ + @Test + public void testRenameDirectoryInParallelDestExists() throws IOException, HiveException { + FileSystem localFs = FileSystem.getLocal(new Configuration()); + Path srcPath = new Path("testRenameDirectoryInParallel-input"); + Path destPath = new Path("testRenameDirectoryInParallel-output"); + + String fileName1 = "test-1.txt"; + String fileName2 = "test-2.txt"; + String fileName3 = "test-3.txt"; + + HiveConf hiveConf = new HiveConf(); + SessionState.start(hiveConf); + + try { + localFs.mkdirs(srcPath); + localFs.mkdirs(destPath); + + localFs.create(new Path(srcPath, fileName1)).close(); + localFs.create(new Path(srcPath, fileName2)).close(); + localFs.create(new Path(srcPath, fileName3)).close(); + + ParallelDirectoryRenamer.renameDirectoryInParallel(hiveConf, localFs, localFs, srcPath, destPath, true, + Executors.newFixedThreadPool(1)); + + Path basePath = new Path(destPath, srcPath.getName()); + assertTrue(localFs.exists(new Path(basePath, fileName1))); + assertTrue(localFs.exists(new Path(basePath, fileName2))); + assertTrue(localFs.exists(new Path(basePath, fileName3))); + } finally { + try { + localFs.delete(srcPath, true); + } finally { + localFs.delete(destPath, true); + } + } + } + + /** + * Test if {@link ParallelDirectoryRenamer#renameDirectoryInParallel(HiveConf, FileSystem, FileSystem, Path, Path, boolean, ExecutorService)} + * works as specified. The test doesn't check the functionality of the method, it only verifies that the method + * executes the rename requests in parallel. + */ + @Test + public void testRenameDirectoryInParallelMockThreadPool() throws IOException, HiveException { + FileSystem localFs = FileSystem.getLocal(new Configuration()); + Path srcPath = new Path("testRenameDirectoryInParallel-input"); + Path destPath = new Path("testRenameDirectoryInParallel-output"); + + String fileName1 = "test-1.txt"; + String fileName2 = "test-2.txt"; + String fileName3 = "test-3.txt"; + + HiveConf hiveConf = new HiveConf(); + SessionState.start(hiveConf); + + try { + localFs.mkdirs(srcPath); + localFs.mkdirs(destPath); + + localFs.create(new Path(srcPath, fileName1)).close(); + localFs.create(new Path(srcPath, fileName2)).close(); + localFs.create(new Path(srcPath, fileName3)).close(); + + ExecutorService mockExecutorService = mock(ExecutorService.class); + when(mockExecutorService.submit(any(Callable.class))).thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Callable callable = (Callable) invocationOnMock.getArguments()[0]; + Future mockFuture = mock(Future.class); + Object callableResult = callable.call(); + when(mockFuture.get()).thenReturn(callableResult); + when(mockFuture.get(any(Long.class), any(TimeUnit.class))).thenReturn(callableResult); + return mockFuture; + } + }); + + ParallelDirectoryRenamer.renameDirectoryInParallel(hiveConf, localFs, localFs, srcPath, destPath, true, + mockExecutorService); + + verify(mockExecutorService, times(3)).submit(any(Callable.class)); + } finally { + try { + localFs.delete(srcPath, true); + } finally { + localFs.delete(destPath, true); + } + } + } +}