diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 0839b42..b88722f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -1104,6 +1104,7 @@ public void closeOp(boolean abort) throws HiveException { throw new HiveException(e); } } + hashAggregations = null; } // Group by contains the columns needed - no need to aggregate from children diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 1b8e7d2..db3e848 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -115,7 +115,7 @@ protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD) protected transient boolean useUniformHash = false; // picks topN K:V pairs from input. - protected transient TopNHash reducerHash = new TopNHash(); + protected transient TopNHash reducerHash; protected transient HiveKey keyWritable = new HiveKey(); protected transient ObjectInspector keyObjectInspector; protected transient ObjectInspector valueObjectInspector; @@ -237,7 +237,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { float memUsage = conf.getTopNMemoryUsage(); if (limit >= 0 && memUsage > 0) { - reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash; + reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash(); reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); } @@ -563,12 +563,13 @@ private BytesWritable makeValueWritable(Object row) throws Exception { @Override protected void closeOp(boolean abort) throws HiveException { - if (!abort) { + if (!abort && reducerHash != null) { reducerHash.flush(); } super.closeOp(abort); out = null; random = null; + reducerHash = null; if (isLogInfoEnabled) { LOG.info(toString() + ": records written - " + numRows); }