diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index c2ee54c11a..68d29eb70e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -169,6 +169,7 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos(), + rbCtx.getDataColumnNums(), decimal64Support); LOG.info("Queue limit for LlapRecordReader is " + limit); this.queue = new ArrayBlockingQueue<>(limit); @@ -223,6 +224,7 @@ static int determineQueueLimit(long maxBufferedSize, int queueLimitMax, int queueLimitMin, TypeInfo[] typeInfos, + int[] projectedColumnNums, final boolean decimal64Support) { assert queueLimitMax >= queueLimitMin; // If the values are equal, the queue limit is fixed. @@ -257,8 +259,8 @@ static int determineQueueLimit(long maxBufferedSize, // 88 8 byte[] BytesColumnVector.smallBuffer long columnVectorBaseSize = (long) (96 * numberOfProjectedColumns * scale); - for (int i = 0; i < typeInfos.length; i++) { - TypeInfo ti = typeInfos[i]; + for (int i = 0; i < projectedColumnNums.length; i++) { + TypeInfo ti = typeInfos[projectedColumnNums[i]]; int colWeight; if (ti.getCategory() != Category.PRIMITIVE) { colWeight = COL_WEIGHT_COMPLEX; diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java index 7e71cf2144..c447c78dfc 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java @@ -34,61 +34,78 @@ private static final int MAX_BUFFERED_SIZE = 1 << 30; //1GB @Test public void testMaxEqMin() { - int expected = LlapRecordReader.determineQueueLimit(0, 100, 100, null, true); + int expected = LlapRecordReader.determineQueueLimit(0, 100, 100, null, null,true); Assert.assertEquals(100, expected); } @Test public void testMaxIsEnforced() { TypeInfo[] cols = { new DecimalTypeInfo() }; - int actual = LlapRecordReader.determineQueueLimit(Long.MAX_VALUE, 10, 1, cols, true); + int[] colsProjected = {0}; + int actual = LlapRecordReader.determineQueueLimit(Long.MAX_VALUE, 10, 1, cols, colsProjected, true); Assert.assertEquals(10, actual); } @Test public void testMinIsEnforced() { TypeInfo[] cols = { new DecimalTypeInfo() }; - int actual = LlapRecordReader.determineQueueLimit(0, 10, 5, cols, true); + int[] colsProjected = {0}; + int actual = LlapRecordReader.determineQueueLimit(0, 10, 5, cols, colsProjected, true); Assert.assertEquals(5, actual); } @Test public void testOrderDecimal64VsFatDecimals() { TypeInfo[] cols = IntStream.range(0, 300).mapToObj(i -> new DecimalTypeInfo()).toArray(TypeInfo[]::new); - int actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, cols, true); + int[] colsProjected = IntStream.range(0, 300).toArray(); + int actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, cols, colsProjected, true); Assert.assertEquals(75, actual); // the idea it to see an order of 10 when using fat Decimals - actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, cols, false); + actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, cols, colsProjected, false); Assert.assertEquals(7, actual); } @Test public void testOrderDecimal64VsLong() { TypeInfo[] decimalCols = ArrayOf(() -> new DecimalTypeInfo(TypeDescription.MAX_DECIMAL64_PRECISION, 0)); TypeInfo[] longCols = ArrayOf(() -> TypeInfoFactory.longTypeInfo); - Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, longCols, true), - LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, decimalCols, true)); + int[] colsProjected = IntStream.range(0, 300).toArray(); + Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, longCols, colsProjected, true), + LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, decimalCols, colsProjected, true)); } @Test public void testStringsColumns() { TypeInfo[] charsCols = ArrayOf(() -> TypeInfoFactory.charTypeInfo); TypeInfo[] stringCols = ArrayOf(() -> TypeInfoFactory.stringTypeInfo); TypeInfo[] binaryCols = ArrayOf(() -> TypeInfoFactory.binaryTypeInfo); - Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, stringCols, true), 9); - Assert.assertEquals(9, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, charsCols, true)); - Assert.assertEquals(9, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, binaryCols, true)); + int[] colsProjected = IntStream.range(0, 300).toArray(); + Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, stringCols, colsProjected, true), 9); + Assert.assertEquals(9, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, charsCols, colsProjected, true)); + Assert.assertEquals(9, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, binaryCols, colsProjected, true)); } @Test public void testLongColumns() { TypeInfo[] longsCols = ArrayOf(() -> TypeInfoFactory.longTypeInfo); TypeInfo[] intCols = ArrayOf(() -> TypeInfoFactory.intTypeInfo); TypeInfo[] byteCols = ArrayOf(() -> TypeInfoFactory.byteTypeInfo); - Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, longsCols, true)); - Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intCols, true)); - Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, byteCols, true)); + int[] colsProjected = IntStream.range(0, 300).toArray(); + Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, longsCols, colsProjected, true)); + Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intCols, colsProjected, true)); + Assert.assertEquals(75, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, byteCols, colsProjected, true)); } @Test public void testTimestampsColumns() { TypeInfo[] tsCols = ArrayOf(() -> TypeInfoFactory.timestampTypeInfo); TypeInfo[] intervalCols = ArrayOf(() -> TypeInfoFactory.intervalDayTimeTypeInfo); - Assert.assertEquals(38, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, tsCols, true)); - Assert.assertEquals(38, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intervalCols, true)); + int[] colsProjected = IntStream.range(0, 300).toArray(); + Assert.assertEquals(38, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, tsCols, colsProjected, true)); + Assert.assertEquals(38, LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intervalCols, colsProjected, true)); + } + + @Test public void testProjectedColumns() { + TypeInfo[] cols = IntStream.range(0, 300).mapToObj(i -> new DecimalTypeInfo()).toArray(TypeInfo[]::new); + int[] colsProjected = {0}; + int actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, cols, colsProjected, true); + Assert.assertEquals(5088, actual); + // the idea it to see an order of 10 when using fat Decimals + actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, cols, colsProjected, false); + Assert.assertEquals(1700, actual); } private static TypeInfo[] ArrayOf(Supplier supplier) { diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 48501e5e39..565afdc90b 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -16,6 +16,7 @@ import com.google.common.io.ByteArrayDataOutput; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector; import org.apache.hadoop.io.Text; import org.apache.hadoop.metrics2.MetricsSource; @@ -1088,7 +1089,9 @@ public void dagComplete() { } finally { writeLock.unlock(); } - updateGuaranteedInRegistry(tgVersionForZk, 0); + if (!StringUtils.isEmpty(conf.get(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim())) { + updateGuaranteedInRegistry(tgVersionForZk, 0); + } // TODO Cleanup pending tasks etc, so that the next dag is not affected. }