diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSystemUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSystemUtils.java new file mode 100644 index 0000000..a7ea10f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSystemUtils.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.InputEstimator; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A utility class for interacting asynchronously with file systems. + */ +public final class FileSystemUtils { + + private static ExecutorService executorService; + + private static final Logger LOG = + LoggerFactory.getLogger(FileSystemUtils.class); + + private static final int DEFAULT_THREAD_COUNT = 100; + + private FileSystemUtils() { + } + + /** + * Creates a thread pool that creates new threads as needed, but will reuse + * previously constructed threads when they are available. If no existing + * thread is available, a new thread will be created and added to the pool. + * Threads that have not been used for sixty seconds are terminated and + * removed from the cache. Thus, a pool that remains idle for long enough will + * not consume many resources. This pool uses daemon threads and has a + * shutdown hook to wait for their completion. This pool waits 120 seconds + * before continuing with JVM termination, even if the executor has not + * finished its work. + * + * @return a singleton ExecutorService + */ + private static synchronized ExecutorService getExecutorService() { + if (executorService == null) { + LOG.info("Creating FileSystem thread pool with maximum of {} threads", + DEFAULT_THREAD_COUNT); + ThreadPoolExecutor exec = new ThreadPoolExecutor(1, DEFAULT_THREAD_COUNT, + 60L, TimeUnit.SECONDS, new SynchronousQueue()); + exec.setThreadFactory( + new ThreadFactoryBuilder().setNameFormat("fs-pool-%d").build()); + executorService = MoreExecutors.getExitingExecutorService(exec); + } + return executorService; + } + + public static Future moveToTrash(final SessionState parentSession, + final FileSystem fs, final FileStatus status, final Configuration conf, + final boolean purge) { + return getExecutorService().submit(new Callable() { + @Override + public Boolean call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + try { + return FileUtils.moveToTrash(fs, status.getPath(), conf, purge); + } finally { + SessionState.detachSession(); + } + } + }); + } + + public static Future> moveFile( + final SessionState parentSession, HiveConf conf, FileSystem sourceFs, + Path sourcePath, FileSystem destFs, Path destDirPath, boolean isSrcLocal, + boolean isOverwrite, boolean isRenameAllowed, int taskId) { + return getExecutorService().submit(new Callable>() { + @Override + public ObjectPair call() throws HiveException { + SessionState.setCurrentSessionState(parentSession); + try { + Path destPath = Hive.mvFile(conf, sourceFs, sourcePath, destFs, + destDirPath, isSrcLocal, isOverwrite, isRenameAllowed, taskId); + return ObjectPair.create(sourcePath, destPath); + } catch (Exception e) { + throw new HiveException("Unable to move source " + sourcePath + + " to destination " + destDirPath, e); + } finally { + SessionState.detachSession(); + } + } + }); + } + + public static Future renameFile(final SessionState parentSession, + Path sourcePath, FileSystem destFs, Path destPath, + Options.Rename option) { + return getExecutorService().submit(new Callable() { + @Override + public Void call() throws HiveException { + boolean success = false; + SessionState.setCurrentSessionState(parentSession); + try { + if (destFs instanceof DistributedFileSystem) { + ((DistributedFileSystem) destFs).rename(sourcePath, destPath, + Options.Rename.OVERWRITE); + success = true; + } else { + destFs.delete(destPath, false); + success = destFs.rename(sourcePath, destPath); + } + } catch (Exception e) { + throw new HiveException("Unable to move source " + sourcePath + + " to destination " + destPath, e); + } finally { + SessionState.detachSession(); + } + if (!success) { + throw new HiveException("rename for src path: " + sourcePath + + " to dest path:" + destPath + " returned false"); + } + return null; + } + }); + } + + public static Future> getInputSummary( + final PartitionDesc partDesc, final JobConf jobConf, + final Map> aliasToWork, + final Map> pathToAlias, final Path path) { + return getExecutorService() + .submit(new Callable>() { + @Override + @SuppressWarnings("rawtypes") + public ObjectPair call() throws Exception { + try { + Class inputFormatCls = + partDesc.getInputFileFormatClass(); + InputFormat inputFormatObj = HiveInputFormat + .getInputFormatFromCache(inputFormatCls, jobConf); + if (inputFormatObj instanceof ContentSummaryInputFormat) { + ContentSummaryInputFormat csif = + (ContentSummaryInputFormat) inputFormatObj; + ContentSummary cs = csif.getContentSummary(path, jobConf); + return new ObjectPair<>(path, cs); + } + + String metaTableStorage = null; + if (partDesc.getTableDesc() != null + && partDesc.getTableDesc().getProperties() != null) { + metaTableStorage = + partDesc.getTableDesc().getProperties().getProperty( + hive_metastoreConstants.META_TABLE_STORAGE, null); + } + if (partDesc.getProperties() != null) { + metaTableStorage = partDesc.getProperties().getProperty( + hive_metastoreConstants.META_TABLE_STORAGE, + metaTableStorage); + } + + HiveStorageHandler handler = + HiveUtils.getStorageHandler(jobConf, metaTableStorage); + if (handler instanceof InputEstimator) { + long total = 0; + TableDesc tableDesc = partDesc.getTableDesc(); + InputEstimator estimator = (InputEstimator) handler; + for (String alias : HiveFileFormatUtils + .doGetAliasesFromPath(pathToAlias, path)) { + JobConf cpyJobConf = new JobConf(jobConf); + TableScanOperator scanOp = + (TableScanOperator) aliasToWork.get(alias); + Utilities.setColumnNameList(cpyJobConf, scanOp, true); + Utilities.setColumnTypeList(cpyJobConf, scanOp, true); + PlanUtils + .configureInputJobPropertiesForStorageHandler(tableDesc); + Utilities.copyTableJobPropertiesToConf(tableDesc, cpyJobConf); + total += estimator.estimate(cpyJobConf, scanOp, -1) + .getTotalLength(); + } + ContentSummary cs = new ContentSummary.Builder().length(total) + .fileCount(-1L).directoryCount(-1L).build(); + return new ObjectPair<>(path, cs); + } else { + // todo: should nullify summary for non-native tables, + // not to be selected as a mapjoin target + FileSystem fs = path.getFileSystem(jobConf); + ContentSummary cs = fs.getContentSummary(path); + return new ObjectPair<>(path, cs); + } + } catch (Exception e) { + // We safely ignore this exception for summary data. + // We don't update the cache to protect it from polluting other + // usages. The worst case is that IOException will always be + // retried for another getInputSummary(), which is fine as + // IOException is not considered as a common case. + // LOG.info("Cannot get size of {}. Safely ignored.", pathStr); + } + return null; + } + }); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 74fb1ba..00f5613 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -65,7 +65,6 @@ import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.regex.Matcher; @@ -96,10 +95,9 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.HiveInterruptCallback; -import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -108,7 +106,6 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; @@ -126,10 +123,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; -import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -138,16 +133,12 @@ import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; -import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; -import org.apache.hadoop.hive.ql.metadata.HiveUtils; -import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -160,7 +151,6 @@ import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.Adjacency; @@ -2383,192 +2373,84 @@ public static ContentSummary getInputSummary(final Context ctx, MapWork work, Pa final Set pathNeedProcess = new HashSet<>(); - // Since multiple threads could call this method concurrently, locking - // this method will avoid number of threads out of control. - synchronized (INPUT_SUMMARY_LOCK) { - // For each input path, calculate the total size. - for (Path path : work.getPathToAliases().keySet()) { - Path p = path; - - if (filter != null && !filter.accept(p)) { - continue; - } - - ContentSummary cs = ctx.getCS(path); - if (cs == null) { - if (path == null) { - continue; - } - pathNeedProcess.add(path); - } else { - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); - } + // For each input path, calculate the total size. + for (Path path : work.getPathToAliases().keySet()) { + if (path == null) { + continue; } - - // Process the case when name node call is needed - final Map resultMap = new ConcurrentHashMap(); - final ExecutorService executor; - - int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size()); - if (numExecutors > 1) { - LOG.info("Using {} threads for getContentSummary", numExecutors); - executor = Executors.newFixedThreadPool(numExecutors, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Get-Input-Summary-%d").build()); + if (filter != null && !filter.accept(path)) { + continue; + } + ContentSummary cs = ctx.getCS(path); + if (cs == null) { + pathNeedProcess.add(path); } else { - executor = null; + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); } - ContentSummary cs = getInputSummaryWithPool(ctx, pathNeedProcess, work, summary, executor); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); - return cs; } + + // Process the case when name node call is needed + getInputSummaryWithPool(ctx, pathNeedProcess, work, summary); + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); + + return new ContentSummary.Builder().length(summary[0]).fileCount(summary[1]) + .directoryCount(summary[2]).build(); } - @VisibleForTesting - static ContentSummary getInputSummaryWithPool(final Context ctx, Set pathNeedProcess, MapWork work, - long[] summary, ExecutorService executor) throws IOException { - List> results = new ArrayList>(); - final Map resultMap = new ConcurrentHashMap(); - - HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() { - @Override - public void interrupt() { - for (Path path : pathNeedProcess) { - try { - path.getFileSystem(ctx.getConf()).close(); - } catch (IOException ignore) { - LOG.debug("Failed to close filesystem", ignore); - } - } - if (executor != null) { - executor.shutdownNow(); - } - } - }); - try { - Configuration conf = ctx.getConf(); - JobConf jobConf = new JobConf(conf); - for (Path path : pathNeedProcess) { - final Path p = path; - final String pathStr = path.toString(); - // All threads share the same Configuration and JobConf based on the - // assumption that they are thread safe if only read operations are - // executed. It is not stated in Hadoop's javadoc, the sourcce codes - // clearly showed that they made efforts for it and we believe it is - // thread safe. Will revisit this piece of codes if we find the assumption - // is not correct. - final Configuration myConf = conf; - final JobConf myJobConf = jobConf; - final Map> aliasToWork = work.getAliasToWork(); - final Map> pathToAlias = work.getPathToAliases(); - final PartitionDesc partDesc = work.getPathToPartitionInfo().get(p); - Runnable r = new Runnable() { - @Override - public void run() { - try { - Class inputFormatCls = partDesc - .getInputFileFormatClass(); - InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( - inputFormatCls, myJobConf); - if (inputFormatObj instanceof ContentSummaryInputFormat) { - ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj; - resultMap.put(pathStr, cs.getContentSummary(p, myJobConf)); - return; - } - - String metaTableStorage = null; - if (partDesc.getTableDesc() != null && - partDesc.getTableDesc().getProperties() != null) { - metaTableStorage = partDesc.getTableDesc().getProperties() - .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, null); - } - if (partDesc.getProperties() != null) { - metaTableStorage = partDesc.getProperties() - .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, metaTableStorage); - } - - HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf, metaTableStorage); - if (handler instanceof InputEstimator) { - long total = 0; - TableDesc tableDesc = partDesc.getTableDesc(); - InputEstimator estimator = (InputEstimator) handler; - for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) { - JobConf jobConf = new JobConf(myJobConf); - TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias); - Utilities.setColumnNameList(jobConf, scanOp, true); - Utilities.setColumnTypeList(jobConf, scanOp, true); - PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); - Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); - total += estimator.estimate(jobConf, scanOp, -1).getTotalLength(); - } - resultMap.put(pathStr, new ContentSummary(total, -1, -1)); - } else { - // todo: should nullify summary for non-native tables, - // not to be selected as a mapjoin target - FileSystem fs = p.getFileSystem(myConf); - resultMap.put(pathStr, fs.getContentSummary(p)); - } - } catch (Exception e) { - // We safely ignore this exception for summary data. - // We don't update the cache to protect it from polluting other - // usages. The worst case is that IOException will always be - // retried for another getInputSummary(), which is fine as - // IOException is not considered as a common case. - LOG.info("Cannot get size of {}. Safely ignored.", pathStr); - } - } - }; + /** + * Has the side-effect of updating the summary array. + */ + private static void getInputSummaryWithPool(final Context ctx, + Set pathNeedProcess, MapWork work, long[] summary) + throws IOException { - if (executor == null) { - r.run(); - } else { - Future result = executor.submit(r); - results.add(result); - } - } + final List>> futures = + new ArrayList<>(pathNeedProcess.size()); - if (executor != null) { - for (Future result : results) { - boolean executorDone = false; - do { - try { - result.get(); - executorDone = true; - } catch (InterruptedException e) { - LOG.info("Interrupted when waiting threads: ", e); - Thread.currentThread().interrupt(); - break; - } catch (ExecutionException e) { - throw new IOException(e); - } - } while (!executorDone); - } - executor.shutdown(); - } - HiveInterruptUtils.checkInterrupted(); - for (Map.Entry entry : resultMap.entrySet()) { - ContentSummary cs = entry.getValue(); + Configuration conf = ctx.getConf(); + JobConf jobConf = new JobConf(conf); - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); + for (Path path : pathNeedProcess) { + // All threads share the same Configuration and JobConf based on the + // assumption that they are thread safe if only read operations are + // executed. It is not stated in Hadoop's javadoc, the sourcce codes + // clearly showed that they made efforts for it and we believe it is + // thread safe. Will revisit this piece of codes if we find the assumption + // is not correct. + final Map> aliasToWork = work.getAliasToWork(); + final Map> pathToAlias = work.getPathToAliases(); + final PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); - ctx.addCS(entry.getKey(), cs); - if (LOG.isInfoEnabled()) { - LOG.info("Cache Content Summary for {} length: {} file count: {} " + - " directory count: {}", entry.getKey(), cs.getLength(), - cs.getFileCount(), cs.getDirectoryCount()); - } - } + futures.add(FileSystemUtils.getInputSummary(partDesc, jobConf, + aliasToWork, pathToAlias, path)); + } - return new ContentSummary(summary[0], summary[1], summary[2]); - } finally { - if (executor != null) { - executor.shutdownNow(); + for (Future> future : futures) { + try { + final ObjectPair result = future.get(); + if (result != null) { + final Path path = result.getFirst(); + final ContentSummary cs = result.getSecond(); + + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); + + ctx.addCS(path.toString(), cs); + + LOG.debug( + "Cache Content Summary for {} length: {} file count: {} " + + " directory count: {}", + path, cs.getLength(), cs.getFileCount(), cs.getDirectoryCount()); + } + } catch (InterruptedException e) { + return; + } catch (ExecutionException e) { + throw new IOException(e); } - HiveInterruptUtils.remove(interrup); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 0b1048c..7912532 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -24,7 +24,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import static org.apache.hadoop.hive.conf.Constants.MATERIALIZED_VIEW_REWRITING_TIME_WINDOW; @@ -76,6 +75,7 @@ import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rex.RexBuilder; import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileChecksum; @@ -112,6 +112,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; +import org.apache.hadoop.hive.ql.exec.FileSystemUtils; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.FunctionUtils; @@ -3574,10 +3575,7 @@ private static void copyFiles(final HiveConf conf, final FileSystem destFs, if (!fullDestStatus.getFileStatus().isDirectory()) { throw new HiveException(destf + " is not a directory."); } - final List>> futures = new LinkedList<>(); - final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? - Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; + final List>> futures = new ArrayList<>(); // For ACID non-bucketed case, the filenames have to be in the format consistent with INSERT/UPDATE/DELETE Ops, // i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc. // The extension is only maintained for files which are compressed. @@ -3591,9 +3589,6 @@ private static void copyFiles(final HiveConf conf, final FileSystem destFs, try { files = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); } catch (IOException e) { - if (null != pool) { - pool.shutdownNow(); - } throw new HiveException(e); } } else { @@ -3609,55 +3604,27 @@ private static void copyFiles(final HiveConf conf, final FileSystem destFs, final boolean isRenameAllowed = !needToCopy && !isSrcLocal; - final String msg = "Unable to move source " + srcP + " to destination " + destf; - - // If we do a rename for a non-local file, we will be transfering the original + // If we do a rename for a non-local file, we will be transferring the original // file permissions from source to the destination. Else, in case of mvFile() where we // copy from source to destination, we will inherit the destination's parent group ownership. - if (null == pool) { - try { - Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isOverwrite, isRenameAllowed, - acidRename ? taskId++ : -1); - - if (null != newFiles) { - newFiles.add(destPath); - } - } catch (Exception e) { - throw getHiveException(e, msg, "Failed to move: {}"); - } - } else { - // future only takes final or seemingly final values. Make a final copy of taskId - final int finalTaskId = acidRename ? taskId++ : -1; - futures.add(pool.submit(new Callable>() { - @Override - public ObjectPair call() throws HiveException { - SessionState.setCurrentSessionState(parentSession); - - try { - Path destPath = - mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isOverwrite, isRenameAllowed, finalTaskId); - - if (null != newFiles) { - newFiles.add(destPath); - } - return ObjectPair.create(srcP, destPath); - } catch (Exception e) { - throw getHiveException(e, msg); - } - } - })); - } + // future only takes final or seemingly final values. Make a final copy of taskId + final int finalTaskId = acidRename ? taskId++ : -1; + Future> mvFuture = + FileSystemUtils.moveFile(parentSession, conf, srcFs, srcP, destFs, + destf, isSrcLocal, isOverwrite, isRenameAllowed, finalTaskId); + futures.add(mvFuture); } } - if (null != pool) { - pool.shutdown(); - for (Future> future : futures) { - try { - ObjectPair pair = future.get(); - LOG.debug("Moved src: {}, to dest: {}", pair.getFirst().toString(), pair.getSecond().toString()); - } catch (Exception e) { - throw handlePoolException(pool, e); + for (Future> future : futures) { + try { + ObjectPair pair = future.get(); + LOG.debug("Moved src: {}, to dest: {}", pair.getFirst(), + pair.getSecond()); + if (newFiles != null) { + newFiles.add(pair.getSecond()); } + } catch (Exception e) { + throw new HiveException(e); } } } @@ -3735,7 +3702,8 @@ private static String getPathName(int taskId) { * * @throws IOException if there was an issue moving the file */ - private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, FileSystem destFs, Path destDirPath, + @VisibleForTesting + public static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, FileSystem destFs, Path destDirPath, boolean isSrcLocal, boolean isOverwrite, boolean isRenameAllowed, int taskId) throws IOException { @@ -3865,7 +3833,6 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, throw new HiveException(e.getMessage(), e); } - HdfsUtils.HadoopFileStatus destStatus = null; String configuredOwner = HiveConf.getVar(conf, ConfVars.HIVE_LOAD_DATA_OWNER); // If source path is a subdirectory of the destination path (or the other way around): @@ -3879,8 +3846,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, final String msg = "Unable to move source " + srcf + " to destination " + destf; try { if (replace) { - try{ - destStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf); + try { //if destf is an existing directory: //if replace is true, delete followed by rename(mv) is equivalent to replace //if replace is false, rename (mv) actually move the src under dest dir @@ -3888,12 +3854,11 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, // to delete the file first if (replace && !srcIsSubDirOfDest) { destFs.delete(destf, true); - LOG.debug("The path " + destf.toString() + " is deleted"); + LOG.debug("The path {} is deleted", destf); } } catch (FileNotFoundException ignore) { } } - final HdfsUtils.HadoopFileStatus desiredStatus = destStatus; final SessionState parentSession = SessionState.get(); if (isSrcLocal) { // For local src file, copy to hdfs @@ -3902,89 +3867,43 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, } else { if (needToCopy(srcf, destf, srcFs, destFs, configuredOwner, isManaged)) { //copy if across file system or encryption zones. - LOG.debug("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); + LOG.debug("Copying source {} to {} because HDFS encryption zones are different.", srcf, destf); return FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf, true, // delete source replace, // overwrite destination conf); } else { if (srcIsSubDirOfDest || destIsSubDirOfSrc) { - FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); + FileStatus[] srcs = + destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); - List> futures = new LinkedList<>(); - final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? - Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; + List> futures = new ArrayList<>(); if (destIsSubDirOfSrc && !destFs.exists(destf)) { - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Creating " + destf); - } + Utilities.FILE_OP_LOGGER.trace("Creating {}", destf); destFs.mkdirs(destf); } - /* Move files one by one because source is a subdirectory of destination */ + /* + * Move files one by one because source is a subdirectory of + * destination + */ for (final FileStatus srcStatus : srcs) { - - final Path destFile = new Path(destf, srcStatus.getPath().getName()); - - final String poolMsg = - "Unable to move source " + srcStatus.getPath() + " to destination " + destFile; - - if (null == pool) { - boolean success = false; - if (destFs instanceof DistributedFileSystem) { - ((DistributedFileSystem)destFs).rename(srcStatus.getPath(), destFile, Options.Rename.OVERWRITE); - success = true; - } else { - destFs.delete(destFile, false); - success = destFs.rename(srcStatus.getPath(), destFile); - } - if(!success) { - throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest:" - + destf + " returned false"); - } - } else { - futures.add(pool.submit(new Callable() { - @Override - public Void call() throws HiveException { - SessionState.setCurrentSessionState(parentSession); - try { - boolean success = false; - if (destFs instanceof DistributedFileSystem) { - ((DistributedFileSystem)destFs).rename(srcStatus.getPath(), destFile, Options.Rename.OVERWRITE); - success = true; - } else { - destFs.delete(destFile, false); - success = destFs.rename(srcStatus.getPath(), destFile); - } - if (!success) { - throw new IOException( - "rename for src path: " + srcStatus.getPath() + " to dest path:" - + destFile + " returned false"); - } - } catch (Exception e) { - throw getHiveException(e, poolMsg); - } - return null; - } - })); - } + final Path destFile = + new Path(destf, srcStatus.getPath().getName()); + futures.add( + FileSystemUtils.renameFile(parentSession, srcStatus.getPath(), + destFs, destFile, Options.Rename.OVERWRITE)); } - if (null != pool) { - pool.shutdown(); - for (Future future : futures) { - try { - future.get(); - } catch (Exception e) { - throw handlePoolException(pool, e); - } + + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + throw new HiveException("Unable to move file", e); } } return true; } else { - if (destFs.rename(srcf, destf)) { - return true; - } - return false; + return destFs.rename(srcf, destf); } } } @@ -4363,53 +4282,19 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, listNewFilesRecursively(destFs, destf, newFiles); } } else { - final Map, Path> moveFutures = Maps.newLinkedHashMapWithExpectedSize(srcs.length); - final int moveFilesThreadCount = HiveConf.getIntVar(conf, ConfVars.HIVE_MOVE_FILES_THREAD_COUNT); - final ExecutorService pool = moveFilesThreadCount > 0 - ? Executors.newFixedThreadPool( - moveFilesThreadCount, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Replace-Thread-%d").build()) - : MoreExecutors.newDirectExecutorService(); - final SessionState parentSession = SessionState.get(); // its either a file or glob + List movedFiles = Lists.newArrayListWithCapacity(srcs.length); for (FileStatus src : srcs) { Path destFile = new Path(destf, src.getPath().getName()); - moveFutures.put( - pool.submit( - new Callable() { - @Override - public Boolean call() throws Exception { - SessionState.setCurrentSessionState(parentSession); - return moveFile( - conf, src.getPath(), destFile, true, isSrcLocal, isManaged); - } - }), - destFile); - } - - pool.shutdown(); - for (Map.Entry, Path> moveFuture : moveFutures.entrySet()) { - boolean moveFailed; - try { - moveFailed = !moveFuture.getKey().get(); - } catch (InterruptedException | ExecutionException e) { - pool.shutdownNow(); - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - if (e.getCause() instanceof HiveException) { - throw (HiveException) e.getCause(); - } - throw handlePoolException(pool, e); - } - if (moveFailed) { + if (moveFile(conf, src.getPath(), destFile, true, isSrcLocal, isManaged)) { + movedFiles.add(destFile); + } else { throw new IOException("Error moving: " + srcf + " into: " + destf); } - - // Add file paths of the files that will be moved to the destination if the caller needs it - if (null != newFiles) { - newFiles.add(moveFuture.getValue()); - } + } + // Add file paths of the files that will be moved to the destination if the caller needs it + if (newFiles != null) { + newFiles.addAll(movedFiles); } } } catch (IOException e) { @@ -4480,39 +4365,22 @@ private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs, public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses, final Configuration conf, final boolean purge) throws IOException { - boolean result = true; - - if (statuses == null || statuses.length == 0) { + if (ArrayUtils.isEmpty(statuses)) { return false; } - final List> futures = new LinkedList<>(); - final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? - Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null; + boolean result = true; + final List> futures = new ArrayList<>(); final SessionState parentSession = SessionState.get(); for (final FileStatus status : statuses) { - if (null == pool) { - result &= FileUtils.moveToTrash(fs, status.getPath(), conf, purge); - } else { - futures.add(pool.submit(new Callable() { - @Override - public Boolean call() throws Exception { - SessionState.setCurrentSessionState(parentSession); - return FileUtils.moveToTrash(fs, status.getPath(), conf, purge); - } - })); - } + futures.add( + FileSystemUtils.moveToTrash(parentSession, fs, status, conf, purge)); } - if (null != pool) { - pool.shutdown(); - for (Future future : futures) { - try { - result &= future.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Failed to delete: ",e); - pool.shutdownNow(); - throw new IOException(e); - } + for (Future future : futures) { + try { + result &= future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to delete", e); + throw new IOException(e); } } return result; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 90eb45b..a0fa98f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -26,28 +26,19 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.apache.hadoop.hive.ql.exec.Utilities.getFileExtension; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; @@ -73,7 +64,6 @@ import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState; @@ -540,135 +530,6 @@ private void runTestGetInputPaths(JobConf jobConf, int numOfPartitions) throws E } @Test - public void testGetInputSummaryPool() throws ExecutionException, InterruptedException, IOException { - ExecutorService pool = mock(ExecutorService.class); - when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class)); - - Set pathNeedProcess = new HashSet<>(); - pathNeedProcess.add(new Path("dummy-path1")); - pathNeedProcess.add(new Path("dummy-path2")); - pathNeedProcess.add(new Path("dummy-path3")); - - SessionState.start(new HiveConf()); - JobConf jobConf = new JobConf(); - Context context = new Context(jobConf); - - Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool); - verify(pool, times(3)).submit(any(Runnable.class)); - verify(pool).shutdown(); - verify(pool).shutdownNow(); - } - - @Test - public void testGetInputSummaryPoolAndFailure() throws ExecutionException, InterruptedException, IOException { - ExecutorService pool = mock(ExecutorService.class); - when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class)); - - Set pathNeedProcess = new HashSet<>(); - pathNeedProcess.add(new Path("dummy-path1")); - pathNeedProcess.add(new Path("dummy-path2")); - pathNeedProcess.add(new Path("dummy-path3")); - - SessionState.start(new HiveConf()); - JobConf jobConf = new JobConf(); - Context context = new Context(jobConf); - - Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool); - verify(pool, times(3)).submit(any(Runnable.class)); - verify(pool).shutdown(); - verify(pool).shutdownNow(); - } - - @Test - public void testGetInputPathsPool() throws IOException, ExecutionException, InterruptedException { - List pathsToAdd = new ArrayList<>(); - Path path = new Path("dummy-path"); - - pathsToAdd.add(path); - pathsToAdd.add(path); - pathsToAdd.add(path); - - ExecutorService pool = mock(ExecutorService.class); - Future mockFuture = mock(Future.class); - - when(mockFuture.get()).thenReturn(path); - when(pool.submit(any(Callable.class))).thenReturn(mockFuture); - - Utilities.getInputPathsWithPool(mock(JobConf.class), mock(MapWork.class), mock(Path.class), mock(Context.class), - false, pathsToAdd, pool); - - verify(pool, times(3)).submit(any(Callable.class)); - verify(pool).shutdown(); - verify(pool).shutdownNow(); - } - - @Test - public void testGetInputPathsPoolAndFailure() throws IOException, ExecutionException, InterruptedException { - List pathsToAdd = new ArrayList<>(); - Path path = new Path("dummy-path"); - - pathsToAdd.add(path); - pathsToAdd.add(path); - pathsToAdd.add(path); - - ExecutorService pool = mock(ExecutorService.class); - Future mockFuture = mock(Future.class); - - when(mockFuture.get()).thenThrow(new RuntimeException()); - when(pool.submit(any(Callable.class))).thenReturn(mockFuture); - - Exception e = null; - try { - Utilities.getInputPathsWithPool(mock(JobConf.class), mock(MapWork.class), mock(Path.class), mock(Context.class), - false, pathsToAdd, pool); - } catch (Exception thrownException) { - e = thrownException; - } - assertNotNull(e); - - verify(pool, times(3)).submit(any(Callable.class)); - verify(pool).shutdownNow(); - } - - @Test - public void testGetInputSummaryWithASingleThread() throws IOException { - final int NUM_PARTITIONS = 5; - final int BYTES_PER_FILE = 5; - - JobConf jobConf = new JobConf(); - Properties properties = new Properties(); - - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); - ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - } - - @Test - public void testGetInputSummaryWithMultipleThreads() throws IOException { - final int NUM_PARTITIONS = 5; - final int BYTES_PER_FILE = 5; - - JobConf jobConf = new JobConf(); - Properties properties = new Properties(); - - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2); - ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - - // Test deprecated mapred.dfsclient.parallelism.max - jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0); - jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2); - summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class); - assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength()); - assertEquals(NUM_PARTITIONS, summary.getFileCount()); - assertEquals(NUM_PARTITIONS, summary.getDirectoryCount()); - } - - @Test public void testGetInputSummaryWithInputEstimator() throws IOException, HiveException { final int NUM_PARTITIONS = 5; final int BYTES_PER_FILE = 10;