commit 6ebc89d259336828e70037a2aaf94abfbaf03a1f Author: Bharath Krishna Date: Fri Oct 12 17:56:26 2018 -0700 HIVE-20512 : Improve record and memory usage logging in SparkRecordHandler. Changing the logging to be interval based than depending on number of rows processed. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 88dd12c05ade417aca4cdaece4448d31d4e1d65f..530131f207dc903b2768bac9fcea5e2f7046213f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -140,9 +140,8 @@ public void processRow(Object key, Object value) throws IOException { // Since there is no concept of a group, we don't invoke // startGroup/endGroup for a mapper mo.process((Writable) value); - if (LOG.isInfoEnabled()) { - logMemoryInfo(); - } + incrementRowNumber(); + } catch (Throwable e) { abort = true; Utilities.setMapWork(jc, null); @@ -164,11 +163,11 @@ public void processRow(Object key, Object value) throws IOException { @Override public void close() { + super.close(); // No row was processed if (!anyRow) { LOG.trace("Close called. no row processed by map."); } - // check if there are IOExceptions if (!abort) { abort = execContext.getIoCxt().getIOExceptions(); @@ -188,10 +187,6 @@ public void close() { } } - if (LOG.isInfoEnabled()) { - logCloseInfo(); - } - ReportStats rps = new ReportStats(rp, jc); mo.preorderMap(rps); return; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java index 8880bb604e088755dcfb0bcb39689702fab0cb77..3891e796682e3bcea28aae3f5a09dd62bf8b9738 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java @@ -108,6 +108,7 @@ public void processRow(Object key, Object value) throws IOException { @Override public void close() { + super.close(); LOG.info("Closing Merge Operator " + mergeOp.getName()); try { mergeOp.closeOp(abort); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java index cb5bd7ada2d5ad4f1f654cf80ddaf4504be5d035..45a9eed35818592caef89553abf954630c7ff4be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -33,6 +34,9 @@ import java.net.URLClassLoader; import java.util.Arrays; import java.util.Iterator; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public abstract class SparkRecordHandler { protected static final String CLASS_NAME = SparkRecordHandler.class.getName(); @@ -40,16 +44,33 @@ private static final Logger LOG = LoggerFactory.getLogger(SparkRecordHandler.class); // used to log memory usage periodically - protected final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); protected JobConf jc; protected OutputCollector oc; protected Reporter rp; protected boolean abort = false; - private long rowNumber = 0; - private long nextLogThreshold = 1; - - protected boolean anyRow = false; + /** + * Using volatile for rowNumber and logThresholdInterval instead of + * Atomic even though they are used in non-atomic context. This is because + * we know that they will be updated only by a single thread at a time and + * there is no contention on these variables. + */ + private volatile long rowNumber = 0; + private volatile long logThresholdInterval = 15000; + boolean anyRow = false; + private final long maxLogThresholdInterval = 900000; + + private final ScheduledThreadPoolExecutor memoryAndRowLogExecutor = getMemoryAndRowLogExecutor(); + + private ScheduledThreadPoolExecutor getMemoryAndRowLogExecutor() { + return new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder() + .setNameFormat("MemoryAndRowInfoLogger") + .setUncaughtExceptionHandler((Thread t, Throwable e) -> LOG.error(t + " throws exception: " + e)) + .build(), + new ThreadPoolExecutor.DiscardPolicy()); + } public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception { jc = job; @@ -60,13 +81,12 @@ rp = reporter; LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - + MemoryInfoLogger memoryInfoLogger = new MemoryInfoLogger(); + memoryInfoLogger.run(); try { - LOG.info("conf classpath = " - + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); - LOG.info("thread classpath = " - + Arrays.asList(((URLClassLoader) Thread.currentThread() - .getContextClassLoader()).getURLs())); + LOG.info("conf classpath = " + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); + LOG.info("thread classpath = " + Arrays + .asList(((URLClassLoader) Thread.currentThread().getContextClassLoader()).getURLs())); } catch (Exception e) { LOG.info("cannot get classpath: " + e.getMessage()); } @@ -83,39 +103,51 @@ public abstract void processRow(Object key, Iterator values) throws IOException; /** - * Logger processed row number and used memory info. + * Increments rowNumber to indicate # of rows processed. */ - protected void logMemoryInfo() { - rowNumber++; - if (rowNumber == nextLogThreshold) { - long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("processing " + rowNumber - + " rows: used memory = " + usedMemory); - nextLogThreshold = getNextLogThreshold(rowNumber); + void incrementRowNumber() { + ++rowNumber; + } + + /** + * Logs every 'logThresholdInterval' milliseconds and doubles the + * logThresholdInterval value after each time it logs until it + * reaches maxLogThresholdInterval. + * */ + class MemoryInfoLogger implements Runnable { + @Override + public void run() { + if (anyRow) { + logThresholdInterval = Math.min(maxLogThresholdInterval, 2 * logThresholdInterval); + logMemoryInfo(); + } + memoryAndRowLogExecutor.schedule(new MemoryInfoLogger(), logThresholdInterval, TimeUnit.MILLISECONDS); + } + } + + public void close() { + memoryAndRowLogExecutor.shutdown(); + try { + if (!memoryAndRowLogExecutor.awaitTermination(30000, TimeUnit.MILLISECONDS)) { + memoryAndRowLogExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + memoryAndRowLogExecutor.shutdownNow(); + } + + if (LOG.isInfoEnabled()) { + logMemoryInfo(); } } - public abstract void close(); public abstract boolean getDone(); /** * Logger information to be logged at the end. */ - protected void logCloseInfo() { + private void logMemoryInfo() { long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("processed " + rowNumber + " rows: used memory = " - + usedMemory); - } - - private long getNextLogThreshold(long currentThreshold) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (currentThreshold >= 1000000) { - return currentThreshold + 1000000; - } - - return 10 * currentThreshold; + LOG.info("Processed " + rowNumber + " rows: used memory = " + usedMemory); } public boolean isAbort() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 20e7ea0f4e8d4ff79dddeaab0406fc7350d22bd7..07cb5cb936445d32f1eba427bd1c824e44f6aa1f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -401,9 +401,7 @@ public void processRow(Object key, final Object value) throws IOException { row.clear(); row.add(keyObject); row.add(valueObject[tag]); - if (LOG.isInfoEnabled()) { - logMemoryInfo(); - } + incrementRowNumber(); try { reducer.process(row, tag); } catch (Exception e) { @@ -571,9 +569,7 @@ private void forwardBatch(boolean resetValueColumnsOnly) throws HiveException { } batchBytes = 0; - if (LOG.isInfoEnabled()) { - logMemoryInfo(); - } + incrementRowNumber(); } private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException { @@ -593,12 +589,11 @@ private Object deserializeValue(BytesWritable valueWritable, byte tag) throws Hi @Override public void close() { - + super.close(); // No row was processed if (!anyRow) { LOG.trace("Close called without any rows processed"); } - try { if (vectorized) { if (batch.size > 0) { @@ -617,9 +612,6 @@ public void close() { reducer.endGroup(); } } - if (LOG.isInfoEnabled()) { - logCloseInfo(); - } reducer.close(abort);