diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e50ff997a9..81371d69d8 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4139,13 +4139,16 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "MR LineRecordRedader into LLAP cache, if this feature is enabled. Safety flag."), LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true, "Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"), - LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 50000, - "The default queue size for VRBs produced by a LLAP IO thread when the processing is\n" + + LLAP_IO_VRB_QUEUE_LIMIT_MAX("hive.llap.io.vrb.queue.limit.max", 50000, + "The maximum queue size for VRBs produced by a LLAP IO thread when the processing is\n" + "slower than the IO. The actual queue size is set per fragment, and is adjusted down\n" + - "from the base, depending on the schema."), - LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 10, + "from the base, depending on the schema see LLAP_IO_CVB_BUFFERED_SIZE."), + LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 1, "The minimum queue size for VRBs produced by a LLAP IO thread when the processing is\n" + "slower than the IO (used when determining the size from base size)."), + LLAP_IO_CVB_BUFFERED_SIZE("hive.llap.io.cvb.memory.consumption.", 1L << 30, + "The amount of bytes used to buffer CVB between IO and Processor Threads default to 1GB, " + + "this will be used to compute a best effort queue size for VRBs produced by a LLAP IO thread."), LLAP_IO_SHARE_OBJECT_POOLS("hive.llap.io.share.object.pools", false, "Whether to used shared object pools in LLAP IO. A safety flag."), LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false, diff --git common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java index 3900a45994..371d9396ad 100644 --- common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java +++ common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java @@ -154,6 +154,13 @@ public boolean tryOffer(T t) { return offerImpl(t); } + @Override public void clear() { + T result = takeImpl(); + while (result != null) { + result = takeImpl(); + } + } + private T takeImpl() { long oldState = reserveArrayIndex(OBJECTS, EMPTY); if (oldState == NO_INDEX) return null; // For whatever reason, reserve failed. diff --git common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java index b026e54424..1c3fc07042 100644 --- common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java +++ common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java @@ -29,6 +29,7 @@ import org.apache.hive.common.util.FixedSizedObjectPool; import org.apache.hadoop.hive.common.Pool; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -238,6 +239,29 @@ public void testMTTImpl(int size, int takerCount, int giverCount) { assertTrue(OneObjHelper.THE_OBJECT == pool.take()); } + @Test + public void testClearImp() { + int size = 10; + FixedSizedObjectPool + fixedSizedObjectPool = + new FixedSizedObjectPool<>(size, new Pool.PoolObjectHelper() { + @Override public Object create() { + //Null is used as marker to be the end. + return null; + } + + @Override public void resetBeforeOffer(Object o) { + // + } + }); + for (int i = 0; i < size; i++) { + fixedSizedObjectPool.offer(new Object()); + } + Assert.assertEquals(size, fixedSizedObjectPool.size()); + assertNotNull(fixedSizedObjectPool.take()); + fixedSizedObjectPool.clear(); + assertNull(fixedSizedObjectPool.take()); + } private static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) { cdlIn.countDown(); try { diff --git llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index 6d7cf7de11..a351a193c6 100644 --- llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -47,6 +47,8 @@ import com.google.protobuf.BlockingService; +import javax.annotation.Nullable; + public class LlapUtil { private static final Logger LOG = LoggerFactory.getLogger(LlapUtil.class); @@ -372,7 +374,7 @@ private static boolean isSomeHiveDir(String p) { } - public static ThreadMXBean initThreadMxBean() { + @Nullable public static ThreadMXBean initThreadMxBean() { ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); if (mxBean != null) { if (!mxBean.isCurrentThreadCpuTimeSupported()) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 91c94efaf5..1378a01f44 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -21,11 +21,12 @@ import java.util.ArrayList; import java.io.IOException; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -74,11 +75,9 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -class LlapRecordReader - implements RecordReader, Consumer { +class LlapRecordReader implements RecordReader, Consumer { private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class); private static final Object DONE_OBJECT = new Object(); @@ -91,7 +90,7 @@ private VectorizedOrcAcidRowBatchReader acidReader; private final Object[] partitionValues; - private final LinkedBlockingQueue queue; + private final ArrayBlockingQueue queue; private final AtomicReference pendingError = new AtomicReference<>(null); /** Vector that is currently being processed by our user. */ @@ -160,14 +159,22 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr( job, isAcidScan, Integer.MAX_VALUE); - - int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, job, daemonConf); - int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf); - final boolean decimal64Support = HiveConf.getVar(job, ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED) - .equalsIgnoreCase("decimal_64"); - int limit = determineQueueLimit(queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos(), decimal64Support); + int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MAX, job, daemonConf); + int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf); + long bestEffortSize = getLongQueueVar(ConfVars.LLAP_IO_CVB_BUFFERED_SIZE, job, daemonConf); + + final boolean + decimal64Support = + HiveConf.getVar(job, ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED).equalsIgnoreCase("decimal_64"); + int + limit = + determineQueueLimit(bestEffortSize, + queueLimitBase, + queueLimitMin, + rbCtx.getRowColumnTypeInfos(), + decimal64Support); LOG.info("Queue limit for LlapRecordReader is " + limit); - this.queue = new LinkedBlockingQueue<>(limit); + this.queue = new ArrayBlockingQueue<>(limit); int partitionColumnCount = rbCtx.getPartitionColumnCount(); @@ -197,24 +204,64 @@ private static int getQueueVar(ConfVars var, JobConf jobConf, Configuration daem return (jobVal != -1) ? jobVal : HiveConf.getIntVar(daemonConf, var); } + private static long getLongQueueVar(ConfVars var, JobConf jobConf, Configuration daemonConf) { + // Check job config for overrides, otherwise use the default server value. + long jobVal = jobConf.getLong(var.varname, -1); + return (jobVal != -1) ? jobVal : HiveConf.getLongVar(daemonConf, var); + } + // For queue size estimation purposes, we assume all columns have weight one, and the following // types are counted as multiple columns. This is very primitive; if we wanted to make it better, // we'd increase the base limit, and adjust dynamically based on IO and processing perf delays. - private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 4, + private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 10, COL_WEIGHT_STRING = 8; - private static int determineQueueLimit( - int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos, final boolean decimal64Support) { + + @VisibleForTesting + static int determineQueueLimit(long maxBufferedSize, + int queueLimitMax, + int queueLimitMin, + TypeInfo[] typeInfos, + final boolean decimal64Support) { + assert queueLimitMax >= queueLimitMin; // If the values are equal, the queue limit is fixed. - if (queueLimitBase == queueLimitMin) return queueLimitBase; + if (queueLimitMax == queueLimitMin) return queueLimitMax; // If there are no columns (projection only join?) just assume no weight. - if (typeInfos == null || typeInfos.length == 0) return queueLimitBase; + if (typeInfos == null || typeInfos.length == 0) return queueLimitMax; + // total weight as bytes double totalWeight = 0; - for (TypeInfo ti : typeInfos) { + int numberOfProjectedColumns = typeInfos.length; + double scale = Math.max(Math.log(numberOfProjectedColumns), 1); + + // Assuming that an empty Column Vector is about 96 bytes the object + // org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector object internals: + // OFFSET SIZE TYPE DESCRIPTION + // VALUE + // 0 16 (object header) + // 16 1 boolean ColumnVector.noNulls + // 17 1 boolean ColumnVector.isRepeating + // 18 1 boolean ColumnVector.preFlattenIsRepeating + // 19 1 boolean ColumnVector.preFlattenNoNulls + // 20 4 (alignment/padding gap) + // 24 8 org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type ColumnVector.type + // 32 8 boolean[] ColumnVector.isNull + // 40 4 int BytesColumnVector.nextFree + // 44 4 int BytesColumnVector.smallBufferNextFree + // 48 4 int BytesColumnVector.bufferAllocationCount + // 52 4 (alignment/padding gap) + // 56 8 byte[][] BytesColumnVector.vector + // 64 8 int[] BytesColumnVector.start + // 72 8 int[] BytesColumnVector.length + // 80 8 byte[] BytesColumnVector.buffer + // 88 8 byte[] BytesColumnVector.smallBuffer + long columnVectorBaseSize = (long) (96 * numberOfProjectedColumns * scale); + + for (int i = 0; i < typeInfos.length; i++) { + TypeInfo ti = typeInfos[i]; int colWeight; if (ti.getCategory() != Category.PRIMITIVE) { colWeight = COL_WEIGHT_COMPLEX; } else { - PrimitiveTypeInfo pti = (PrimitiveTypeInfo)ti; + PrimitiveTypeInfo pti = (PrimitiveTypeInfo) ti; switch (pti.getPrimitiveCategory()) { case BINARY: case CHAR: @@ -222,6 +269,11 @@ private static int determineQueueLimit( case STRING: colWeight = COL_WEIGHT_STRING; break; + //Timestamp column vector uses an int and long arrays + case TIMESTAMP: + case INTERVAL_DAY_TIME: + colWeight = 2; + break; case DECIMAL: boolean useDecimal64 = false; if (ti instanceof DecimalTypeInfo) { @@ -241,9 +293,13 @@ private static int determineQueueLimit( colWeight = 1; } } - totalWeight += colWeight; + totalWeight += colWeight * 8 * scale; } - return Math.max(queueLimitMin, (int)(queueLimitBase / totalWeight)); + //default batch size is 1024 + totalWeight *= 1024; + totalWeight += columnVectorBaseSize; + int bestEffortSize = Math.min((int) (maxBufferedSize / totalWeight), queueLimitMax); + return Math.max(bestEffortSize, queueLimitMin); } @@ -271,7 +327,7 @@ private static MapWork findMapWork(JobConf job) throws HiveException { work = Utilities.getMergeWork(job, inputName); } - if (work == null || !(work instanceof MapWork)) { + if (!(work instanceof MapWork)) { work = Utilities.getMapWork(job); } return (MapWork) work; @@ -325,7 +381,7 @@ public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException } isFirst = false; } - ColumnVectorBatch cvb = null; + ColumnVectorBatch cvb; try { cvb = nextCvb(); } catch (InterruptedException e) { @@ -347,10 +403,10 @@ public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException // TODO: relying everywhere on the magical constants and columns being together means ACID // columns are going to be super hard to change in a backward compat manner. I can // foresee someone cursing while refactoring all the magic for prefix schema changes. - /** - * Acid meta cols are always either all included or all excluded the - * the width of 'cvb' changes accordingly so 'acidColCount' and - * 'ixInVrb' need to be adjusted. See {@link IncludesImpl} comments. + /* + Acid meta cols are always either all included or all excluded the + the width of 'cvb' changes accordingly so 'acidColCount' and + 'ixInVrb' need to be adjusted. See {@link IncludesImpl} comments. */ // Exclude the row column. int acidColCount = acidReader.includeAcidColumns() ? @@ -467,7 +523,7 @@ ColumnVectorBatch nextCvb() throws InterruptedException, IOException { // If the structure is replaced with smth that doesn't, we MUST check interrupt here because // Hive operators rely on recordreader to handle task interruption, and unlike most RRs we // do not do any blocking IO ops on this thread. - Object next = null; + Object next; do { rethrowErrorIfAny(pendingError.get()); // Best-effort check; see the comment in the method. next = queue.poll(100, TimeUnit.MILLISECONDS); @@ -624,7 +680,7 @@ public IncludesImpl(List tableIncludedCols, boolean isAcidScan, List filePhysicalColumnIds = readerLogicalColumnIds; if (isAcidScan) { int rootCol = OrcInputFormat.getRootColumn(false); - filePhysicalColumnIds = new ArrayList(filePhysicalColumnIds.size() + rootCol); + filePhysicalColumnIds = new ArrayList<>(filePhysicalColumnIds.size() + rootCol); this.acidStructColumnId = rootCol - 1; // OrcRecordUpdater.ROW. This is somewhat fragile... // Note: this guarantees that physical column IDs are in order. for (int i = 0; i < rootCol; ++i) { @@ -632,12 +688,12 @@ public IncludesImpl(List tableIncludedCols, boolean isAcidScan, // struct to get read without projection. if (acidStructColumnId == i) continue; if(!includeAcidColumns) { - /** - * if not including acid columns, we still want to number the - * physical columns as if acid columns are included becase - * {@link #generateFileIncludes(TypeDescription)} takes the file - * schema as input - * (eg >) + /* + if not including acid columns, we still want to number the + physical columns as if acid columns are included becase + {@link #generateFileIncludes(TypeDescription)} takes the file + schema as input + (eg >) */ continue; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java index 84436bc495..10d76aa095 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java @@ -57,17 +57,15 @@ public EncodedDataConsumer(Consumer consumer, final int colCo this.downstreamConsumer = consumer; this.ioMetrics = ioMetrics; this.mxBean = LlapUtil.initThreadMxBean(); - cvbPool = new FixedSizedObjectPool(CVB_POOL_SIZE, - new Pool.PoolObjectHelper() { - @Override - public ColumnVectorBatch create() { - return new ColumnVectorBatch(colCount); - } - @Override - public void resetBeforeOffer(ColumnVectorBatch t) { - // Don't reset anything, we are reusing column vectors. - } - }); + cvbPool = new FixedSizedObjectPool<>(CVB_POOL_SIZE, new Pool.PoolObjectHelper() { + @Override public ColumnVectorBatch create() { + return new ColumnVectorBatch(colCount); + } + + @Override public void resetBeforeOffer(ColumnVectorBatch t) { + // Don't reset anything, we are reusing column vectors. + } + }); this.counters = counters; } @@ -81,6 +79,9 @@ public CpuRecordingCallable(Callable readCallable) { @Override public Void call() throws Exception { + if (mxBean == null) { + return readCallable.call(); + } long cpuTime = mxBean.getCurrentThreadCpuTime(), userTime = mxBean.getCurrentThreadUserTime(); try { @@ -145,6 +146,7 @@ protected abstract void decodeBatch(BatchType batch, @Override public void setDone() throws InterruptedException { downstreamConsumer.setDone(); + cvbPool.clear(); } @Override diff --git llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java new file mode 100644 index 0000000000..7e71cf2144 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.api.impl; + +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.orc.TypeDescription; +import org.junit.Assert; +import org.junit.Test; + +import java.util.function.Supplier; +import java.util.stream.IntStream; + +public class LlapRecordReaderQueueSizeTest { + + private static final int END_EXCLUSIVE = 300; + private static final int MAX_BUFFERED_SIZE = 1 << 30; //1GB + + @Test public void testMaxEqMin() { + int expected = LlapRecordReader.determineQueueLimit(0, 100, 100, null, true); + Assert.assertEquals(100, expected); + } + + @Test public void testMaxIsEnforced() { + TypeInfo[] cols = { new DecimalTypeInfo() }; + int actual = LlapRecordReader.determineQueueLimit(Long.MAX_VALUE, 10, 1, cols, true); + Assert.assertEquals(10, actual); + } + + @Test public void testMinIsEnforced() { + TypeInfo[] cols = { new DecimalTypeInfo() }; + int actual = LlapRecordReader.determineQueueLimit(0, 10, 5, cols, true); + Assert.assertEquals(5, actual); + } + + @Test public void testOrderDecimal64VsFatDecimals() { + TypeInfo[] cols = IntStream.range(0, 300).mapToObj(i -> new DecimalTypeInfo()).toArray(TypeInfo[]::new); + int actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, cols, true); + Assert.assertEquals(75, actual); + // the idea it to see an order of 10 when using fat Decimals + actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, cols, false); + Assert.assertEquals(7, actual); + } + + @Test public void testOrderDecimal64VsLong() { + TypeInfo[] decimalCols = ArrayOf(() -> new DecimalTypeInfo(TypeDescription.MAX_DECIMAL64_PRECISION, 0)); + TypeInfo[] longCols = ArrayOf(() -> TypeInfoFactory.longTypeInfo); + Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, longCols, true), + LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, decimalCols, true)); + } + + @Test public void testStringsColumns() { + TypeInfo[] charsCols = ArrayOf(() -> TypeInfoFactory.charTypeInfo); + TypeInfo[] stringCols = ArrayOf(() -> TypeInfoFactory.stringTypeInfo); + TypeInfo[] binaryCols = ArrayOf(() -> TypeInfoFactory.binaryTypeInfo); + Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, stringCols, true), 9); + Assert.assertEquals(9, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, charsCols, true)); + Assert.assertEquals(9, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, binaryCols, true)); + } + + @Test public void testLongColumns() { + TypeInfo[] longsCols = ArrayOf(() -> TypeInfoFactory.longTypeInfo); + TypeInfo[] intCols = ArrayOf(() -> TypeInfoFactory.intTypeInfo); + TypeInfo[] byteCols = ArrayOf(() -> TypeInfoFactory.byteTypeInfo); + Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, longsCols, true)); + Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intCols, true)); + Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, byteCols, true)); + } + + @Test public void testTimestampsColumns() { + TypeInfo[] tsCols = ArrayOf(() -> TypeInfoFactory.timestampTypeInfo); + TypeInfo[] intervalCols = ArrayOf(() -> TypeInfoFactory.intervalDayTimeTypeInfo); + Assert.assertEquals(38, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, tsCols, true)); + Assert.assertEquals(38, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intervalCols, true)); + } + + private static TypeInfo[] ArrayOf(Supplier supplier) { + return IntStream.range(0, END_EXCLUSIVE).mapToObj(i -> supplier.get()).toArray(TypeInfo[]::new); + } +} diff --git storage-api/src/java/org/apache/hadoop/hive/common/Pool.java storage-api/src/java/org/apache/hadoop/hive/common/Pool.java index b9789eca17..0522cc1ee3 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/Pool.java +++ storage-api/src/java/org/apache/hadoop/hive/common/Pool.java @@ -17,21 +17,36 @@ */ package org.apache.hadoop.hive.common; -/** Simple object pool to prevent GC on small objects passed between threads. */ +/** + * Simple object pool to prevent GC on small objects passed between threads. + */ public interface Pool { - /** Object helper for objects stored in the pool. */ + /** + * Object helper for objects stored in the pool. + */ public interface PoolObjectHelper { - /** Called to create an object when one cannot be provided. + /** + * Called to create an object when one cannot be provided. + * * @return a newly allocated object */ T create(); - /** Called before the object is put in the pool (regardless of whether put succeeds). + + /** + * Called before the object is put in the pool (regardless of whether put succeeds). + * * @param t the object to reset */ void resetBeforeOffer(T t); } T take(); + void offer(T t); + int size(); -} \ No newline at end of file + + default void clear() { + //no op + } +}