From 16dbad892d43f4a575e0dad078e826b41fba46df Mon Sep 17 00:00:00 2001 From: Ma Gang Date: Tue, 12 Sep 2017 18:12:16 +0800 Subject: [PATCH] APACHE-KYLIN-2736: Use multiple threads to calculate HyperLogLogPlusCounter in FactDistinctColumnsMapper Signed-off-by: Zhong --- .../org/apache/kylin/common/KylinConfigBase.java | 9 + .../engine/mr/steps/FactDistinctColumnsMapper.java | 278 +++++++++++++++------ 2 files changed, 215 insertions(+), 72 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 66805df..be55920 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1023,6 +1023,15 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000")); } + public int getCuboidStatisticsCalculatorMaxNumber() { + // default multi-thread statistics calculation is disabled + return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1")); + } + + public int getCuboidNumberPerStatisticsCalculator() { + return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100")); + } + //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns public int getUHCReducerCount() { return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5")); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index ace16a5..272894f 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -20,8 +20,11 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collection; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinVersion; @@ -55,18 +58,17 @@ public class FactDistinctColumnsMapper extends FactDistinctColumnsMapperB BYTES } - protected boolean collectStatistics = false; protected int nRowKey; private Integer[][] allCuboidsBitSet = null; private HLLCounter[] allCuboidsHLL = null; private Long[] cuboidIds; - private HashFunction hf = null; private int rowCount = 0; private int samplingPercentage; - //private ByteArray[] row_hashcodes = null; - private long[] rowHashCodesLong = null; private ByteBuffer tmpbuf; + + private CuboidStatCalculator[] cuboidStatCalculators; + private static final Text EMPTY_TEXT = new Text(); public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; public static final byte MARK_FOR_HLL = (byte) 0xFF; @@ -76,9 +78,6 @@ public class FactDistinctColumnsMapper extends FactDistinctColumnsMapperB private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(); - //about details of the new algorithm, please see KYLIN-2518 - private boolean isUsePutRowKeyToHllNewAlgorithm; - @Override protected void doSetup(Context context) throws IOException { super.doSetup(context); @@ -116,18 +115,60 @@ public class FactDistinctColumnsMapper extends FactDistinctColumnsMapperB needFetchPartitionCol = true; } //for KYLIN-2518 backward compatibility + boolean isUsePutRowKeyToHllNewAlgorithm; if (KylinVersion.isBefore200(cubeDesc.getVersion())) { isUsePutRowKeyToHllNewAlgorithm = false; - hf = Hashing.murmur3_32(); logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion()); } else { isUsePutRowKeyToHllNewAlgorithm = true; - rowHashCodesLong = new long[nRowKey]; - hf = Hashing.murmur3_128(); logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion()); } + + int calculatorNum = getStatsThreadNum(cuboidIds.length); + cuboidStatCalculators = new CuboidStatCalculator[calculatorNum]; + int splitSize = cuboidIds.length / calculatorNum; + if (splitSize <= 0) { + splitSize = 1; + } + for (int i = 0; i < calculatorNum; i++) { + HLLCounter[] cuboidsHLLSplit; + Integer[][] cuboidsBitSetSplit; + Long[] cuboidIdSplit; + int start = i * splitSize; + if (start > cuboidIds.length) { + break; + } + int end = (i + 1) * splitSize; + if (i == calculatorNum - 1) {// last split + end = cuboidIds.length; + } + + cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end); + cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, end); + cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end); + CuboidStatCalculator calculator = new CuboidStatCalculator(i, + intermediateTableDesc.getRowKeyColumnIndexes(), cuboidIdSplit, cuboidsBitSetSplit, + isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit); + cuboidStatCalculators[i] = calculator; + calculator.start(); + } } + } + private int getStatsThreadNum(int cuboidNum) { + int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatisticsCalculator(); + if (unitNum <= 0) { + logger.warn("config from getCuboidNumberPerStatisticsCalculator() " + unitNum + " is should larger than 0"); + logger.info("Will use single thread for cuboid statistics calculation"); + return 1; + } + + int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatisticsCalculatorMaxNumber(); + int calculatorNum = (cuboidNum - 1) / unitNum + 1; + if (calculatorNum > maxCalculatorNum) { + calculatorNum = maxCalculatorNum; + } + return calculatorNum; } @Override @@ -172,11 +213,7 @@ public class FactDistinctColumnsMapper extends FactDistinctColumnsMapperB if (collectStatistics) { if (rowCount % 100 < samplingPercentage) { - if (isUsePutRowKeyToHllNewAlgorithm) { - putRowKeyToHLLNew(row); - } else { - putRowKeyToHLLOld(row); - } + putRowKeyToHLL(row); } if (needFetchPartitionCol == true) { @@ -200,6 +237,12 @@ public class FactDistinctColumnsMapper extends FactDistinctColumnsMapperB } } + private void putRowKeyToHLL(String[] row) { + for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) { + cuboidStatCalculator.putRow(row); + } + } + private long countSizeInBytes(String[] row) { int size = 0; for (String s : row) { @@ -209,80 +252,171 @@ public class FactDistinctColumnsMapper extends FactDistinctColumnsMapperB return size; } - private void putRowKeyToHLLOld(String[] row) { - //generate hash for each row key column - byte[][] rowHashCodes = new byte[nRowKey][]; - for (int i = 0; i < nRowKey; i++) { - Hasher hc = hf.newHasher(); - String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]; - if (colValue != null) { - rowHashCodes[i] = hc.putString(colValue).hash().asBytes(); - } else { - rowHashCodes[i] = hc.putInt(0).hash().asBytes(); + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + if (collectStatistics) { + ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + // output each cuboid's hll to reducer, key is 0 - cuboidId + for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) { + cuboidStatCalculator.waitComplete(); + } + for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) { + Long[] cuboidIds = cuboidStatCalculator.getCuboidIds(); + HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters(); + HLLCounter hll; + + for (int i = 0; i < cuboidIds.length; i++) { + hll = cuboidsHLL[i]; + tmpbuf.clear(); + tmpbuf.put(MARK_FOR_HLL); // one byte + tmpbuf.putLong(cuboidIds[i]); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); + hllBuf.clear(); + hll.writeRegisters(hllBuf); + outputValue.set(hllBuf.array(), 0, hllBuf.position()); + sortableKey.init(outputKey, (byte) 0); + context.write(sortableKey, outputValue); + } } } + } - // user the row key column hash to get a consolidated hash for each cuboid - for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { - Hasher hc = hf.newHasher(); - for (int position = 0; position < allCuboidsBitSet[i].length; position++) { - hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]); + private int countNewSize(int oldSize, int dataSize) { + int newSize = oldSize * 2; + while (newSize < dataSize) { + newSize = newSize * 2; + } + return newSize; + } + + public static class CuboidStatCalculator implements Runnable { + private final int id; + private final int nRowKey; + private final int[] rowkeyColIndex; + private final Long[] cuboidIds; + private final Integer[][] cuboidsBitSet; + private volatile HLLCounter[] cuboidsHLL = null; + + //about details of the new algorithm, please see KYLIN-2518 + private final boolean isNewAlgorithm; + private final HashFunction hf; + private long[] rowHashCodesLong; + + private BlockingQueue queue = new LinkedBlockingQueue(2000); + private Thread workThread; + private volatile boolean stop; + + public CuboidStatCalculator(int id, int[] rowkeyColIndex, Long[] cuboidIds, Integer[][] cuboidsBitSet, + boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] cuboidsHLL) { + this.id = id; + this.nRowKey = rowkeyColIndex.length; + this.rowkeyColIndex = rowkeyColIndex; + this.cuboidIds = cuboidIds; + this.cuboidsBitSet = cuboidsBitSet; + this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm; + if (!isNewAlgorithm) { + this.hf = Hashing.murmur3_32(); + } else { + rowHashCodesLong = new long[nRowKey]; + this.hf = Hashing.murmur3_128(); } + this.cuboidsHLL = cuboidsHLL; + workThread = new Thread(this); + } - allCuboidsHLL[i].add(hc.hash().asBytes()); + public void start() { + logger.info("cuboid stats calculator:" + id + " started, handle cuboids number:" + cuboidIds.length); + workThread.start(); } - } - private void putRowKeyToHLLNew(String[] row) { - //generate hash for each row key column - for (int i = 0; i < nRowKey; i++) { - Hasher hc = hf.newHasher(); - String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]; - if (colValue == null) - colValue = "0"; - byte[] bytes = hc.putString(colValue).hash().asBytes(); - rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a) + public void putRow(final String[] row) { + String[] copyRow = Arrays.copyOf(row, row.length); + try { + queue.put(copyRow); + } catch (InterruptedException e) { + logger.error("interrupt", e); + } } - // user the row key column hash to get a consolidated hash for each cuboid - for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { - long value = 0; - for (int position = 0; position < allCuboidsBitSet[i].length; position++) { - value += rowHashCodesLong[allCuboidsBitSet[i][position]]; + public void waitComplete() { + stop = true; + try { + workThread.join(); + } catch (InterruptedException e) { + logger.error("interrupt", e); } - allCuboidsHLL[i].addHashDirectly(value); } - } - @Override - protected void doCleanup(Context context) throws IOException, InterruptedException { - if (collectStatistics) { - ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - // output each cuboid's hll to reducer, key is 0 - cuboidId - HLLCounter hll; - for (int i = 0; i < cuboidIds.length; i++) { - hll = allCuboidsHLL[i]; + private void putRowKeyToHLLOld(String[] row) { + //generate hash for each row key column + byte[][] rowHashCodes = new byte[nRowKey][]; + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[rowkeyColIndex[i]]; + if (colValue != null) { + rowHashCodes[i] = hc.putString(colValue).hash().asBytes(); + } else { + rowHashCodes[i] = hc.putInt(0).hash().asBytes(); + } + } - tmpbuf.clear(); - tmpbuf.put(MARK_FOR_HLL); // one byte - tmpbuf.putLong(cuboidIds[i]); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - hllBuf.clear(); - hll.writeRegisters(hllBuf); - outputValue.set(hllBuf.array(), 0, hllBuf.position()); - sortableKey.init(outputKey, (byte) 0); - context.write(sortableKey, outputValue); + // user the row key column hash to get a consolidated hash for each cuboid + for (int i = 0, n = cuboidsBitSet.length; i < n; i++) { + Hasher hc = hf.newHasher(); + for (int position = 0; position < cuboidsBitSet[i].length; position++) { + hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]); + } + + cuboidsHLL[i].add(hc.hash().asBytes()); } } - } + private void putRowKeyToHLLNew(String[] row) { + //generate hash for each row key column + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[rowkeyColIndex[i]]; + if (colValue == null) + colValue = "0"; + byte[] bytes = hc.putString(colValue).hash().asBytes(); + rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a) + } + + // user the row key column hash to get a consolidated hash for each cuboid + for (int i = 0, n = cuboidsBitSet.length; i < n; i++) { + long value = 0; + for (int position = 0; position < cuboidsBitSet[i].length; position++) { + value += rowHashCodesLong[cuboidsBitSet[i][position]]; + } + cuboidsHLL[i].addHashDirectly(value); + } + } - private int countNewSize(int oldSize, int dataSize) { - int newSize = oldSize * 2; - while (newSize < dataSize) { - newSize = newSize * 2; + public HLLCounter[] getHLLCounters() { + return cuboidsHLL; } - return newSize; - } + public Long[] getCuboidIds() { + return cuboidIds; + } + + @Override + public void run() { + while (true) { + String[] row = queue.poll(); + if (row == null && stop) { + logger.info("cuboid stats calculator:" + id + " completed."); + break; + } else if (row == null) { + Thread.yield(); + continue; + } + if (isNewAlgorithm) { + putRowKeyToHLLNew(row); + } else { + putRowKeyToHLLOld(row); + } + } + } + } } -- 2.5.4 (Apple Git-61)