From 658a1f20c49481063858f77f69b330ba4ca6a78f Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Wed, 26 Oct 2016 19:35:20 +0800 Subject: [PATCH] KYLIN-2135 Enlarge FactDistinctColumns reducer number --- .../org/apache/kylin/common/KylinConfigBase.java | 5 ++ .../java/org/apache/kylin/cube/CubeManager.java | 35 +++++++++++++ .../apache/kylin/engine/mr/DFSFileTableReader.java | 57 ++++++++++++++++++---- .../kylin/engine/mr/common/BatchConstants.java | 5 ++ .../mr/steps/FactDistinctColumnPartitioner.java | 11 +---- .../engine/mr/steps/FactDistinctColumnsJob.java | 18 ++++++- .../mr/steps/FactDistinctColumnsMapperBase.java | 17 ++++++- .../mr/steps/FactDistinctColumnsReducer.java | 36 ++++++++++++-- .../mr/steps/FactDistinctHiveColumnsMapper.java | 12 ++++- 9 files changed, 170 insertions(+), 26 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index acc4eb1..9ac8142 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -374,6 +374,11 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true")); } + //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns + public int getUHCReducerCount() { + return Integer.parseInt(getOptional("kylin.job.uhc.reducer.count", "3")); + } + public String getOverrideHiveTableLocation(String table) { return getOptional("hive.table.location." + table.toUpperCase()); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index a53849e..738ef21 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -45,6 +46,7 @@ import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.DictionaryDesc; import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; @@ -1100,4 +1102,37 @@ public class CubeManager implements IRealizationProvider { } return holes; } + + private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; + + //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns + public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { + List factDictCols = getAllDictColumnsOnFact(cubeDesc); + int[] uhcIndex = new int[factDictCols.size()]; + + //add GlobalDictionaryColumns + List dictionaryDescList = cubeDesc.getDictionaries(); + if (dictionaryDescList != null) { + for (DictionaryDesc dictionaryDesc : dictionaryDescList) { + if (dictionaryDesc.getBuilderClass() != null && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) { + for (int i = 0; i < factDictCols.size(); i++) { + if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) { + uhcIndex[i] = 1; + break; + } + } + } + } + } + + //add ShardByColumns + Set shardByColumns = cubeDesc.getShardByColumns(); + for (int i = 0; i < factDictCols.size(); i++) { + if (shardByColumns.contains(factDictCols.get(i))) { + uhcIndex[i] = 1; + } + } + + return uhcIndex; + } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java index 300b123..3763698 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java @@ -23,10 +23,14 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -53,7 +57,7 @@ public class DFSFileTableReader implements TableReader { private String filePath; private String delim; - private RowReader reader; + private List readerList; private String curLine; private String[] curColumns; @@ -68,17 +72,33 @@ public class DFSFileTableReader implements TableReader { this.filePath = filePath; this.delim = delim; this.expectedColumnNumber = expectedColumnNumber; + this.readerList = new ArrayList(); FileSystem fs = HadoopUtil.getFileSystem(filePath); - try { - this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath); + ArrayList allFiles = new ArrayList<>(); + FileStatus status = fs.getFileStatus(new Path(filePath)); + if (status.isFile()) { + allFiles.add(status); + } else { + FileStatus[] listStatus = fs.listStatus(new Path(filePath)); + allFiles.addAll(Arrays.asList(listStatus)); + } + try { + for (FileStatus f : allFiles) { + RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString()); + this.readerList.add(rowReader); + } } catch (IOException e) { if (isExceptionSayingNotSeqFile(e) == false) throw e; - this.reader = new CsvRowReader(fs, filePath); + this.readerList = new ArrayList(); + for (FileStatus f : allFiles) { + RowReader rowReader = new CsvRowReader(fs, f.getPath().toString()); + this.readerList.add(rowReader); + } } } @@ -94,9 +114,20 @@ public class DFSFileTableReader implements TableReader { @Override public boolean next() throws IOException { - curLine = reader.nextLine(); - curColumns = null; - return curLine != null; + int curReaderIndex = -1; + RowReader curReader; + + while (++curReaderIndex < readerList.size()) { + curReader = readerList.get(curReaderIndex); + curLine = curReader.nextLine(); + curColumns = null; + + if (curLine != null) { + return true; + } + } + + return false; } public String getLine() { @@ -145,9 +176,15 @@ public class DFSFileTableReader implements TableReader { } @Override - public void close() throws IOException { - if (reader != null) - reader.close(); + public void close() { + for (RowReader reader : readerList) { + try { + if (reader != null) + reader.close(); + } catch (IOException e) { + logger.warn("close file failed:", e); + } + } } private String autoDetectDelim(String line) { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index e4a8808..078d80f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -81,4 +81,9 @@ public interface BatchConstants { String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder"; int NORMAL_RECORD_LOG_THRESHOLD = 100000; int ERROR_RECORD_LOG_THRESHOLD = 100; + + /** + * dictionaries builder class + */ + String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index 6973c4b..b36e422 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@ -18,7 +18,6 @@ package org.apache.kylin.engine.mr.steps; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.kylin.common.util.BytesUtil; @@ -26,22 +25,16 @@ import org.apache.kylin.common.util.BytesUtil; /** */ public class FactDistinctColumnPartitioner extends Partitioner { - private Configuration conf; - @Override public int getPartition(Text key, Text value, int numReduceTasks) { - if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) { // the last reducer is for merging hll return numReduceTasks - 1; } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) { - // the last reducer is for merging hll + // the last but one reducer is for partition col return numReduceTasks - 2; } else { - int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1); - return colIndex; + return BytesUtil.readUnsigned(key.getBytes(), 0, 1); } - } - } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 6603728..0802d2f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -78,11 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { CubeInstance cube = cubeMgr.getCube(cubeName); List columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor()); + int reducerCount = columnsNeedDict.size(); + int uhcReducerCount = cube.getConfig().getUHCReducerCount(); + + int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor()); + for(int index : uhcIndex) { + if(index == 1) { + reducerCount += uhcReducerCount - 1; + } + } + + if (reducerCount > 255) { + throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' "); + } + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent); + logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); @@ -101,7 +117,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { System.out.println("Found segment " + segment); } setupMapper(cube.getSegmentById(segmentID)); - setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size()); + setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount); attachKylinPropsAndMetadata(cube, job.getConfiguration()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index 3fa966d..196bf1e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -20,7 +20,9 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -58,6 +60,10 @@ public class FactDistinctColumnsMapperBase extends KylinMapper columnIndexToReducerBeginId = new HashMap<>(); + @Override protected void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); @@ -73,7 +79,7 @@ public class FactDistinctColumnsMapperBase extends KylinMapper extends KylinMapper ReducerIdToColumnIndex = new HashMap<>(); + private int taskId; + protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class); @Override @@ -83,7 +88,10 @@ public class FactDistinctColumnsReducer extends KylinReducer values, Context context) throws IOException, InterruptedException { @@ -153,10 +175,16 @@ public class FactDistinctColumnsReducer extends KylinReducer extends FactDistinctColumnsMap if (fieldValue == null) continue; int offset = keyBuffer.position(); - keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough + + int reducerIndex; + if (uhcIndex[i] == 0) { + //for the normal dictionary column + reducerIndex = columnIndexToReducerBeginId.get(i); + } else { + //for the uhc + reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; + } + + keyBuffer.put(Bytes.toBytes(reducerIndex)[3]); keyBuffer.put(Bytes.toBytes(fieldValue)); outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); context.write(outputKey, EMPTY_TEXT); -- 2.9.3 (Apple Git-75)