diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e7f5fc0c6a..39e5c00b74 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1561,6 +1561,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_STRICT_CHECKS_BUCKETING("hive.strict.checks.bucketing", true, "Enabling strict bucketing checks disallows the following:\n" + " Load into bucketed tables."), + HIVE_LOAD_DATA_OWNER("hive.load.data.owner", "", + "Set the owner of files loaded using load data in managed tables."), @Deprecated HIVEMAPREDMODE("hive.mapred.mode", null, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index f80a945be5..19097f5e70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -140,7 +140,8 @@ private void moveFileInDfs (Path sourcePath, Path targetPath, HiveConf conf) deletePath = createTargetPath(targetPath, tgtFs); } Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false); - if (!Hive.moveFile(conf, sourcePath, targetPath, true, false)) { + // Set isManaged to false as this is not load data operation for which it is needed. + if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, false)) { try { if (deletePath != null) { tgtFs.delete(deletePath, true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 2ec131e274..3cf49b4cf8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -63,21 +63,13 @@ import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptMaterialization; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.hep.HepPlanner; import org.apache.calcite.plan.hep.HepProgramBuilder; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelVisitor; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.tools.RelBuilder; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileChecksum; @@ -86,14 +78,13 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable; import org.apache.hadoop.hive.common.log.InPlaceUpdate; @@ -180,7 +171,6 @@ import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAugmentMaterializationRule; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; @@ -202,7 +192,6 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.hive.common.util.TxnIdUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1727,16 +1716,9 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } else { newPartPath = oldPartPath; } - List newFiles = null; + List newFiles = Collections.synchronizedList(new ArrayList()); perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES); - // If config is set, table is not temporary and partition being inserted exists, capture - // the list of files added. For not yet existing partitions (insert overwrite to new partition - // or dynamic partition inserts), the add partition event will capture the list of files added. - if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) { - newFiles = Collections.synchronizedList(new ArrayList()); - } - // Note: the stats for ACID tables do not have any coordination with either Hive ACID logic // like txn commits, time outs, etc.; nor the lower level sync in metastore pertaining @@ -1771,6 +1753,8 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("moving " + loadPath + " to " + destPath); } + + boolean isManaged = tbl.getTableType().equals(TableType.MANAGED_TABLE.toString()); // TODO: why is "&& !isAcidIUDoperation" needed here? if (!isTxnTable && ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcidIUDoperation))) { //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new @@ -1779,12 +1763,12 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par boolean needRecycle = !tbl.isTemporary() && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName())); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, - isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle); + isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles, - tbl.getNumBuckets() > 0, isFullAcidTable); + tbl.getNumBuckets() > 0, isFullAcidTable, isManaged); } } perfLogger.PerfLogEnd("MoveTask", PerfLogger.FILE_MOVES); @@ -1792,13 +1776,13 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); + // If config is set, table is not temporary and partition being inserted exists, capture + // the list of files added. For not yet existing partitions (insert overwrite to new partition + // or dynamic partition inserts), the add partition event will capture the list of files added. // Generate an insert event only if inserting into an existing partition // When inserting into a new partition, the add partition event takes care of insert event - if ((null != oldPart) && (null != newFiles)) { + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) { fireInsertEvent(tbl, partSpec, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); - } else { - LOG.debug("No new files were created, and is not a replace, or we're inserting into a " - + "partition that does not exist yet. Skipping generating INSERT event."); } // column stats will be inaccurate @@ -1871,6 +1855,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } else { setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart); } + perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION); return newTPart; } catch (IOException e) { @@ -2316,15 +2301,12 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_TABLE); - List newFiles = null; + List newFiles = Collections.synchronizedList(new ArrayList()); Table tbl = getTable(tableName); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl); boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); - if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { - newFiles = Collections.synchronizedList(new ArrayList()); - } // Note: this assumes both paths are qualified; which they are, currently. if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) { @@ -2356,19 +2338,21 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES); + boolean isManaged = tbl.getTableType().equals(TableType.MANAGED_TABLE.toString()); + if (loadFileType == LoadFileType.REPLACE_ALL && !isTxnTable) { //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); boolean needRecycle = !tbl.isTemporary() && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName())); replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge, - newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle); + newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles, - tbl.getNumBuckets() > 0 ? true : false, isFullAcidTable); + tbl.getNumBuckets() > 0, isFullAcidTable, isManaged); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); } @@ -2406,7 +2390,11 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType alterTable(tbl, environmentContext); - fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { + fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); + } else { + fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), null); + } perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_TABLE); } @@ -3320,7 +3308,7 @@ public PrincipalPrivilegeSet get_privilege_set(HiveObjectType objectType, private static void copyFiles(final HiveConf conf, final FileSystem destFs, FileStatus[] srcs, final FileSystem srcFs, final Path destf, final boolean isSrcLocal, boolean isOverwrite, - final List newFiles, boolean acidRename) throws HiveException { + final List newFiles, boolean acidRename, boolean isManaged) throws HiveException { final HdfsUtils.HadoopFileStatus fullDestStatus; try { @@ -3342,6 +3330,7 @@ private static void copyFiles(final HiveConf conf, final FileSystem destFs, int taskId = 0; // Sort the files Arrays.sort(srcs); + String configuredOwner = HiveConf.getVar(conf, ConfVars.HIVE_LOAD_DATA_OWNER); for (FileStatus src : srcs) { FileStatus[] files; if (src.isDirectory()) { @@ -3362,7 +3351,7 @@ private static void copyFiles(final HiveConf conf, final FileSystem destFs, Arrays.sort(files); for (final FileStatus srcFile : files) { final Path srcP = srcFile.getPath(); - final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs); + final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs, configuredOwner, isManaged); final boolean isRenameAllowed = !needToCopy && !isSrcLocal; @@ -3604,7 +3593,7 @@ public void recycleDirToCmPath(Path dataPath, boolean isPurge) throws HiveExcept //from mv command if the destf is a directory, it replaces the destf instead of moving under //the destf. in this case, the replaced destf still preserves the original destf's permission public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace, - boolean isSrcLocal) throws HiveException { + boolean isSrcLocal, boolean isManaged) throws HiveException { final FileSystem srcFs, destFs; try { destFs = destf.getFileSystem(conf); @@ -3620,6 +3609,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, } HdfsUtils.HadoopFileStatus destStatus = null; + String configuredOwner = HiveConf.getVar(conf, ConfVars.HIVE_LOAD_DATA_OWNER); // If source path is a subdirectory of the destination path (or the other way around): // ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300; @@ -3653,7 +3643,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, destFs.copyFromLocalFile(srcf, destf); return true; } else { - if (needToCopy(srcf, destf, srcFs, destFs)) { + if (needToCopy(srcf, destf, srcFs, destFs, configuredOwner, isManaged)) { //copy if across file system or encryption zones. LOG.debug("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); return FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf, @@ -3802,12 +3792,45 @@ static private HiveException getHiveException(Exception e, String msg, String lo * TODO- consider if need to do this for different file authority. * @throws HiveException */ - static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException { + static private boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, + FileSystem destFs, String configuredOwner, boolean isManaged) throws HiveException { //Check if different FileSystems if (!FileUtils.equalsFileSystem(srcFs, destFs)) { return true; } + if (isManaged && !configuredOwner.isEmpty() && srcFs instanceof DistributedFileSystem) { + // Need some extra checks + // Get the running owner + FileStatus srcs; + + try { + srcs = srcFs.getFileStatus(srcf); + String runningUser = UserGroupInformation.getLoginUser().getUserName(); + boolean isOwned = FileUtils.isOwnerOfFileHierarchy(srcFs, srcs, configuredOwner, false); + if (configuredOwner.equals(runningUser)) { + // Check if owner has write permission, else it will have to copy + if (!isOwned) { + return true; + } + } else { + // If the configured owner does not own the file, throw + if (!isOwned) { + throw new HiveException("Load Data failed for " + srcf + " as the file is not owned by " + + configuredOwner + " and load data is also not ran as " + configuredOwner); + } else { + return true; + } + } + } catch (IOException e) { + throw new HiveException("Could not fetch FileStatus for source file"); + } catch (HiveException e) { + throw new HiveException(e); + } catch (Exception e) { + throw new HiveException(" Failed in looking up Permissions on file + " + srcf); + } + } + //Check if different encryption zones HadoopShims.HdfsEncryptionShim srcHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(srcFs); HadoopShims.HdfsEncryptionShim destHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(destFs); @@ -3833,12 +3856,13 @@ static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, Fil * @param isOverwrite if true, then overwrite if destination file exist, else add a duplicate copy * @param newFiles if this is non-null, a list of files that were created as a result of this * move will be returned. + * @param isManaged if table is managed. * @throws HiveException */ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs, boolean isSrcLocal, boolean isAcidIUD, boolean isOverwrite, List newFiles, boolean isBucketed, - boolean isFullAcidTable) throws HiveException { + boolean isFullAcidTable, boolean isManaged) throws HiveException { try { // create the destination if it does not exist if (!fs.exists(destf)) { @@ -3874,7 +3898,7 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem // i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc. // The extension is only maintained for files which are compressed. copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, isOverwrite, - newFiles, isFullAcidTable && !isBucketed); + newFiles, isFullAcidTable && !isBucketed, isManaged); } } @@ -4030,10 +4054,12 @@ private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, F * If the source directory is LOCAL * @param newFiles * Output the list of new files replaced in the destination path + * @param isManaged + * If the table is managed. */ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, boolean isSrcLocal, boolean purge, List newFiles, PathFilter deletePathFilter, - boolean isNeedRecycle) throws HiveException { + boolean isNeedRecycle, boolean isManaged) throws HiveException { try { FileSystem destFs = destf.getFileSystem(conf); @@ -4070,7 +4096,7 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, // 2. srcs must be a list of files -- ensured by LoadSemanticAnalyzer // in both cases, we move the file under destf if (srcs.length == 1 && srcs[0].isDirectory()) { - if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) { + if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal, isManaged)) { throw new IOException("Error moving: " + srcf + " into: " + destf); } @@ -4082,7 +4108,7 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, // its either a file or glob for (FileStatus src : srcs) { Path destFile = new Path(destf, src.getPath().getName()); - if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal)) { + if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal, isManaged)) { throw new IOException("Error moving: " + srcf + " into: " + destf); } @@ -4334,7 +4360,7 @@ public synchronized IMetaStoreClient getMSC( return metaStoreClient; } - private String getUserName() { + private static String getUserName() { return SessionState.getUserFromAuthenticator(); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java index a20a2ae3ce..a0c23b632d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java @@ -83,7 +83,7 @@ public void testRenameNewFilesOnSameFileSystem() throws IOException { FileSystem targetFs = targetPath.getFileSystem(hiveConf); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false,null, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false,null, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -107,7 +107,7 @@ public void testRenameExistingFilesOnSameFileSystem() throws IOException { FileSystem targetFs = targetPath.getFileSystem(hiveConf); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -127,7 +127,7 @@ public void testRenameExistingFilesOnSameFileSystem() throws IOException { sourceFolder.newFile("000001_0.gz"); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -158,7 +158,7 @@ public void testCopyNewFilesOnDifferentFileSystem() throws IOException { Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + targetPath.toUri().getPath())); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -185,7 +185,7 @@ public void testCopyExistingFilesOnDifferentFileSystem() throws IOException { Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + targetPath.toUri().getPath())); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -205,7 +205,7 @@ public void testCopyExistingFilesOnDifferentFileSystem() throws IOException { sourceFolder.newFile("000001_0.gz"); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false);