diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2e51518..2a570f4 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1587,6 +1587,10 @@
"inheriting the permission of the warehouse or database directory."),
HIVE_INSERT_INTO_EXTERNAL_TABLES("hive.insert.into.external.tables", true,
"whether insert into external tables is allowed"),
+ HIVE_TEMPORARY_TABLE_STORAGE(
+ "hive.exec.temporary.table.storage", "default", new StringSet("memory",
+ "ssd", "default"), "Define the storage policy for temporary tables." +
+ "Choices between memory, ssd and default"),
HIVE_DRIVER_RUN_HOOKS("hive.exec.driver.run.hooks", "",
"A comma separated list of hooks which implement HiveDriverRunHook. Will be run at the beginning " +
diff --git pom.xml pom.xml
index 0e30078..7945ade 100644
--- pom.xml
+++ pom.xml
@@ -118,7 +118,7 @@
11.0.2
2.1.6
1.2.1
- 2.5.0
+ 2.6.0
${basedir}/${hive.path.to.root}/testutils/hadoop
0.98.3-hadoop1
0.98.3-hadoop2
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 4f3d504..9eb95a1 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -68,11 +68,16 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
+
/**
* File Sink operator implementation.
**/
@@ -88,6 +93,7 @@
protected transient List dpColNames;
protected transient DynamicPartitionCtx dpCtx;
protected transient boolean isCompressed;
+ protected transient boolean isTemporary;
protected transient Path parent;
protected transient HiveOutputFormat, ?> hiveOutputFormat;
protected transient Path specPath;
@@ -318,6 +324,7 @@ protected void initializeOp(Configuration hconf) throws HiveException {
this.hconf = hconf;
filesCreated = false;
isNativeTable = !conf.getTableInfo().isNonNative();
+ isTemporary = conf.isTemporary();
multiFileSpray = conf.isMultiFileSpray();
totalFiles = conf.getTotalFiles();
numFiles = conf.getNumFiles();
@@ -384,6 +391,20 @@ protected void initializeOp(Configuration hconf) throws HiveException {
valToPaths.put("", fsp); // special entry for non-DP case
}
}
+
+ final StoragePolicyValue tmpStorage = StoragePolicyValue.lookup(HiveConf
+ .getVar(hconf, HIVE_TEMPORARY_TABLE_STORAGE));
+ if (isTemporary && fsp != null
+ && tmpStorage != StoragePolicyValue.DEFAULT) {
+ final Path outputPath = fsp.taskOutputTempPath;
+ StoragePolicyShim shim = ShimLoader.getHadoopShims()
+ .getStoragePolicyShim(fs);
+ if (shim != null) {
+ // directory creation is otherwise within the writers
+ fs.mkdirs(outputPath);
+ shim.setStoragePolicy(outputPath, tmpStorage);
+ }
+ }
if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
conf.getWriteType() == AcidUtils.Operation.DELETE) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index f18f8db..2879ec2 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -5879,6 +5879,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input)
Table dest_tab = null; // destination table if any
boolean destTableIsAcid = false; // should the destination table be written to using ACID
+ boolean destTableIsTemporary = false;
Partition dest_part = null;// destination partition if any
Path queryTmpdir = null; // the intermediate destination directory
Path dest_path = null; // the final destination directory
@@ -5896,6 +5897,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input)
dest_tab = qbm.getDestTableForAlias(dest);
destTableIsAcid = isAcidTable(dest_tab);
+ destTableIsTemporary = dest_tab.isTemporary();
// Is the user trying to insert into a external tables
if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
@@ -6165,6 +6167,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input)
CreateTableDesc tblDesc = qb.getTableDesc();
if (tblDesc != null) {
field_schemas = new ArrayList();
+ destTableIsTemporary = tblDesc.isTemporary();
}
boolean first = true;
@@ -6309,6 +6312,8 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input)
fileSinkDesc.setWriteType(wt);
acidFileSinks.add(fileSinkDesc);
}
+
+ fileSinkDesc.setTemporary(destTableIsTemporary);
/* Set List Bucketing context. */
if (lbCtx != null) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 8b25c2b..a2b9dfe 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -47,6 +47,7 @@
private String compressCodec;
private String compressType;
private boolean multiFileSpray;
+ private boolean temporary;
// Whether the files output by this FileSink can be merged, e.g. if they are to be put into a
// bucketed or sorted table/partition they cannot be merged.
private boolean canBeMerged;
@@ -217,6 +218,21 @@ public boolean isMultiFileSpray() {
public void setMultiFileSpray(boolean multiFileSpray) {
this.multiFileSpray = multiFileSpray;
}
+
+ /**
+ * @return destination is temporary
+ */
+ public boolean isTemporary() {
+ return temporary;
+ }
+
+ /**
+ * @param totalFiles the totalFiles to set
+ */
+ public void setTemporary(boolean temporary) {
+ this.temporary = temporary;
+ }
+
public boolean canBeMerged() {
return canBeMerged;
diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
index f881a7a..e917d53 100644
--- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
+++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
@@ -601,4 +601,9 @@ public String getShortName() throws IOException {
return kerberosName.getShortName();
}
}
+
+ @Override
+ public StoragePolicyShim getStoragePolicyShim(FileSystem fs) {
+ return null;
+ }
}
diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index f65d394..4cfd0dd 100644
--- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -27,8 +27,10 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
@@ -53,7 +55,10 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobConf;
@@ -91,15 +96,28 @@
HadoopShims.MiniDFSShim cluster = null;
final boolean zeroCopy;
+ final boolean storagePolicy;
public Hadoop23Shims() {
boolean zcr = false;
+ boolean storage = false;
try {
Class.forName("org.apache.hadoop.fs.CacheFlag", false,
ShimLoader.class.getClassLoader());
zcr = true;
} catch (ClassNotFoundException ce) {
}
+
+ if (zcr) {
+ // in-memory HDFS is only available after zcr
+ try {
+ Class.forName("org.apache.hadoop.hdfs.protocol.BlockStoragePolicy",
+ false, ShimLoader.class.getClassLoader());
+ storage = true;
+ } catch (ClassNotFoundException ce) {
+ }
+ }
+ this.storagePolicy = storage;
this.zeroCopy = zcr;
}
@@ -930,4 +948,47 @@ public String getShortName() throws IOException {
return kerberosName.getShortName();
}
}
+
+
+ public static class StoragePolicyShim implements HadoopShims.StoragePolicyShim {
+
+ private final DistributedFileSystem dfs;
+
+ public StoragePolicyShim(DistributedFileSystem fs) {
+ this.dfs = fs;
+ }
+
+ @Override
+ public void setStoragePolicy(Path path, StoragePolicyValue policy)
+ throws IOException {
+ switch (policy) {
+ case MEMORY: {
+ dfs.setStoragePolicy(path, HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
+ break;
+ }
+ case SSD: {
+ dfs.setStoragePolicy(path, HdfsConstants.ALLSSD_STORAGE_POLICY_NAME);
+ break;
+ }
+ case DEFAULT: {
+ /* do nothing */
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown storage policy " + policy);
+ }
+ }
+ }
+
+ @Override
+ public HadoopShims.StoragePolicyShim getStoragePolicyShim(FileSystem fs) {
+ if (!storagePolicy) {
+ return null;
+ }
+ try {
+ return new StoragePolicyShim((DistributedFileSystem) fs);
+ } catch (ClassCastException ce) {
+ return null;
+ }
+ }
}
diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 988692a..3e370ac 100644
--- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -28,6 +28,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
@@ -40,6 +41,7 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.InputSplit;
@@ -391,6 +393,33 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus,
public FileSystem createProxyFileSystem(FileSystem fs, URI uri);
public Map getHadoopConfNames();
+
+ /**
+ * Create a shim for DFS storage policy.
+ */
+
+ public enum StoragePolicyValue {
+ MEMORY, /* 1-replica memory */
+ SSD, /* 3-replica ssd */
+ DEFAULT /* system defaults (usually 3-replica disk) */;
+
+ public static StoragePolicyValue lookup(String name) {
+ if (name == null) {
+ return DEFAULT;
+ }
+ return StoragePolicyValue.valueOf(name.toUpperCase().trim());
+ }
+ };
+
+ public interface StoragePolicyShim {
+ void setStoragePolicy(Path path, StoragePolicyValue policy) throws IOException;
+ }
+
+ /**
+ * obtain a storage policy shim associated with the filesystem.
+ * Returns null when the filesystem has no storage policies.
+ */
+ public StoragePolicyShim getStoragePolicyShim(FileSystem fs);
/**
* a hadoop.io ByteBufferPool shim.