diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index ec2f9f0ac8..0f4f9ad307 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -1065,4 +1065,25 @@ public static int getSleepTime(int repeatNum) { return IO_ERROR_SLEEP_TIME * (int)(Math.pow(2.0, repeatNum)); } + /** + * Updates the owner for all the files, recursively + * Provided a list of files, update the owner + * @param files List of files. + * @param owner New owner of the listed files. + */ + public static void updateOwnerForFiles(FileSystem fileSystem, List files, final String owner, + final String group) throws IOException { + for (Path path : files) { + FileStatus fileStatus = fileSystem.getFileStatus(path); + // update the owner + //String groupLocal = UserGroupInformation.createProxyUser(owner, + // UserGroupInformation.getLoginUser()).getPrimaryGroupName(); + String groupLocal = group; + if (groupLocal == null) { + groupLocal = fileStatus.getGroup().isEmpty() ? null : fileStatus.getGroup(); + } + fileSystem.setOwner(path, owner, groupLocal); + } + } + } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 933bda4ad0..762680b062 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1561,6 +1561,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_STRICT_CHECKS_BUCKETING("hive.strict.checks.bucketing", true, "Enabling strict bucketing checks disallows the following:\n" + " Load into bucketed tables."), + HIVE_LOAD_DATA_OWNER("hive.load.data.owner", "hive", + "Set the owner of files loaded using load data in managed tables."), @Deprecated HIVEMAPREDMODE("hive.mapred.mode", null, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 2ec131e274..0241ec6b4f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -86,6 +86,7 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; @@ -1727,16 +1728,9 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } else { newPartPath = oldPartPath; } - List newFiles = null; + List newFiles = Collections.synchronizedList(new ArrayList()); perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES); - // If config is set, table is not temporary and partition being inserted exists, capture - // the list of files added. For not yet existing partitions (insert overwrite to new partition - // or dynamic partition inserts), the add partition event will capture the list of files added. - if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) { - newFiles = Collections.synchronizedList(new ArrayList()); - } - // Note: the stats for ACID tables do not have any coordination with either Hive ACID logic // like txn commits, time outs, etc.; nor the lower level sync in metastore pertaining @@ -1792,13 +1786,13 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); + // If config is set, table is not temporary and partition being inserted exists, capture + // the list of files added. For not yet existing partitions (insert overwrite to new partition + // or dynamic partition inserts), the add partition event will capture the list of files added. // Generate an insert event only if inserting into an existing partition // When inserting into a new partition, the add partition event takes care of insert event - if ((null != oldPart) && (null != newFiles)) { + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) { fireInsertEvent(tbl, partSpec, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); - } else { - LOG.debug("No new files were created, and is not a replace, or we're inserting into a " - + "partition that does not exist yet. Skipping generating INSERT event."); } // column stats will be inaccurate @@ -1871,6 +1865,15 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } else { setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart); } + + // Update the owner to configured owner, default is Hive + if (newFiles.size() > 0) { + FileSystem tblFS = newFiles.get(0).getFileSystem(conf); + if (tblFS instanceof DistributedFileSystem) { + FileUtils.updateOwnerForFiles(tblFS, newFiles, + HiveConf.getVar(conf, ConfVars.HIVE_LOAD_DATA_OWNER), "hdfs"); + } + } perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION); return newTPart; } catch (IOException e) { @@ -2316,15 +2319,12 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_TABLE); - List newFiles = null; + List newFiles = Collections.synchronizedList(new ArrayList()); Table tbl = getTable(tableName); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl); boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); - if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { - newFiles = Collections.synchronizedList(new ArrayList()); - } // Note: this assumes both paths are qualified; which they are, currently. if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) { @@ -2406,8 +2406,25 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType alterTable(tbl, environmentContext); - fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { + fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); + } else { + fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), null); + } + + try { + // Update the owner to configured owner, default is Hive + if (newFiles != null && newFiles.size() > 0) { + FileSystem tblFS = newFiles.get(0).getFileSystem(conf); + if (tblFS instanceof DistributedFileSystem) { + FileUtils.updateOwnerForFiles(tblFS, newFiles, + HiveConf.getVar(conf, ConfVars.HIVE_LOAD_DATA_OWNER), "hdfs"); + } + } + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + } perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_TABLE); } @@ -3342,6 +3359,7 @@ private static void copyFiles(final HiveConf conf, final FileSystem destFs, int taskId = 0; // Sort the files Arrays.sort(srcs); + String owner = HiveConf.getVar(conf, ConfVars.HIVE_LOAD_DATA_OWNER); for (FileStatus src : srcs) { FileStatus[] files; if (src.isDirectory()) { @@ -3362,7 +3380,7 @@ private static void copyFiles(final HiveConf conf, final FileSystem destFs, Arrays.sort(files); for (final FileStatus srcFile : files) { final Path srcP = srcFile.getPath(); - final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs); + final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs, owner); final boolean isRenameAllowed = !needToCopy && !isSrcLocal; @@ -3620,6 +3638,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, } HdfsUtils.HadoopFileStatus destStatus = null; + String owner = HiveConf.getVar(conf, ConfVars.HIVE_LOAD_DATA_OWNER); // If source path is a subdirectory of the destination path (or the other way around): // ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300; @@ -3653,7 +3672,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, destFs.copyFromLocalFile(srcf, destf); return true; } else { - if (needToCopy(srcf, destf, srcFs, destFs)) { + if (needToCopy(srcf, destf, srcFs, destFs, owner)) { //copy if across file system or encryption zones. LOG.debug("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); return FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf, @@ -3802,11 +3821,29 @@ static private HiveException getHiveException(Exception e, String msg, String lo * TODO- consider if need to do this for different file authority. * @throws HiveException */ - static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException { + static private boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, + FileSystem destFs, String owner) throws HiveException { //Check if different FileSystems if (!FileUtils.equalsFileSystem(srcFs, destFs)) { return true; } + // Get filestatus to get the owner + FileStatus srcs; + try { + srcs = srcFs.getFileStatus(srcf); + // if owner does not have write permission, need to copy + if (!FileUtils.isActionPermittedForFileHierarchy(srcFs, srcs, owner, FsAction.WRITE, false)) { + return true; + } + + // If the configured owner does not own the file, need to copy + if (!FileUtils.isOwnerOfFileHierarchy(srcFs, srcs, owner, false)) { + return true; + } + } catch (Exception e) { + throw new HiveException("Could not fetch FileStatus for source file"); + } + //Check if different encryption zones HadoopShims.HdfsEncryptionShim srcHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(srcFs); diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q index adcf6962ab..bf79b0d5a0 100644 --- a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q +++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q @@ -20,7 +20,9 @@ load data local inpath '../../data/files/bmj/000001_0' INTO TABLE srcbucket_mapj load data local inpath '../../data/files/bmj/000002_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08'); load data local inpath '../../data/files/bmj/000003_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08'); +--dfs -ls -R ${hiveconf:hive.metastore.warehouse.dir}/srcbucket_mapjoin_n18; +--dfs -chown hive:hdfs ${hiveconf:hive.metastore.warehouse.dir}/srcbucket_mapjoin_n18/ds=2008-04-08/000000_0; set hive.optimize.bucketingsorting=false; insert overwrite table tab_part_n11 partition (ds='2008-04-08')