From 647ca751af82d5c4bf8d62d6720e49501d45c9df Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Thu, 9 Apr 2020 16:31:58 +0100 Subject: [PATCH] Guard VGB op from flushing too often Change-Id: Id8e5e93f54b157612454af4d2b6cddb1d51fa5bd --- .../ql/exec/vector/VectorGroupByOperator.java | 9 +- .../vector/TestVectorGroupByOperator.java | 93 +++++++++++++++++++ 2 files changed, 99 insertions(+), 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 7fe6151a17..f104c13a49 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -593,20 +593,23 @@ private void flush(boolean all) throws HiveException { /** * Returns true if the memory threshold for the hash table was reached. + * WARN: Frequent flushing can reduce Op throughput */ private boolean shouldFlush(VectorizedRowBatch batch) { if (batch.size == 0) { return false; } - //numEntriesSinceCheck is the number of entries added to the hash table + // numEntriesSinceCheck is the number of entries added to the hash table // since the last time we checked the average variable size if (numEntriesSinceCheck >= this.checkInterval) { // Were going to update the average variable row size by sampling the current batch updateAvgVariableSize(batch); numEntriesSinceCheck = 0; } - if (numEntriesHashTable > this.maxHtEntries || - numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory) { + long currMemUsed = numEntriesHashTable * (fixedHashEntrySize + avgVariableSize); + // Protect against low maxHtEntries setting: if memory usage is below 30% avoid flushing + if ( ((numEntriesHashTable > this.maxHtEntries) && (currMemUsed > 0.3 * maxHashTblMemory)) || + currMemUsed > maxHashTblMemory) { return true; } if (gcCanary.get() == null) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index f2d95bb397..e8586fce25 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -440,6 +440,99 @@ public void remove() { } } + @Test + public void testMaxHTEntriesFlush() throws HiveException { + + List mapColumnNames = new ArrayList(); + mapColumnNames.add("Key"); + mapColumnNames.add("Value"); + VectorizationContext ctx = new VectorizationContext("name", mapColumnNames); + + Pair pair = buildKeyGroupByDesc (ctx, "max", + "Value", TypeInfoFactory.longTypeInfo, + "Key", TypeInfoFactory.longTypeInfo); + GroupByDesc desc = pair.fst; + VectorGroupByDesc vectorDesc = pair.snd; + + // Set the memory treshold so that we get 100Kb before we need to flush. + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + long maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + + // 1 MB should be able to store 1M/16bytes(key+data) = 62500 entries + float treshold = 10 * 100.0f*1024.0f/maxMemory; + desc.setMemoryThreshold(treshold); + + // Set really low MAXENTRIES setting + hconf.set("hive.vectorized.groupby.maxentries", "100"); + + CompilationOpContext cCtx = new CompilationOpContext(); + + Operator groupByOp = OperatorFactory.get(cCtx, desc); + + VectorGroupByOperator vgo = + (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc); + + FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo); + vgo.initialize(hconf, null); + + long expected = vgo.getMaxMemory(); + assertEquals(expected, maxMemory); + + this.outputRowCount = 0; + out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() { + @Override + public void inspectRow(Object row, int tag) throws HiveException { + ++outputRowCount; + } + }); + + Iterable it = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator () { + long value = 0; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Object next() { + return ++value; + } + + @Override + public void remove() { + } + }; + } + }; + + FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables( + 100, + new String[] {"long", "long"}, + it, + it); + + // The 'it' data source will produce data w/o ever ending + // We want to see that VGBY entries are NOT flushed when reaching 100 + // (misconfigured threshold) but when we reach about 30% of maxMemory + long countRowsProduced = 0; + for (VectorizedRowBatch unit: data) { + countRowsProduced += 100; + vgo.process(unit, 0); + if (0 < outputRowCount) { + break; + } + + } + // Make sure that we did not flush at the low entry threshold + assertTrue( countRowsProduced > 100); + // Make sure we did not go above 30% of available memory + assertTrue(countRowsProduced < 0.3 * (1000 * 1024 / 16)); + } + @Test public void testMultiKeyIntStringInt() throws HiveException { testMultiKey( -- 2.20.1 (Apple Git-117)