commit 10e54c04f01c584d98d974f748517a6b8999de2a Author: Andrew Sherman Date: Mon Jul 2 17:15:53 2018 -0700 HIVE-19986: Add logging of runtime statistics indicating when Hdfs Erasure Coding is used by MR. These stats are not avalable until the unreleased Hadoop 3.2 so the shim is used to determine if the new counter can be used. diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java index b0a0145a4ee705b0a7d8f214d2c87397f731faec..efb37590e6dd5c1ad6564016e8f9ffc1d1c1aa93 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java @@ -19,6 +19,8 @@ package org.apache.hive.jdbc; import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; @@ -31,11 +33,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.processors.ErasureProcessor; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim; import org.apache.hadoop.hive.shims.HadoopShims.MiniDFSShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.WriterAppender; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.layout.PatternLayout; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -173,6 +181,53 @@ public void testDescribeErasureCoding() throws Exception { } } + /** + * Test MR stats. + */ + @Test + public void testMapRedStats() throws Exception { + // Do log4j magic to save log output + StringWriter writer = new StringWriter(); + Appender appender = addAppender(writer, "testMapRedStats"); + try (Statement stmt = hs2Conn.createStatement()) { + String table = "mapredstats"; + stmt.execute("set hive.execution.engine=mr"); + stmt.execute(" CREATE TABLE " + table + " (a int) STORED AS PARQUET"); + stmt.execute("INSERT INTO TABLE " + table + " VALUES (3)"); + try (ResultSet rs = stmt.executeQuery("select a from " + table + " order by a")) { + while (rs.next()) { + int val = rs.getInt(1); + assertEquals(3, val); + } + } + } + String output = writer.toString(); + // check for standard stats + assertTrue(output.contains("HDFS Read:")); + assertTrue(output.contains("HDFS Write:")); + + // check for erasure coding stat + HadoopShims.HdfsErasureCodingShim erasureShim = ErasureProcessor.getErasureShim(conf); + if (erasureShim.isMapReduceStatAvailable()) { + assertTrue(output.contains("HDFS EC Read:")); + } + } + + /** + * Add an appender to log4j. + * http://logging.apache.org/log4j/2.x/manual/customconfig.html#AddingToCurrent + */ + private Appender addAppender(final Writer writer, final String writerName) { + final LoggerContext context = LoggerContext.getContext(false); + final Configuration config = context.getConfiguration(); + final PatternLayout layout = PatternLayout.createDefaultLayout(config); + final Appender appender = + WriterAppender.createAppender(layout, null, writer, writerName, false, true); + appender.start(); + config.getRootLogger().addAppender(appender, null, null); + return appender; + } + /** * Add a Erasure Coding Policy to a Path. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java index 483c3d9fcd2f55a644321a62224b13846e421188..ac45ec46de9a4c3b6d7051e0a0d85e60e8d2e48a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java +++ ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java @@ -18,8 +18,15 @@ package org.apache.hadoop.hive.ql; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.processors.ErasureProcessor; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MapRedStats. @@ -30,6 +37,9 @@ * */ public class MapRedStats { + private static final String CLASS_NAME = MapRedStats.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + private JobConf jobConf; int numMap; int numReduce; long cpuMSec; @@ -40,7 +50,8 @@ private long numModifiedRows; - public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess, String jobId) { + public MapRedStats(JobConf jobConf, int numMap, int numReduce, long cpuMSec, boolean ifSuccess, String jobId) { + this.jobConf = jobConf; this.numMap = numMap; this.numReduce = numReduce; this.cpuMSec = cpuMSec; @@ -144,10 +155,40 @@ public String toString() { if (hdfsWrittenCntr != null && (hdfsWritten = hdfsWrittenCntr.getValue()) >= 0) { sb.append(" HDFS Write: " + hdfsWritten); } + + HadoopShims.HdfsErasureCodingShim erasureShim = getHdfsErasureCodingShim(); + + if (erasureShim != null && erasureShim.isMapReduceStatAvailable()) { + // Erasure Coding stats - added in HADOOP-15507, expected in Hadoop 3.2.0 + Counter hdfsReadEcCntr = counters.findCounter("FileSystemCounters", + "HDFS_BYTES_READ_EC"); // FileSystemCounter.BYTES_READ_EC + if (hdfsReadEcCntr != null) { + long hdfsReadEc = hdfsReadEcCntr.getValue(); + if (hdfsReadEc >= 0) { + sb.append(" HDFS EC Read: " + hdfsReadEc); + } + } + } } sb.append(" " + (success ? "SUCCESS" : "FAIL")); return sb.toString(); } + + /** + * Get the Erasure Coding Shim. + * @return a HdfsErasureCodingShim + */ + private HadoopShims.HdfsErasureCodingShim getHdfsErasureCodingShim() { + HadoopShims.HdfsErasureCodingShim erasureShim = null; + try { + erasureShim = ErasureProcessor.getErasureShim(jobConf); + } catch (IOException e) { + // this should not happen + LOG.warn("Could not get Erasure Coding shim for reason: " + e.getMessage()); + // fall through + } + return erasureShim; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index c31e22f041e18abf077f44e79188b7479abc3629..eb6cbf71e217c971cf6bf2dab2dd97dd99034d39 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -419,7 +419,7 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockEx } } - MapRedStats mapRedStats = new MapRedStats(numMap, numReduce, cpuMsec, success, rj.getID().toString()); + MapRedStats mapRedStats = new MapRedStats(job, numMap, numReduce, cpuMsec, success, rj.getID().toString()); mapRedStats.setCounters(ctrs); // update based on the final value of the counters diff --git ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java index 46114f50dc1ee3f03305cf9047361c39131128d6..04cc8b0e07e2e6d9e4a86ac7e3c2ae42a803c419 100644 --- ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java @@ -31,6 +31,7 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -69,9 +70,16 @@ private HadoopShims.HdfsErasureCodingShim erasureCodingShim; ErasureProcessor(HiveConf config) throws IOException { + this.erasureCodingShim = getErasureShim(config); + } + + /** + * Get an instance of HdfsErasureCodingShim from a config. + */ + public static HadoopShims.HdfsErasureCodingShim getErasureShim(Configuration config) throws IOException { HadoopShims hadoopShims = ShimLoader.getHadoopShims(); FileSystem fileSystem = FileSystem.get(config); - this.erasureCodingShim = hadoopShims.createHdfsErasureCodingShim(fileSystem, config); + return hadoopShims.createHdfsErasureCodingShim(fileSystem, config); } private CommandLine parseCommandArgs(final Options opts, String[] args) throws ParseException { diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 02490f1171ee658a2f3208e4cf2f0416d6907dc6..98c3eef6fa553d3b9588ffef6b40bb7ca419ce33 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -19,6 +19,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -78,6 +79,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.WebHCatJTShim23; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; +import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -1607,5 +1609,20 @@ public void removeErasureCodingPolicy(String ecPolicyName) throws IOException { public void disableErasureCodingPolicy(String ecPolicyName) throws IOException { hdfsAdmin.disableErasureCodingPolicy(ecPolicyName); } + + /** + * @return true if if the runtime MR stat for Erasure Coding is available. + */ + @Override + public boolean isMapReduceStatAvailable() { + // Look for FileSystemCounter.BYTES_READ_EC, this is present in hadoop 3.2 + Field field = null; + try { + field = FileSystemCounter.class.getField("BYTES_READ_EC"); + } catch (NoSuchFieldException e) { + // This version of Hadoop does not support EC stats for MR + } + return (field != null); + } } } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 2e84ca9b180e0149fd2185183238dab2268e5474..84e6430fcf45bd15073e518c69e15d866f503aac 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -691,6 +691,11 @@ public void deleteKey(String keyName) throws IOException { * @param ecPolicyName the name of the erasure coding policy */ void disableErasureCodingPolicy(String ecPolicyName) throws IOException; + + /** + * @return true if if the runtime MR stat for Erasure Coding is available. + */ + boolean isMapReduceStatAvailable(); } /** @@ -728,6 +733,11 @@ public void removeErasureCodingPolicy(String ecPolicyName) throws IOException { public void disableErasureCodingPolicy(String ecPolicyName) throws IOException { } + @Override + public boolean isMapReduceStatAvailable() { + return false; + } + } /**