diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2e51518..2a570f4 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1587,6 +1587,10 @@ "inheriting the permission of the warehouse or database directory."), HIVE_INSERT_INTO_EXTERNAL_TABLES("hive.insert.into.external.tables", true, "whether insert into external tables is allowed"), + HIVE_TEMPORARY_TABLE_STORAGE( + "hive.exec.temporary.table.storage", "default", new StringSet("memory", + "ssd", "default"), "Define the storage policy for temporary tables." + + "Choices between memory, ssd and default"), HIVE_DRIVER_RUN_HOOKS("hive.exec.driver.run.hooks", "", "A comma separated list of hooks which implement HiveDriverRunHook. Will be run at the beginning " + diff --git pom.xml pom.xml index 0e30078..7945ade 100644 --- pom.xml +++ pom.xml @@ -118,7 +118,7 @@ 11.0.2 2.1.6 1.2.1 - 2.5.0 + 2.6.0 ${basedir}/${hive.path.to.root}/testutils/hadoop 0.98.3-hadoop1 0.98.3-hadoop2 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 4f3d504..9eb95a1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -68,11 +68,16 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim; +import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE; + /** * File Sink operator implementation. **/ @@ -88,6 +93,7 @@ protected transient List dpColNames; protected transient DynamicPartitionCtx dpCtx; protected transient boolean isCompressed; + protected transient boolean isTemporary; protected transient Path parent; protected transient HiveOutputFormat hiveOutputFormat; protected transient Path specPath; @@ -318,6 +324,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { this.hconf = hconf; filesCreated = false; isNativeTable = !conf.getTableInfo().isNonNative(); + isTemporary = conf.isTemporary(); multiFileSpray = conf.isMultiFileSpray(); totalFiles = conf.getTotalFiles(); numFiles = conf.getNumFiles(); @@ -384,6 +391,20 @@ protected void initializeOp(Configuration hconf) throws HiveException { valToPaths.put("", fsp); // special entry for non-DP case } } + + final StoragePolicyValue tmpStorage = StoragePolicyValue.lookup(HiveConf + .getVar(hconf, HIVE_TEMPORARY_TABLE_STORAGE)); + if (isTemporary && fsp != null + && tmpStorage != StoragePolicyValue.DEFAULT) { + final Path outputPath = fsp.taskOutputTempPath; + StoragePolicyShim shim = ShimLoader.getHadoopShims() + .getStoragePolicyShim(fs); + if (shim != null) { + // directory creation is otherwise within the writers + fs.mkdirs(outputPath); + shim.setStoragePolicy(outputPath, tmpStorage); + } + } if (conf.getWriteType() == AcidUtils.Operation.UPDATE || conf.getWriteType() == AcidUtils.Operation.DELETE) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f18f8db..2879ec2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -5879,6 +5879,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) Table dest_tab = null; // destination table if any boolean destTableIsAcid = false; // should the destination table be written to using ACID + boolean destTableIsTemporary = false; Partition dest_part = null;// destination partition if any Path queryTmpdir = null; // the intermediate destination directory Path dest_path = null; // the final destination directory @@ -5896,6 +5897,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_tab = qbm.getDestTableForAlias(dest); destTableIsAcid = isAcidTable(dest_tab); + destTableIsTemporary = dest_tab.isTemporary(); // Is the user trying to insert into a external tables if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) && @@ -6165,6 +6167,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) CreateTableDesc tblDesc = qb.getTableDesc(); if (tblDesc != null) { field_schemas = new ArrayList(); + destTableIsTemporary = tblDesc.isTemporary(); } boolean first = true; @@ -6309,6 +6312,8 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) fileSinkDesc.setWriteType(wt); acidFileSinks.add(fileSinkDesc); } + + fileSinkDesc.setTemporary(destTableIsTemporary); /* Set List Bucketing context. */ if (lbCtx != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 8b25c2b..a2b9dfe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -47,6 +47,7 @@ private String compressCodec; private String compressType; private boolean multiFileSpray; + private boolean temporary; // Whether the files output by this FileSink can be merged, e.g. if they are to be put into a // bucketed or sorted table/partition they cannot be merged. private boolean canBeMerged; @@ -217,6 +218,21 @@ public boolean isMultiFileSpray() { public void setMultiFileSpray(boolean multiFileSpray) { this.multiFileSpray = multiFileSpray; } + + /** + * @return destination is temporary + */ + public boolean isTemporary() { + return temporary; + } + + /** + * @param totalFiles the totalFiles to set + */ + public void setTemporary(boolean temporary) { + this.temporary = temporary; + } + public boolean canBeMerged() { return canBeMerged; diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index f881a7a..e917d53 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -601,4 +601,9 @@ public String getShortName() throws IOException { return kerberosName.getShortName(); } } + + @Override + public StoragePolicyShim getStoragePolicyShim(FileSystem fs) { + return null; + } } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index f65d394..4cfd0dd 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -27,8 +27,10 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.commons.lang.StringUtils; @@ -53,7 +55,10 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; @@ -91,15 +96,28 @@ HadoopShims.MiniDFSShim cluster = null; final boolean zeroCopy; + final boolean storagePolicy; public Hadoop23Shims() { boolean zcr = false; + boolean storage = false; try { Class.forName("org.apache.hadoop.fs.CacheFlag", false, ShimLoader.class.getClassLoader()); zcr = true; } catch (ClassNotFoundException ce) { } + + if (zcr) { + // in-memory HDFS is only available after zcr + try { + Class.forName("org.apache.hadoop.hdfs.protocol.BlockStoragePolicy", + false, ShimLoader.class.getClassLoader()); + storage = true; + } catch (ClassNotFoundException ce) { + } + } + this.storagePolicy = storage; this.zeroCopy = zcr; } @@ -930,4 +948,47 @@ public String getShortName() throws IOException { return kerberosName.getShortName(); } } + + + public static class StoragePolicyShim implements HadoopShims.StoragePolicyShim { + + private final DistributedFileSystem dfs; + + public StoragePolicyShim(DistributedFileSystem fs) { + this.dfs = fs; + } + + @Override + public void setStoragePolicy(Path path, StoragePolicyValue policy) + throws IOException { + switch (policy) { + case MEMORY: { + dfs.setStoragePolicy(path, HdfsConstants.MEMORY_STORAGE_POLICY_NAME); + break; + } + case SSD: { + dfs.setStoragePolicy(path, HdfsConstants.ALLSSD_STORAGE_POLICY_NAME); + break; + } + case DEFAULT: { + /* do nothing */ + break; + } + default: + throw new IllegalArgumentException("Unknown storage policy " + policy); + } + } + } + + @Override + public HadoopShims.StoragePolicyShim getStoragePolicyShim(FileSystem fs) { + if (!storagePolicy) { + return null; + } + try { + return new StoragePolicyShim((DistributedFileSystem) fs); + } catch (ClassCastException ce) { + return null; + } + } } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 988692a..3e370ac 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -28,6 +28,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.InputSplit; @@ -391,6 +393,33 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus, public FileSystem createProxyFileSystem(FileSystem fs, URI uri); public Map getHadoopConfNames(); + + /** + * Create a shim for DFS storage policy. + */ + + public enum StoragePolicyValue { + MEMORY, /* 1-replica memory */ + SSD, /* 3-replica ssd */ + DEFAULT /* system defaults (usually 3-replica disk) */; + + public static StoragePolicyValue lookup(String name) { + if (name == null) { + return DEFAULT; + } + return StoragePolicyValue.valueOf(name.toUpperCase().trim()); + } + }; + + public interface StoragePolicyShim { + void setStoragePolicy(Path path, StoragePolicyValue policy) throws IOException; + } + + /** + * obtain a storage policy shim associated with the filesystem. + * Returns null when the filesystem has no storage policies. + */ + public StoragePolicyShim getStoragePolicyShim(FileSystem fs); /** * a hadoop.io ByteBufferPool shim.