diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index 5d41fa1..1a0d4bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -64,23 +64,9 @@ * */ public final class OperatorFactory { + private static final List opvec; + private static final List vectorOpvec; - /** - * OpTuple. - * - * @param - */ - public static final class OpTuple { - public Class descClass; - public Class> opClass; - - public OpTuple(Class descClass, Class> opClass) { - this.descClass = descClass; - this.opClass = opClass; - } - } - - public static ArrayList opvec; static { opvec = new ArrayList(); opvec.add(new OpTuple(FilterDesc.class, FilterOperator.class)); @@ -116,7 +102,6 @@ public OpTuple(Class descClass, Class> opClass) { MuxOperator.class)); } - public static ArrayList vectorOpvec; static { vectorOpvec = new ArrayList(); vectorOpvec.add(new OpTuple(SelectDesc.class, VectorSelectOperator.class)); @@ -130,6 +115,17 @@ public OpTuple(Class descClass, Class> opClass) { vectorOpvec.add(new OpTuple(LimitDesc.class, VectorLimitOperator.class)); } + private static final class OpTuple { + private final Class descClass; + private final Class> opClass; + + public OpTuple(Class descClass, Class> opClass) { + this.descClass = descClass; + this.opClass = opClass; + } + } + + public static Operator getVectorOperator(T conf, VectorizationContext vContext) throws HiveException { Class descClass = (Class) conf.getClass(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 6d9f5e4..5745718 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -61,9 +63,20 @@ PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex"); } + private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName()); private static final long serialVersionUID = 1L; - protected transient OutputCollector out; + private static final MurmurHash hash = (MurmurHash)MurmurHash.getInstance(); + + private transient ObjectInspector[] partitionObjectInspectors; + private transient ObjectInspector[] bucketObjectInspectors; + private transient int buckColIdxInKey; + private boolean firstRow; + private transient int tag; + private boolean skipTag = false; + private transient InspectableObject tempInspectableObject = new InspectableObject(); + private transient int[] valueIndex; // index for value(+ from keys, - from values) + protected transient OutputCollector out; /** * The evaluators for the key columns. Key columns decide the sort order on * the reducer side. Key columns are passed to the reducer in the "key". @@ -84,38 +97,40 @@ * Evaluators for bucketing columns. This is used to compute bucket number. */ protected transient ExprNodeEvaluator[] bucketEval = null; - - // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is - // ready + // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is ready protected transient Serializer keySerializer; protected transient boolean keyIsText; protected transient Serializer valueSerializer; - transient int tag; protected transient byte[] tagByte = new byte[1]; - transient protected int numDistributionKeys; - transient protected int numDistinctExprs; - transient String[] inputAliases; // input aliases of this RS for join (used for PPD) - private boolean skipTag = false; + protected transient int numDistributionKeys; + protected transient int numDistinctExprs; + protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD) protected transient boolean autoParallel = false; - - protected static final MurmurHash hash = (MurmurHash)MurmurHash.getInstance(); - - private transient int[] valueIndex; // index for value(+ from keys, - from values) - - public void setInputAliases(String[] inputAliases) { - this.inputAliases = inputAliases; - } - - public String[] getInputAliases() { - return inputAliases; - } - - public void setOutputCollector(OutputCollector _out) { - this.out = _out; - } - // picks topN K:V pairs from input. protected transient TopNHash reducerHash = new TopNHash(); + protected transient HiveKey keyWritable = new HiveKey(); + protected transient ObjectInspector keyObjectInspector; + protected transient ObjectInspector valueObjectInspector; + protected transient Object[] cachedValues; + protected transient List> distinctColIndices; + protected transient Random random; + /** + * This two dimensional array holds key data and a corresponding Union object + * which contains the tag identifying the aggregate expression for distinct columns. + * + * If there is no distict expression, cachedKeys is simply like this. + * cachedKeys[0] = [col0][col1] + * + * with two distict expression, union(tag:key) is attatched for each distinct expression + * cachedKeys[0] = [col0][col1][0:dist1] + * cachedKeys[1] = [col0][col1][1:dist2] + * + * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1 + * see {@link ExprNodeColumnEvaluator} + */ + // TODO: we only ever use one row of these at a time. Why do we need to cache multiple? + protected transient Object[][] cachedKeys; + @Override protected void initializeOp(Configuration hconf) throws HiveException { try { @@ -184,40 +199,12 @@ protected void initializeOp(Configuration hconf) throws HiveException { firstRow = true; initializeChildren(hconf); } catch (Exception e) { - e.printStackTrace(); + String msg = "Error initializing ReduceSinkOperator: " + e.getMessage(); + LOG.error(msg, e); throw new RuntimeException(e); } } - transient InspectableObject tempInspectableObject = new InspectableObject(); - protected transient HiveKey keyWritable = new HiveKey(); - - protected transient ObjectInspector keyObjectInspector; - protected transient ObjectInspector valueObjectInspector; - transient ObjectInspector[] partitionObjectInspectors; - transient ObjectInspector[] bucketObjectInspectors = null; - transient int buckColIdxInKey; - - protected transient Object[] cachedValues; - protected transient List> distinctColIndices; - /** - * This two dimensional array holds key data and a corresponding Union object - * which contains the tag identifying the aggregate expression for distinct columns. - * - * If there is no distict expression, cachedKeys is simply like this. - * cachedKeys[0] = [col0][col1] - * - * with two distict expression, union(tag:key) is attatched for each distinct expression - * cachedKeys[0] = [col0][col1][0:dist1] - * cachedKeys[1] = [col0][col1][1:dist2] - * - * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1 - * see {@link ExprNodeColumnEvaluator} - */ - // TODO: we only ever use one row of these at a time. Why do we need to cache multiple? - protected transient Object[][] cachedKeys; - boolean firstRow; - protected transient Random random; /** * Initializes array of ExprNodeEvaluator. Adds Union field for distinct @@ -509,4 +496,16 @@ public void setValueIndex(int[] valueIndex) { public int[] getValueIndex() { return valueIndex; } + + public void setInputAliases(String[] inputAliases) { + this.inputAliases = inputAliases; + } + + public String[] getInputAliases() { + return inputAliases; + } + + public void setOutputCollector(OutputCollector _out) { + this.out = _out; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 2f5f60c..4e0fd79 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -250,7 +250,7 @@ public void close() { + used_memory); } - reportStats rps = new reportStats(rp); + ReportStats rps = new ReportStats(rp); mo.preorderMap(rps); return; } catch (Exception e) { @@ -285,10 +285,10 @@ public static void setDone(boolean done) { * reportStats. * */ - public static class reportStats implements Operator.OperatorFunc { - Reporter rp; + public static class ReportStats implements Operator.OperatorFunc { + private final Reporter rp; - public reportStats(Reporter rp) { + public ReportStats(Reporter rp) { this.rp = rp; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index a6a4c32..c9e469c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -306,7 +306,7 @@ public void close() { } reducer.close(abort); - reportStats rps = new reportStats(rp); + ReportStats rps = new ReportStats(rp); reducer.preorderMap(rps); } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index d964eb1..8513e33 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; @@ -225,7 +225,7 @@ void close(){ if (isLogInfoEnabled) { logCloseInfo(); } - reportStats rps = new reportStats(reporter); + ReportStats rps = new ReportStats(reporter); mapOp.preorderMap(rps); return; } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index e884afd..5abff4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; @@ -519,7 +519,7 @@ void close(){ dummyOp.close(abort); } } - reportStats rps = new reportStats(reporter); + ReportStats rps = new ReportStats(reporter); reducer.preorderMap(rps); } catch (Exception e) {