commit c2fd6bdbecab2a8ff24964337b818f273a857a26 Author: Bharath Krishna Date: Fri Oct 12 17:56:26 2018 -0700 HIVE-20512 : Improve record and memory usage logging in SparkRecordHandler. Changing the logging to be interval based than depending on number of rows processed. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 88dd12c05ade417aca4cdaece4448d31d4e1d65f..530131f207dc903b2768bac9fcea5e2f7046213f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -140,9 +140,8 @@ public void processRow(Object key, Object value) throws IOException { // Since there is no concept of a group, we don't invoke // startGroup/endGroup for a mapper mo.process((Writable) value); - if (LOG.isInfoEnabled()) { - logMemoryInfo(); - } + incrementRowNumber(); + } catch (Throwable e) { abort = true; Utilities.setMapWork(jc, null); @@ -164,11 +163,11 @@ public void processRow(Object key, Object value) throws IOException { @Override public void close() { + super.close(); // No row was processed if (!anyRow) { LOG.trace("Close called. no row processed by map."); } - // check if there are IOExceptions if (!abort) { abort = execContext.getIoCxt().getIOExceptions(); @@ -188,10 +187,6 @@ public void close() { } } - if (LOG.isInfoEnabled()) { - logCloseInfo(); - } - ReportStats rps = new ReportStats(rp, jc); mo.preorderMap(rps); return; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java index 8880bb604e088755dcfb0bcb39689702fab0cb77..3891e796682e3bcea28aae3f5a09dd62bf8b9738 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java @@ -108,6 +108,7 @@ public void processRow(Object key, Object value) throws IOException { @Override public void close() { + super.close(); LOG.info("Closing Merge Operator " + mergeOp.getName()); try { mergeOp.closeOp(abort); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java index cb5bd7ada2d5ad4f1f654cf80ddaf4504be5d035..714c1291d97f07f0f20a271960220a7e53330121 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java @@ -33,6 +33,9 @@ import java.net.URLClassLoader; import java.util.Arrays; import java.util.Iterator; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; public abstract class SparkRecordHandler { protected static final String CLASS_NAME = SparkRecordHandler.class.getName(); @@ -40,16 +43,17 @@ private static final Logger LOG = LoggerFactory.getLogger(SparkRecordHandler.class); // used to log memory usage periodically - protected final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); protected JobConf jc; protected OutputCollector oc; protected Reporter rp; protected boolean abort = false; - private long rowNumber = 0; - private long nextLogThreshold = 1; - - protected boolean anyRow = false; + private AtomicLong rowNumber = new AtomicLong(0); + private AtomicLong logThresholdInterval = new AtomicLong(15000); + boolean anyRow = false; + private final long maxLogThresholdInterval = 900000; + private ScheduledThreadPoolExecutor memoryLogExecutor; public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception { jc = job; @@ -60,13 +64,13 @@ rp = reporter; LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - + memoryLogExecutor = new ScheduledThreadPoolExecutor(1); + MemoryInfoLogger memoryInfoLogger = new MemoryInfoLogger(); + memoryInfoLogger.run(); try { - LOG.info("conf classpath = " - + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); - LOG.info("thread classpath = " - + Arrays.asList(((URLClassLoader) Thread.currentThread() - .getContextClassLoader()).getURLs())); + LOG.info("conf classpath = " + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); + LOG.info("thread classpath = " + Arrays + .asList(((URLClassLoader) Thread.currentThread().getContextClassLoader()).getURLs())); } catch (Exception e) { LOG.info("cannot get classpath: " + e.getMessage()); } @@ -83,39 +87,43 @@ public abstract void processRow(Object key, Iterator values) throws IOException; /** - * Logger processed row number and used memory info. + * Increments rowNumber to indicate # of rows processed. */ - protected void logMemoryInfo() { - rowNumber++; - if (rowNumber == nextLogThreshold) { - long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("processing " + rowNumber - + " rows: used memory = " + usedMemory); - nextLogThreshold = getNextLogThreshold(rowNumber); + void incrementRowNumber() { + rowNumber.incrementAndGet(); + } + + /** + * Logs every 'logThresholdInterval' milliseconds and doubles the + * logThresholdInterval value after each time it logs until it + * reaches maxLogThresholdInterval. + * */ + class MemoryInfoLogger implements Runnable { + @Override + public void run() { + if (anyRow) { + logThresholdInterval.set(Math.min(maxLogThresholdInterval, 2 * logThresholdInterval.get())); + logMemoryInfo(); + } + memoryLogExecutor.schedule(new MemoryInfoLogger(), logThresholdInterval.get(), TimeUnit.MILLISECONDS); + } + } + + public void close() { + memoryLogExecutor.shutdownNow(); + if (LOG.isInfoEnabled()) { + logMemoryInfo(); } } - public abstract void close(); public abstract boolean getDone(); /** * Logger information to be logged at the end. */ - protected void logCloseInfo() { + private void logMemoryInfo() { long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("processed " + rowNumber + " rows: used memory = " - + usedMemory); - } - - private long getNextLogThreshold(long currentThreshold) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (currentThreshold >= 1000000) { - return currentThreshold + 1000000; - } - - return 10 * currentThreshold; + LOG.info("Processed " + rowNumber + " rows: used memory = " + usedMemory); } public boolean isAbort() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 20e7ea0f4e8d4ff79dddeaab0406fc7350d22bd7..07cb5cb936445d32f1eba427bd1c824e44f6aa1f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -401,9 +401,7 @@ public void processRow(Object key, final Object value) throws IOException { row.clear(); row.add(keyObject); row.add(valueObject[tag]); - if (LOG.isInfoEnabled()) { - logMemoryInfo(); - } + incrementRowNumber(); try { reducer.process(row, tag); } catch (Exception e) { @@ -571,9 +569,7 @@ private void forwardBatch(boolean resetValueColumnsOnly) throws HiveException { } batchBytes = 0; - if (LOG.isInfoEnabled()) { - logMemoryInfo(); - } + incrementRowNumber(); } private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException { @@ -593,12 +589,11 @@ private Object deserializeValue(BytesWritable valueWritable, byte tag) throws Hi @Override public void close() { - + super.close(); // No row was processed if (!anyRow) { LOG.trace("Close called without any rows processed"); } - try { if (vectorized) { if (batch.size > 0) { @@ -617,9 +612,6 @@ public void close() { reducer.endGroup(); } } - if (LOG.isInfoEnabled()) { - logCloseInfo(); - } reducer.close(abort);