diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1e322b86c5..cc372c6e5c 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3231,6 +3231,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 10, "The minimum queue size for VRBs produced by a LLAP IO thread when the processing is\n" + "slower than the IO (used when determining the size from base size)."), + LLAP_IO_SHARE_OBJECT_POOLS("hive.llap.io.share.object.pools", false, + "Whether to used shared object pools in LLAP IO. A safety flag."), LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false, "Whether or not to allow the planner to run vertices in the AM."), LLAP_AUTO_ENFORCE_TREE("hive.llap.auto.enforce.tree", true, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index f9b85ece30..f4a549c529 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; -import org.apache.hive.common.util.FixedSizedObjectPool; public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAllocatorMXBean, LlapOomDebugDump { @@ -81,8 +80,15 @@ .asFileAttribute(PosixFilePermissions.fromString("rwx------")); private final AtomicLong[] defragCounters; private final boolean doUseFreeListDiscard, doUseBruteDiscard; - private final FixedSizedObjectPool ctxPool; private final static boolean assertsEnabled = areAssertsEnabled(); + // Discard context that is cached for reuse per thread to avoid allocating lots of arrays, + // and then resizing them down the line if we need a bigger size. + // Only the IO threads need this, so there'd be at most few dozen objects. + private final static ThreadLocal threadCtx = new ThreadLocal() { + protected DiscardContext initialValue() { + return new DiscardContext(); + }; + }; public BuddyAllocator(Configuration conf, MemoryManager mm, LlapDaemonCacheMetrics metrics) { this(HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT), @@ -158,17 +164,7 @@ public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, boolean isBoth = null == discardMethod || "both".equalsIgnoreCase(discardMethod); doUseFreeListDiscard = isBoth || "freelist".equalsIgnoreCase(discardMethod); doUseBruteDiscard = isBoth || "brute".equalsIgnoreCase(discardMethod); - ctxPool = new FixedSizedObjectPool(32, - new FixedSizedObjectPool.PoolObjectHelper() { - @Override - public DiscardContext create() { - return new DiscardContext(); - } - @Override - public void resetBeforeOffer(DiscardContext t) { - } - }); - } + } public long determineMaxMmSize(long defragHeadroom, long maxMmSize) { if (defragHeadroom > 0) { @@ -302,35 +298,32 @@ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory // Try to force-evict the fragments of the requisite size. boolean hasDiscardedAny = false; - DiscardContext ctx = ctxPool.take(); - try { - // Brute force may discard up to twice as many buffers. - int maxListSize = 1 << (doUseBruteDiscard ? freeListIx : (freeListIx - 1)); - int requiredBlocks = dest.length - destAllocIx; - ctx.init(maxListSize, requiredBlocks); - // First, try to use the blocks of half size in every arena. - if (doUseFreeListDiscard && freeListIx > 0) { - discardBlocksBasedOnFreeLists(freeListIx, startArenaIx, arenaCount, ctx); - memoryForceReleased += ctx.memoryReleased; - hasDiscardedAny = ctx.resultCount > 0; - destAllocIx = allocateFromDiscardResult( - dest, destAllocIx, freeListIx, allocationSize, ctx); - if (destAllocIx == dest.length) return; - } - // Then, try the brute force search for something to throw away. - if (doUseBruteDiscard) { - ctx.resetResults(); - discardBlocksBruteForce(freeListIx, startArenaIx, arenaCount, ctx); - memoryForceReleased += ctx.memoryReleased; - hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0); - destAllocIx = allocateFromDiscardResult( - dest, destAllocIx, freeListIx, allocationSize, ctx); - - if (destAllocIx == dest.length) return; - } - } finally { - ctxPool.offer(ctx); + DiscardContext ctx = threadCtx.get(); + // Brute force may discard up to twice as many buffers. + int maxListSize = 1 << (doUseBruteDiscard ? freeListIx : (freeListIx - 1)); + int requiredBlocks = dest.length - destAllocIx; + ctx.init(maxListSize, requiredBlocks); + // First, try to use the blocks of half size in every arena. + if (doUseFreeListDiscard && freeListIx > 0) { + discardBlocksBasedOnFreeLists(freeListIx, startArenaIx, arenaCount, ctx); + memoryForceReleased += ctx.memoryReleased; + hasDiscardedAny = ctx.resultCount > 0; + destAllocIx = allocateFromDiscardResult( + dest, destAllocIx, freeListIx, allocationSize, ctx); + if (destAllocIx == dest.length) return; + } + // Then, try the brute force search for something to throw away. + if (doUseBruteDiscard) { + ctx.resetResults(); + discardBlocksBruteForce(freeListIx, startArenaIx, arenaCount, ctx); + memoryForceReleased += ctx.memoryReleased; + hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0); + destAllocIx = allocateFromDiscardResult( + dest, destAllocIx, freeListIx, allocationSize, ctx); + + if (destAllocIx == dest.length) return; } + if (hasDiscardedAny) { discardFailed = 0; } else if (++discardFailed > MAX_DISCARD_ATTEMPTS) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index a6d2a0497c..903dce1818 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -134,11 +134,6 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { }); private static final PoolFactory POOL_FACTORY = new PoolFactory() { @Override - public Pool createPool(int size, PoolObjectHelper helper) { - return new FixedSizedObjectPool<>(size, helper); - } - - @Override public Pool createColumnStreamDataPool() { return CSD_POOL; } @@ -161,7 +156,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private final QueryFragmentCounters counters; private final UserGroupInformation ugi; private final SchemaEvolution evolution; - private final boolean useCodecPool; + private final boolean useCodecPool, useObjectPools; // Read state. private int stripeIxFrom; @@ -215,6 +210,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff throw new RuntimeException(e); } this.useCodecPool = HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_ORC_CODEC_POOL); + this.useObjectPools = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_SHARE_OBJECT_POOLS); // moved this part of code from performDataRead as LlapInputFormat need to know the file schema // to decide if schema evolution is supported or not. @@ -446,8 +442,8 @@ private void ensureDataReader() throws IOException { ensureOrcReader(); // Reader creation updates HDFS counters, don't do it here. DataWrapperForOrc dw = new DataWrapperForOrc(); - stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, trace, - HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_ORC_CODEC_POOL)); + stripeReader = orcReader.encodedReader( + fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool); stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); } @@ -804,11 +800,15 @@ public void returnData(OrcEncodedColumnBatch ecb) { } } bufferManager.decRefBuffers(data.getCacheBuffers()); - CSD_POOL.offer(data); + if (useObjectPools) { + CSD_POOL.offer(data); + } } } // We can offer ECB even with some streams not discarded; reset() will clear the arrays. - ECB_POOL.offer(ecb); + if (useObjectPools) { + ECB_POOL.offer(ecb); + } } /** diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 166abf7c70..fa2583d90a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -46,8 +46,6 @@ import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; -import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; -import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl; import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.FileData; @@ -58,7 +56,6 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.SerDeStripeMetadata; import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer; -import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter; import org.apache.hadoop.hive.llap.io.encoded.VectorDeserializeOrcWriter.AsyncCallback; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -160,6 +157,7 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final int allocSize; private final int targetSliceRowCount; private final boolean isLrrEnabled; + private final boolean useObjectPools; private final boolean[] writerIncludes; private FileReaderYieldReturn currentFileRead = null; @@ -198,6 +196,7 @@ public MemoryBuffer create() { this.targetSliceRowCount = HiveConf.getIntVar( sliceConf, ConfVars.LLAP_IO_ENCODE_SLICE_ROW_COUNT); this.isLrrEnabled = HiveConf.getBoolVar(sliceConf, ConfVars.LLAP_IO_ENCODE_SLICE_LRR); + this.useObjectPools = HiveConf.getBoolVar(sliceConf, ConfVars.LLAP_IO_SHARE_OBJECT_POOLS); if (this.columnIds != null) { Collections.sort(this.columnIds); } @@ -946,7 +945,7 @@ private boolean processOneSlice(CacheWriter.CacheStripeData diskData, boolean[] } consumer.setStripeMetadata(metadata); - OrcEncodedColumnBatch ecb = ECB_POOL.take(); + OrcEncodedColumnBatch ecb = useObjectPools ? ECB_POOL.take() : new OrcEncodedColumnBatch(); ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length); for (int colIx = 0; colIx < writerIncludes.length; ++colIx) { if (!writerIncludes[colIx]) continue; @@ -968,7 +967,7 @@ private boolean processOneSlice(CacheWriter.CacheStripeData diskData, boolean[] continue; } int streamIx = setStreamDataToCache(newCacheDataForCol, stream); - ColumnStreamData cb = CSD_POOL.take(); + ColumnStreamData cb = useObjectPools ? CSD_POOL.take() : new ColumnStreamData(); cb.incRef(); cb.setCacheBuffers(stream.data); ecb.setStreamData(colIx, streamIx, cb); @@ -1031,7 +1030,7 @@ private boolean processOneSlice(Vectors diskData, boolean[] splitIncludes, int s } consumer.setStripeMetadata(metadata); - OrcEncodedColumnBatch ecb = ECB_POOL.take(); + OrcEncodedColumnBatch ecb = useObjectPools ? ECB_POOL.take() : new OrcEncodedColumnBatch(); ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length); int vectorsIx = 0; for (int colIx = 0; colIx < writerIncludes.length; ++colIx) { @@ -1151,7 +1150,7 @@ private void processColumnCacheData(LlapSerDeDataBuffer[][][] cacheBuffers, } for (int streamIx = 0; streamIx < colData.length; ++streamIx) { if (colData[streamIx] == null) continue; - ColumnStreamData cb = CSD_POOL.take(); + ColumnStreamData cb = useObjectPools ? CSD_POOL.take() : new ColumnStreamData(); cb.incRef(); cb.setCacheBuffers(Lists.newArrayList(colData[streamIx])); ecb.setStreamData(colIx, streamIx, cb); @@ -1680,11 +1679,15 @@ public void returnData(OrcEncodedColumnBatch ecb) { } } bufferManager.decRefBuffers(data.getCacheBuffers()); - CSD_POOL.offer(data); + if (useObjectPools) { + CSD_POOL.offer(data); + } } } // We can offer ECB even with some streams not discarded; reset() will clear the arrays. - ECB_POOL.offer(ecb); + if (useObjectPools) { + ECB_POOL.offer(ecb); + } } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 32bdf6e68e..04140cf10c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -1705,8 +1705,7 @@ private static Pools createPools(PoolFactory pf) { /** Pool factory that is used if another one isn't specified - just creates the objects. */ private static class NoopPoolFactory implements PoolFactory { - @Override - public Pool createPool(final int size, final PoolObjectHelper helper) { + private Pool createPool(final int size, final PoolObjectHelper helper) { return new Pool() { public void offer(T t) { } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java index 50d10e33cd..26e64b69d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java @@ -50,7 +50,6 @@ EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader data /** The factory that can create (or return) the pools used by encoded reader. */ public interface PoolFactory { - Pool createPool(int size, PoolObjectHelper helper); Pool createEncodedColumnBatchPool(); Pool createColumnStreamDataPool(); }