Index: VENDOR.hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java =================================================================== --- VENDOR.hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 28262) +++ VENDOR.hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy) @@ -54,12 +54,12 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -74,10 +74,12 @@ private static final Log LOG = LogFactory.getLog(GroupByOperator.class .getName()); - private static final long serialVersionUID = 1L; private static final int NUMROWSESTIMATESIZE = 1000; + public static final String counterNameHashIn = "COUNT_HASH_IN"; + public static final String counterNameHashOut = "COUNT_HASH_OUT"; + protected transient ExprNodeEvaluator[] keyFields; protected transient ObjectInspector[] keyObjectInspectors; @@ -996,6 +998,12 @@ public void closeOp(boolean abort) throws HiveException { if (!abort) { try { + // put the hash related stats in statsMap if applicable, so that they are sent to jt as counters + if (hashAggr) { + incrCounter(counterNameHashIn, numRowsInput); + incrCounter(counterNameHashOut, numRowsHashTbl); + } + // If there is no grouping key and no row came to this operator if (firstRow && (keyFields.length == 0)) { firstRow = false; @@ -1055,6 +1063,14 @@ } } + @Override + protected List getAdditionalCounters() { + List ctrList = new ArrayList(); + ctrList.add(getWrappedCounterName(counterNameHashIn)); + ctrList.add(getWrappedCounterName(counterNameHashOut)); + return ctrList; + } + // Group by contains the columns needed - no need to aggregate from children public List genColLists( HashMap, OpParseContext> opParseCtx) { Index: VENDOR.hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- VENDOR.hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 28262) +++ VENDOR.hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -545,11 +545,9 @@ state = State.CLOSE; LOG.info(id + " finished. closing... "); - if (counterNameToEnum != null) { - incrCounter(numInputRowsCntr, inputRows); - incrCounter(numOutputRowsCntr, outputRows); - incrCounter(timeTakenCntr, totalTime); - } + incrCounter(numInputRowsCntr, inputRows); + incrCounter(numOutputRowsCntr, outputRows); + incrCounter(timeTakenCntr, totalTime); LOG.info(id + " forwarded " + cntr + " rows"); @@ -724,10 +722,8 @@ throws HiveException { if ((++outputRows % 1000) == 0) { - if (counterNameToEnum != null) { - incrCounter(numOutputRowsCntr, outputRows); - outputRows = 0; - } + incrCounter(numOutputRowsCntr, outputRows); + outputRows = 0; } if (isLogInfoEnabled) { @@ -1061,16 +1057,13 @@ */ private void preProcessCounter() { inputRows++; - - if (counterNameToEnum != null) { - if ((inputRows % 1000) == 0) { - incrCounter(numInputRowsCntr, inputRows); - incrCounter(timeTakenCntr, totalTime); - inputRows = 0; - totalTime = 0; - } - beginTime = System.currentTimeMillis(); + if ((inputRows % 1000) == 0) { + incrCounter(numInputRowsCntr, inputRows); + incrCounter(timeTakenCntr, totalTime); + inputRows = 0; + totalTime = 0; } + beginTime = System.currentTimeMillis(); } /** @@ -1089,7 +1082,11 @@ * @param amount */ protected void incrCounter(String name, long amount) { - String counterName = "CNTR_NAME_" + getOperatorId() + "_" + name; + if(counterNameToEnum == null) { + return; + } + + String counterName = getWrappedCounterName(name); ProgressCounter pc = counterNameToEnum.get(counterName); // Currently, we maintain fixed number of counters per plan - in case of a @@ -1115,6 +1112,10 @@ return operatorId; } + public final String getWrappedCounterName(String ctrName) { + return String.format(counterNameFormat, getOperatorId(), ctrName); + } + public void initOperatorId() { setOperatorId(getName() + "_" + this.id); } @@ -1170,7 +1171,7 @@ return false; } - String counterName = "CNTR_NAME_" + getOperatorId() + "_" + fatalErrorCntr; + String counterName = getWrappedCounterName(fatalErrorCntr); ProgressCounter pc = counterNameToEnum.get(counterName); // Currently, we maintain fixed number of counters per plan - in case of a @@ -1246,15 +1247,17 @@ protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS"; protected static String timeTakenCntr = "TIME_TAKEN"; protected static String fatalErrorCntr = "FATAL_ERROR"; + private static String counterNameFormat = "CNTR_NAME_%s_%s"; public void initializeCounters() { initOperatorId(); counterNames = new ArrayList(); - counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numInputRowsCntr); - counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numOutputRowsCntr); - counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + timeTakenCntr); - counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + fatalErrorCntr); - List newCntrs = getAdditionalCounters(); + counterNames.add(getWrappedCounterName(numInputRowsCntr)); + counterNames.add(getWrappedCounterName(numOutputRowsCntr)); + counterNames.add(getWrappedCounterName(timeTakenCntr)); + counterNames.add(getWrappedCounterName(fatalErrorCntr)); + List newCntrs = getAdditionalCounters(); /* getAdditionalCounter should return Wrapped counters */ + if (newCntrs != null) { counterNames.addAll(newCntrs); } @@ -1262,9 +1265,11 @@ /* * By default, the list is empty - if an operator wants to add more counters, - * it should override this method and provide the new list. + * it should override this method and provide the new list. Counter names returned + * by this method should be wrapped counter names (i.e the strings should be passed + * through getWrappedCounterName). */ - private List getAdditionalCounters() { + protected List getAdditionalCounters() { return null; }