diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d0adc35..4e059d1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1949,6 +1949,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " ETL strategy is used when spending little more time in split generation is acceptable" + " (split generation reads and caches file footers). HYBRID chooses between the above strategies" + " based on heuristics."), + HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED("hive.exec.orc.writer.llap.memory.manager.enabled", true, + "Whether orc writers should use llap-aware memory manager. LLAP aware memory manager will use memory\n" + + "per executor instead of entire heap memory when concurrent orc writers are involved. This will let\n" + + "task fragments to use memory within its limit (memory per executor) when performing ETL in LLAP."), // hive streaming ingest settings HIVE_STREAMING_AUTO_FLUSH_ENABLED("hive.streaming.auto.flush.enabled", true, "Whether to enable memory \n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index e7dfb05..e246ac2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -24,20 +24,29 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.orc.FileMetadata; +import org.apache.orc.OrcConf; import org.apache.orc.PhysicalWriter; import org.apache.orc.MemoryManager; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.MemoryManagerImpl; import org.apache.orc.impl.OrcTail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; /** * Contains factory methods to read or write ORC files. */ public final class OrcFile extends org.apache.orc.OrcFile { - + private static final Logger LOG = LoggerFactory.getLogger(OrcFile.class); // unused protected OrcFile() {} @@ -96,6 +105,37 @@ public static Reader createReader(Path path, return new ReaderImpl(path, options); } + @VisibleForTesting + static class LlapAwareMemoryManager extends MemoryManagerImpl { + private final double maxLoad; + private final long totalMemoryPool; + + public LlapAwareMemoryManager(Configuration conf) { + super(conf); + maxLoad = OrcConf.MEMORY_POOL.getDouble(conf); + long memPerExecutor = LlapDaemonInfo.INSTANCE.getMemoryPerExecutor(); + totalMemoryPool = (long) (memPerExecutor * maxLoad); + if (LOG.isDebugEnabled()) { + LOG.debug("Using LLAP memory manager for orc writer. memPerExecutor: {} maxLoad: {} totalMemPool: {}", + LlapUtil.humanReadableByteCount(memPerExecutor), maxLoad, LlapUtil.humanReadableByteCount(totalMemoryPool)); + } + } + + @Override + public long getTotalMemoryPool() { + return totalMemoryPool; + } + } + + private static ThreadLocal threadLocalOrcLlapMemoryManager = null; + + private static synchronized MemoryManager getThreadLocalOrcLlapMemoryManager(final Configuration conf) { + if (threadLocalOrcLlapMemoryManager == null) { + threadLocalOrcLlapMemoryManager = ThreadLocal.withInitial(() -> new LlapAwareMemoryManager(conf)); + } + return threadLocalOrcLlapMemoryManager.get(); + } + /** * Options for creating ORC file writers. */ @@ -111,6 +151,10 @@ public static Reader createReader(Path path, WriterOptions(Properties tableProperties, Configuration conf) { super(tableProperties, conf); useUTCTimestamp(true); + if (conf.getBoolean(HiveConf.ConfVars.HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED.varname, true) && + LlapProxy.isDaemon()) { + memory(getThreadLocalOrcLlapMemoryManager(conf)); + } } /** diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java index 97d4fc6..2931c04 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java @@ -19,12 +19,14 @@ package org.apache.hadoop.hive.ql.io.orc; import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.assertNotNull; import static junit.framework.Assert.assertNull; import static junit.framework.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -43,6 +45,9 @@ import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; @@ -91,6 +96,7 @@ import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.MemoryManagerImpl; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -2202,4 +2208,39 @@ public void testListExpansion() throws Exception { assertEquals(false, reader.hasNext()); reader.close(); } + + @Test + public void testLlapAwareMemoryManager() throws IOException { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + try { + OrcFile.WriterOptions opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB); + Writer writer = OrcFile.createWriter(new Path(testFilePath, "-0"), opts); + writer.close(); + assertEquals(opts.getMemoryManager().getClass(), MemoryManagerImpl.class); + + conf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap"); + LlapDaemonInfo.initialize("test", new Configuration()); + LlapProxy.setDaemon(true); + opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB); + writer = OrcFile.createWriter(new Path(testFilePath, "-1"), opts); + writer.close(); + assertEquals(opts.getMemoryManager().getClass(), OrcFile.LlapAwareMemoryManager.class); + assertEquals(LlapDaemonInfo.INSTANCE.getMemoryPerExecutor() * 0.5, + ((OrcFile.LlapAwareMemoryManager) opts.getMemoryManager()).getTotalMemoryPool(), 100); + + conf.setBoolean(HiveConf.ConfVars.HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED.varname, false); + opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB); + writer = OrcFile.createWriter(new Path(testFilePath, "-2"), opts); + writer.close(); + assertEquals(opts.getMemoryManager().getClass(), MemoryManagerImpl.class); + } finally { + LlapProxy.setDaemon(false); + conf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "container"); + } + } }