Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1157918) +++ conf/hive-default.xml (working copy) @@ -1145,6 +1145,12 @@ + hive.exec.inter.mapred.compression.codec + com.hadoop.compression.lzo.LzoCodec + Compression to be used between map reduce jobs + + + hive.exec.perf.logger org.apache.hadoop.hive.ql.log.PerfLogger The class responsible logging client side performance metrics. Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1157918) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -163,6 +163,7 @@ HADOOPNUMREDUCERS("mapred.reduce.tasks", 1), HADOOPJOBNAME("mapred.job.name", null), HADOOPSPECULATIVEEXECREDUCERS("mapred.reduce.tasks.speculative.execution", false), + HADOOP_OUTPUT_COMPRESSION_CODEC("mapred.output.compression.codec", null), // Metastore stuff. Be sure to update HiveConf.metaVars when you add // something here! @@ -461,6 +462,9 @@ HIVE_REWORK_MAPREDWORK("hive.rework.mapredwork", false), HIVE_CONCATENATE_CHECK_INDEX ("hive.exec.concatenate.check.index", true), + // compression to be used between map reduce jobs + HIVE_INTER_MAPRED_COMPRESSION_CODEC("hive.exec.inter.mapred.compression.codec", ""), + // The class responsible for logging client side performance metrics // Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger HIVE_PERF_LOGGER("hive.exec.perf.logger", "org.apache.hadoop.hive.ql.log.PerfLogger"), Index: ql/src/test/results/clientpositive/intermediate_compression.q.out =================================================================== --- ql/src/test/results/clientpositive/intermediate_compression.q.out (revision 0) +++ ql/src/test/results/clientpositive/intermediate_compression.q.out (revision 0) @@ -0,0 +1,45 @@ +PREHOOK: query: drop table if exists src_inter1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists src_inter1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table src_inter1 like src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table src_inter1 like src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@src_inter1 +PREHOOK: query: insert overwrite table src_inter1 select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src_inter1 +PREHOOK: query: insert overwrite table src_inter1 select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src_inter1 +PREHOOK: query: insert overwrite local directory '/tmp/hive_test/intercomp_local/jg' select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: /tmp/hive_test/intercomp_local/jg +PREHOOK: query: insert overwrite local directory '/tmp/hive_test/intercomp_local/jj' select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: /tmp/hive_test/intercomp_local/jj +PREHOOK: query: insert overwrite table src_inter1 select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src_inter1 +PREHOOK: query: insert overwrite table src_inter1 select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src_inter1 +PREHOOK: query: insert overwrite local directory '/tmp/hive_test/intercomp_local/jg' select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: /tmp/hive_test/intercomp_local/jg +PREHOOK: query: insert overwrite local directory '/tmp/hive_test/intercomp_local/jj' select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: /tmp/hive_test/intercomp_local/jj +PREHOOK: query: drop table if exists src_inter1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@src_inter1 +PREHOOK: Output: default@src_inter1 Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsIntermediateHook.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsIntermediateHook.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsIntermediateHook.java (revision 0) @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.hooks; + +import java.io.Serializable; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.MapRedTask; +import org.apache.hadoop.hive.ql.exec.MapredLocalTask; +import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; + +public class VerifyIsIntermediateHook implements ExecuteWithHookContext { + + private static final String errorMessage = "VerifyIsIntermediateHook failed because the value of isIntermediate" + + " in the work for a map reduce task did not match what was expected."; + + @Override + public void run(HookContext hookContext) throws Exception { + if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) { + List taskRunners = hookContext.getCompleteTaskList(); + for (TaskRunner taskRunner : taskRunners) { + Task task = taskRunner.getTask(); + if (task instanceof MapRedTask || task instanceof MapredLocalTask) { + boolean isIntermediate = true; + for (Task childTask : task.getChildTasks()) { + if (childTask instanceof ConditionalTask) { + for (Task containedTask : ((ConditionalTask)childTask).getListTasks()) { + if (containedTask instanceof MoveTask) { + isIntermediate = false; + } + } + } else if (childTask instanceof MoveTask) { + isIntermediate = false; + } + } + if (task instanceof MapRedTask) { + Assert.assertEquals(errorMessage, isIntermediate, ((MapRedTask)task).getWork().getIsIntermediate()); + } + if (task instanceof MapredLocalTask) { + Assert.assertEquals(errorMessage, isIntermediate, ((MapredLocalTask)task).getWork().getIsIntermediate()); + } + } + } + } + } +} Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 1157918) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy) @@ -20,12 +20,13 @@ import java.io.File; import java.io.FileInputStream; +import java.io.InputStream; import java.util.ArrayList; import java.util.LinkedList; +import java.util.zip.GZIPInputStream; import junit.framework.TestCase; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -48,7 +49,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.TextInputFormat; @@ -68,10 +68,15 @@ private static Hive db; private static FileSystem fs; + public static final String defaultCodec = "org.apache.hadoop.io.compress.DefaultCodec"; + static { try { conf = new HiveConf(ExecDriver.class); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC, + defaultCodec); + fs = FileSystem.get(conf); if (fs.exists(tmppath) && !fs.getFileStatus(tmppath).isDir()) { throw new RuntimeException(tmpdir + " exists but is not a directory"); @@ -86,7 +91,8 @@ for (Object one : Utilities.makeList("mapplan1.out", "mapplan2.out", "mapredplan1.out", "mapredplan2.out", "mapredplan3.out", - "mapredplan4.out", "mapredplan5.out", "mapredplan6.out")) { + "mapredplan4.out", "mapredplan5.out", "mapredplan6.out", + "mapplan3.out", "mapplan4.out")) { Path onedir = new Path(tmppath, (String) one); if (fs.exists(onedir)) { fs.delete(onedir, true); @@ -135,6 +141,10 @@ } private static void fileDiff(String datafile, String testdir) throws Exception { + fileDiff(datafile, testdir, false); + } + + private static void fileDiff(String datafile, String testdir, boolean gzipped) throws Exception { String testFileDir = conf.get("test.data.files"); System.out.println(testFileDir); FileInputStream fi_gold = new FileInputStream(new File(testFileDir, @@ -149,7 +159,12 @@ throw new RuntimeException(tmpdir + testdir + " is not a directory"); } - FSDataInputStream fi_test = fs.open((fs.listStatus(di_test))[0].getPath()); + InputStream fi_test; + if (!gzipped) { + fi_test = fs.open((fs.listStatus(di_test))[0].getPath()); + } else { + fi_test = new GZIPInputStream(fs.open((fs.listStatus(di_test))[0].getPath())); + } if (!Utilities.contentsEqual(fi_gold, fi_test, false)) { System.out.println(di_test.toString() + " does not match " + datafile); @@ -432,6 +447,32 @@ mr.setReducer(op5); } + @SuppressWarnings("unchecked") + private void populateMapPlan3(Table src) { + mr.setNumReduceTasks(Integer.valueOf(0)); + mr.setIsIntermediate(false); + + Operator op2 = OperatorFactory.get(new FileSinkDesc(tmpdir + + "mapplan3.out", Utilities.defaultTd, true)); + Operator op1 = OperatorFactory.get(getTestFilterDesc("key"), + op2); + + Utilities.addMapWork(mr, src, "a", op1); + } + + @SuppressWarnings("unchecked") + private void populateMapPlan4(Table src) { + mr.setNumReduceTasks(Integer.valueOf(0)); + mr.setIsIntermediate(true); + + Operator op2 = OperatorFactory.get(new FileSinkDesc(tmpdir + + "mapplan4.out", Utilities.defaultTd, true)); + Operator op1 = OperatorFactory.get(getTestFilterDesc("key"), + op2); + + Utilities.addMapWork(mr, src, "a", op1); + } + private void executePlan() throws Exception { String testName = new Exception().getStackTrace()[1].getMethodName(); MapRedTask mrtask = new MapRedTask(); @@ -565,4 +606,42 @@ fail("Got Throwable"); } } + + // Checks that non-intermediate map reduce jobs are not compressed using HIVE_INTER_MAPRED_COMPRESSION_CODEC + public void testMapPlan3() throws Exception { + + System.out.println("Beginning testMapPlan7"); + + try { + conf.setVar(HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec"); + populateMapPlan3(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, + "src")); + executePlan(); + fileDiff("lt100.txt.deflate", "mapplan3.out"); + } catch (Throwable e) { + e.printStackTrace(); + fail("Got Throwable"); + } finally { + conf.setVar(HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC, defaultCodec); + } + } + + // Checks that intermediate map reduce jobs are compressed using HIVE_INTER_MAPRED_COMPRESSION_CODEC + public void testMapPlan4() throws Exception { + + System.out.println("Beginning testMapPlan7"); + + try { + conf.setVar(HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec"); + populateMapPlan4(db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, + "src")); + executePlan(); + fileDiff("lt100.txt", "mapplan4.out", true); + } catch (Throwable e) { + e.printStackTrace(); + fail("Got Throwable"); + } finally { + conf.setVar(HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC, defaultCodec); + } + } } Index: ql/src/test/queries/clientpositive/intermediate_compression.q =================================================================== --- ql/src/test/queries/clientpositive/intermediate_compression.q (revision 0) +++ ql/src/test/queries/clientpositive/intermediate_compression.q (revision 0) @@ -0,0 +1,30 @@ +drop table if exists src_inter1; + +create table src_inter1 like src; + +set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyIsIntermediateHook; +set hive.exec.mode.local.auto=true; + +insert overwrite table src_inter1 select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key; +insert overwrite table src_inter1 select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value); + +!rm -fr /tmp/hive_test/intercomp_local; + +insert overwrite local directory '/tmp/hive_test/intercomp_local/jg' select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key; +insert overwrite local directory '/tmp/hive_test/intercomp_local/jj' select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value); + +!rm -fr /tmp/hive_test/intercomp_local; + +set hive.exec.mode.local.auto=false; + +insert overwrite table src_inter1 select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key; +insert overwrite table src_inter1 select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value); + +!rm -fr /tmp/hive_test/intercomp_local; + +insert overwrite local directory '/tmp/hive_test/intercomp_local/jg' select a.key, count(distinct b.value) from src a join src b on (a.key = b.key) group by a.key; +insert overwrite local directory '/tmp/hive_test/intercomp_local/jj' select a.key, c.key from src a join src b on (a.key = b.key) join src c on (b.value = c.value); + +!rm -fr /tmp/hive_test/intercomp_local; + +drop table if exists src_inter1; \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (revision 1157918) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapRedTask; +import org.apache.hadoop.hive.ql.exec.MapredLocalTask; import org.apache.hadoop.hive.ql.exec.MoveTask; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -519,6 +520,14 @@ listWorks.add(mvWork); listWorks.add(mergeWork); + // Mark the work of the Map Reduce task immediately preceding this (if any) as non-intermediate + if (currTask instanceof MapRedTask) { + ((MapRedTask)currTask).getWork().setIsIntermediate(false); + } + + // Mark the merge work as non-intermediate + mergeWork.setIsIntermediate(false); + ConditionalWork cndWork = new ConditionalWork(listWorks); List> listTasks = new ArrayList>(); @@ -622,6 +631,12 @@ // Set the move task to be dependent on the current task if (mvTask != null) { + // If it is a map reduce task, set its work to be non-intermediate + if (currTask instanceof MapRedTask) { + ((MapRedTask)currTask).getWork().setIsIntermediate(false); + } else if (currTask instanceof MapredLocalTask) { + ((MapredLocalTask)currTask).getWork().setIsIntermediate(false); + } currTask.addDependentTask(mvTask); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (revision 1157918) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (working copy) @@ -92,6 +92,11 @@ job = new JobConf(conf, ExecDriver.class); //we don't use the HadoopJobExecHooks for local tasks this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null); + + if (work.getIsIntermediate()) { + HiveConf.setVar(job, HiveConf.ConfVars.HADOOP_OUTPUT_COMPRESSION_CODEC, + HiveConf.getVar(job, HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC)); + } } public static String now() { @@ -240,6 +245,12 @@ if (work == null) { return -1; } + + if (work.getIsIntermediate()) { + HiveConf.setVar(job, HiveConf.ConfVars.HADOOP_OUTPUT_COMPRESSION_CODEC, + HiveConf.getVar(job, HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC)); + } + memoryMXBean = ManagementFactory.getMemoryMXBean(); long startTime = System.currentTimeMillis(); console.printInfo(Utilities.now() Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1157918) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -171,6 +171,11 @@ HiveConf.setVar(job, ConfVars.HIVEADDEDARCHIVES, addedArchives); } this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); + + if (work.getIsIntermediate()) { + HiveConf.setVar(job, HiveConf.ConfVars.HADOOP_OUTPUT_COMPRESSION_CODEC, + HiveConf.getVar(job, HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC)); + } } /** @@ -181,6 +186,11 @@ this.job = job; console = new LogHelper(LOG, isSilent); this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); + + if (work.getIsIntermediate()) { + HiveConf.setVar(job, HiveConf.ConfVars.HADOOP_OUTPUT_COMPRESSION_CODEC, + HiveConf.getVar(job, HiveConf.ConfVars.HIVE_INTER_MAPRED_COMPRESSION_CODEC)); + } } /** Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (revision 1157918) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (working copy) @@ -47,6 +47,8 @@ private List> dummyParentOp ; + private boolean isIntermediate = true; + public MapredLocalWork() { } @@ -151,6 +153,14 @@ return tmpFileURI; } + public void setIsIntermediate(boolean isIntermediate) { + this.isIntermediate = isIntermediate; + } + + public boolean getIsIntermediate() { + return this.isIntermediate; + } + public static class BucketMapJoinContext implements Serializable { private static final long serialVersionUID = 1L; Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1157918) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -87,6 +87,8 @@ private boolean mapperCannotSpanPartns; + private boolean isIntermediate = true; + public MapredWork() { aliasToPartnInfo = new LinkedHashMap(); } @@ -443,4 +445,11 @@ pathToPartitionInfo.put(path.toString(), partDesc); } + public boolean getIsIntermediate() { + return isIntermediate; + } + + public void setIsIntermediate(boolean isIntermediate) { + this.isIntermediate = isIntermediate; + } }