diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index 277683e..42c1003 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -38,6 +38,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor; import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index a376023..aecaae5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -407,6 +407,8 @@ private static BaseWork getBaseWork(Configuration conf, String name) { if (gWork == null) { Path localPath = path; LOG.debug("local path = " + localPath); + final long serializedSize; + final String planMode; if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { LOG.debug("Loading plan from string: "+path.toUri().getPath()); String planString = conf.getRaw(path.toUri().getPath()); @@ -414,12 +416,17 @@ private static BaseWork getBaseWork(Configuration conf, String name) { LOG.info("Could not find plan string in conf"); return null; } + serializedSize = planString.length(); + planMode = "RPC"; byte[] planBytes = Base64.decodeBase64(planString); in = new ByteArrayInputStream(planBytes); in = new InflaterInputStream(in); } else { LOG.debug("Open file to read in plan: " + localPath); - in = localPath.getFileSystem(conf).open(localPath); + FileSystem fs = localPath.getFileSystem(conf); + in = fs.open(localPath); + serializedSize = fs.getFileStatus(localPath).getLen(); + planMode = "FILE"; } if(MAP_PLAN_NAME.equals(name)){ @@ -451,6 +458,8 @@ private static BaseWork getBaseWork(Configuration conf, String name) { throw new RuntimeException("Unknown work type: " + name); } } + LOG.info("Deserialized plan (via {}) - name: {} size: {}", planMode, + gWork.getName(), humanReadableByteCount(serializedSize)); gWorkMap.get(conf).put(path, gWork); } else if (LOG.isDebugEnabled()) { LOG.debug("Found plan in cache for name: " + name); @@ -539,6 +548,8 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch OutputStream out = null; + final long serializedSize; + final String planMode; if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { // add it to the conf ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); @@ -550,9 +561,10 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch } finally { IOUtils.closeStream(out); } - LOG.info("Setting plan: "+planPath.toUri().getPath()); - conf.set(planPath.toUri().getPath(), - Base64.encodeBase64String(byteOut.toByteArray())); + final String serializedPlan = Base64.encodeBase64String(byteOut.toByteArray()); + serializedSize = serializedPlan.length(); + planMode = "RPC"; + conf.set(planPath.toUri().getPath(), serializedPlan); } else { // use the default file system of the conf FileSystem fs = planPath.getFileSystem(conf); @@ -561,6 +573,9 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch SerializationUtilities.serializePlan(kryo, w, out); out.close(); out = null; + long fileLen = fs.getFileStatus(planPath).getLen(); + serializedSize = fileLen; + planMode = "FILE"; } finally { IOUtils.closeStream(out); } @@ -583,6 +598,8 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch } } + LOG.info("Serialized plan (via {}) - name: {} size: {}", planMode, w.getName(), + humanReadableByteCount(serializedSize)); // Cache the plan in this process gWorkMap.get(conf).put(planPath, w); return planPath; @@ -3686,4 +3703,14 @@ public static boolean checkLlapIOSupportedTypes(final List readColumnNam } return result; } + + public static String humanReadableByteCount(long bytes) { + int unit = 1000; // use binary units instead? + if (bytes < unit) { + return bytes + "B"; + } + int exp = (int) (Math.log(bytes) / Math.log(unit)); + String suffix = "KMGTPE".charAt(exp-1) + ""; + return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 67cd38d..d617879 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -635,17 +635,6 @@ private void printDagSummary(Map progressMap, LogHelper consol } } - - private String humanReadableByteCount(long bytes) { - int unit = 1000; // use binary units instead? - if (bytes < unit) { - return bytes + "B"; - } - int exp = (int) (Math.log(bytes) / Math.log(unit)); - String suffix = "KMGTPE".charAt(exp-1) + ""; - return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix); - } - private void printLlapIOSummary(Map progressMap, LogHelper console, DAGClient dagClient) { SortedSet keys = new TreeSet<>(progressMap.keySet()); @@ -697,10 +686,10 @@ private void printLlapIOSummary(Map progressMap, LogHelper con selectedRowgroups, metadataCacheHit, metadataCacheMiss, - humanReadableByteCount(cacheHitBytes), - humanReadableByteCount(cacheMissBytes), - humanReadableByteCount(allocatedBytes), - humanReadableByteCount(allocatedUsedBytes), + Utilities.humanReadableByteCount(cacheHitBytes), + Utilities.humanReadableByteCount(cacheMissBytes), + Utilities.humanReadableByteCount(allocatedBytes), + Utilities.humanReadableByteCount(allocatedUsedBytes), secondsFormat.format(totalIoTime / 1000_000_000.0) + "s"); console.printInfo(queryFragmentStats); } @@ -750,10 +739,10 @@ private void printFSCountersSummary(Map progressMap, LogHelper String fsCountersSummary = String.format(FS_COUNTERS_HEADER_FORMAT, vertexName, - humanReadableByteCount(bytesRead), + Utilities.humanReadableByteCount(bytesRead), readOps, largeReadOps, - humanReadableByteCount(bytesWritten), + Utilities.humanReadableByteCount(bytesWritten), writeOps); console.printInfo(fsCountersSummary); }