diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0cecae5..fd2a5c5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1946,6 +1946,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " ETL strategy is used when spending little more time in split generation is acceptable" + " (split generation reads and caches file footers). HYBRID chooses between the above strategies" + " based on heuristics."), + HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED("hive.exec.orc.writer.llap.memory.manager.enabled", true, + "Whether orc writers should use llap-aware memory manager. LLAP aware memory manager will use memory\n" + + "per executor instead of entire heap memory when concurrent orc writers are involved. This will let\n" + + "task fragments to use memory within its limit (memory per executor) when performing ETL in LLAP."), // hive streaming ingest settings HIVE_STREAMING_AUTO_FLUSH_ENABLED("hive.streaming.auto.flush.enabled", true, "Whether to enable memory \n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index e7dfb05..8b157d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -24,20 +24,29 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.orc.FileMetadata; +import org.apache.orc.OrcConf; import org.apache.orc.PhysicalWriter; import org.apache.orc.MemoryManager; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.MemoryManagerImpl; import org.apache.orc.impl.OrcTail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; /** * Contains factory methods to read or write ORC files. */ public final class OrcFile extends org.apache.orc.OrcFile { - + private static final Logger LOG = LoggerFactory.getLogger(OrcFile.class); // unused protected OrcFile() {} @@ -96,6 +105,28 @@ public static Reader createReader(Path path, return new ReaderImpl(path, options); } + @VisibleForTesting + static class LlapAwareMemoryManager extends MemoryManagerImpl { + private final double maxLoad; + private final long totalMemoryPool; + + public LlapAwareMemoryManager(Configuration conf) { + super(conf); + maxLoad = OrcConf.MEMORY_POOL.getDouble(conf); + long memPerExecutor = LlapDaemonInfo.INSTANCE.getMemoryPerExecutor(); + totalMemoryPool = (long) (memPerExecutor * maxLoad); + if (LOG.isDebugEnabled()) { + LOG.debug("Using LLAP aware memory manager for orc writer. memPerExecutor: {} maxLoad: {} totalMemPool: {}", + LlapUtil.humanReadableByteCount(memPerExecutor), maxLoad, LlapUtil.humanReadableByteCount(totalMemoryPool)); + } + } + + @Override + public long getTotalMemoryPool() { + return totalMemoryPool; + } + } + /** * Options for creating ORC file writers. */ @@ -111,6 +142,10 @@ public static Reader createReader(Path path, WriterOptions(Properties tableProperties, Configuration conf) { super(tableProperties, conf); useUTCTimestamp(true); + if (conf.getBoolean(HiveConf.ConfVars.HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED.varname, true) && + "llap".equalsIgnoreCase(conf.get(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname))) { + memory(new LlapAwareMemoryManager(conf)); + } } /** diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java index 97d4fc6..d3651d2 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -43,6 +44,8 @@ import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; @@ -91,6 +94,7 @@ import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.MemoryManagerImpl; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -2202,4 +2206,41 @@ public void testListExpansion() throws Exception { assertEquals(false, reader.hasNext()); reader.close(); } + + @Test + public void testLlapAwareMemoryManager() throws IOException { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + try { + OrcFile.WriterOptions opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB); + Writer writer = OrcFile.createWriter(new Path(testFilePath, "-0"), opts); + writer.close(); + assertTrue(opts.getMemoryManager() instanceof MemoryManagerImpl); + long expected = Math.round(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * 0.5); + assertEquals(expected, ((MemoryManagerImpl) opts.getMemoryManager()).getTotalMemoryPool(), 100); + + conf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap"); + LlapDaemonInfo.initialize("test", new Configuration()); + opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB); + writer = OrcFile.createWriter(new Path(testFilePath, "-1"), opts); + writer.close(); + assertTrue(opts.getMemoryManager() instanceof OrcFile.LlapAwareMemoryManager); + assertEquals(LlapDaemonInfo.INSTANCE.getMemoryPerExecutor() * 0.5, + ((OrcFile.LlapAwareMemoryManager) opts.getMemoryManager()).getTotalMemoryPool(), 100); + + conf.setBoolean(HiveConf.ConfVars.HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED.varname, false); + opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB); + writer = OrcFile.createWriter(new Path(testFilePath, "-2"), opts); + writer.close(); + assertTrue(opts.getMemoryManager() instanceof MemoryManagerImpl); + expected = Math.round(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * 0.5); + assertEquals(expected, ((MemoryManagerImpl) opts.getMemoryManager()).getTotalMemoryPool(), 100); + } finally { + conf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "container"); + } + } }