diff --git a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java index 62c109c45d..c9253c199b 100644 --- a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java +++ b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java @@ -49,7 +49,12 @@ protected static Path warehouseDir = null; protected UserGroupInformation userUgi = null; protected String testUserName = "test_user"; + protected String proxyUserName = null; + @Override + protected String getProxyUserName() { + return proxyUserName; + } @Override protected boolean isTestEnabled() { @@ -74,10 +79,10 @@ protected HiveConf createHiveConf() throws Exception { // Hadoop FS ACLs do not work with LocalFileSystem, so set up MiniDFS. HiveConf conf = super.createHiveConf(); - String currentUserName = Utils.getUGI().getShortUserName(); + proxyUserName = Utils.getUGI().getShortUserName(); conf.set("dfs.namenode.acls.enabled", "true"); - conf.set("hadoop.proxyuser." + currentUserName + ".groups", "*"); - conf.set("hadoop.proxyuser." + currentUserName + ".hosts", "*"); + conf.set("hadoop.proxyuser." + proxyUserName + ".groups", "*"); + conf.set("hadoop.proxyuser." + proxyUserName + ".hosts", "*"); dfs = ShimLoader.getHadoopShims().getMiniDfs(conf, 4, true, null); FileSystem fs = dfs.getFileSystem(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java index 0e08e81f20..0fc677b3f5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestMetastoreAuthorizationProvider.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Collections; import junit.framework.TestCase; @@ -85,6 +86,10 @@ protected HiveConf createHiveConf() throws Exception { return new HiveConf(this.getClass()); } + protected String getProxyUserName() { + return null; + } + @Override protected void setUp() throws Exception { @@ -304,6 +309,19 @@ public void testSimplePrivileges() throws Exception { ret = driver.run("alter table "+tblName+" add partition (b='2011')"); assertEquals(0,ret.getResponseCode()); + String proxyUserName = getProxyUserName(); + if (proxyUserName != null) { + // for storage based authorization, user having proxy privilege should be allowed to do operation + // even if the file permission is not there. + InjectableDummyAuthenticator.injectUserName(proxyUserName); + InjectableDummyAuthenticator.injectGroupNames(Collections.singletonList(proxyUserName)); + InjectableDummyAuthenticator.injectMode(true); + disallowCreateInTbl(tbl.getTableName(), proxyUserName, tbl.getSd().getLocation()); + ret = driver.run("alter table "+tblName+" add partition (b='2012')"); + assertEquals(0, ret.getResponseCode()); + InjectableDummyAuthenticator.injectMode(false); + } + allowDropOnTable(tblName, userName, tbl.getSd().getLocation()); allowDropOnDb(dbName,userName,db.getLocationUri()); ret = driver.run("drop database if exists "+getTestDbName()+" cascade"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java index efecdb826c..d7eed2c033 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java @@ -33,8 +33,13 @@ import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import java.security.PrivilegedExceptionAction; import java.io.Serializable; +import java.io.FileNotFoundException; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -68,6 +73,83 @@ private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class); private static final int MAX_COPY_RETRY = 5; + private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException { + FileSystem targetFs = destPath.getFileSystem(conf); + boolean createdDir = false; + if (!targetFs.exists(destPath)) { + // target path is created even if the source path is missing, so that ddl task does not try to create it. + if (!targetFs.mkdirs(destPath)) { + throw new IOException(destPath + " is not a directory or unable to create one"); + } + createdDir = true; + } + + FileStatus status; + try { + status = sourcePath.getFileSystem(conf).getFileStatus(sourcePath); + } catch (FileNotFoundException e) { + // Don't delete target path created else ddl task will try to create it using user hive and may fail. + LOG.warn("source path missing " + sourcePath); + return false; + } + LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}", + destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission()); + destPath.getFileSystem(conf).setOwner(destPath, status.getOwner(), status.getGroup()); + destPath.getFileSystem(conf).setPermission(destPath, status.getPermission()); + return createdDir; + } + + private boolean setTargetPathOwner(Path targetPath, Path sourcePath, String distCpDoAsUser) + throws IOException { + if (distCpDoAsUser == null) { + return createAndSetPathOwner(targetPath, sourcePath); + } + UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( + distCpDoAsUser, UserGroupInformation.getLoginUser()); + try { + Path finalTargetPath = targetPath; + Path finalSourcePath = sourcePath; + return proxyUser.doAs((PrivilegedExceptionAction) () -> + createAndSetPathOwner(finalTargetPath, finalSourcePath)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private int handleException(Exception e, Path sourcePath, Path targetPath, int currentRetry) { + try { + if (!sourcePath.getFileSystem(conf).exists(sourcePath)) { + LOG.warn("Source path missing " + sourcePath, e); + return 0; + } + } catch (Exception ex) { + LOG.warn("Source path missing check failed" + sourcePath, ex); + } + + if (currentRetry <= MAX_COPY_RETRY) { + LOG.warn("unable to copy {} to {}", sourcePath, targetPath, e); + } else { + LOG.error("unable to copy {} to {}", sourcePath, targetPath, e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + + int sleepTime = FileUtils.getSleepTime(currentRetry); + LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry)); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException timerEx) { + LOG.info("sleep interrupted", timerEx.getMessage()); + } + + try { + FileSystem.closeAllForUGI(Utils.getUGI()); + } catch (Exception ex) { + LOG.error("unable to closeAllForUGI", ex); + } + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + @Override protected int execute(DriverContext driverContext) { String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); @@ -79,13 +161,16 @@ protected int execute(DriverContext driverContext) { targetPath = reservedRawPath(work.fullyQualifiedTargetPath.toUri()); } int currentRetry = 0; - while (currentRetry < MAX_COPY_RETRY) { + int error = 0; + while (currentRetry <= MAX_COPY_RETRY) { try { UserGroupInformation ugi = Utils.getUGI(); String currentUser = ugi.getShortUserName(); boolean usePrivilegedUser = distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser); + setTargetPathOwner(targetPath, sourcePath, usePrivilegedUser ? distCpDoAsUser : null); + // do we create a new conf and only here provide this additional option so that we get away from // differences of data in two location for the same directories ? // basically add distcp.options.delete to hiveconf new object ? @@ -99,17 +184,14 @@ protected int execute(DriverContext driverContext) { ShimLoader.getHadoopShims()); return 0; } catch (Exception e) { - if (++currentRetry < MAX_COPY_RETRY) { - LOG.warn("unable to copy", e); - } else { - LOG.error("unable to copy {} to {}", sourcePath, targetPath, e); - setException(e); - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + currentRetry++; + error = handleException(e, sourcePath, targetPath, currentRetry); + if (error == 0) { + return 0; } } } - LOG.error("should never come here "); - return -1; + return error; } private static Path reservedRawPath(URI uri) { @@ -119,13 +201,18 @@ private static Path reservedRawPath(URI uri) { @Override public StageType getType() { - return StageType.REPL_INCREMENTAL_LOAD; + return StageType.COPY; } @Override public String getName() { return "DIR_COPY_TASK"; } + + @Override + public boolean canExecuteInParallel(){ + return true; + } } @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index b5e39b8592..2309fc9b9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -130,7 +130,16 @@ a database ( directory ) if (!iterator.hasNext() && constraintIterator.hasNext()) { loadingConstraint = true; } - while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) { + while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()) || + (work.getPathsToCopyIterator().hasNext())) && loadTaskTracker.canAddMoreTasks()) { + // First start the distcp tasks to copy the files related to external table. The distcp tasks should be + // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these + // directory with proper permission and owner. + if (work.getPathsToCopyIterator().hasNext()) { + scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker)); + break; + } + BootstrapEvent next; if (!loadingConstraint) { next = iterator.next(); @@ -249,10 +258,6 @@ a database ( directory ) } } - if (loadTaskTracker.canAddMoreTasks()) { - scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker)); - } - boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() || constraintIterator.hasNext() @@ -265,7 +270,8 @@ a database ( directory ) // Update last repl ID of the database only if the current dump is not incremental. If bootstrap // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change // last repl ID of the database. - if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) { + if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.getPathsToCopyIterator().hasNext() + && !work.isIncrementalLoad()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); work.updateDbEventState(null); } @@ -470,34 +476,17 @@ private int executeIncrementalLoad(DriverContext driverContext) { } List> childTasks = new ArrayList<>(); - int parallelism = conf.getIntVar(HiveConf.ConfVars.EXECPARALLETHREADNUMBER); - // during incremental we will have no parallelism from replication tasks since they are event based - // and hence are linear. To achieve parallelism we have to use copy tasks(which have no DAG) for - // all threads except one, in execution phase. int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - // If the total number of tasks that can be created are less than the parallelism we can achieve - // do nothing since someone is working on 1950's machine. else try to achieve max parallelism - int calculatedMaxNumOfTasks = 0, maxNumOfHDFSTasks = 0; - if (maxTasks <= parallelism) { - if (builder.hasMoreWork()) { - calculatedMaxNumOfTasks = maxTasks; - } else { - maxNumOfHDFSTasks = maxTasks; - } + // First start the distcp tasks to copy the files related to external table. The distcp tasks should be + // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these + // directory with proper permission and owner. + TaskTracker tracker = new TaskTracker(maxTasks); + if (work.getPathsToCopyIterator().hasNext()) { + childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(tracker)); } else { - calculatedMaxNumOfTasks = maxTasks - parallelism + 1; - maxNumOfHDFSTasks = parallelism - 1; + childTasks.add(builder.build(driverContext, getHive(), LOG, tracker)); } - TaskTracker trackerForReplIncremental = new TaskTracker(calculatedMaxNumOfTasks); - Task incrementalLoadTaskRoot = - builder.build(driverContext, getHive(), LOG, trackerForReplIncremental); - // we are adding the incremental task first so that its always processed first, - // followed by dir copy tasks if capacity allows. - childTasks.add(incrementalLoadTaskRoot); - - TaskTracker trackerForCopy = new TaskTracker(maxNumOfHDFSTasks); - childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(trackerForCopy)); // Either the incremental has more work or the external table file copy has more paths to process. // Once all the incremental events are applied and external tables file copies are done, enable diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java index de5504498d..2a52e8354b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java @@ -26,7 +26,10 @@ import javax.security.auth.login.LoginException; +import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -154,6 +157,20 @@ public void authorize(Database db, Privilege[] readRequiredPriv, Privilege[] wri authorize(path, readRequiredPriv, writeRequiredPriv); } + private static boolean userHasProxyPrivilege(String user, Configuration conf) { + try { + if (MetaStoreServerUtils.checkUserHasHostProxyPrivileges(user, conf, + HiveMetaStore.HMSHandler.getIPAddress())) { + LOG.info("user {} has host proxy privilege.", user); + return true; + } + } catch (Exception ex) { + // if can not decide on the proxy privilege status, then proceed with authorization check. + LOG.warn("Cannot obtain username to check for host proxy privilege", ex); + } + return false; + } + @Override public void authorize(Table table, Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv) throws HiveException, AuthorizationException { @@ -366,6 +383,11 @@ protected void checkPermissions(final Configuration conf, final Path path, throw new IllegalArgumentException("path is null"); } + if (userHasProxyPrivilege(authenticator.getUserName(), conf)) { + LOG.info("Path authorization is skipped for path {}.", path); + return; + } + final FileSystem fs = path.getFileSystem(conf); FileStatus pathStatus = FileUtils.getFileStatusOrNull(fs, path); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 41f399becd..510be01c34 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -358,7 +358,7 @@ private static void logAuditEvent(String cmd) { auditLog.info("ugi={} ip={} cmd={} ", ugi.getUserName(), address, cmd); } - private static String getIPAddress() { + public static String getIPAddress() { if (useSasl) { if (saslServer != null && saslServer.getRemoteAddress() != null) { return saslServer.getRemoteAddress().getHostAddress();