diff --git a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java deleted file mode 100644 index 6ca35e2..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; - -import java.util.Collection; - -/** - * Utilities for different blob (object) storage systems - */ -public class BlobStorageUtils { - private static final boolean DISABLE_BLOBSTORAGE_AS_SCRATCHDIR = false; - - public static boolean isBlobStoragePath(final Configuration conf, final Path path) { - return (path == null) ? false : isBlobStorageScheme(conf, path.toUri().getScheme()); - } - - public static boolean isBlobStorageFileSystem(final Configuration conf, final FileSystem fs) { - return (fs == null) ? false : isBlobStorageScheme(conf, fs.getScheme()); - } - - public static boolean isBlobStorageScheme(final Configuration conf, final String scheme) { - Collection supportedBlobStoreSchemes = - conf.getStringCollection(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname); - - return supportedBlobStoreSchemes.contains(scheme); - } - - public static boolean isBlobStorageAsScratchDir(final Configuration conf) { - return conf.getBoolean( - HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, - DISABLE_BLOBSTORAGE_AS_SCRATCHDIR - ); - } -} diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 15de10b..60f8ce4 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3161,7 +3161,14 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Comma-separated list of supported blobstore schemes."), HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR("hive.blobstore.use.blobstore.as.scratchdir", false, - "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."); + "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."), + + HIVE_BLOBSTORE_MOVE_INDIVIDUAL_FILES("hive.blobstore.move.individual.files", true, + "When moving files within a blobstore, move files one at a time rather than at at directory level. Since " + + "moves may require copying the entire file, each move can take a long amount of time. Moving at a " + + "directory level may not be ideal if the blobstore connector cannot efficiently rename a directory " + + "(e.g. HADOOP-13600). By default, moves are done using a thread pool which allows each individual file " + + "to be moved in parallel."); public final String varname; diff --git a/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java deleted file mode 100644 index 84a0d86..0000000 --- a/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.net.URI; - -import static org.apache.hadoop.hive.common.BlobStorageUtils.*; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; - -public class TestBlobStorageUtils { - private static final Configuration conf = new Configuration(); - - @Before - public void setUp() { - conf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname, "s3a,swift"); - conf.setBoolean(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, false); - } - - @Test - public void testValidAndInvalidPaths() throws IOException { - // Valid paths - assertTrue(isBlobStoragePath(conf, new Path("s3a://bucket/path"))); - assertTrue(isBlobStoragePath(conf, new Path("swift://bucket/path"))); - - // Invalid paths - assertFalse(isBlobStoragePath(conf, new Path("/tmp/a-path"))); - assertFalse(isBlobStoragePath(conf, new Path("s3fs://tmp/file"))); - assertFalse(isBlobStoragePath(conf, null)); - assertFalse(isBlobStorageFileSystem(conf, null)); - assertFalse(isBlobStoragePath(conf, new Path(URI.create("")))); - } - - @Test - public void testValidAndInvalidFileSystems() { - FileSystem fs = mock(FileSystem.class); - - /* Valid FileSystem schemes */ - - doReturn("s3a").when(fs).getScheme(); - assertTrue(isBlobStorageFileSystem(conf, fs)); - - doReturn("swift").when(fs).getScheme(); - assertTrue(isBlobStorageFileSystem(conf, fs)); - - /* Invalid FileSystem schemes */ - - doReturn("hdfs").when(fs).getScheme(); - assertFalse(isBlobStorageFileSystem(conf, fs)); - - doReturn("").when(fs).getScheme(); - assertFalse(isBlobStorageFileSystem(conf, fs)); - - assertFalse(isBlobStorageFileSystem(conf, null)); - } - - @Test - public void testValidAndInvalidSchemes() { - // Valid schemes - assertTrue(isBlobStorageScheme(conf, "s3a")); - assertTrue(isBlobStorageScheme(conf, "swift")); - - // Invalid schemes - assertFalse(isBlobStorageScheme(conf, "hdfs")); - assertFalse(isBlobStorageScheme(conf, "")); - assertFalse(isBlobStorageScheme(conf, null)); - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 838d73e..70a9e6c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -55,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.util.BlobStorageUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index dab4c6a..49796f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -136,6 +136,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.util.BlobStorageUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; @@ -3001,6 +3002,9 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, //needed for perm inheritance. final boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); + final boolean moveIndividualFiles = HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_BLOBSTORE_MOVE_INDIVIDUAL_FILES) && BlobStorageUtils.isBlobStorageFileSystem( + destFs.getConf(), destFs); HdfsUtils.HadoopFileStatus destStatus = null; // If source path is a subdirectory of the destination path: @@ -3053,6 +3057,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, conf); } else { if (destIsSubDir) { + FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); List> futures = new LinkedList<>(); @@ -3105,13 +3110,21 @@ public Void call() throws Exception { } return true; } else { - if (destFs.rename(srcf, destf)) { - if (inheritPerms) { - HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true); - } + if (moveIndividualFiles && conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0) { + final ExecutorService pool = Executors.newFixedThreadPool( + conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()); + BlobStorageUtils.renameDirectory(conf, srcFs, destFs, srcf, destf, inheritPerms, pool); return true; + } else { + if (destFs.rename(srcf, destf)) { + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true); + } + return true; + } + return false; } - return false; } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index cea99e1..cd6d7a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -110,6 +109,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.util.BlobStorageUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/BlobStorageUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/util/BlobStorageUtils.java new file mode 100644 index 0000000..bafa98b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/BlobStorageUtils.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.util; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.io.HdfsUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * Utilities for different blob (object) storage systems + */ +public class BlobStorageUtils { + + private static final boolean DISABLE_BLOBSTORAGE_AS_SCRATCHDIR = false; + private static final Logger LOG = LoggerFactory.getLogger(BlobStorageUtils.class); + + public static boolean isBlobStoragePath(final Configuration conf, final Path path) { + return (path == null) ? false : isBlobStorageScheme(conf, path.toUri().getScheme()); + } + + public static boolean isBlobStorageFileSystem(final Configuration conf, final FileSystem fs) { + return (fs == null) ? false : isBlobStorageScheme(conf, fs.getScheme()); + } + + public static boolean isBlobStorageScheme(final Configuration conf, final String scheme) { + Collection supportedBlobStoreSchemes = + conf.getStringCollection(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname); + + return supportedBlobStoreSchemes.contains(scheme); + } + + public static boolean isBlobStorageAsScratchDir(final Configuration conf) { + return conf.getBoolean( + HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, + DISABLE_BLOBSTORAGE_AS_SCRATCHDIR + ); + } + + public static void renameDirectory(final HiveConf hiveConf, final FileSystem srcFs, + final FileSystem destFs, final Path srcPath, + final Path destPath, final boolean inheritPerms, + ExecutorService pool) throws IOException, HiveException { + + Preconditions.checkArgument(srcFs.isDirectory(srcPath)); + Preconditions.checkArgument(destFs.isDirectory(destPath)); + + final SessionState parentSession = SessionState.get(); + final HdfsUtils.HadoopFileStatus desiredStatus = new HdfsUtils.HadoopFileStatus(destFs.getConf(), destFs, + destPath); + + List> futures = new ArrayList<>(); + + for (final FileStatus srcStatus : srcFs.listStatus(srcPath)) { + final Path destFile = new Path(destPath, srcStatus.getPath().getName()); + + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + final String group = srcStatus.getGroup(); + if (destFs.rename(srcStatus.getPath(), destFile)) { + if (inheritPerms) { + HdfsUtils.setFullFileStatus(hiveConf, desiredStatus, group, destFs, destFile, false); + } + } else { + throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest path:" + + destFile + " returned false"); + } + return null; + } + })); + } + + pool.shutdown(); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/TestBlobStorageUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/util/TestBlobStorageUtils.java new file mode 100644 index 0000000..7c2e791 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/util/TestBlobStorageUtils.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; + +import static org.apache.hadoop.hive.ql.util.BlobStorageUtils.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +public class TestBlobStorageUtils { + private static final Configuration conf = new Configuration(); + + @Before + public void setUp() { + conf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname, "s3a,swift"); + conf.setBoolean(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, false); + } + + @Test + public void testValidAndInvalidPaths() throws IOException { + // Valid paths + assertTrue(isBlobStoragePath(conf, new Path("s3a://bucket/path"))); + assertTrue(isBlobStoragePath(conf, new Path("swift://bucket/path"))); + + // Invalid paths + assertFalse(isBlobStoragePath(conf, new Path("/tmp/a-path"))); + assertFalse(isBlobStoragePath(conf, new Path("s3fs://tmp/file"))); + assertFalse(isBlobStoragePath(conf, null)); + assertFalse(isBlobStorageFileSystem(conf, null)); + assertFalse(isBlobStoragePath(conf, new Path(URI.create("")))); + } + + @Test + public void testValidAndInvalidFileSystems() { + FileSystem fs = mock(FileSystem.class); + + /* Valid FileSystem schemes */ + + doReturn("s3a").when(fs).getScheme(); + assertTrue(isBlobStorageFileSystem(conf, fs)); + + doReturn("swift").when(fs).getScheme(); + assertTrue(isBlobStorageFileSystem(conf, fs)); + + /* Invalid FileSystem schemes */ + + doReturn("hdfs").when(fs).getScheme(); + assertFalse(isBlobStorageFileSystem(conf, fs)); + + doReturn("").when(fs).getScheme(); + assertFalse(isBlobStorageFileSystem(conf, fs)); + + assertFalse(isBlobStorageFileSystem(conf, null)); + } + + @Test + public void testValidAndInvalidSchemes() { + // Valid schemes + assertTrue(isBlobStorageScheme(conf, "s3a")); + assertTrue(isBlobStorageScheme(conf, "swift")); + + // Invalid schemes + assertFalse(isBlobStorageScheme(conf, "hdfs")); + assertFalse(isBlobStorageScheme(conf, "")); + assertFalse(isBlobStorageScheme(conf, null)); + } + +// @Test +// public void testRenameDirectoryInParallel() throws IOException, HiveException { +// FileSystem localFs = FileSystem.getLocal(new Configuration()); +// Path srcPath = new Path("testRenameDirectoryInParallel-input"); +// Path destPath = new Path("testRenameDirectoryInParallel-output"); +// +// String fileName1 = "test-1.txt"; +// String fileName2 = "test-2.txt"; +// String fileName3 = "test-3.txt"; +// +// try { +// localFs.mkdirs(srcPath); +// localFs.mkdirs(destPath); +// +// localFs.create(new Path(srcPath, fileName1)).close(); +// localFs.create(new Path(srcPath, fileName2)).close(); +// localFs.create(new Path(srcPath, fileName3)).close(); +// +// // HOW TO MOCK AN EXECUTORSERVICE +// +// BlobStorageUtils.renameDirectory(new HiveConf(), localFs, localFs, srcPath, destPath, true, null); +// +// assertTrue(localFs.exists(new Path(destPath, fileName1))); +// assertTrue(localFs.exists(new Path(destPath, fileName2))); +// assertTrue(localFs.exists(new Path(destPath, fileName3))); +// } finally { +// try { +// localFs.delete(srcPath, true); +// } finally { +// localFs.delete(destPath, true); +// } +// } +// } +}