diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java index efecdb826c..ec430276be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java @@ -33,8 +33,13 @@ import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import java.security.PrivilegedExceptionAction; import java.io.Serializable; +import java.io.FileNotFoundException; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -68,6 +73,83 @@ private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class); private static final int MAX_COPY_RETRY = 5; + private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException { + FileSystem targetFs = destPath.getFileSystem(conf); + boolean createdDir = false; + if (!targetFs.exists(destPath)) { + // target path is created even if the source path is missing, so that ddl task does not try to create it. + if (!targetFs.mkdirs(destPath)) { + throw new IOException(destPath + " is not a directory or unable to create one"); + } + createdDir = true; + } + + FileStatus status; + try { + status = sourcePath.getFileSystem(conf).getFileStatus(sourcePath); + } catch (FileNotFoundException e) { + // Don't delete target path created else ddl task will try to create it using user hive and may fail. + LOG.warn("source path missing " + sourcePath); + return false; + } + LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}", + destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission()); + destPath.getFileSystem(conf).setOwner(destPath, status.getOwner(), status.getGroup()); + destPath.getFileSystem(conf).setPermission(destPath, status.getPermission()); + return createdDir; + } + + private boolean setTargetPathOwner(Path targetPath, Path sourcePath, String distCpDoAsUser) + throws IOException { + if (distCpDoAsUser == null) { + return createAndSetPathOwner(targetPath, sourcePath); + } + UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( + distCpDoAsUser, UserGroupInformation.getLoginUser()); + try { + Path finalTargetPath = targetPath; + Path finalSourcePath = sourcePath; + return proxyUser.doAs((PrivilegedExceptionAction) () -> + createAndSetPathOwner(finalTargetPath, finalSourcePath)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private int handleException(Exception e, Path sourcePath, Path targetPath, int currentRetry) { + try { + if (e instanceof FileNotFoundException && !sourcePath.getFileSystem(conf).exists(sourcePath)) { + LOG.warn("Source path missing " + sourcePath, e); + return 0; + } + } catch (Exception ex) { + LOG.warn("Source path missing check failed" + sourcePath, ex); + } + + if (currentRetry <= MAX_COPY_RETRY) { + LOG.warn("unable to copy {} to {}", sourcePath, targetPath, e); + } else { + LOG.error("unable to copy {} to {}", sourcePath, targetPath, e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + + int sleepTime = FileUtils.getSleepTime(currentRetry); + LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry)); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException timerEx) { + LOG.info("sleep interrupted", timerEx.getMessage()); + } + + try { + FileSystem.closeAllForUGI(Utils.getUGI()); + } catch (Exception ex) { + LOG.error("unable to closeAllForUGI", ex); + } + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + @Override protected int execute(DriverContext driverContext) { String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); @@ -79,13 +161,16 @@ protected int execute(DriverContext driverContext) { targetPath = reservedRawPath(work.fullyQualifiedTargetPath.toUri()); } int currentRetry = 0; - while (currentRetry < MAX_COPY_RETRY) { + int error = 0; + while (currentRetry <= MAX_COPY_RETRY) { try { UserGroupInformation ugi = Utils.getUGI(); String currentUser = ugi.getShortUserName(); boolean usePrivilegedUser = distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser); + setTargetPathOwner(targetPath, sourcePath, usePrivilegedUser ? distCpDoAsUser : null); + // do we create a new conf and only here provide this additional option so that we get away from // differences of data in two location for the same directories ? // basically add distcp.options.delete to hiveconf new object ? @@ -99,17 +184,14 @@ protected int execute(DriverContext driverContext) { ShimLoader.getHadoopShims()); return 0; } catch (Exception e) { - if (++currentRetry < MAX_COPY_RETRY) { - LOG.warn("unable to copy", e); - } else { - LOG.error("unable to copy {} to {}", sourcePath, targetPath, e); - setException(e); - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + currentRetry++; + error = handleException(e, sourcePath, targetPath, currentRetry); + if (error == 0) { + return 0; } } } - LOG.error("should never come here "); - return -1; + return error; } private static Path reservedRawPath(URI uri) { @@ -119,13 +201,18 @@ private static Path reservedRawPath(URI uri) { @Override public StageType getType() { - return StageType.REPL_INCREMENTAL_LOAD; + return StageType.COPY; } @Override public String getName() { return "DIR_COPY_TASK"; } + + @Override + public boolean canExecuteInParallel(){ + return true; + } } @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 7062eda98d..2c191cbef4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -119,6 +119,14 @@ a database ( directory ) loadingConstraint = true; } while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) { + // First start the distcp tasks to copy the files related to external table. The distcp tasks should be + // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these + // directory with proper permission and owner. + if (work.getPathsToCopyIterator().hasNext()) { + scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker)); + break; + } + BootstrapEvent next; if (!loadingConstraint) { next = iterator.next(); @@ -231,10 +239,6 @@ a database ( directory ) constraintTracker.debugLog("constraints"); } } - - if (!loadingConstraint && !iterator.currentDbHasNext()) { - createEndReplLogTask(context, scope, iterator.replLogger()); - } } if (loadTaskTracker.canAddMoreTasks()) { @@ -381,34 +385,17 @@ private int executeIncrementalLoad(DriverContext driverContext) { } List> childTasks = new ArrayList<>(); - int parallelism = conf.getIntVar(HiveConf.ConfVars.EXECPARALLETHREADNUMBER); - // during incremental we will have no parallelism from replication tasks since they are event based - // and hence are linear. To achieve parallelism we have to use copy tasks(which have no DAG) for - // all threads except one, in execution phase. int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - // If the total number of tasks that can be created are less than the parallelism we can achieve - // do nothing since someone is working on 1950's machine. else try to achieve max parallelism - int calculatedMaxNumOfTasks = 0, maxNumOfHDFSTasks = 0; - if (maxTasks <= parallelism) { - if (builder.hasMoreWork()) { - calculatedMaxNumOfTasks = maxTasks; - } else { - maxNumOfHDFSTasks = maxTasks; - } + // First start the distcp tasks to copy the files related to external table. The distcp tasks should be + // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these + // directory with proper permission and owner. + TaskTracker tracker = new TaskTracker(maxTasks); + if (work.getPathsToCopyIterator().hasNext()) { + childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(tracker)); } else { - calculatedMaxNumOfTasks = maxTasks - parallelism + 1; - maxNumOfHDFSTasks = parallelism - 1; + childTasks.add(builder.build(driverContext, getHive(), LOG, tracker)); } - TaskTracker trackerForReplIncremental = new TaskTracker(calculatedMaxNumOfTasks); - Task incrementalLoadTaskRoot = - builder.build(driverContext, getHive(), LOG, trackerForReplIncremental); - // we are adding the incremental task first so that its always processed first, - // followed by dir copy tasks if capacity allows. - childTasks.add(incrementalLoadTaskRoot); - - TaskTracker trackerForCopy = new TaskTracker(maxNumOfHDFSTasks); - childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(trackerForCopy)); // Either the incremental has more work or the external table file copy has more paths to process. // Once all the incremental events are applied and external tables file copies are done, enable diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java index de5504498d..fc1cb08e2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java @@ -26,7 +26,10 @@ import javax.security.auth.login.LoginException; +import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -188,6 +191,17 @@ public void authorize(Table table, Privilege[] readRequiredPriv, Privilege[] wri checkDeletePermission(path, getConf(), authenticator.getUserName()); } + try { + String user = SecurityUtils.getUGI().getShortUserName(); + if (MetaStoreServerUtils.checkUserHasHostProxyPrivileges(user, getConf(), + HiveMetaStore.HMSHandler.getIPAddress())) { + LOG.info("Path authentication is skipped for user {} with host proxy privilege." + user); + } + } catch (Exception ex) { + // if can not decide on the proxy privilege status, then proceed with authorization check. + LOG.error("Cannot obtain username to check for host proxy privilege", ex); + } + // If the user has specified a location - external or not, check if the user // has the permissions on the table dir if (path != null) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 41f399becd..510be01c34 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -358,7 +358,7 @@ private static void logAuditEvent(String cmd) { auditLog.info("ugi={} ip={} cmd={} ", ugi.getUserName(), address, cmd); } - private static String getIPAddress() { + public static String getIPAddress() { if (useSasl) { if (saslServer != null && saslServer.getRemoteAddress() != null) { return saslServer.getRemoteAddress().getHostAddress();