Index: shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java =================================================================== --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (revision 1651252) +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (working copy) @@ -606,4 +606,9 @@ return kerberosName.getShortName(); } } + + @Override + public StoragePolicyShim getStoragePolicyShim(FileSystem fs) { + return null; + } } Index: shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java =================================================================== --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (revision 1651252) +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (working copy) @@ -27,8 +27,10 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.commons.lang.StringUtils; @@ -53,7 +55,10 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; @@ -92,15 +97,28 @@ HadoopShims.MiniDFSShim cluster = null; final boolean zeroCopy; + final boolean storagePolicy; public Hadoop23Shims() { boolean zcr = false; + boolean storage = false; try { Class.forName("org.apache.hadoop.fs.CacheFlag", false, ShimLoader.class.getClassLoader()); zcr = true; } catch (ClassNotFoundException ce) { } + + if (zcr) { + // in-memory HDFS is only available after zcr + try { + Class.forName("org.apache.hadoop.hdfs.protocol.BlockStoragePolicy", + false, ShimLoader.class.getClassLoader()); + storage = true; + } catch (ClassNotFoundException ce) { + } + } + this.storagePolicy = storage; this.zeroCopy = zcr; } @@ -936,4 +954,47 @@ return kerberosName.getShortName(); } } + + + public static class StoragePolicyShim implements HadoopShims.StoragePolicyShim { + + private final DistributedFileSystem dfs; + + public StoragePolicyShim(DistributedFileSystem fs) { + this.dfs = fs; + } + + @Override + public void setStoragePolicy(Path path, StoragePolicyValue policy) + throws IOException { + switch (policy) { + case MEMORY: { + dfs.setStoragePolicy(path, HdfsConstants.MEMORY_STORAGE_POLICY_NAME); + break; + } + case SSD: { + dfs.setStoragePolicy(path, HdfsConstants.ALLSSD_STORAGE_POLICY_NAME); + break; + } + case DEFAULT: { + /* do nothing */ + break; + } + default: + throw new IllegalArgumentException("Unknown storage policy " + policy); + } + } + } + + @Override + public HadoopShims.StoragePolicyShim getStoragePolicyShim(FileSystem fs) { + if (!storagePolicy) { + return null; + } + try { + return new StoragePolicyShim((DistributedFileSystem) fs); + } catch (ClassCastException ce) { + return null; + } + } } Index: shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java =================================================================== --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (revision 1651252) +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (working copy) @@ -26,6 +26,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; @@ -38,6 +39,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; @@ -388,8 +390,35 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri); public Map getHadoopConfNames(); + + /** + * Create a shim for DFS storage policy. + */ + + public enum StoragePolicyValue { + MEMORY, /* 1-replica memory */ + SSD, /* 3-replica ssd */ + DEFAULT /* system defaults (usually 3-replica disk) */; + public static StoragePolicyValue lookup(String name) { + if (name == null) { + return DEFAULT; + } + return StoragePolicyValue.valueOf(name.toUpperCase().trim()); + } + }; + + public interface StoragePolicyShim { + void setStoragePolicy(Path path, StoragePolicyValue policy) throws IOException; + } + /** + * obtain a storage policy shim associated with the filesystem. + * Returns null when the filesystem has no storage policies. + */ + public StoragePolicyShim getStoragePolicyShim(FileSystem fs); + + /** * a hadoop.io ByteBufferPool shim. */ public interface ByteBufferPoolShim { Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1651252) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -1615,6 +1615,10 @@ "inheriting the permission of the warehouse or database directory."), HIVE_INSERT_INTO_EXTERNAL_TABLES("hive.insert.into.external.tables", true, "whether insert into external tables is allowed"), + HIVE_TEMPORARY_TABLE_STORAGE( + "hive.exec.temporary.table.storage", "default", new StringSet("memory", + "ssd", "default"), "Define the storage policy for temporary tables." + + "Choices between memory, ssd and default"), HIVE_DRIVER_RUN_HOOKS("hive.exec.driver.run.hooks", "", "A comma separated list of hooks which implement HiveDriverRunHook. Will be run at the beginning " + Index: ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (revision 1651252) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (working copy) @@ -47,6 +47,7 @@ private String compressCodec; private String compressType; private boolean multiFileSpray; + private boolean temporary; // Whether the files output by this FileSink can be merged, e.g. if they are to be put into a // bucketed or sorted table/partition they cannot be merged. private boolean canBeMerged; @@ -217,7 +218,22 @@ public void setMultiFileSpray(boolean multiFileSpray) { this.multiFileSpray = multiFileSpray; } + + /** + * @return destination is temporary + */ + public boolean isTemporary() { + return temporary; + } + /** + * @param totalFiles the totalFiles to set + */ + public void setTemporary(boolean temporary) { + this.temporary = temporary; + } + + public boolean canBeMerged() { return canBeMerged; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1651252) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -5857,6 +5857,7 @@ Table dest_tab = null; // destination table if any boolean destTableIsAcid = false; // should the destination table be written to using ACID + boolean destTableIsTemporary = false; Partition dest_part = null;// destination partition if any Path queryTmpdir = null; // the intermediate destination directory Path dest_path = null; // the final destination directory @@ -5874,6 +5875,7 @@ dest_tab = qbm.getDestTableForAlias(dest); destTableIsAcid = isAcidTable(dest_tab); + destTableIsTemporary = dest_tab.isTemporary(); // Is the user trying to insert into a external tables if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) && @@ -6143,6 +6145,7 @@ CreateTableDesc tblDesc = qb.getTableDesc(); if (tblDesc != null) { field_schemas = new ArrayList(); + destTableIsTemporary = tblDesc.isTemporary(); } boolean first = true; @@ -6287,6 +6290,8 @@ fileSinkDesc.setWriteType(wt); acidFileSinks.add(fileSinkDesc); } + + fileSinkDesc.setTemporary(destTableIsTemporary); /* Set List Bucketing context. */ if (lbCtx != null) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1651252) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -68,11 +68,16 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim; +import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE; + /** * File Sink operator implementation. **/ @@ -88,6 +93,7 @@ protected transient List dpColNames; protected transient DynamicPartitionCtx dpCtx; protected transient boolean isCompressed; + protected transient boolean isTemporary; protected transient Path parent; protected transient HiveOutputFormat hiveOutputFormat; protected transient Path specPath; @@ -318,6 +324,7 @@ this.hconf = hconf; filesCreated = false; isNativeTable = !conf.getTableInfo().isNonNative(); + isTemporary = conf.isTemporary(); multiFileSpray = conf.isMultiFileSpray(); totalFiles = conf.getTotalFiles(); numFiles = conf.getNumFiles(); @@ -384,6 +391,20 @@ valToPaths.put("", fsp); // special entry for non-DP case } } + + final StoragePolicyValue tmpStorage = StoragePolicyValue.lookup(HiveConf + .getVar(hconf, HIVE_TEMPORARY_TABLE_STORAGE)); + if (isTemporary && fsp != null + && tmpStorage != StoragePolicyValue.DEFAULT) { + final Path outputPath = fsp.taskOutputTempPath; + StoragePolicyShim shim = ShimLoader.getHadoopShims() + .getStoragePolicyShim(fs); + if (shim != null) { + // directory creation is otherwise within the writers + fs.mkdirs(outputPath); + shim.setStoragePolicy(outputPath, tmpStorage); + } + } if (conf.getWriteType() == AcidUtils.Operation.UPDATE || conf.getWriteType() == AcidUtils.Operation.DELETE) {