diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index c944a13c67..250004ca1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -32,6 +32,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -196,7 +197,7 @@ private TaskTracker forNewTable() throws Exception { LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " + partSpecToString(partSpec.getPartSpec()) + " with source location: " + partSpec.getLocation()); - Path tmpPath = context.utils.getExternalTmpPath(replicaWarehousePartitionLocation); + Path tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index a1187c4460..9ffd152f40 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -32,6 +32,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; @@ -218,7 +219,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) private Task loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath, Path fromURI) { Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME); - Path tmpPath = context.utils.getExternalTmpPath(tgtPath); + Path tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo); Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java index 2a7cca1459..8948b0c7d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java @@ -26,12 +26,12 @@ Licensed to the Apache Software Foundation (ASF) under one public final HiveConf hiveConf; public final Hive hiveDb; public final Warehouse warehouse; - public final PathUtils utils; + public final PathInfo pathInfo; public Context(HiveConf hiveConf, Hive hiveDb) throws MetaException { this.hiveConf = hiveConf; this.hiveDb = hiveDb; this.warehouse = new Warehouse(hiveConf); - this.utils = new PathUtils(hiveConf); + this.pathInfo = new PathInfo(hiveConf); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathInfo.java new file mode 100644 index 0000000000..f9f3750090 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathInfo.java @@ -0,0 +1,90 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hive.ql.Context.generateExecutionId; + +public class PathInfo { + private static Logger LOG = LoggerFactory.getLogger(PathUtils.class); + + private final Map fsScratchDirs = new HashMap<>(); + private final String stagingDir; + private final HiveConf hiveConf; + + public PathInfo(HiveConf hiveConf) { + this.hiveConf = hiveConf; + stagingDir = HiveConf.getVar(hiveConf, HiveConf.ConfVars.STAGINGDIR); + } + + Path computeStagingDir(Path inputPath) { + final URI inputPathUri = inputPath.toUri(); + final String inputPathName = inputPathUri.getPath(); + final String fileSystemAsString = inputPathUri.getScheme() + ":" + inputPathUri.getAuthority(); + + String stagingPathName; + if (!inputPathName.contains(stagingDir)) { + stagingPathName = new Path(inputPathName, stagingDir).toString(); + } else { + stagingPathName = + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length()); + } + + final String key = + fileSystemAsString + "-" + stagingPathName + "-" + TaskRunner.getTaskRunnerID(); + + Path dir = fsScratchDirs.get(key); + try { + FileSystem fileSystem = inputPath.getFileSystem(hiveConf); + if (dir == null) { + // Append task specific info to stagingPathName, instead of creating a sub-directory. + // This way we don't have to worry about deleting the stagingPathName separately at + // end of query execution. + Path path = new Path( + stagingPathName + "_" + generateExecutionId() + "-" + TaskRunner.getTaskRunnerID()); + dir = fileSystem.makeQualified(path); + + LOG.debug("Created staging dir = " + dir + " for path = " + inputPath); + + if (!FileUtils.mkdir(fileSystem, dir, hiveConf)) { + throw new IllegalStateException( + "Cannot create staging directory '" + dir.toString() + "'"); + } + fileSystem.deleteOnExit(dir); + } + fsScratchDirs.put(key, dir); + return dir; + + } catch (IOException e) { + throw new RuntimeException( + "Cannot create staging directory '" + dir.toString() + "': " + e.getMessage(), e); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java index d0b7bdac7d..649f4407a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java @@ -17,89 +17,27 @@ Licensed to the Apache Software Foundation (ASF) under one */ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.URI; -import java.util.HashMap; -import java.util.Map; - import static org.apache.hadoop.hive.ql.Context.EXT_PREFIX; -import static org.apache.hadoop.hive.ql.Context.generateExecutionId; public class PathUtils { private static int pathId = 10000; private static Logger LOG = LoggerFactory.getLogger(PathUtils.class); - private final Map fsScratchDirs = new HashMap<>(); - private final String stagingDir; - private final HiveConf hiveConf; - - PathUtils(HiveConf hiveConf) { - this.hiveConf = hiveConf; - stagingDir = HiveConf.getVar(hiveConf, HiveConf.ConfVars.STAGINGDIR); - } - - public synchronized Path getExternalTmpPath(Path path) { + public static synchronized Path getExternalTmpPath(Path path, PathInfo pathInfo) { URI extURI = path.toUri(); if (extURI.getScheme().equals("viewfs")) { // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/.. // to final /user/hive/warehouse/ will fail later, so instead pick tmp dir // on same namespace as tbl dir. - return new Path(getStagingDir(path.getParent()), + return new Path(pathInfo.computeStagingDir(path.getParent()), EXT_PREFIX + Integer.toString(++pathId)); } Path fullyQualifiedPath = new Path(extURI.getScheme(), extURI.getAuthority(), extURI.getPath()); - return new Path(getStagingDir(fullyQualifiedPath), EXT_PREFIX + Integer.toString(++pathId)); - } - - private Path getStagingDir(Path inputPath) { - final URI inputPathUri = inputPath.toUri(); - final String inputPathName = inputPathUri.getPath(); - final String fileSystemAsString = inputPathUri.getScheme() + ":" + inputPathUri.getAuthority(); - - String stagingPathName; - if (!inputPathName.contains(stagingDir)) { - stagingPathName = new Path(inputPathName, stagingDir).toString(); - } else { - stagingPathName = - inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length()); - } - - final String key = - fileSystemAsString + "-" + stagingPathName + "-" + TaskRunner.getTaskRunnerID(); - - Path dir = fsScratchDirs.get(key); - try { - FileSystem fileSystem = inputPath.getFileSystem(hiveConf); - if (dir == null) { - // Append task specific info to stagingPathName, instead of creating a sub-directory. - // This way we don't have to worry about deleting the stagingPathName separately at - // end of query execution. - Path path = new Path( - stagingPathName + "_" + generateExecutionId() + "-" + TaskRunner.getTaskRunnerID()); - dir = fileSystem.makeQualified(path); - - LOG.debug("Created staging dir = " + dir + " for path = " + inputPath); - - if (!FileUtils.mkdir(fileSystem, dir, hiveConf)) { - throw new IllegalStateException( - "Cannot create staging directory '" + dir.toString() + "'"); - } - fileSystem.deleteOnExit(dir); - } - fsScratchDirs.put(key, dir); - return dir; - - } catch (IOException e) { - throw new RuntimeException( - "Cannot create staging directory '" + dir.toString() + "': " + e.getMessage(), e); - } + return new Path(pathInfo.computeStagingDir(fullyQualifiedPath), EXT_PREFIX + Integer.toString(++pathId)); } }