commit 66304e9b779509335437e3f779ccd45158a5c16a Author: Bharath Krishna Date: Fri Oct 12 17:56:26 2018 -0700 HIVE-20512 : Improve record and memory usage logging in SparkRecordHandler. Changing the logging to be interval based than depending on number of rows processed. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java index cb5bd7ada2d5ad4f1f654cf80ddaf4504be5d035..ce2eb88d01cf4081a68663cf1dc8375960506260 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java @@ -47,8 +47,8 @@ protected Reporter rp; protected boolean abort = false; private long rowNumber = 0; - private long nextLogThreshold = 1; - + private long logThresholdInterval = 120000; + private long lastLoggedTime = Long.MAX_VALUE; protected boolean anyRow = false; public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception { @@ -84,15 +84,30 @@ /** * Logger processed row number and used memory info. + * Logs after each logThresholdInterval milliseconds. */ protected void logMemoryInfo() { rowNumber++; - if (rowNumber == nextLogThreshold) { + // Print log if it has been more than logThresholdInterval milliseconds since last log was printed. + if (intervalExceededSinceLastLog()) { long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("processing " + rowNumber - + " rows: used memory = " + usedMemory); - nextLogThreshold = getNextLogThreshold(rowNumber); + LOG.info("processing " + rowNumber + " rows: used memory = " + usedMemory); + } + } + + /** + * Checks if it has been 'logThresholdInterval' milliseconds since last log time and doubles the + * logThresholdInterval value if the threshold has reached. + * @return boolean indicating whether we have exceeded the interval since last log was written. + */ + private boolean intervalExceededSinceLastLog() { + long now = System.currentTimeMillis(); + if (lastLoggedTime > now - logThresholdInterval) { + lastLoggedTime = now; + logThresholdInterval *= 2; + return true; } + return false; } public abstract void close(); @@ -103,19 +118,7 @@ protected void logMemoryInfo() { */ protected void logCloseInfo() { long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("processed " + rowNumber + " rows: used memory = " - + usedMemory); - } - - private long getNextLogThreshold(long currentThreshold) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (currentThreshold >= 1000000) { - return currentThreshold + 1000000; - } - - return 10 * currentThreshold; + LOG.info("processed " + rowNumber + " rows: used memory = " + usedMemory); } public boolean isAbort() {