From 13a4ca8bdd5efcbaf5a8b5d64909a905d4bc0c4a Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Sat, 17 Dec 2016 14:12:48 +0800 Subject: [PATCH] KYLIN-2242 write multiple files in FactDistinctColumnsReducer with MultipleOutputs --- .../kylin/engine/mr/common/BatchConstants.java | 6 + .../kylin/engine/mr/steps/CreateDictionaryJob.java | 23 +++- .../engine/mr/steps/FactDistinctColumnsJob.java | 23 +++- .../mr/steps/FactDistinctColumnsReducer.java | 136 +++++++-------------- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 21 +++- 5 files changed, 103 insertions(+), 106 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 078d80f..a77714c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -56,6 +56,12 @@ public interface BatchConstants { String CFG_STATISTICS_CUBE_ESTIMATION_FILENAME = "cube_statistics.txt"; String CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME = "cuboid_statistics.seq"; + String CFG_MAPRED_OUTPUT_COMPRESS = "mapred.output.compress"; + + String CFG_OUTPUT_COLUMN = "column"; + String CFG_OUTPUT_DICT = "dict"; + String CFG_OUTPUT_PARTITION= "partition"; + /** * command line ARGuments */ diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index 4985503..a7f6537 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -23,8 +23,10 @@ import java.io.IOException; import org.apache.commons.cli.Options; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; @@ -64,11 +66,15 @@ public class CreateDictionaryJob extends AbstractHadoopJob { @Override public Dictionary getDictionary(TblColRef col) throws IOException { Path colDir = new Path(factColumnsInputPath, col.getName()); - Path dictFile = new Path(colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); - FileSystem fs = HadoopUtil.getFileSystem(dictFile.toString()); - if (fs.exists(dictFile) == false) + FileSystem fs = HadoopUtil.getFileSystem(colDir.toString()); + FileStatus[] fileStatus = getFilterPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); + Path dictFile; + if (fileStatus.length == 1) { + dictFile = fileStatus[0].getPath(); + } else { return null; - + } + FSDataInputStream is = null; try { is = fs.open(dictFile); @@ -86,6 +92,15 @@ public class CreateDictionaryJob extends AbstractHadoopJob { return 0; } + private FileStatus[] getFilterPath(FileSystem fs, Path baseDir, final String filter) throws IOException { + return fs.listStatus(baseDir, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(filter); + } + }); + } + public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new CreateDictionaryJob(), args); System.exit(exitCode); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 2eb694e..c2c8b69 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -23,11 +23,15 @@ import java.util.List; import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; @@ -82,8 +86,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { int uhcReducerCount = cube.getConfig().getUHCReducerCount(); int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor()); - for(int index : uhcIndex) { - if(index == 1) { + for (int index : uhcIndex) { + if (index == 1) { reducerCount += uhcReducerCount - 1; } } @@ -92,7 +96,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.engine.mr.uhc-reducer-count'"); } - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled); @@ -118,6 +121,12 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { attachKylinPropsAndMetadata(cube, job.getConfiguration()); + /** + * don't compress the reducer output so that {@link CreateDictionaryJob} and {@link UpdateCubeInfoAfterBuildStep} + * could read the reducer file directly + */ + job.getConfiguration().set(BatchConstants.CFG_MAPRED_OUTPUT_COMPRESS, "false"); + return waitForCompletion(job); } finally { @@ -139,12 +148,14 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { private void setupReducer(Path output, int numberOfReducers) throws IOException { job.setReducerClass(FactDistinctColumnsReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(Text.class); job.setPartitionerClass(FactDistinctColumnPartitioner.class); job.setNumReduceTasks(numberOfReducers); + //make each reducer output to respective dir + MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class); + MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, TextOutputFormat.class, NullWritable.class, BytesWritable.class); + MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class); + FileOutputFormat.setOutputPath(job, output); job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 3115fe4..a94d3ca 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -18,23 +18,26 @@ package org.apache.kylin.engine.mr.steps; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.Dictionary; @@ -69,7 +72,6 @@ public class FactDistinctColumnsReducer extends KylinReducer colValues; private TblColRef col = null; private boolean isStatistics = false; private KylinConfig cubeConfig; @@ -86,10 +88,14 @@ public class FactDistinctColumnsReducer extends KylinReducer values, Context context) throws IOException { - final Configuration conf = context.getConfiguration(); - final FileSystem fs = FileSystem.get(conf); - final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH); - final Path colDir = new Path(outputPath, col.getName()); - final String fileName = col.getName() + "-" + taskId % uhcReducerCount; - final Path outputFile = new Path(colDir, fileName); - - FSDataOutputStream out = null; - try { - if (!fs.exists(colDir)) { - fs.mkdirs(colDir); - } - - if (fs.exists(outputFile)) { - out = fs.append(outputFile); - logger.info("append file " + outputFile); - } else { - out = fs.create(outputFile); - logger.info("create file " + outputFile); + byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1); + // output written to baseDir/colName/-r-00000 (etc) + String fileName = col.getName() + "/"; + mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName); } - - for (ByteArray value : values) { - out.write(value.array(), value.offset(), value.length()); - out.write('\n'); - } - } finally { - IOUtils.closeQuietly(out); - } - } - - private void outputDict(TblColRef col, Dictionary dict, Context context) throws IOException { - final String fileName = col.getName() + DICT_FILE_POSTFIX; - FSDataOutputStream out = getOutputStream(context, fileName); - try { - String dictClassName = dict.getClass().getName(); - out.writeUTF(dictClassName); - dict.write(out); - logger.info("reducer id is:+" + taskId + " colName:" + col.getName() + " writing dict at file : " + fileName + " dict class:" + dictClassName); - } finally { - IOUtils.closeQuietly(out); } } - private void outputPartitionInfo(Context context) throws IOException { - final String fileName = col.getName() + PARTITION_COL_INFO_FILE_POSTFIX; - FSDataOutputStream out = getOutputStream(context, fileName); - try { - out.writeLong(timeMinValue); - out.writeLong(timeMaxValue); - logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue); - } finally { - IOUtils.closeQuietly(out); - } - } - - private FSDataOutputStream getOutputStream(Context context, String outputFileName) throws IOException { - final Configuration conf = context.getConfiguration(); - final FileSystem fs = FileSystem.get(conf); - final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH); - final Path colDir = new Path(outputPath, col.getName()); - final Path outputFile = new Path(colDir, outputFileName); - FSDataOutputStream out = null; - if (!fs.exists(colDir)) { - fs.mkdirs(colDir); - } - fs.deleteOnExit(outputFile); - out = fs.create(outputFile); - return out; - } - @Override protected void doCleanup(Context context) throws IOException, InterruptedException { if (isStatistics) { - // output the hll info + //output the hll info; long grandTotal = 0; for (HLLCounter hll : cuboidHLLMap.values()) { grandTotal += hll.getCountEstimate(); @@ -282,21 +211,40 @@ public class FactDistinctColumnsReducer extends KylinReducer dict = builder.build(); - outputDict(col, dict, context); - } else { - if (colValues.size() > 0) { - outputDistinctValues(col, colValues, context); - colValues.clear(); - } + outputDict(col, dict); } } + + mos.close(); + } + + private void outputPartitionInfo() throws IOException, InterruptedException { + if (col != null) { + // output written to baseDir/colName/colName.pci-r-00000 (etc) + String partitionFileName = col.getName() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX; + + mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName); + mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName); + logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue); + } + } + + private void outputDict(TblColRef col, Dictionary dict) throws IOException, InterruptedException { + // output written to baseDir/colName/colName.rldict-r-00000 (etc) + String dictFileName = col.getName() + "/" + col.getName() + DICT_FILE_POSTFIX; + + mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(dict.getClass().getName().getBytes()), dictFileName); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) { + dict.write(outputStream); + + mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName); + } } private void writeMapperAndCuboidStatistics(Context context) throws IOException { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index d3becfe..eebc523 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -22,8 +22,10 @@ import java.io.IOException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -82,8 +84,15 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); Path colDir = new Path(factColumnsInputPath, partitionCol.getName()); - Path outputFile = new Path(colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX); - FileSystem fs = HadoopUtil.getFileSystem(outputFile.toString()); + FileSystem fs = HadoopUtil.getFileSystem(colDir.toString()); + FileStatus[] fileStatus = getFilterPath(fs, colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX); + Path outputFile; + if (fileStatus.length == 1) { + outputFile = fileStatus[0].getPath(); + } else { + throw new IOException("fail to find the partition file in base dir: " + colDir); + } + FSDataInputStream is = null; long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE; try { @@ -100,4 +109,12 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { segment.setDateRangeEnd(maxValue); } + private FileStatus[] getFilterPath(FileSystem fs, Path baseDir, final String filter) throws IOException { + return fs.listStatus(baseDir, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(filter); + } + }); + } } -- 2.10.1 (Apple Git-78)