Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1157918)
+++ conf/hive-default.xml (working copy)
@@ -1145,6 +1145,12 @@
+ hive.exec.inter.mapred.compression.codec
+ com.hadoop.compression.lzo.LzoCodec
+ Compression to be used between map reduce jobs
+
+
+
hive.exec.perf.logger
org.apache.hadoop.hive.ql.log.PerfLogger
The class responsible logging client side performance metrics. Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1157918)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -163,6 +163,7 @@
HADOOPNUMREDUCERS("mapred.reduce.tasks", 1),
HADOOPJOBNAME("mapred.job.name", null),
HADOOPSPECULATIVEEXECREDUCERS("mapred.reduce.tasks.speculative.execution", false),
+ HADOOP_OUTPUT_COMPRESSION_CODEC("mapred.output.compression.codec", null),
// Metastore stuff. Be sure to update HiveConf.metaVars when you add
// something here!
@@ -461,6 +462,9 @@
HIVE_REWORK_MAPREDWORK("hive.rework.mapredwork", false),
HIVE_CONCATENATE_CHECK_INDEX ("hive.exec.concatenate.check.index", true),
+ // compression to be used between map reduce jobs
+ HIVE_INTER_MAPRED_COMPRESSION_CODEC("hive.exec.inter.mapred.compression.codec", ""),
+
// The class responsible for logging client side performance metrics
// Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger
HIVE_PERF_LOGGER("hive.exec.perf.logger", "org.apache.hadoop.hive.ql.log.PerfLogger"),
Index: ql/src/test/results/clientpositive/intermediate_compression.q.out
===================================================================
--- ql/src/test/results/clientpositive/intermediate_compression.q.out (revision 0)
+++ ql/src/test/results/clientpositive/intermediate_compression.q.out (revision 0)
@@ -0,0 +1,45 @@
+PREHOOK: query: drop table if exists src_inter1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists src_inter1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table src_inter1 like src
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table src_inter1 like src
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@src_inter1
+PREHOOK: query: insert overwrite table src_inter1 select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@src_inter1
+PREHOOK: query: insert overwrite table src_inter1 select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@src_inter1
+PREHOOK: query: insert overwrite local directory '/tmp/hive_test/intercomp_local/jg' select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: /tmp/hive_test/intercomp_local/jg
+PREHOOK: query: insert overwrite local directory '/tmp/hive_test/intercomp_local/jj' select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: /tmp/hive_test/intercomp_local/jj
+PREHOOK: query: insert overwrite table src_inter1 select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@src_inter1
+PREHOOK: query: insert overwrite table src_inter1 select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@src_inter1
+PREHOOK: query: insert overwrite local directory '/tmp/hive_test/intercomp_local/jg' select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: /tmp/hive_test/intercomp_local/jg
+PREHOOK: query: insert overwrite local directory '/tmp/hive_test/intercomp_local/jj' select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: /tmp/hive_test/intercomp_local/jj
+PREHOOK: query: drop table if exists src_inter1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@src_inter1
+PREHOOK: Output: default@src_inter1
Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsIntermediateHook.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsIntermediateHook.java (revision 0)
+++ ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsIntermediateHook.java (revision 0)
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.MoveTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+
+public class VerifyIsIntermediateHook implements ExecuteWithHookContext {
+
+ private static final String errorMessage = "VerifyIsIntermediateHook failed because the value of isIntermediate" +
+ " in the work for a map reduce task did not match what was expected.";
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) {
+ List taskRunners = hookContext.getCompleteTaskList();
+ for (TaskRunner taskRunner : taskRunners) {
+ Task extends Serializable> task = taskRunner.getTask();
+ if (task instanceof MapRedTask || task instanceof MapredLocalTask) {
+ boolean isIntermediate = true;
+ for (Task extends Serializable> childTask : task.getChildTasks()) {
+ if (childTask instanceof ConditionalTask) {
+ for (Task extends Serializable> containedTask : ((ConditionalTask)childTask).getListTasks()) {
+ if (containedTask instanceof MoveTask) {
+ isIntermediate = false;
+ }
+ }
+ } else if (childTask instanceof MoveTask) {
+ isIntermediate = false;
+ }
+ }
+ if (task instanceof MapRedTask) {
+ Assert.assertEquals(errorMessage, isIntermediate, ((MapRedTask)task).getWork().getIsIntermediate());
+ }
+ if (task instanceof MapredLocalTask) {
+ Assert.assertEquals(errorMessage, isIntermediate, ((MapredLocalTask)task).getWork().getIsIntermediate());
+ }
+ }
+ }
+ }
+ }
+}
Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 1157918)
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy)
@@ -20,12 +20,13 @@
import java.io.File;
import java.io.FileInputStream;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedList;
+import java.util.zip.GZIPInputStream;
import junit.framework.TestCase;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -48,7 +49,6 @@
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.TextInputFormat;
@@ -68,10 +68,15 @@
private static Hive db;
private static FileSystem fs;
+ public static final String defaultCodec = "org.apache.hadoop.io.compress.DefaultCodec";
+
static {
try {
conf = new HiveConf(ExecDriver.class);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC,
+ defaultCodec);
+
fs = FileSystem.get(conf);
if (fs.exists(tmppath) && !fs.getFileStatus(tmppath).isDir()) {
throw new RuntimeException(tmpdir + " exists but is not a directory");
@@ -86,7 +91,8 @@
for (Object one : Utilities.makeList("mapplan1.out", "mapplan2.out",
"mapredplan1.out", "mapredplan2.out", "mapredplan3.out",
- "mapredplan4.out", "mapredplan5.out", "mapredplan6.out")) {
+ "mapredplan4.out", "mapredplan5.out", "mapredplan6.out",
+ "mapplan3.out", "mapplan4.out")) {
Path onedir = new Path(tmppath, (String) one);
if (fs.exists(onedir)) {
fs.delete(onedir, true);
@@ -135,6 +141,10 @@
}
private static void fileDiff(String datafile, String testdir) throws Exception {
+ fileDiff(datafile, testdir, false);
+ }
+
+ private static void fileDiff(String datafile, String testdir, boolean gzipped) throws Exception {
String testFileDir = conf.get("test.data.files");
System.out.println(testFileDir);
FileInputStream fi_gold = new FileInputStream(new File(testFileDir,
@@ -149,7 +159,12 @@
throw new RuntimeException(tmpdir + testdir + " is not a directory");
}
- FSDataInputStream fi_test = fs.open((fs.listStatus(di_test))[0].getPath());
+ InputStream fi_test;
+ if (!gzipped) {
+ fi_test = fs.open((fs.listStatus(di_test))[0].getPath());
+ } else {
+ fi_test = new GZIPInputStream(fs.open((fs.listStatus(di_test))[0].getPath()));
+ }
if (!Utilities.contentsEqual(fi_gold, fi_test, false)) {
System.out.println(di_test.toString() + " does not match " + datafile);
@@ -432,6 +447,32 @@
mr.setReducer(op5);
}
+ @SuppressWarnings("unchecked")
+ private void populateMapPlan3(Table src) {
+ mr.setNumReduceTasks(Integer.valueOf(0));
+ mr.setIsIntermediate(false);
+
+ Operator op2 = OperatorFactory.get(new FileSinkDesc(tmpdir
+ + "mapplan3.out", Utilities.defaultTd, true));
+ Operator op1 = OperatorFactory.get(getTestFilterDesc("key"),
+ op2);
+
+ Utilities.addMapWork(mr, src, "a", op1);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void populateMapPlan4(Table src) {
+ mr.setNumReduceTasks(Integer.valueOf(0));
+ mr.setIsIntermediate(true);
+
+ Operator op2 = OperatorFactory.get(new FileSinkDesc(tmpdir
+ + "mapplan4.out", Utilities.defaultTd, true));
+ Operator op1 = OperatorFactory.get(getTestFilterDesc("key"),
+ op2);
+
+ Utilities.addMapWork(mr, src, "a", op1);
+ }
+
private void executePlan() throws Exception {
String testName = new Exception().getStackTrace()[1].getMethodName();
MapRedTask mrtask = new MapRedTask();
@@ -565,4 +606,42 @@
fail("Got Throwable");
}
}
+
+ // Checks that non-intermediate map reduce jobs are not compressed using HIVE_INTER_MAPRED_COMPRESSION_CODEC
+ public void testMapPlan3() throws Exception {
+
+ System.out.println("Beginning testMapPlan7");
+
+ try {
+ conf.setVar(HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
+ populateMapPlan3(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ "src"));
+ executePlan();
+ fileDiff("lt100.txt.deflate", "mapplan3.out");
+ } catch (Throwable e) {
+ e.printStackTrace();
+ fail("Got Throwable");
+ } finally {
+ conf.setVar(HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC, defaultCodec);
+ }
+ }
+
+ // Checks that intermediate map reduce jobs are compressed using HIVE_INTER_MAPRED_COMPRESSION_CODEC
+ public void testMapPlan4() throws Exception {
+
+ System.out.println("Beginning testMapPlan7");
+
+ try {
+ conf.setVar(HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
+ populateMapPlan4(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ "src"));
+ executePlan();
+ fileDiff("lt100.txt", "mapplan4.out", true);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ fail("Got Throwable");
+ } finally {
+ conf.setVar(HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC, defaultCodec);
+ }
+ }
}
Index: ql/src/test/queries/clientpositive/intermediate_compression.q
===================================================================
--- ql/src/test/queries/clientpositive/intermediate_compression.q (revision 0)
+++ ql/src/test/queries/clientpositive/intermediate_compression.q (revision 0)
@@ -0,0 +1,30 @@
+drop table if exists src_inter1;
+
+create table src_inter1 like src;
+
+set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyIsIntermediateHook;
+set hive.exec.mode.local.auto=true;
+
+insert overwrite table src_inter1 select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key;
+insert overwrite table src_inter1 select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value);
+
+!rm -fr /tmp/hive_test/intercomp_local;
+
+insert overwrite local directory '/tmp/hive_test/intercomp_local/jg' select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key;
+insert overwrite local directory '/tmp/hive_test/intercomp_local/jj' select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value);
+
+!rm -fr /tmp/hive_test/intercomp_local;
+
+set hive.exec.mode.local.auto=false;
+
+insert overwrite table src_inter1 select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key;
+insert overwrite table src_inter1 select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value);
+
+!rm -fr /tmp/hive_test/intercomp_local;
+
+insert overwrite local directory '/tmp/hive_test/intercomp_local/jg' select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key;
+insert overwrite local directory '/tmp/hive_test/intercomp_local/jj' select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value);
+
+!rm -fr /tmp/hive_test/intercomp_local;
+
+drop table if exists src_inter1;
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (revision 1157918)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (working copy)
@@ -36,6 +36,7 @@
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.MapredLocalTask;
import org.apache.hadoop.hive.ql.exec.MoveTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -519,6 +520,14 @@
listWorks.add(mvWork);
listWorks.add(mergeWork);
+ // Mark the work of the Map Reduce task immediately preceding this (if any) as non-intermediate
+ if (currTask instanceof MapRedTask) {
+ ((MapRedTask)currTask).getWork().setIsIntermediate(false);
+ }
+
+ // Mark the merge work as non-intermediate
+ mergeWork.setIsIntermediate(false);
+
ConditionalWork cndWork = new ConditionalWork(listWorks);
List> listTasks = new ArrayList>();
@@ -622,6 +631,12 @@
// Set the move task to be dependent on the current task
if (mvTask != null) {
+ // If it is a map reduce task, set its work to be non-intermediate
+ if (currTask instanceof MapRedTask) {
+ ((MapRedTask)currTask).getWork().setIsIntermediate(false);
+ } else if (currTask instanceof MapredLocalTask) {
+ ((MapredLocalTask)currTask).getWork().setIsIntermediate(false);
+ }
currTask.addDependentTask(mvTask);
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (revision 1157918)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (working copy)
@@ -92,6 +92,11 @@
job = new JobConf(conf, ExecDriver.class);
//we don't use the HadoopJobExecHooks for local tasks
this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null);
+
+ if (work.getIsIntermediate()) {
+ HiveConf.setVar(job, HiveConf.ConfVars.HADOOP_OUTPUT_COMPRESSION_CODEC,
+ HiveConf.getVar(job, HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC));
+ }
}
public static String now() {
@@ -240,6 +245,12 @@
if (work == null) {
return -1;
}
+
+ if (work.getIsIntermediate()) {
+ HiveConf.setVar(job, HiveConf.ConfVars.HADOOP_OUTPUT_COMPRESSION_CODEC,
+ HiveConf.getVar(job, HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC));
+ }
+
memoryMXBean = ManagementFactory.getMemoryMXBean();
long startTime = System.currentTimeMillis();
console.printInfo(Utilities.now()
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1157918)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy)
@@ -171,6 +171,11 @@
HiveConf.setVar(job, ConfVars.HIVEADDEDARCHIVES, addedArchives);
}
this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
+
+ if (work.getIsIntermediate()) {
+ HiveConf.setVar(job, HiveConf.ConfVars.HADOOP_OUTPUT_COMPRESSION_CODEC,
+ HiveConf.getVar(job, HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC));
+ }
}
/**
@@ -181,6 +186,11 @@
this.job = job;
console = new LogHelper(LOG, isSilent);
this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
+
+ if (work.getIsIntermediate()) {
+ HiveConf.setVar(job, HiveConf.ConfVars.HADOOP_OUTPUT_COMPRESSION_CODEC,
+ HiveConf.getVar(job, HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC));
+ }
}
/**
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (revision 1157918)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (working copy)
@@ -47,6 +47,8 @@
private List> dummyParentOp ;
+ private boolean isIntermediate = true;
+
public MapredLocalWork() {
}
@@ -151,6 +153,14 @@
return tmpFileURI;
}
+ public void setIsIntermediate(boolean isIntermediate) {
+ this.isIntermediate = isIntermediate;
+ }
+
+ public boolean getIsIntermediate() {
+ return this.isIntermediate;
+ }
+
public static class BucketMapJoinContext implements Serializable {
private static final long serialVersionUID = 1L;
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1157918)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy)
@@ -87,6 +87,8 @@
private boolean mapperCannotSpanPartns;
+ private boolean isIntermediate = true;
+
public MapredWork() {
aliasToPartnInfo = new LinkedHashMap();
}
@@ -443,4 +445,11 @@
pathToPartitionInfo.put(path.toString(), partDesc);
}
+ public boolean getIsIntermediate() {
+ return isIntermediate;
+ }
+
+ public void setIsIntermediate(boolean isIntermediate) {
+ this.isIntermediate = isIntermediate;
+ }
}