diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 0839b42..b88722f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -1104,6 +1104,7 @@ public void closeOp(boolean abort) throws HiveException { throw new HiveException(e); } } + hashAggregations = null; } // Group by contains the columns needed - no need to aggregate from children diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 1b8e7d2..ba71a1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -115,7 +115,7 @@ protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD) protected transient boolean useUniformHash = false; // picks topN K:V pairs from input. - protected transient TopNHash reducerHash = new TopNHash(); + protected transient TopNHash reducerHash; protected transient HiveKey keyWritable = new HiveKey(); protected transient ObjectInspector keyObjectInspector; protected transient ObjectInspector valueObjectInspector; @@ -237,7 +237,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { float memUsage = conf.getTopNMemoryUsage(); if (limit >= 0 && memUsage > 0) { - reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash; + reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash(); reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); } @@ -385,8 +385,11 @@ public void process(Object row, int tag) throws HiveException { */ boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row); - // Try to store the first key. If it's not excluded, we will proceed. - int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull); + // Try to store the first key. + // if TopNHashes aren't active, always forward + // if TopNHashes are active, proceed if not already excluded (i.e order by limit) + final int firstIndex = + (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD; if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do. // Compute value and hashcode - we'd either store or forward them. BytesWritable value = makeValueWritable(row); @@ -394,6 +397,7 @@ public void process(Object row, int tag) throws HiveException { if (firstIndex == TopNHash.FORWARD) { collect(firstKey, value); } else { + // invariant: reducerHash != null assert firstIndex >= 0; reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false); } @@ -563,12 +567,13 @@ private BytesWritable makeValueWritable(Object row) throws Exception { @Override protected void closeOp(boolean abort) throws HiveException { - if (!abort) { + if (!abort && reducerHash != null) { reducerHash.flush(); } super.closeOp(abort); out = null; random = null; + reducerHash = null; if (isLogInfoEnabled) { LOG.info(toString() + ": records written - " + numRows); }