Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 889444) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -164,6 +164,9 @@ HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long)(256*1000*1000)), HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long)(16*1000*1000)), + HIVESKEWJOIN("hive.optimize.skewjoin", true), + HIVESKEWJOINKEY("hive.skewjoin.key", 100000), + HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000), HIVEJOBPROGRESS("hive.task.progress", false), Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.ErrorMsg; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskRunner; @@ -453,10 +454,32 @@ while (running.size() != 0 || runnable.peek()!=null) { // Launch upto maxthreads tasks while(runnable.peek() != null && running.size() < maxthreads) { - Task tsk = runnable.remove(); - curJobNo = launchTask(tsk, queryId, noName,running, jobname, jobs, curJobNo); + Task tsk = runnable.peek(); + if(tsk instanceof ConditionalTask) { + runnable.remove(); //remove the condition task from the queue + tsk.initialize(conf, plan); + List> childTasks = ((ConditionalTask)tsk).getResTasks(); + if(childTasks != null) { + for(Task currChild: childTasks) + addToRunnable(runnable,currChild); + } + childTasks = ((ConditionalTask)tsk).getChildTasks(); + if(childTasks != null) { + for(Task currChild: childTasks) + addToRunnable(runnable,currChild); + } + } else { + if(tsk.isRunnable()) { + runnable.remove(); + curJobNo = launchTask(tsk, queryId, noName,running, jobname, jobs, curJobNo); + } else { + break; // break the loop and wait for task completion + } + } } + if (running.size() == 0) + break; // poll the Tasks to see which one completed TaskResult tskRes = pollTasks(running.keySet()); TaskRunner tskRun = running.remove(tskRes); Index: ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (working copy) @@ -320,9 +320,11 @@ } else if (task instanceof ConditionalTask) { ConditionalTask cTask = (ConditionalTask)task; - for (Task listTask: cTask.getListTasks()) { - if (!tasksVisited.contains(listTask)) { - tasksToVisit.add(listTask); + if(cTask.getResTasks()!=null) { + for (Task listTask: cTask.getResTasks()) { + if (!tasksVisited.contains(listTask)) { + tasksToVisit.add(listTask); + } } } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (working copy) @@ -20,23 +20,21 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Stack; import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.exprNodeDesc; import org.apache.hadoop.hive.ql.plan.joinCond; import org.apache.hadoop.hive.ql.plan.joinDesc; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -227,9 +225,11 @@ outputObjInspector = getJoinOutputObjectInspector(order, joinValuesStandardObjectInspectors, conf); LOG.info("JOIN " + ((StructObjectInspector)outputObjInspector).getTypeName() + " totalsz = " + totalSz); } - + + transient boolean newGroupStarted = false; public void startGroup() throws HiveException { LOG.trace("Join: Starting new group"); + newGroupStarted = false; storage.clear(); for (Byte alias : order) storage.put(alias, new ArrayList>()); @@ -253,7 +253,7 @@ List valueFields, List valueFieldsOI) throws HiveException { // Compute the values - ArrayList nr = new ArrayList(valueFields.size()); + ArrayList nr = new ArrayList(); for (int i=0; i> listTasks; - private Task resTask; + private List> resTasks; private ConditionalResolver resolver; private Object resolverCtx; @@ -62,13 +62,14 @@ public void initialize (HiveConf conf, QueryPlan queryPlan) { super.initialize(conf, queryPlan); - resTask = listTasks.get(resolver.getTaskId(conf, resolverCtx)); - resTask.initialize(conf, queryPlan); + resTasks = resolver.getTasks(conf, resolverCtx); + for(Task tsk: resTasks) + tsk.initialize(conf, queryPlan); } @Override public int execute() { - return resTask.executeTask(); + return 0; } /** @@ -91,6 +92,21 @@ public Object getResolverCtx() { return resolverCtx; } + + // used to determine whether child tasks can be run. + public boolean done() { + boolean ret = true; + List> parentTasks = this.getParentTasks(); + if (parentTasks != null) { + for(Task par: parentTasks) + ret = ret && par.done(); + } + if (ret && getResTasks() != null) { + for (Task tsk : getResTasks()) + ret = ret && tsk.done(); + } + return ret; + } /** * @param resolverCtx the resolverCtx to set @@ -116,4 +132,12 @@ public int getType() { return StageType.CONDITIONAL; } + + public List> getResTasks() { + return resTasks; + } + + public void setResTasks(List> resTasks) { + this.resTasks = resTasks; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -960,20 +960,21 @@ FileInputFormat.addInputPaths(job, path); else emptyPaths.add(path); + + // If the query references non-existent partitions + // We need to add a empty file, it is not acceptable to change the operator tree + // Consider the query: + // select * from (select count(1) from T union all select count(1) from T2) x; + // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 rows) + if (path == null) + numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false, oneAlias); + } } // Create a empty file if the directory is empty for (String emptyPath : emptyPaths) numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true, oneAlias); - - // If the query references non-existent partitions - // We need to add a empty file, it is not acceptable to change the operator tree - // Consider the query: - // select * from (select count(1) from T union all select count(1) from T2) x; - // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 rows) - if (path == null) - numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false, oneAlias); } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -108,7 +108,6 @@ LOG.info("Writing to temp file: FS " + outPath); HiveOutputFormat hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); - final Class outputClass = serializer.getSerializedClass(); boolean isCompressed = conf.getCompressed(); // The reason to keep these instead of using @@ -117,22 +116,8 @@ // we create. Path parent = Utilities.toTempPath(specPath); finalPath = HiveFileFormatUtils.getOutputFormatFinalPath(parent, jc, hiveOutputFormat, isCompressed, finalPath); - tableDesc tableInfo = conf.getTableInfo(); - JobConf jc_output = jc; - if (isCompressed) { - jc_output = new JobConf(jc); - String codecStr = conf.getCompressCodec(); - if (codecStr != null && !codecStr.trim().equals("")) { - Class codec = (Class) Class.forName(codecStr); - FileOutputFormat.setOutputCompressorClass(jc_output, codec); - } - String type = conf.getCompressType(); - if(type !=null && !type.trim().equals("")) { - CompressionType style = CompressionType.valueOf(type); - SequenceFileOutputFormat.setOutputCompressionType(jc, style); - } - } - outWriter = getRecordWriter(jc_output, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath); + final Class outputClass = serializer.getSerializedClass(); + outWriter = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(), outputClass, conf, outPath); // in recent hadoop versions, use deleteOnExit to clean tmp files. autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, outPath); @@ -146,15 +131,6 @@ } } - public static RecordWriter getRecordWriter(JobConf jc, HiveOutputFormat hiveOutputFormat, - final Class valueClass, boolean isCompressed, - Properties tableProp, Path outPath) throws IOException, HiveException { - if (hiveOutputFormat != null) { - return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass, isCompressed, tableProp, null); - } - return null; - } - Writable recordValue; public void processOp(Object row, int tag) throws HiveException { // Since File Sink is a terminal operator, forward is not called - so, maintain the number of output rows explicitly Index: ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (working copy) @@ -18,15 +18,35 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.File; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.fileSinkDesc; import org.apache.hadoop.hive.ql.plan.joinDesc; +import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; /** @@ -35,15 +55,166 @@ public class JoinOperator extends CommonJoinOperator implements Serializable { private static final long serialVersionUID = 1L; + private transient boolean handleSkewJoin = false; + private transient SkewJoinContext keyContext = null; + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); initializeChildren(hconf); + joinDesc desc = this.getConf(); + this.handleSkewJoin = desc.getHandleSkewJoin(); + if(this.handleSkewJoin) { + keyContext = new SkewJoinContext(); + keyContext.initiliaze(hconf); + } + } + + private class SkewJoinContext { + + private int rowNumber = 0; + private int currTag = -1; + private int currBigKeyTag = -1; + + private int skewKeyDefinition = -1; + private ArrayList skewKeysTableObjectInspector = null; + private ArrayList tblSerializers = null; + private fileSinkDesc fileSink = null; + private List tblDesc = null; + + Map bigKeysWriters = null; + Map> smallKeysWriters = null; + Configuration hconf = null; + String uniqueFeed = UUID.randomUUID().toString(); + LongWritable dummyKey = null; + + + public void initiliaze(Configuration hconf) { + this.hconf = hconf; + joinDesc desc = getConf(); + skewKeyDefinition = desc.getSkewKeyDefinition(); + skewKeysTableObjectInspector = new ArrayList(numAliases); + bigKeysWriters = new HashMap(numAliases); + smallKeysWriters = new HashMap>(numAliases); + tblDesc = desc.getSkewKeysValuesTables(); + tblSerializers = new ArrayList(numAliases); + for (int i = 0; i < numAliases; i++) { + smallKeysWriters.put((byte)i, new HashMap()); + List skewTableValueInspectors = new ArrayList(); + ObjectInspector dummyKeyField = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.LONG); + skewTableValueInspectors.add(dummyKeyField); + try { + SerDe serializer = (SerDe)ReflectionUtils.newInstance(tblDesc.get(i).getDeserializerClass(), null); + serializer.initialize(null, tblDesc.get(i).getProperties()); + tblSerializers.add(serializer); + } catch (SerDeException e) { + LOG.error("Skewjoin will be disabled due to " + e.getMessage(),e); + handleSkewJoin = false; + break; + } + List colNames = Utilities.getColumnNames(tblDesc.get(i).getProperties()); + skewTableValueInspectors.addAll(joinValuesStandardObjectInspectors.get((byte)i)); + StructObjectInspector structTblInpector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, skewTableValueInspectors); + skewKeysTableObjectInspector.add(i, structTblInpector); + } + fileSink = Utilities.getFileSinkOperator(JoinOperator.this).getConf(); + } + + private void flushStorage() throws HiveException { + for (int i = 0; i < numAliases; i++) { + ArrayList> values = storage.remove(Byte.valueOf((byte)i)); + if(values != null) { + for (int k = 0; k < values.size(); k++) { + ArrayList rowValue = values.get(k); + flushRow(rowValue, i); + } + } + } + } + + private void flushRow(ArrayList rowValues, int tag) throws HiveException { + RecordWriter rw = getRecordWriter((byte)tag); + SerDe serde = tblSerializers.get(tag); + rowValues.add(0,dummyKey); + try { + rw.write(serde.serialize(rowValues, skewKeysTableObjectInspector.get(tag))); + } catch (Exception e) { + throw new HiveException(e); + } + } + + private RecordWriter getRecordWriter(byte tag) throws HiveException { + RecordWriter rw = null; + if (tag == currBigKeyTag) { + rw = bigKeysWriters.get(tag); + if (rw == null) { + rw = HiveFileFormatUtils.getHiveRecordWriter(new JobConf(hconf), + tblDesc.get(tag), tblSerializers.get(tag).getSerializedClass(), + fileSink, new Path(getConf().getBigKeysDirMap().get(tag) + File.separator + uniqueFeed)); + bigKeysWriters.put(Byte.valueOf(tag), rw); + } + } else { + rw = smallKeysWriters.get((byte)currBigKeyTag).get(tag); + if (rw == null) { + rw = HiveFileFormatUtils.getHiveRecordWriter(new JobConf(hconf), + tblDesc.get(tag), tblSerializers.get(tag).getSerializedClass(), + fileSink, new Path(getConf().getSmallKeysDirMap().get((byte)currBigKeyTag).get(tag) + File.separator + uniqueFeed)); + smallKeysWriters.get((byte)currBigKeyTag).put(Byte.valueOf(tag), rw); + } + } + + return rw; + } + + public boolean handleSkew(ArrayList values, int tag) throws HiveException { + + if(newGroupStarted || tag != currTag) { + rowNumber = 0; + currTag = tag; + } + if(newGroupStarted) { + currBigKeyTag = -1; + newGroupStarted = false; + } + + rowNumber++; + if (currBigKeyTag == -1 && (tag < numAliases - 1) + && rowNumber >= skewKeyDefinition) { + // the first time we see a big key. If this key is not in the last + // table (the last table can always be streamed), we define that the + // skew is happening now. + currBigKeyTag = tag; + //create a new dummy key + ArrayList keyhash = new ArrayList(); + keyhash.add(values.hashCode()); + keyhash.add(tag); + keyhash.add(uniqueFeed); + dummyKey = new LongWritable(keyhash.hashCode()); + flushStorage(); + } + + if(currBigKeyTag >= 0 ) { + flushRow(values, tag); + return true; + } else { + return false; + } + } + + public void close() throws IOException { + for(RecordWriter rw: bigKeysWriters.values()) + rw.close(done); + for(Map rwMap : smallKeysWriters.values()) { + for(RecordWriter rw: rwMap.values()) + rw.close(done); + } + } } public void processOp(Object row, int tag) throws HiveException { try { + // get alias alias = (byte)tag; @@ -52,6 +223,12 @@ ArrayList nr = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias)); + if(this.handleSkewJoin && keyContext.handleSkew(nr, tag)) { + if (reporter != null) + reporter.progress(); + return; + } + // number of rows for the key in the given table int sz = storage.get(alias).size(); @@ -91,6 +268,33 @@ return OperatorType.JOIN; } + /** + * All done + * + */ + public void closeOp(boolean abort) throws HiveException { + if(this.handleSkewJoin) { + try { + keyContext.close(); + } catch (IOException e) { + throw new HiveException(e); + } + } + } + + /** + * Forward a record of join results. + * + * @throws HiveException + */ + public void endGroup() throws HiveException { + if(this.handleSkewJoin && keyContext.currBigKeyTag >=0) { + return; + } + else { + checkAndGenObject(); + } + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -54,12 +54,16 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.*; import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.commons.logging.LogFactory; @@ -337,6 +341,15 @@ public static tableDesc getTableDesc(Table tbl) { return (new tableDesc (tbl.getDeserializer().getClass(), tbl.getInputFormatClass(), tbl.getOutputFormatClass(), tbl.getSchema())); } + + //column names and column types are all delimited by comma + public static tableDesc getTableDesc(String cols, String colTypes) { + return (new tableDesc(LazySimpleSerDe.class, SequenceFileInputFormat.class, + HiveSequenceFileOutputFormat.class, Utilities.makeProperties( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode, + org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols, + org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes))); + } public static partitionDesc getPartitionDesc(Partition part) throws HiveException { @@ -835,6 +848,19 @@ } return names; } + + public static List getColumnNames(Properties props) { + List names = new ArrayList(); + String colNames = props.getProperty(Constants.LIST_COLUMNS); + String[] cols = colNames.trim().split(","); + if (cols != null) { + for(String col : cols) { + if(col!=null && !col.trim().equals("")) + names.add(col); + } + } + return names; + } public static void validateColumnNames(List colNames, List checkCols) throws SemanticException { @@ -874,4 +900,19 @@ } return notificationInterval; } + + public static FileSinkOperator getFileSinkOperator(Operator baseOp) { + Queue> queue = new LinkedList>(); + queue.addAll(baseOp.getChildOperators()); + while(queue.peek()!=null) { + Operator op = queue.remove(); + if(op instanceof FileSinkOperator) { + return (FileSinkOperator)op; + } + if(op.getChildOperators()!=null) + queue.addAll(op.getChildOperators()); + } + return null; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (working copy) @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -30,7 +31,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.fileSinkDesc; +import org.apache.hadoop.hive.ql.plan.tableDesc; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -187,4 +195,44 @@ } return true; } + + + public static RecordWriter getHiveRecordWriter(JobConf jc, + tableDesc tableInfo, Class outputClass, + fileSinkDesc conf, Path outPath) throws HiveException { + try { + HiveOutputFormat hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance(); + boolean isCompressed = conf.getCompressed(); + JobConf jc_output = jc; + if (isCompressed) { + jc_output = new JobConf(jc); + String codecStr = conf.getCompressCodec(); + if (codecStr != null && !codecStr.trim().equals("")) { + Class codec = (Class) Class.forName(codecStr); + FileOutputFormat.setOutputCompressorClass(jc_output, codec); + } + String type = conf.getCompressType(); + if (type != null && !type.trim().equals("")) { + CompressionType style = CompressionType.valueOf(type); + SequenceFileOutputFormat.setOutputCompressionType(jc, style); + } + } + return getRecordWriter(jc_output, hiveOutputFormat, outputClass, + isCompressed, tableInfo.getProperties(), outPath); + } catch (Exception e) { + throw new HiveException(e); + } + } + + public static RecordWriter getRecordWriter(JobConf jc, + HiveOutputFormat hiveOutputFormat, + final Class valueClass, boolean isCompressed, + Properties tableProp, Path outPath) throws IOException, HiveException { + if (hiveOutputFormat != null) { + return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass, + isCompressed, tableProp, null); + } + return null; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRSkewJoinProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRSkewJoinProcessor.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRSkewJoinProcessor.java (revision 0) @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.Serializable; +import java.io.StringReader; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Vector; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QBJoinTree; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; +import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles; +import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin; +import org.apache.hadoop.hive.ql.plan.ConditionalWork; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.exprNodeDesc; +import org.apache.hadoop.hive.ql.plan.extractDesc; +import org.apache.hadoop.hive.ql.plan.fetchWork; +import org.apache.hadoop.hive.ql.plan.fileSinkDesc; +import org.apache.hadoop.hive.ql.plan.joinDesc; +import org.apache.hadoop.hive.ql.plan.loadFileDesc; +import org.apache.hadoop.hive.ql.plan.mapJoinDesc; +import org.apache.hadoop.hive.ql.plan.mapredLocalWork; +import org.apache.hadoop.hive.ql.plan.mapredWork; +import org.apache.hadoop.hive.ql.plan.moveWork; +import org.apache.hadoop.hive.ql.plan.partitionDesc; +import org.apache.hadoop.hive.ql.plan.reduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.tableDesc; +import org.apache.hadoop.hive.ql.plan.tableScanDesc; +import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.util.ReflectionUtils; + +public class GenMRSkewJoinProcessor { + + public GenMRSkewJoinProcessor() { + } + + /** + * Create tasks for processing skew joins. The idea is (HIVE-964) to use + * separated jobs and map-joins to handle skew joins. + *

+ *

    + *
  • + * Number of mr jobs to handle skew keys is the number of table minus 1 (we + * can stream the last table, so big keys in the last table will not be a + * problem). + *
  • + * At runtime in Join, we output big keys in one table into one corresponding + * directories, and all same keys in other tables into different dirs(one for + * each table). The directories will look like: + *
      + *
    • + * dir-T1-bigkeys(containing big keys in T1), dir-T2-keys(containing keys + * which is big in T1),dir-T3-keys(containing keys which is big in T1), ... + *
    • + * dir-T1-keys(containing keys which is big in T2), dir-T2-bigkeys(containing + * big keys in T2),dir-T3-keys(containing keys which is big in T2), ... + *
    • + * dir-T1-keys(containing keys which is big in T3), dir-T2-keys(containing big + * keys in T3),dir-T3-bigkeys(containing keys which is big in T3), ... ..... + *
    + *
+ * For each table, we launch one mapjoin job, taking the directory containing + * big keys in this table and corresponding dirs in other tables as input. + * (Actally one job for one row in the above.) + * + *

+ * For more discussions, please check + * https://issues.apache.org/jira/browse/HIVE-964. + * + */ + public static void processSkewJoin(JoinOperator joinOp, + Task currTask, ParseContext parseCtx) + throws SemanticException { + + // We are trying to adding map joins to handle skew keys, and map join right + // now does not work with outer joins + if (!GenMRSkewJoinProcessor.skewJoinEnabled(parseCtx.getConf(), joinOp)) + return; + + String baseTmpDir = parseCtx.getContext().getMRTmpFileURI(); + + joinDesc joinDescriptor = joinOp.getConf(); + Map bigKeysDirMap = new HashMap(); + Map> smallKeysDirMap = new HashMap>(); + Map skewJoinJobResultsDir = new HashMap(); + Byte[] tags = joinDescriptor.getTagOrder(); + for (Byte src : tags) { + String bigKeysDir = getBigKeysDir(baseTmpDir, src); + bigKeysDirMap.put(src, bigKeysDir); + Map smallKeysMap = new HashMap(); + smallKeysDirMap.put(src, smallKeysMap); + for(Byte src2 : tags) { + if(!src2.equals(src)) + smallKeysMap.put(src2, getSmallKeysDir(baseTmpDir, src, src2)); + } + skewJoinJobResultsDir.put(src, getBigKeysSkewJoinResultDir(baseTmpDir, src)); + } + + joinDescriptor.setHandleSkewJoin(true); + joinDescriptor.setBigKeysDirMap(bigKeysDirMap); + joinDescriptor.setSmallKeysDirMap(smallKeysDirMap); + joinDescriptor.setSkewKeyDefinition(HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVESKEWJOINKEY)); + + Map> bigKeysDirToTaskMap = new HashMap>(); + List listWorks = new ArrayList(); + List> listTasks = new ArrayList>(); + mapredWork currPlan = (mapredWork) currTask.getWork(); + + String dummyKeyName = "skewjoindummykey"; + String dummyKeyType = "bigint"; + tableDesc dummyKeyTblDesc = Utilities.getTableDesc(dummyKeyName, dummyKeyType); + Map> joinValues = joinDescriptor.getExprs(); + List tableDescList = new ArrayList(); + List newJoinValueTblDesc = new ArrayList(); + Map> newJoinValues = new HashMap>(); + Map> newJoinKeys = new HashMap>(); + for (int tag = 0; tag < joinValues.size(); tag++) { + List valueCols = joinValues.get((byte)tag); + String colNames = dummyKeyName; + String colTypes = dummyKeyType; + int columnSize = valueCols.size(); + List newValueExpr = new ArrayList(); + List newKeyExpr = new ArrayList(); + newKeyExpr.add(new exprNodeColumnDesc(TypeInfoFactory.getPrimitiveTypeInfo(dummyKeyType), dummyKeyName, ""+tag, false)); + for (int k = 0; k < columnSize; k++) { + TypeInfo type = valueCols.get(k).getTypeInfo(); + String newColName = tag + "_VALUE_" + k; // any name, it does not matter. + newValueExpr.add(new exprNodeColumnDesc(type, newColName, ""+tag, false)); //use tag as table alias + colNames = colNames + ","; + colTypes = colTypes + ","; + colNames = colNames + newColName; + colTypes = colTypes + valueCols.get(k).getTypeString(); + } + newJoinValues.put(Byte.valueOf((byte) tag), newValueExpr); + newJoinKeys.put(Byte.valueOf((byte) tag), newKeyExpr); + tableDescList.add(tag, Utilities.getTableDesc(colNames, colTypes)); + + //construct value table Desc + String valueColNames =""; + String valueColTypes =""; + boolean first = true; + for (int k = 0; k < columnSize; k++) { + String newColName = tag + "_VALUE_" + k; // any name, it does not matter. + if(!first) { + valueColNames = valueColNames + ","; + valueColTypes = valueColTypes + ","; + } + valueColNames = valueColNames + newColName; + valueColTypes = valueColTypes + valueCols.get(k).getTypeString(); + first = false; + } + newJoinValueTblDesc.add(tag, Utilities.getTableDesc(valueColNames, valueColTypes)); + } + + joinDescriptor.setSkewKeysValuesTables(tableDescList); + + for(Byte src : tags) { + mapredWork newPlan = PlanUtils.getMapRedWork(); + mapredWork clonePlan = null; + try { + String xmlPlan = currPlan.toXML(); + StringBuffer sb = new StringBuffer(xmlPlan); + ByteArrayInputStream bis; + bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8")); + clonePlan = Utilities.deserializeMapRedWork(bis, parseCtx.getConf()); + } catch (UnsupportedEncodingException e) { + throw new SemanticException(e); + } + + Operator[] parentOps = new TableScanOperator[tags.length]; + for(int i =0;i ts = OperatorFactory.get(tableScanDesc.class, (RowSchema)null); + parentOps[i] = ts; + } + Operator tblScan_op = parentOps[(int)src]; + + ArrayList aliases = new ArrayList(); + String alias = src.toString(); + aliases.add(alias); + String bigKeyDirPath = bigKeysDirMap.get(src); + newPlan.getPathToAliases().put(bigKeyDirPath, aliases); + newPlan.getAliasToWork().put(alias, tblScan_op); + partitionDesc part = new partitionDesc(tableDescList.get(src), null); + newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part); + newPlan.getAliasToPartnInfo().put(alias, part); + + Operator reducer = clonePlan.getReducer(); + assert reducer instanceof JoinOperator; + JoinOperator cloneJoinOp = (JoinOperator) reducer; + + mapJoinDesc mapJoinDescriptor = new mapJoinDesc(newJoinKeys, + dummyKeyTblDesc, newJoinValues, newJoinValueTblDesc, joinDescriptor.getOutputColumnNames(), + src, joinDescriptor.getConds()); + + // construct a map join and set it as the child operator of tblScan_op + MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(mapJoinDescriptor, (RowSchema)null, parentOps); + + // change the children of the original join operator to point to the map + // join operator + List> childOps = cloneJoinOp.getChildOperators(); + for (Operator childOp : childOps) + childOp.replaceParent(cloneJoinOp, mapJoinOp); + + mapJoinOp.setChildOperators(childOps); + + mapredLocalWork localPlan = newPlan.getMapLocalWork(); + if (localPlan == null) + localPlan = new mapredLocalWork( + new LinkedHashMap>(), + new LinkedHashMap()); + Map smallTblDirs = smallKeysDirMap.get(src); + for (Byte i : tags) { + if(i == src) + continue; + String alias_id = i.toString(); + Operator tblScan_op2 = parentOps[(int)i]; + localPlan.getAliasToWork().put(alias_id, tblScan_op2); +// newPlan.getPathToAliases().put(smallTblDirs.get(alias_id), aliases); + newPlan.getAliasToWork().put(alias_id, tblScan_op2); + newPlan.getPathToPartitionInfo().put(smallTblDirs.get(i), new partitionDesc(tableDescList.get(i), null)); + Path tblDir = new Path(smallTblDirs.get(i)); + localPlan.getAliasToFetchWork().put(alias_id, new fetchWork(tblDir.toString(), tableDescList.get(i))); + } + +// replaceTaskOutputDir(tblScan_op, skewJoinJobResultsDir.get(src)); + newPlan.setMapLocalWork(localPlan); + Task skewJoinMapJoinTask = TaskFactory.get(newPlan, parseCtx.getConf()); + bigKeysDirToTaskMap.put(bigKeyDirPath, skewJoinMapJoinTask); + listWorks.add(skewJoinMapJoinTask.getWork()); + listTasks.add(skewJoinMapJoinTask); + } + + ConditionalWork cndWork = new ConditionalWork(listWorks); + ConditionalTask cndTsk = (ConditionalTask)TaskFactory.get(cndWork, parseCtx.getConf()); + cndTsk.setListTasks(listTasks); + cndTsk.setResolver(new ConditionalResolverSkewJoin()); + cndTsk.setResolverCtx(new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap)); + List> oldChildTasks = currTask.getChildTasks(); + currTask.setChildTasks(new ArrayList>()); + currTask.addDependentTask(cndTsk); + + if (oldChildTasks != null) { + for (Task oldChild : oldChildTasks) + cndTsk.addDependentTask(oldChild); + } + //add a list of move tasks +// String finalDir = findCurrTaskOutputDir(joinOp); +// for (int i = 0; i < tags.length; i++) { +// moveWork dummyMv = new moveWork(null, null, null, new loadFileDesc( +// skewJoinJobResultsDir.get(tags[i]), finalDir, true, null, null), +// false); +// Task mergeTask = TaskFactory.get(dummyMv, parseCtx.getConf()); +// cndTsk.addDependentTask(mergeTask); +// if(oldChildTasks!=null) { +// for (Task oldChild : oldChildTasks) +// mergeTask.addDependentTask(oldChild); +// } +// } + return; + } + + private static String findCurrTaskOutputDir(Operator baseOp) throws SemanticException { + Queue> queue = new LinkedList>(); + queue.addAll(baseOp.getChildOperators()); + while(queue.peek()!=null) { + Operator op = queue.remove(); + if(op instanceof FileSinkOperator) { + return ((FileSinkOperator)op).getConf().getDirName(); + } + if(op.getChildOperators()!=null) + queue.addAll(op.getChildOperators()); + } + throw new SemanticException("Can not find a FileSinkOperator followed by JoinOperator"); + } + + private static void replaceTaskOutputDir(Operator baseOp, String newDir){ + Queue> queue = new LinkedList>(); + queue.addAll(baseOp.getChildOperators()); + while(queue.peek()!=null) { + Operator op = queue.remove(); + if(op instanceof FileSinkOperator) { + ((FileSinkOperator)op).getConf().setDirName(newDir); + } + if(op.getChildOperators()!=null) + queue.addAll(op.getChildOperators()); + } + } + + private static String skewJoinPrefix = "hive_skew_join"; + private static String UNDERLINE = "_"; + private static String BIGKEYS = "bigkeys"; + private static String SMALLKEYS = "smallkeys"; + private static String RESULTS = "results"; + static String getBigKeysDir(String baseDir, Byte srcTbl) { + return baseDir + File.separator + skewJoinPrefix + UNDERLINE + BIGKEYS + + UNDERLINE + srcTbl; + } + + static String getBigKeysSkewJoinResultDir(String baseDir, Byte srcTbl) { + return baseDir + File.separator + skewJoinPrefix + UNDERLINE + BIGKEYS + + UNDERLINE + RESULTS + UNDERLINE+ srcTbl; + } + + static String getSmallKeysDir(String baseDir, Byte srcTblBigTbl, + Byte srcTblSmallTbl) { + return baseDir + File.separator + skewJoinPrefix + UNDERLINE + SMALLKEYS + + UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl; + } + + public static boolean skewJoinEnabled(HiveConf conf, JoinOperator joinOp) { + joinDesc desc = joinOp.getConf(); + org.apache.hadoop.hive.ql.plan.joinCond[] condns = desc.getConds(); + + if (conf != null && !conf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) + return false; + + for (org.apache.hadoop.hive.ql.plan.joinCond condn : condns) { + if (condn.getType() == joinDesc.FULL_OUTER_JOIN + || (condn.getType() == joinDesc.LEFT_OUTER_JOIN) + || condn.getType() == joinDesc.RIGHT_OUTER_JOIN) + return false; + } + + // go over the joinOp's children operators to see if there is a groupby + // operator. This may happen as an optimization technique when group by keys + // are the same as join keys + Queue> children = new LinkedList>(); + children.addAll(joinOp.getChildOperators()); + while(children.peek()!=null){ + Operator child = children.remove(); + if(child instanceof GroupByOperator) + return false; + else if (child.getChildOperators() != null) + children.addAll(child.getChildOperators()); + } + return true; + } +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -73,6 +73,7 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.GenMRSkewJoinProcessor; import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; @@ -4724,7 +4725,7 @@ opRules.put(new RuleRegExp(new String("R9"), "UNION%.*MAPJOIN%"), MapJoinFactory.getUnionMapJoin()); opRules.put(new RuleRegExp(new String("R10"), "MAPJOIN%.*MAPJOIN%"), MapJoinFactory.getMapJoinMapJoin()); opRules.put(new RuleRegExp(new String("R11"), "MAPJOIN%SEL%"), MapJoinFactory.getMapJoin()); - + // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules, procCtx); @@ -4738,6 +4739,9 @@ // For each task, go over all operators recursively for (Task rootTask: rootTasks) breakTaskTree(rootTask); + + for (Task rootTask: rootTasks) + handleSkewJoin(rootTask); // For each task, set the key descriptor for the reducer for (Task rootTask: rootTasks) @@ -4770,6 +4774,27 @@ } } } + + private void handleSkewJoin(Task task) throws SemanticException { + if ((task instanceof MapRedTask) || (task instanceof ExecDriver)) { + Operator reducer = ((mapredWork) task.getWork()).getReducer(); + if (reducer !=null && reducer instanceof JoinOperator) + GenMRSkewJoinProcessor.processSkewJoin((JoinOperator)reducer, task, getParseContext()); + } else if (task instanceof ConditionalTask) { + // conditional task's list tasks are not its children tasks, we need + // handle them separately + List> listTasks = ((ConditionalTask) task) + .getListTasks(); + for (Task tsk : listTasks) + handleSkewJoin(tsk); + } + + if (task.getChildTasks() == null) + return; + + for (Task childTask : task.getChildTasks()) + handleSkewJoin(childTask); + } /** * Find all leaf tasks of the list of root tasks. @@ -4835,7 +4860,7 @@ for (Operator child: op.getChildOperators()) generateCountersOperator(child); } - + // loop over all the tasks recursviely private void breakTaskTree(Task task) { Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java (working copy) @@ -18,7 +18,11 @@ package org.apache.hadoop.hive.ql.plan; +import java.io.Serializable; +import java.util.List; + import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; /** * Conditional task resolution interface. This is invoked at run time to get the task to invoke. @@ -31,5 +35,5 @@ * @param ctx opaque context * @return position of the task */ - public int getTaskId(HiveConf conf, Object ctx); + public List> getTasks(HiveConf conf, Object ctx); } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.FileStatus; @@ -82,10 +83,11 @@ } } - public int getTaskId(HiveConf conf, Object objCtx) { + public List> getTasks(HiveConf conf, Object objCtx) { ConditionalResolverMergeFilesCtx ctx = (ConditionalResolverMergeFilesCtx)objCtx; String dirName = ctx.getDir(); + List> resTsks = new ArrayList>(); // check if a map-reduce job is needed to merge the files // If the current size is smaller than the target, merge long trgtSize = conf.getLongVar(HiveConf.ConfVars.HIVEMERGEMAPFILESSIZE); @@ -114,13 +116,14 @@ reducers = Math.max(1, reducers); reducers = Math.min(maxReducers, reducers); work.setNumReduceTasks(reducers); - - return 1; + resTsks.add(tsk); + return resTsks; } } } catch (IOException e) { e.printStackTrace(); } - return 0; + resTsks.add(ctx.getListTasks().get(0)); + return resTsks; } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java (revision 0) @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.plan; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; + +public class ConditionalResolverSkewJoin implements ConditionalResolver, Serializable { + private static final long serialVersionUID = 1L; + + public static class ConditionalResolverSkewJoinCtx implements Serializable { + private static final long serialVersionUID = 1L; + // we store big keys in one table into one dir, and same keys in other + // tables into corresponding different dirs (one dir per table). + // this map stores mapping from "big key dir" to its corresponding mapjoin + // task. + Map> dirToTaskMap; + + public ConditionalResolverSkewJoinCtx( + Map> dirToTaskMap) { + super(); + this.dirToTaskMap = dirToTaskMap; + } + + public Map> getDirToTaskMap() { + return dirToTaskMap; + } + + public void setDirToTaskMap( + Map> dirToTaskMap) { + this.dirToTaskMap = dirToTaskMap; + } + } + + public ConditionalResolverSkewJoin(){ + } + + @Override + public List> getTasks(HiveConf conf, Object objCtx) { + ConditionalResolverSkewJoinCtx ctx = (ConditionalResolverSkewJoinCtx)objCtx; + List> resTsks = new ArrayList>(); + + Map> dirToTaskMap = ctx.getDirToTaskMap(); + Iterator>> bigKeysPathsIter = dirToTaskMap.entrySet().iterator(); + try { + while(bigKeysPathsIter.hasNext()) { + Entry> entry = bigKeysPathsIter.next(); + String path = entry.getKey(); + Path dirPath = new Path(path); + FileSystem inpFs = dirPath.getFileSystem(conf); + FileStatus[] fstatus = inpFs.listStatus(dirPath); + if (fstatus.length > 0) + resTsks.add(entry.getValue()); + } + }catch (IOException e) { + e.printStackTrace(); + } + return resTsks; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (working copy) @@ -236,6 +236,32 @@ order )); } + +// public static String getColumnNames(Properties props) { +// return props.getProperty(Constants.LIST_COLUMNS); +// } +// +// public static List getColumnNameList(Properties props) { +// List colNames = new ArrayList(); +// String columns = getColumnNames(props); +// String[] cols = columns.split(","); +// for(String col : cols) +// colNames.add(col); +// return colNames; +// } +// +// public static String getColumnTypes(Properties props) { +// return props.getProperty(Constants.LIST_COLUMN_TYPES); +// } +// +// public static List getColumnTypeList(Properties props) { +// List colTypes = new ArrayList(); +// String columnTypes = getColumnTypes(props); +// String[] cols = columnTypes.split(","); +// for(String col : cols) +// colTypes.add(col); +// return colTypes; +// } /** * Generate the table descriptor for Map-side join key. Index: ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java (revision 889444) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java (working copy) @@ -44,6 +44,13 @@ public static final int UNIQUE_JOIN = 4; public static final int LEFT_SEMI_JOIN = 5; + //used to handle skew join + private boolean handleSkewJoin = false; + private int skewKeyDefinition = -1; + private Map bigKeysDirMap; + private Map> smallKeysDirMap; + private List skewKeysValuesTables; + // alias to key mapping private Map> exprs; @@ -185,4 +192,77 @@ public void setTagOrder(Byte[] tagOrder) { this.tagOrder = tagOrder; } + + @explain(displayName="handleSkewJoin") + public boolean getHandleSkewJoin() { + return handleSkewJoin; + } + + /** + * set to handle skew join in this join op + * @param handleSkewJoin + */ + public void setHandleSkewJoin(boolean handleSkewJoin) { + this.handleSkewJoin = handleSkewJoin; + } + + /** + * @return mapping from tbl to dir for big keys + */ + public Map getBigKeysDirMap() { + return bigKeysDirMap; + } + + /** + * set the mapping from tbl to dir for big keys + * @param bigKeysDirMap + */ + public void setBigKeysDirMap(Map bigKeysDirMap) { + this.bigKeysDirMap = bigKeysDirMap; + } + + /** + * @return mapping from tbl to dir for small keys + */ + public Map> getSmallKeysDirMap() { + return smallKeysDirMap; + } + + /** + * set the mapping from tbl to dir for small keys + * @param bigKeysDirMap + */ + public void setSmallKeysDirMap(Map> smallKeysDirMap) { + this.smallKeysDirMap = smallKeysDirMap; + } + + /** + * @return skew key definition. If we see a key's associated entries' number + * is bigger than this, we will define this key as a skew key. + */ + public int getSkewKeyDefinition() { + return skewKeyDefinition; + } + + /** + * set skew key definition + * @param skewKeyDefinition + */ + public void setSkewKeyDefinition(int skewKeyDefinition) { + this.skewKeyDefinition = skewKeyDefinition; + } + + /** + * @return the table desc for storing skew keys and their corresponding value; + */ + public List getSkewKeysValuesTables() { + return skewKeysValuesTables; + } + + /** + * @param skewKeysValuesTable set the table desc for storing skew keys and their corresponding value; + */ + public void setSkewKeysValuesTables(List skewKeysValuesTables) { + this.skewKeysValuesTables = skewKeysValuesTables; + } }