diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index c45b96e..613b5bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -88,7 +88,7 @@ public Object process(Node nd, Stack stack, hive_metastoreConstants.BUCKET_COUNT); int numBuckets = bucketCount == null ? 0 : Integer.parseInt(bucketCount); if (numBuckets > 0) { - LOG.info("Set parallelism for reduce sink " + sink + " to: " + numBuckets); + LOG.info("Set parallelism for reduce sink " + sink + " to: " + numBuckets + " (buckets)"); desc.setNumReducers(numBuckets); return false; } @@ -100,6 +100,9 @@ public Object process(Node nd, Stack stack, sink.getChildOperators().get(0).getParentOperators()) { if (sibling.getStatistics() != null) { numberOfBytes += sibling.getStatistics().getDataSize(); + if (LOG.isDebugEnabled()) { + LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics()); + } } else { LOG.warn("No stats available from: " + sibling); } @@ -114,13 +117,13 @@ public Object process(Node nd, Stack stack, context.getConf(), sparkSessionManager); sparkMemoryAndCores = sparkSession.getMemoryAndCores(); } catch (Exception e) { - throw new SemanticException("Failed to get spark memory/core info", e); + throw new SemanticException("Failed to get spark memory/core info: " + e, e); } finally { if (sparkSession != null && sparkSessionManager != null) { try { sparkSessionManager.returnSession(sparkSession); - } catch(HiveException ex) { - LOG.error("Failed to return the session to SessionManager", ex); + } catch (HiveException ex) { + LOG.error("Failed to return the session to SessionManager: " + ex, ex); } } } @@ -136,7 +139,9 @@ public Object process(Node nd, Stack stack, if (numReducers < cores) { numReducers = cores; } - LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers); + LOG.info("Set parallelism parameters: cores = " + cores + ", numReducers = " + numReducers + + ", bytesPerReducer = " + bytesPerReducer + ", numberOfBytes = " + numberOfBytes); + LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers + " (calculated)"); desc.setNumReducers(numReducers); } } else {