diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java index d2c981d..516ba42 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java @@ -46,13 +46,14 @@ FILTERED, PASSED } - private final transient LongWritable filtered_count, passed_count; + protected final transient LongWritable filtered_count; + protected final transient LongWritable passed_count; private transient ExprNodeEvaluator conditionEvaluator; private transient PrimitiveObjectInspector conditionInspector; private transient int consecutiveFails; private transient int consecutiveSearches; private transient IOContext ioContext; - transient int heartbeatInterval; + protected transient int heartbeatInterval; public FilterOperator() { super(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index ccf55ba..ffbc672 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -142,8 +142,16 @@ transient StructObjectInspector newKeyObjectInspector; transient StructObjectInspector currentKeyObjectInspector; public static MemoryMXBean memoryMXBean; - private long maxMemory; - private float memoryThreshold; + + /** + * Total amount of memory allowed for JVM heap. + */ + protected long maxMemory; + + /** + * configure percent of memory threshold usable by QP. + */ + protected float memoryThreshold; private boolean groupingSetsPresent; private int groupingSetsPosition; @@ -160,10 +168,18 @@ transient List[] aggrPositions; transient int fixedRowSize; - transient long maxHashTblMemory; + + /** + * Max memory usable by the hashtable before it should flush. + */ + protected transient long maxHashTblMemory; transient int totalVariableSize; transient int numEntriesVarSize; - transient int numEntriesHashTable; + + /** + * Current number of entries in the hash table. + */ + protected transient int numEntriesHashTable; transient int countAfterReport; transient int heartbeatInterval; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 6a538e8..498c6de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -73,11 +73,11 @@ // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is // ready - transient Serializer keySerializer; - transient boolean keyIsText; - transient Serializer valueSerializer; + protected transient Serializer keySerializer; + protected transient boolean keyIsText; + protected transient Serializer valueSerializer; transient int tag; - transient byte[] tagByte = new byte[1]; + protected transient byte[] tagByte = new byte[1]; transient protected int numDistributionKeys; transient protected int numDistinctExprs; transient String inputAlias; // input alias of this RS for join (used for PPD) @@ -140,20 +140,20 @@ protected void initializeOp(Configuration hconf) throws HiveException { } transient InspectableObject tempInspectableObject = new InspectableObject(); - transient HiveKey keyWritable = new HiveKey(); - transient Writable value; + protected transient HiveKey keyWritable = new HiveKey(); + protected transient Writable value; transient StructObjectInspector keyObjectInspector; transient StructObjectInspector valueObjectInspector; transient ObjectInspector[] partitionObjectInspectors; - transient Object[][] cachedKeys; - transient Object[] cachedValues; - transient List> distinctColIndices; + protected transient Object[][] cachedKeys; + protected transient Object[] cachedValues; + protected transient List> distinctColIndices; boolean firstRow; - transient Random random; + protected transient Random random; /** * Initializes array of ExprNodeEvaluator. Adds Union field for distinct diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java index 2d40037..ebe4ac5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java @@ -18,11 +18,9 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.Serializable; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -30,27 +28,15 @@ import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.io.LongWritable; /** * Filter operator implementation. **/ -public class VectorFilterOperator extends Operator implements - Serializable { +public class VectorFilterOperator extends FilterOperator { private static final long serialVersionUID = 1L; - /** - * Counter. - * - */ - public static enum Counter { - FILTERED, PASSED - } - - private final transient LongWritable filtered_count, passed_count; private VectorExpression conditionEvaluator = null; - transient int heartbeatInterval; // filterMode is 1 if condition is always true, -1 if always false // and 0 if condition needs to be computed. @@ -66,8 +52,6 @@ public VectorFilterOperator(VectorizationContext vContext, OperatorDesc conf) public VectorFilterOperator() { super(); - filtered_count = new LongWritable(); - passed_count = new LongWritable(); this.conf = (FilterDesc) conf; } @@ -120,23 +104,10 @@ public void processOp(Object row, int tag) throws HiveException { } } - /** - * @return the name of the operator - */ - @Override - public String getName() { - return getOperatorName(); - } - static public String getOperatorName() { return "FIL"; } - @Override - public OperatorType getType() { - return OperatorType.FILTER; - } - public VectorExpression getConditionEvaluator() { return conditionEvaluator; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index c8f0825..f213ee8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.Serializable; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.util.ArrayList; @@ -31,8 +30,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.KeyWrapper; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; @@ -52,7 +51,7 @@ * stores the aggregate operators' intermediate states. Emits row mode output. * */ -public class VectorGroupByOperator extends Operator implements Serializable { +public class VectorGroupByOperator extends GroupByOperator { private static final Log LOG = LogFactory.getLog( VectorGroupByOperator.class.getName()); @@ -83,21 +82,6 @@ private transient VectorHashKeyWrapperBatch keyWrappersBatch; /** - * Total amount of memory allowed for JVM heap. - */ - private transient long maxMemory; - - /** - * configure percent of memory threshold usable by QP. - */ - private transient float memoryThreshold; - - /** - * Max memory usable by the hashtable before it should flush. - */ - private transient long maxHashTblMemory; - - /** * Total per hashtable entry fixed memory (does not depend on key/agg values). */ private transient int fixedHashEntrySize; @@ -108,11 +92,6 @@ private transient int avgVariableSize; /** - * Current number of entries in the hash table. - */ - private transient int numEntriesHashTable; - - /** * Number of entries added to the hashtable since the last check if it should flush. */ private transient int numEntriesSinceCheck; @@ -434,23 +413,10 @@ public void closeOp(boolean aborted) throws HiveException { } } - /** - * @return the name of the operator - */ - @Override - public String getName() { - return getOperatorName(); - } - static public String getOperatorName() { return "GBY"; } - @Override - public OperatorType getType() { - return OperatorType.GROUPBY; - } - public VectorExpression[] getKeyExpressions() { return keyExpressions; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java index 33711c6..6df3551 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java @@ -19,18 +19,15 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.io.IOException; -import java.io.Serializable; -import java.util.List; import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.TerminalOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; @@ -42,10 +39,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -public class VectorReduceSinkOperator extends TerminalOperator - implements Serializable { +public class VectorReduceSinkOperator extends ReduceSinkOperator { private static final Log LOG = LogFactory.getLog( VectorReduceSinkOperator.class.getName()); @@ -89,28 +84,8 @@ */ protected transient VectorExpressionWriter[] partitionWriters; - private transient int numDistributionKeys; - - private transient List> distinctColIndices; - - private transient int numDistinctExprs; - - transient HiveKey keyWritable = new HiveKey(); - transient Writable value; - - transient Object[] cachedValues; - transient Object[][] cachedKeys; - transient Random random; - - transient Serializer keySerializer; - transient boolean keyIsText; - transient Serializer valueSerializer; - transient int tag; - transient byte[] tagByte = new byte[1]; - transient ObjectInspector keyObjectInspector; transient ObjectInspector valueObjectInspector; - transient ObjectInspector[] partitionObjectInspectors; transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE]; public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf) @@ -331,28 +306,10 @@ public void processOp(Object row, int tag) throws HiveException { } } - /** - * @return the name of the operator - */ - @Override - public String getName() { - return getOperatorName(); - } - static public String getOperatorName() { return "RS"; } - @Override - public OperatorType getType() { - return OperatorType.REDUCESINK; - } - - @Override - public boolean opAllowedBeforeMapJoin() { - return false; - } - public VectorExpression[] getPartitionEval() { return partitionEval; } @@ -376,5 +333,4 @@ public void setValueEval(VectorExpression[] valueEval) { public void setKeyEval(VectorExpression[] keyEval) { this.keyEval = keyEval; } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index 7b3ee02..74a8713 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -18,12 +18,11 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; @@ -39,8 +38,7 @@ /** * Select operator implementation. */ -public class VectorSelectOperator extends Operator implements - Serializable { +public class VectorSelectOperator extends SelectOperator { private static final long serialVersionUID = 1L; @@ -128,23 +126,10 @@ public void processOp(Object row, int tag) throws HiveException { vrg.valueWriters = originalValueWriters; } - /** - * @return the name of the operator - */ - @Override - public String getName() { - return getOperatorName(); - } - static public String getOperatorName() { return "SEL"; } - @Override - public OperatorType getType() { - return OperatorType.SELECT; - } - @Explain (displayName = "vector expressions") public VectorExpression[] getvExpressions() { return vExpressions;