diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestFolderPermissions.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestFolderPermissions.java index 6cc2d18..9c09c1a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestFolderPermissions.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/TestFolderPermissions.java @@ -34,9 +34,12 @@ public static void setup() throws Exception { baseSetup(); } + // assume the umask=022, ^umask is 755 + // the mode of the new file is P & ^umask + // the mode of the new directory is 0777 & ^umask public FsPermission[] expected = new FsPermission[] { - FsPermission.createImmutable((short) 0777), - FsPermission.createImmutable((short) 0766) + FsPermission.createImmutable((short) 0644), + FsPermission.createImmutable((short) 0511) }; @Override diff --git a/orc/src/java/org/apache/orc/OrcFile.java b/orc/src/java/org/apache/orc/OrcFile.java index 7dd7333..fca3618 100644 --- a/orc/src/java/org/apache/orc/OrcFile.java +++ b/orc/src/java/org/apache/orc/OrcFile.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.orc.impl.MemoryManager; import org.apache.orc.impl.ReaderImpl; import org.apache.orc.impl.WriterImpl; @@ -233,6 +234,7 @@ public static Reader createReader(Path path, public static class WriterOptions { private final Configuration configuration; private FileSystem fileSystemValue = null; + private FsPermission permission; private TypeDescription schema = null; private long stripeSizeValue; private long blockSizeValue; @@ -416,6 +418,14 @@ public WriterOptions version(Version value) { } /** + * Sets the permission of the file that will be written. + */ + public WriterOptions permission(FsPermission fsPermission) { + permission = fsPermission; + return this; + } + + /** * Add a listener for when the stripe and file are about to be closed. * @param callback the object to be called when the stripe is closed * @return this @@ -504,6 +514,10 @@ public double getPaddingTolerance() { public double getBloomFilterFpp() { return bloomFilterFpp; } + + public FsPermission getFsPermission() { + return permission; + } } /** diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java index b2966e0..a617404 100644 --- a/orc/src/java/org/apache/orc/impl/WriterImpl.java +++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java @@ -51,6 +51,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -102,6 +103,7 @@ private final FileSystem fs; private final Path path; + private final FsPermission permission; private final long defaultStripeSize; private long adjustedStripeSize; private final int rowIndexStride; @@ -152,6 +154,7 @@ public WriterImpl(FileSystem fs, OrcFile.WriterOptions opts) throws IOException { this.fs = fs; this.path = path; + this.permission = opts.getFsPermission(); this.conf = opts.getConfiguration(); this.callback = opts.getCallback(); this.schema = opts.getSchema(); @@ -2465,8 +2468,13 @@ private static void writeTypes(OrcProto.Footer.Builder builder, @VisibleForTesting public FSDataOutputStream getStream() throws IOException { if (rawWriter == null) { - rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE, - fs.getDefaultReplication(path), blockSize); + if (permission == null) { + rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE, fs.getDefaultReplication(path), + blockSize); + } else { + rawWriter = fs.create(path, permission, false, HDFS_BUFFER_SIZE, + fs.getDefaultReplication(path), blockSize, null); + } rawWriter.writeBytes(OrcFile.MAGIC); headerLength = rawWriter.getPos(); writer = new OutStream("metadata", bufferSize, codec, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 0ac9109..c68083b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -36,15 +36,18 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.io.HdfsUtils; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveOutputFormatWithPermission; import org.apache.hadoop.hive.ql.io.HivePartitioner; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; @@ -73,10 +76,13 @@ import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; +import org.apache.hadoop.hive.shims.HiveHarFileSystem; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.common.util.HiveStringUtils; import org.slf4j.Logger; @@ -150,6 +156,7 @@ Stat stat; int acidLastBucket = -1; int acidFileOffset = -1; + String dirName; public FSPaths(Path specPath) { tmpPath = Utilities.toTempPath(specPath); @@ -234,8 +241,11 @@ private void commit(FileSystem fs) throws HiveException { } } if (needToRename && !fs.rename(outPaths[idx], finalPaths[idx])) { - throw new HiveException("Unable to rename output from: " + - outPaths[idx] + " to: " + finalPaths[idx]); + throw new HiveException("Unable to rename output from: " + + outPaths[idx] + " to: " + finalPaths[idx]); + } + if (partitionPermSet.containsKey(finalPaths[idx])) { + fs.setPermission(finalPaths[idx].getParent(), partitionPermSet.get(finalPaths[idx])); } updateProgress(); } catch (IOException e) { @@ -264,6 +274,14 @@ public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws Hi public Stat getStat() { return stat; } + + public String getDirName() { + return dirName; + } + + public void setDirName(String dirName) { + this.dirName = dirName; + } } // class FSPaths private static final long serialVersionUID = 1L; @@ -572,6 +590,46 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { filesCreated = true; } + // build a small cache so that when a partition is asking for the + // FsPermission, it may hit the cache. + private Map fsPermCache = new HashMap<>(); + + // build another map to map the partition to its permission + private Map partitionPermSet = new HashMap<>(); + + private FsPermission getPermission(Path tblPath, String dirName) throws IOException { + // it is possible that table/part=1 does not exist + // but table/part=2 has its own permission. + Path partPath = new Path(tblPath, dirName); + // first try partition path + if (fsPermCache.containsKey(partPath)) { + return fsPermCache.get(partPath); + } + FsPermission destPerm = null; + try { + HdfsUtils.HadoopFileStatus destStatus = new HdfsUtils.HadoopFileStatus(hconf, + tblPath.getFileSystem(hconf), partPath); + FileStatus fStatus = destStatus.getFileStatus(); + destPerm = fStatus.getPermission(); + fsPermCache.put(partPath, destPerm); + } catch (FileNotFoundException exception) { + // second try table path + final boolean inheritPerms = HiveConf.getBoolVar(hconf, + HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); + if (inheritPerms) { + if (fsPermCache.containsKey(tblPath)) { + return fsPermCache.get(tblPath); + } + HdfsUtils.HadoopFileStatus destStatus = new HdfsUtils.HadoopFileStatus(hconf, + tblPath.getFileSystem(hconf), tblPath); + FileStatus fStatus = destStatus.getFileStatus(); + destPerm = fStatus.getPermission(); + fsPermCache.put(tblPath, destPerm); + } + } + return destPerm; + } + protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException { try { @@ -611,8 +669,24 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) // only create bucket files only if no dynamic partitions, // buckets of dynamic partitions will be created for each newly created partition if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) { - fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(), - outputClass, conf, fsp.outPaths[filesIdx], reporter); + if (conf.isSkipPermission()) { + FsPermission permission = getPermission(this.conf.getDestPath(), fsp.getDirName()); + fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, + conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx], permission, reporter); + LOG.info("fsp.outPaths[filesIdx] is set as " + permission); + // We need to set the partition's permission as well. + // Consider a fsp.outPaths = + // ./_task_tmp.-ext-10000/p1=5/_tmp.000000_0. + // Previous command sets the permission for _tmp.000000_0 only. + // We need to set the permission for p1=5 as well. + if (!partitionPermSet.containsKey(fsp.finalPaths[filesIdx])) { + partitionPermSet.put(fsp.finalPaths[filesIdx], permission); + } + } else { + fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, + conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx], null, reporter); + } + // If the record writer provides stats, get it from there instead of the serde statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter; @@ -846,6 +920,7 @@ protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveExceptio */ private FSPaths createNewPaths(String dirName) throws HiveException { FSPaths fsp2 = new FSPaths(specPath); + fsp2.setDirName(dirName); if (childSpecPathDynLinkedPartitions != null) { fsp2.tmpPath = new Path(fsp2.tmpPath, dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index aeaae6b..f2c36f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveOutputFormatWithPermission; import org.apache.hadoop.hive.ql.io.merge.MergeFileTask; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; @@ -58,6 +59,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; +import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -82,7 +84,7 @@ public MoveTask() { super(); } - private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir) + private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir, boolean skipPermission) throws HiveException { try { String mesg = "Moving data to " + (isDfsDir ? "" : "local ") + "directory " @@ -92,7 +94,7 @@ private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir) FileSystem fs = sourcePath.getFileSystem(conf); if (isDfsDir) { - moveFileInDfs (sourcePath, targetPath, fs); + moveFileInDfs (sourcePath, targetPath, fs, skipPermission); } else { // This is a local file FileSystem dstFs = FileSystem.getLocal(conf); @@ -104,7 +106,7 @@ private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir) } } - private void moveFileInDfs (Path sourcePath, Path targetPath, FileSystem fs) + private void moveFileInDfs (Path sourcePath, Path targetPath, FileSystem fs, boolean skipPermission) throws HiveException, IOException { // if source exists, rename. Otherwise, create a empty directory if (fs.exists(sourcePath)) { @@ -114,7 +116,7 @@ private void moveFileInDfs (Path sourcePath, Path targetPath, FileSystem fs) if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) { deletePath = createTargetPath(targetPath, fs); } - if (!Hive.moveFile(conf, sourcePath, targetPath, true, false)) { + if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, skipPermission)) { try { if (deletePath != null) { fs.delete(deletePath, true); @@ -243,7 +245,7 @@ public int execute(DriverContext driverContext) { if (lfd != null) { Path targetPath = lfd.getTargetDir(); Path sourcePath = lfd.getSourcePath(); - moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + moveFile(sourcePath, targetPath, lfd.getIsDfsDir(), lfd.isSkipPermission()); } // Multi-file load is for dynamic partitions when some partitions do not @@ -259,7 +261,7 @@ public int execute(DriverContext driverContext) { if (!fs.exists(destPath.getParent())) { fs.mkdirs(destPath.getParent()); } - moveFile(srcPath, destPath, isDfsDir); + moveFile(srcPath, destPath, isDfsDir, false); i++; } } @@ -420,6 +422,7 @@ public int execute(DriverContext driverContext) { // iterate over it and call loadPartition() here. // The reason we don't do inside HIVE-1361 is the latter is large and we // want to isolate any potential issue it may introduce. + Map, Partition> dp = db.loadDynamicPartitions( tbd.getSourcePath(), @@ -428,9 +431,9 @@ public int execute(DriverContext driverContext) { tbd.getReplace(), dpCtx.getNumDPCols(), isSkewedStoredAsDirs(tbd), - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, + tbd.getWriteType() != AcidUtils.Operation.NOT_ACID, SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(), - work.getLoadTableWork().getWriteType()); + tbd.getWriteType(), tbd.isSkipPermission()); console.printInfo("\t Time taken to load dynamic partitions: " + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 528d663..3d7d0f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -96,6 +96,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.io.HdfsUtils; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -1005,14 +1006,14 @@ public static String getFileExtension(JobConf jc, boolean isCompressed, * Path to be created * @return output stream over the created rcfile */ - public static RCFile.Writer createRCFileWriter(JobConf jc, FileSystem fs, Path file, + public static RCFile.Writer createRCFileWriter(JobConf jc, FileSystem fs, Path file, FsPermission permission, boolean isCompressed, Progressable progressable) throws IOException { CompressionCodec codec = null; if (isCompressed) { Class codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); codec = (CompressionCodec) ReflectionUtil.newInstance(codecClass, jc); } - return new RCFile.Writer(fs, jc, file, progressable, codec); + return new RCFile.Writer(fs, jc, file, permission, progressable, codec); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java index 31788dd..63263d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; @@ -38,7 +39,7 @@ * separators. It can be used to create a binary data file. */ public class HiveBinaryOutputFormat - extends TextOutputFormat implements HiveOutputFormat { + extends TextOutputFormat implements HiveOutputFormatWithPermission { /** * create the final out file, and output row by row. After one row is @@ -57,14 +58,49 @@ * @param progress * progress used for status report * @return the RecordWriter + * @deprecated Use {@link #getHiveRecordWriter(JobConf,Path,FsPermission,Class,boolean,Properties,Progressable)} instead */ @Override public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { + return getHiveRecordWriter(jc, outPath, null, valueClass, isCompressed, + tableProperties, progress); + } + + /** + * create the final out file, and output row by row. After one row is + * appended, a configured row separator is appended + * + * @param jc + * the job configuration file + * @param valueClass + * the value class used for create + * @param isCompressed + * ignored. Currently we don't support compression. + * @param tableProperties + * the tableProperties of this file's corresponding table + * @param progress + * progress used for status report + * @param outPath + * the final output file to be created + * @return the RecordWriter + */ + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, + FsPermission permission, Class valueClass, + boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { FileSystem fs = outPath.getFileSystem(jc); - final OutputStream outStream = fs.create(outPath, progress); + OutputStream createOutStream = null; + if (permission == null) { + createOutStream = fs.create(outPath, progress); + } else { + createOutStream = fs.create(outPath, permission, true, + fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(outPath), + fs.getDefaultBlockSize(outPath), progress); + } + final OutputStream outStream = createOutStream; return new RecordWriter() { @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 6bb5efa..f83c68a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -247,7 +248,7 @@ private static boolean isPipe(FileSystem fs, FileStatus file) { public static RecordWriter getHiveRecordWriter(JobConf jc, TableDesc tableInfo, Class outputClass, - FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException { + FileSinkDesc conf, Path outPath, FsPermission permission, Reporter reporter) throws HiveException { HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, tableInfo); try { boolean isCompressed = conf.getCompressed(); @@ -267,7 +268,7 @@ public static RecordWriter getHiveRecordWriter(JobConf jc, } } return getRecordWriter(jc_output, hiveOutputFormat, outputClass, - isCompressed, tableInfo.getProperties(), outPath, reporter); + isCompressed, tableInfo.getProperties(), outPath, permission, reporter); } catch (Exception e) { throw new HiveException(e); } @@ -278,11 +279,22 @@ public static RecordWriter getRecordWriter(JobConf jc, Class valueClass, boolean isCompressed, Properties tableProp, Path outPath, Reporter reporter ) throws IOException, HiveException { + return getRecordWriter(jc, outputFormat, valueClass, isCompressed, tableProp, outPath, null, reporter); + } + + public static RecordWriter getRecordWriter(JobConf jc, OutputFormat outputFormat, + Class valueClass, boolean isCompressed, Properties tableProp, + Path outPath, FsPermission permission, Reporter reporter) throws IOException, HiveException { if (!(outputFormat instanceof HiveOutputFormat)) { outputFormat = new HivePassThroughOutputFormat(outputFormat); } - return ((HiveOutputFormat)outputFormat).getHiveRecordWriter( - jc, outPath, valueClass, isCompressed, tableProp, reporter); + if (outputFormat instanceof HiveOutputFormatWithPermission && permission != null) { + return ((HiveOutputFormatWithPermission) outputFormat).getHiveRecordWriter(jc, outPath, + permission, valueClass, isCompressed, tableProp, reporter); + } else { + return ((HiveOutputFormat) outputFormat).getHiveRecordWriter(jc, outPath, valueClass, + isCompressed, tableProp, reporter); + } } public static HiveOutputFormat getHiveOutputFormat(Configuration conf, TableDesc tableDesc) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java index 9ad7f37..e356f75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde.serdeConstants; @@ -42,7 +43,7 @@ * */ public class HiveIgnoreKeyTextOutputFormat - extends TextOutputFormat implements HiveOutputFormat { + extends TextOutputFormat implements HiveOutputFormatWithPermission { /** * create the final out file, and output row by row. After one row is @@ -61,11 +62,38 @@ * @param progress * progress used for status report * @return the RecordWriter + * @deprecated Use {@link #getHiveRecordWriter(JobConf,Path,FsPermission,Class,boolean,Properties,Progressable)} instead */ @Override public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { + return getHiveRecordWriter(jc, outPath, null, valueClass, isCompressed, + tableProperties, progress); + } + + /** + * create the final out file, and output row by row. After one row is + * appended, a configured row separator is appended + * + * @param jc + * the job configuration file + * @param valueClass + * the value class used for create + * @param isCompressed + * whether the content is compressed or not + * @param tableProperties + * the tableProperties of this file's corresponding table + * @param progress + * progress used for status report + * @param outPath + * the final output file to be created + * @return the RecordWriter + */ + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, + FsPermission permission, Class valueClass, + boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { int rowSeparator = 0; String rowSeparatorString = tableProperties.getProperty( serdeConstants.LINE_DELIM, "\n"); @@ -77,8 +105,17 @@ public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, final int finalRowSeparator = rowSeparator; FileSystem fs = outPath.getFileSystem(jc); - final OutputStream outStream = Utilities.createCompressedStream(jc, - fs.create(outPath, progress), isCompressed); + OutputStream createOutStream = null; + if (permission == null) { + createOutStream = Utilities.createCompressedStream(jc, fs.create(outPath, progress), isCompressed); + } else { + createOutStream = Utilities.createCompressedStream( + jc, + fs.create(outPath, permission, true, fs.getConf().getInt("io.file.buffer.size", 4096), + fs.getDefaultReplication(outPath), fs.getDefaultBlockSize(outPath), progress), + isCompressed); + } + final OutputStream outStream = createOutStream; return new RecordWriter() { @Override public void write(Writable r) throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormatWithPermission.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormatWithPermission.java new file mode 100644 index 0000000..0bff37a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormatWithPermission.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.Progressable; + +public interface HiveOutputFormatWithPermission extends HiveOutputFormat { + + /** + * create the final out file and get some specific settings. + * + * @param jc + * the job configuration file + * @param finalOutPath + * the final output file to be created + * @param permission + * the permission for the final output file + * @param valueClass + * the value class used for create + * @param isCompressed + * whether the content is compressed or not + * @param tableProperties + * the table properties of this file's corresponding table + * @param progress + * progress used for status report + * @return the RecordWriter for the output file + */ + RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + FsPermission permission, final Class valueClass, + boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException; + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java index 5855288..d4c726e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -33,7 +34,7 @@ * This pass through class is used to wrap OutputFormat implementations such that new OutputFormats not derived from * HiveOutputFormat gets through the checker */ -public class HivePassThroughOutputFormat implements HiveOutputFormat{ +public class HivePassThroughOutputFormat implements HiveOutputFormatWithPermission{ private final OutputFormat actualOutputFormat; @@ -53,16 +54,34 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException job, name, progress); } + /** + * @deprecated Use {@link #getHiveRecordWriter(JobConf,Path,FsPermission,Class,boolean,Properties,Progressable)} instead + */ @Override public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { - if (actualOutputFormat instanceof HiveOutputFormat) { - return ((HiveOutputFormat) actualOutputFormat).getHiveRecordWriter(jc, - finalOutPath, valueClass, isCompressed, tableProperties, progress); + return getHiveRecordWriter(jc, finalOutPath, null, valueClass, isCompressed, + tableProperties, progress); + } + + @Override + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( + JobConf jc, Path finalOutPath, FsPermission permission, Class valueClass, + boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { + if (actualOutputFormat instanceof HiveOutputFormatWithPermission) { + return ((HiveOutputFormatWithPermission) actualOutputFormat).getHiveRecordWriter(jc, + finalOutPath, permission, valueClass, isCompressed, tableProperties, progress); + } else if (actualOutputFormat instanceof HiveOutputFormat) { + return ((HiveOutputFormat) actualOutputFormat).getHiveRecordWriter(jc, finalOutPath, + valueClass, isCompressed, tableProperties, progress); } FileSystem fs = finalOutPath.getFileSystem(jc); RecordWriter recordWriter = actualOutputFormat.getRecordWriter(fs, jc, null, progress); return new HivePassThroughRecordWriter(recordWriter); } + + public OutputFormat getActualOutputFormat() { + return actualOutputFormat; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java index d391164..6ba2679 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; @@ -946,6 +947,11 @@ public Writer(FileSystem fs, Configuration conf, Path name, this(fs, conf, name, progress, new Metadata(), codec); } + public Writer(FileSystem fs, Configuration conf, Path name, FsPermission permission, + Progressable progress, CompressionCodec codec) throws IOException { + this(fs, conf, name, permission, progress, new Metadata(), codec); + } + /** * Constructs a RCFile Writer. * @@ -959,12 +965,16 @@ public Writer(FileSystem fs, Configuration conf, Path name, * @param metadata a string to string map in the file header * @throws IOException */ - public Writer(FileSystem fs, Configuration conf, Path name, + public Writer(FileSystem fs, Configuration conf, Path name, Progressable progress, + Metadata metadata, CompressionCodec codec) throws IOException { + this(fs, conf, name, null, progress, metadata, codec); + } + + public Writer(FileSystem fs, Configuration conf, Path name, FsPermission permission, Progressable progress, Metadata metadata, CompressionCodec codec) throws IOException { - this(fs, conf, name, fs.getConf().getInt("io.file.buffer.size", 4096), - ShimLoader.getHadoopShims().getDefaultReplication(fs, name), - ShimLoader.getHadoopShims().getDefaultBlockSize(fs, name), progress, - metadata, codec); + this(fs, conf, name, permission, fs.getConf().getInt("io.file.buffer.size", 4096), ShimLoader + .getHadoopShims().getDefaultReplication(fs, name), ShimLoader.getHadoopShims() + .getDefaultBlockSize(fs, name), progress, metadata, codec); } /** @@ -984,7 +994,7 @@ public Writer(FileSystem fs, Configuration conf, Path name, * @param metadata a string to string map in the file header * @throws IOException */ - public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize, + public Writer(FileSystem fs, Configuration conf, Path name, FsPermission permission, int bufferSize, short replication, long blockSize, Progressable progress, Metadata metadata, CompressionCodec codec) throws IOException { RECORD_INTERVAL = HiveConf.getIntVar(conf, HIVE_RCFILE_RECORD_INTERVAL); @@ -1006,8 +1016,13 @@ public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize, columnBuffers[i] = new ColumnBuffer(); } - init(conf, fs.create(name, true, bufferSize, replication, - blockSize, progress), codec, metadata); + if (permission == null) { + init(conf, fs.create(name, true, bufferSize, replication, blockSize, progress), codec, + metadata); + } else { + init(conf, fs.create(name, permission, true, bufferSize, replication, blockSize, progress), codec, + metadata); + } initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java index 479f198..531f613 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; @@ -45,7 +46,7 @@ */ public class RCFileOutputFormat extends FileOutputFormat implements - HiveOutputFormat { + HiveOutputFormatWithPermission { /** * set number of columns into the given configuration. @@ -117,11 +118,37 @@ public void write(WritableComparable key, BytesRefArrayWritable value) * @param progress * progress used for status report * @throws IOException + * @deprecated Use {@link #getHiveRecordWriter(JobConf,Path,FsPermission,Class,boolean,Properties,Progressable)} instead */ @Override public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { + return getHiveRecordWriter(jc, finalOutPath, null, valueClass, isCompressed, + tableProperties, progress); + } + + /** + * create the final out file. + * + * @param jc + * the job configuration file + * @param finalOutPath + * the final output file to be created + * @param valueClass + * the value class used for create + * @param isCompressed + * whether the content is compressed or not + * @param tableProperties + * the tableInfo of this file's corresponding table + * @param progress + * progress used for status report + * @throws IOException + */ + @Override + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( + JobConf jc, Path finalOutPath, FsPermission permission, + Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { String[] cols = null; String columns = tableProperties.getProperty("columns"); @@ -133,7 +160,7 @@ public void write(WritableComparable key, BytesRefArrayWritable value) RCFileOutputFormat.setColumnNumber(jc, cols.length); final RCFile.Writer outWriter = Utilities.createRCFileWriter(jc, - finalOutPath.getFileSystem(jc), finalOutPath, isCompressed, progress); + finalOutPath.getFileSystem(jc), finalOutPath, permission, isCompressed, progress); return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() { @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java index 59d3bba..1487573 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java @@ -34,8 +34,10 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveOutputFormatWithPermission; import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; @@ -50,14 +52,25 @@ * Write to an Avro file from a Hive process. */ public class AvroContainerOutputFormat - implements HiveOutputFormat { + implements HiveOutputFormatWithPermission { public static final Logger LOG = LoggerFactory.getLogger(AvroContainerOutputFormat.class); + /** + * @deprecated Use {@link #getHiveRecordWriter(JobConf,Path,FsPermission,Class,boolean,Properties,Progressable)} instead + */ @Override public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Path path, Class valueClass, boolean isCompressed, Properties properties, Progressable progressable) throws IOException { + return getHiveRecordWriter(jobConf, path, null, valueClass, isCompressed, + properties, progressable); + } + + @Override + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, + Path path, FsPermission permission, Class valueClass, + boolean isCompressed, Properties properties, Progressable progressable) throws IOException { Schema schema; try { schema = AvroSerdeUtils.determineSchemaOrThrowException(jobConf, properties); @@ -76,7 +89,11 @@ dfw.setCodec(factory); } - dfw.create(schema, path.getFileSystem(jobConf).create(path)); + if (permission == null) { + dfw.create(schema, path.getFileSystem(jobConf).create(path)); + } else { + dfw.create(schema, FileSystem.create(path.getFileSystem(jobConf), path, permission)); + } return new AvroGenericRecordWriter(dfw); } @@ -110,7 +127,7 @@ public WrapperRecordWriter(JobConf jobConf, Progressable progressable, String fi path = new Path(path,"_dummy"); } - this.hiveWriter = getHiveRecordWriter(jobConf,path,null,isCompressed, properties, progressable); + this.hiveWriter = getHiveRecordWriter(jobConf,path,null,null, isCompressed, properties, progressable); } return this.hiveWriter; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index b0f8c8b..0a8d4ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -30,8 +30,11 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveOutputFormatWithPermission; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; @@ -65,7 +68,7 @@ * A Hive OutputFormat for ORC files. */ public class OrcOutputFormat extends FileOutputFormat - implements AcidOutputFormat { + implements AcidOutputFormat, HiveOutputFormatWithPermission { private static final Logger LOG = LoggerFactory.getLogger(OrcOutputFormat.class); @@ -178,6 +181,9 @@ public SerDeStats getStats() { } + /** + * @deprecated Use {@link #getHiveRecordWriter(JobConf,Path,FsPermission,Class,boolean,Properties,Progressable)} instead + */ @Override public StatsProvidingRecordWriter getHiveRecordWriter(JobConf conf, @@ -186,7 +192,21 @@ public SerDeStats getStats() { boolean isCompressed, Properties tableProperties, Progressable reporter) throws IOException { - return new OrcRecordWriter(path, getOptions(conf, tableProperties)); + return getHiveRecordWriter(conf, path, null, valueClass, + isCompressed, tableProperties, reporter); + } + + @Override + public StatsProvidingRecordWriter + getHiveRecordWriter(JobConf conf, + Path path, + FsPermission permission, + Class valueClass, + boolean isCompressed, + Properties tableProperties, Progressable reporter) throws IOException { + OrcFile.WriterOptions options = getOptions(conf, tableProperties); + options.permission(permission); + return new OrcRecordWriter(path, options); } private class DummyOrcRecordUpdater implements RecordUpdater { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 2b01fce..fbd4d90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -127,6 +127,7 @@ import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; +import org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.booleanValue_return; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; @@ -1483,10 +1484,16 @@ public void loadPartition(Path loadPath, String tableName, * If the source directory is LOCAL * @param isAcid true if this is an ACID operation */ + public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, + boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, + boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { + return loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs, + isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, false); + } public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, - boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { + boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, boolean skipPermission) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); try { /** @@ -1530,14 +1537,14 @@ public Partition loadPartition(Path loadPath, Table tbl, List newFiles = null; if (replace || (oldPart == null && !isAcid)) { replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), - isSrcLocal); + isSrcLocal, skipPermission); } else { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { newFiles = Collections.synchronizedList(new ArrayList()); } FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); + Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles, skipPermission); } Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); @@ -1696,7 +1703,7 @@ private void constructOneLBLocationMap(FileStatus fSta, public Map, Partition> loadDynamicPartitions(Path loadPath, String tableName, Map partSpec, boolean replace, int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, boolean hasFollowingStatsTask, - AcidUtils.Operation operation) + AcidUtils.Operation operation, boolean skipPermission) throws HiveException { Set validPartitions = new HashSet(); @@ -1746,7 +1753,7 @@ private void constructOneLBLocationMap(FileStatus fSta, LinkedHashMap fullPartSpec = new LinkedHashMap(partSpec); Warehouse.makeSpecFromName(fullPartSpec, partPath); Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, - true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); + true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask, skipPermission); partitionsMap.put(fullPartSpec, newPartition); if (inPlaceEligible) { InPlaceUpdates.rePositionCursor(ps); @@ -2630,9 +2637,15 @@ public PrincipalPrivilegeSet get_privilege_set(HiveObjectType objectType, } } - private static void copyFiles(final HiveConf conf, final FileSystem destFs, - FileStatus[] srcs, final FileSystem srcFs, final Path destf, final boolean isSrcLocal, final List newFiles) - throws HiveException { + private static void copyFiles(final HiveConf conf, final FileSystem destFs, FileStatus[] srcs, + final FileSystem srcFs, final Path destf, final boolean isSrcLocal, final List newFiles) + throws HiveException { + copyFiles(conf, destFs, srcs, srcFs, destf, isSrcLocal, newFiles, false); + } + + private static void copyFiles(final HiveConf conf, final FileSystem destFs, FileStatus[] srcs, + final FileSystem srcFs, final Path destf, final boolean isSrcLocal, + final List newFiles, final boolean skipPermission) throws HiveException { final HdfsUtils.HadoopFileStatus fullDestStatus; try { @@ -2720,7 +2733,7 @@ private static void copyFiles(final HiveConf conf, final FileSystem destFs, } if (inheritPerms) { - HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false); + HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false, skipPermission); } if (null != newFiles) { newFiles.add(destPath); @@ -2822,8 +2835,13 @@ private static Path mvFile(HiveConf conf, Path srcf, Path destf, boolean isSrcLo //method is called. when the replace value is true, this method works a little different //from mv command if the destf is a directory, it replaces the destf instead of moving under //the destf. in this case, the replaced destf still preserves the original destf's permission + public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace, + boolean isSrcLocal) throws HiveException { + return moveFile(conf, srcf, destf, replace, isSrcLocal, false); + } + public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, - boolean replace, boolean isSrcLocal) throws HiveException { + boolean replace, boolean isSrcLocal, final boolean skipPermission) throws HiveException { final FileSystem srcFs, destFs; try { destFs = destf.getFileSystem(conf); @@ -2877,7 +2895,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, destFs.copyFromLocalFile(srcf, destf); if (inheritPerms) { try { - HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true); + HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true, skipPermission); } catch (IOException e) { LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e); } @@ -2916,7 +2934,7 @@ public Void call() throws Exception { final String group = srcStatus.getGroup(); if(destFs.rename(srcStatus.getPath(), destf)) { if (inheritPerms) { - HdfsUtils.setFullFileStatus(conf, desiredStatus, group, destFs, destPath, false); + HdfsUtils.setFullFileStatus(conf, desiredStatus, group, destFs, destPath, false, skipPermission); } } else { throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest path:" @@ -2947,7 +2965,7 @@ public Void call() throws Exception { } else { if (destFs.rename(srcf, destf)) { if (inheritPerms) { - HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true); + HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true, skipPermission); } return true; } @@ -2994,8 +3012,13 @@ static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, Fil * move will be returned. * @throws HiveException */ + static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs, + boolean isSrcLocal, boolean isAcid, List newFiles) throws HiveException { + copyFiles(conf, srcf, destf, fs, isSrcLocal, isAcid, newFiles, false); + } + static protected void copyFiles(HiveConf conf, Path srcf, Path destf, - FileSystem fs, boolean isSrcLocal, boolean isAcid, List newFiles) throws HiveException { + FileSystem fs, boolean isSrcLocal, boolean isAcid, List newFiles, boolean skipPermission) throws HiveException { boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); try { @@ -3029,7 +3052,7 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, if (isAcid) { moveAcidFiles(srcFs, srcs, destf, newFiles); } else { - copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles); + copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles, skipPermission); } } @@ -3129,7 +3152,12 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, * If the source directory is LOCAL */ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, - boolean isSrcLocal) throws HiveException { + boolean isSrcLocal) throws HiveException { + replaceFiles(tablePath, srcf, destf, oldPath, conf, isSrcLocal, false); + } + + protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, + boolean isSrcLocal, boolean skipPermission) throws HiveException { try { FileSystem destFs = destf.getFileSystem(conf); @@ -3195,7 +3223,7 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, // 2. srcs must be a list of files -- ensured by LoadSemanticAnalyzer // in both cases, we move the file under destf if (srcs.length == 1 && srcs[0].isDirectory()) { - if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) { + if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal, skipPermission)) { throw new IOException("Error moving: " + srcf + " into: " + destf); } } else { // its either a file or glob diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 7595065..c86636f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1310,7 +1310,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, // 2. Constructing a conditional task consisting of a move task and a map reduce task // MoveWork dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); + new LoadFileDesc(fsInputDesc.isSkipPermission(), fsInputDesc.getFinalDirName(), finalName, true, null, null), false); MapWork cplan; Serializable work; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index cd3c860..699e7dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.StatsSetupConst.StatDB; @@ -104,8 +105,10 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveOutputFormatWithPermission; import org.apache.hadoop.hive.ql.io.NullRowsInputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; @@ -7021,15 +7024,36 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) + dest_path + " row schema: " + inputRR.toString()); } - FileSinkOperator fso = (FileSinkOperator) output; - fso.getConf().setTable(dest_tab); - fsopToTable.put(fso, dest_tab); + fileSinkDesc.setTable(dest_tab); + fsopToTable.put((FileSinkOperator) output, dest_tab); + + if (dest_tab != null && fileSinkDesc.getWriteType() == AcidUtils.Operation.NOT_ACID) { + HiveOutputFormat hiveOutputFormat; + try { + hiveOutputFormat = HiveFileFormatUtils.getHiveOutputFormat(conf, + fileSinkDesc.getTableInfo()); + } catch (HiveException e) { + throw new SemanticException(e); + } + // HiveHFileOutputFormat does not work for skip permission. + // As sequencefile creates file inside the package org.apache.hadoop.io, + // HiveNullValueSequenceFileOutputFormat, HiveSequenceFileOutputFormat does not work. + // MapredParquetOutputFormat does not work. + // We do not care about MockOutputFormat and TFSOOutputFormat as they are for testing purpose. + if (hiveOutputFormat instanceof HiveOutputFormatWithPermission + && (fileSinkDesc.getDynPartCtx() != null && fileSinkDesc.getDynPartCtx().getNumDPCols() > 0) + && fileSinkDesc.getDirName() != null) { + ltd.setSkipPermission(true); + fileSinkDesc.setSkipPermission(true); + } + } + // the following code is used to collect column stats when // hive.stats.autogather=true // and it is an insert overwrite or insert into table if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER) - && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) { + && ColumnStatsAutoGatherContext.canRunAutogatherStats(output)) { if (dest_type.intValue() == QBMetaData.DEST_TABLE) { genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo() .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 07ed4fd..9cd2688 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -96,6 +96,7 @@ private transient Table table; private Path destPath; private boolean isHiveServerQuery; + private boolean skipPermission; public FileSinkDesc() { } @@ -474,4 +475,12 @@ public void setStatsTmpDir(String statsCollectionTempDir) { this.statsTmpDir = statsCollectionTempDir; } + public boolean isSkipPermission() { + return skipPermission; + } + + public void setSkipPermission(boolean skipPermission) { + this.skipPermission = skipPermission; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java index df153a2..3858f40 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java @@ -35,6 +35,7 @@ private String columns; private String columnTypes; private String destinationCreateTable; + private boolean skipPermission; public LoadFileDesc() { } @@ -61,6 +62,16 @@ public LoadFileDesc(final Path sourcePath, final Path targetDir, this.columnTypes = columnTypes; } + public LoadFileDesc(final boolean skipPermission, final Path sourcePath, final Path targetDir, + final boolean isDfsDir, final String columns, final String columnTypes) { + super(sourcePath); + this.targetDir = targetDir; + this.isDfsDir = isDfsDir; + this.columns = columns; + this.columnTypes = columnTypes; + this.skipPermission = skipPermission; + } + @Explain(displayName = "destination") public Path getTargetDir() { return targetDir; @@ -115,4 +126,12 @@ public void setColumnTypes(String columnTypes) { public String getDestinationCreateTable(){ return destinationCreateTable; } + + public boolean isSkipPermission() { + return skipPermission; + } + + public void setSkipPermission(boolean skipPermission) { + this.skipPermission = skipPermission; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 771a919..4972ab1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -45,6 +45,7 @@ // TODO: the below seems like they should just be combined into partitionDesc private org.apache.hadoop.hive.ql.plan.TableDesc table; private Map partitionSpec; // NOTE: this partitionSpec has to be ordered map + private boolean skipPermission; public LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, @@ -172,4 +173,12 @@ public void setLbCtx(ListBucketingCtx lbCtx) { public AcidUtils.Operation getWriteType() { return writeType; } + + public boolean isSkipPermission() { + return skipPermission; + } + + public void setSkipPermission(boolean skipPermission) { + this.skipPermission = skipPermission; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 9f498c7..b575566 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -38,6 +38,7 @@ private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; private LoadMultiFilesDesc loadMultiFilesWork; + private boolean skipPermission; private boolean checkFileFormat; private boolean srcLocal; @@ -83,6 +84,12 @@ public MoveWork(HashSet inputs, HashSet outputs, this.checkFileFormat = checkFileFormat; } + public MoveWork(boolean skipPermission, HashSet inputs, HashSet outputs, + final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, boolean checkFileFormat) { + this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat); + this.skipPermission = skipPermission; + } + @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public LoadTableDesc getLoadTableWork() { return loadTableWork; @@ -142,4 +149,12 @@ public void setSrcLocal(boolean srcLocal) { this.srcLocal = srcLocal; } + public boolean isSkipPermission() { + return skipPermission; + } + + public void setSkipPermission(boolean skipPermission) { + this.skipPermission = skipPermission; + } + } diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java b/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java index 70a6857..1306400 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java @@ -56,13 +56,26 @@ public static Path getFileIdPath( ? new Path(HDFS_ID_PATH_PREFIX + fileId) : path; } + // original function public static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus, FileSystem fs, Path target, boolean recursion) throws IOException { - setFullFileStatus(conf, sourceStatus, null, fs, target, recursion); + setFullFileStatus(conf, sourceStatus, null, fs, target, recursion, false); } + // add target group public static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus, - String targetGroup, FileSystem fs, Path target, boolean recursion) throws IOException { + String targetGroup, FileSystem fs, Path target, boolean recursion) throws IOException { + setFullFileStatus(conf, sourceStatus, targetGroup, fs, target, recursion, false); + } + + // add skipPermission + public static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus, + FileSystem fs, Path target, boolean recursion, boolean skipPermission) throws IOException { + setFullFileStatus(conf, sourceStatus, null, fs, target, recursion, skipPermission); + } + + public static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus, + String targetGroup, FileSystem fs, Path target, boolean recursion, boolean skipPermission) throws IOException { FileStatus fStatus= sourceStatus.getFileStatus(); String group = fStatus.getGroup(); boolean aclEnabled = Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true"); @@ -108,8 +121,12 @@ public static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileSta } } } else { - String permission = Integer.toString(sourcePerm.toShort(), 8); - run(fsShell, new String[]{"-chmod", "-R", permission, target.toString()}); + if (!skipPermission) { + String permission = Integer.toString(sourcePerm.toShort(), 8); + run(fsShell, new String[] { "-chmod", "-R", permission, target.toString() }); + } else { + LOG.info("Skip setting permission for " + target.toString()); + } } } catch (Exception e) { throw new IOException("Unable to set permissions of " + target, e);