From 5b6aefd10ccedb1318ffe8840b626bc09b68f028 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Mon, 22 Dec 2014 16:56:58 +0800 Subject: [PATCH 01/16] use HCatalog to in MR --- job/pom.xml | 6 ++++- .../job/hadoop/cube/FactDistinctColumnsJob.java | 26 +++++++++++++++------- .../job/hadoop/cube/FactDistinctColumnsMapper.java | 18 +++++++++------ .../com/kylinolap/job/BuildCubeWithEngineTest.java | 2 +- 4 files changed, 35 insertions(+), 17 deletions(-) diff --git a/job/pom.xml b/job/pom.xml index b039957..e89f2c0 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -169,7 +169,11 @@ hadoop-hdfs provided - + + org.apache.hive.hcatalog + hive-hcatalog-core + 0.14.0 + diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java index 556f690..f6e38a5 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java @@ -26,15 +26,15 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.kylinolap.common.KylinConfig; +import com.kylinolap.cube.CubeInstance; import com.kylinolap.cube.CubeManager; import com.kylinolap.job.constant.BatchConstants; import com.kylinolap.job.hadoop.AbstractHadoopJob; @@ -64,17 +64,19 @@ public int run(String[] args) throws Exception { Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); // ---------------------------------------------------------------------------- + // add metadata to distributed cache + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cubeInstance = cubeMgr.getCube(cubeName); + String factTableName = cubeInstance.getDescriptor().getFactTable(); job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); System.out.println("Starting: " + job.getJobName()); - setupMapInput(input, inputFormat); + setupMapInput(input, inputFormat, factTableName); setupReduceOutput(output); - // add metadata to distributed cache - CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment(); - attachKylinPropsAndMetadata(cubeMgr.getCube(cubeName), job.getConfiguration()); + attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration()); return waitForCompletion(job); @@ -86,7 +88,7 @@ public int run(String[] args) throws Exception { } - private void setupMapInput(Path input, String inputFormat) throws IOException { + private void setupMapInput(Path input, String inputFormat, String factTableName) throws IOException { FileInputFormat.setInputPaths(job, input); File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath()); @@ -95,12 +97,20 @@ private void setupMapInput(Path input, String inputFormat) throws IOException { } else { job.setJarByClass(this.getClass()); } - + + /* if ("text".equalsIgnoreCase(inputFormat) || "textinputformat".equalsIgnoreCase(inputFormat)) { job.setInputFormatClass(TextInputFormat.class); } else { job.setInputFormatClass(SequenceFileInputFormat.class); } + */ +// HCatInputFormat.setInput(job, "default", +// factTableName); + HCatInputFormat.setInput(job, "default", + factTableName.toLowerCase()); + + job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(FactDistinctColumnsMapper.class); job.setCombinerClass(FactDistinctColumnsCombiner.class); job.setMapOutputKeyClass(ShortWritable.class); diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java index c95018e..375e9f2 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java @@ -20,9 +20,11 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hive.hcatalog.data.HCatRecord; import com.kylinolap.common.KylinConfig; import com.kylinolap.cube.CubeInstance; @@ -41,7 +43,7 @@ /** * @author yangli9 */ -public class FactDistinctColumnsMapper extends Mapper { +public class FactDistinctColumnsMapper extends Mapper { private String cubeName; private CubeInstance cube; @@ -93,18 +95,20 @@ protected void setup(Context context) throws IOException { } @Override - public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException { + public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { try { - bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter); - intermediateTableDesc.sanityCheck(bytesSplitter); - SplittedBytes[] splitBuffers = bytesSplitter.getSplitBuffers(); +// bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter); +// intermediateTableDesc.sanityCheck(bytesSplitter); +// SplittedBytes[] splitBuffers = bytesSplitter.getSplitBuffers(); int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); for (int i : factDictCols) { outputKey.set((short) i); - SplittedBytes bytes = splitBuffers[flatTableIndexes[i]]; - outputValue.set(bytes.value, 0, bytes.length); +// SplittedBytes bytes = splitBuffers[flatTableIndexes[i]]; + String textValue = record.get(flatTableIndexes[i]+1).toString(); + byte[] bytes = Bytes.toBytes(textValue); + outputValue.set(bytes, 0, bytes.length); context.write(outputKey, outputValue); } } catch (Exception ex) { diff --git a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java index dc56ad8..a05caf5 100644 --- a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java +++ b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java @@ -88,7 +88,7 @@ public void testCubes() throws Exception { // keep this order. testLeftJoinCube(); - testInnerJoinCube(); +// testInnerJoinCube(); jobManager.stopJobEngine(); } From 36b950a807ae280ece83352b1fcc1ae6d086b00f Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Tue, 23 Dec 2014 14:40:55 +0800 Subject: [PATCH 02/16] Use hive-hcatalog-core for hadoop2 from hortonworks repository. --- examples/test_case_data/sandbox/mapred-site.xml | 13 ++++++++++++- job/pom.xml | 10 +++++----- job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java | 2 ++ .../kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java | 8 +++++--- .../job/hadoop/cube/FactDistinctColumnsMapper.java | 2 +- pom.xml | 1 + 6 files changed, 26 insertions(+), 10 deletions(-) diff --git a/examples/test_case_data/sandbox/mapred-site.xml b/examples/test_case_data/sandbox/mapred-site.xml index 355f64b..fca7076 100644 --- a/examples/test_case_data/sandbox/mapred-site.xml +++ b/examples/test_case_data/sandbox/mapred-site.xml @@ -98,8 +98,19 @@ mapreduce.application.classpath - /tmp/kylin/*,/usr/lib/hbase/lib/* + /tmp/kylin/*,/usr/lib/hive/conf/,/usr/lib/hbase/lib/*,/usr/lib/hive-hcatalog/share/hcatalog/*,/usr/lib/hive/lib/* + + + hive.metastore.uris + thrift://sandbox.hortonworks.com:9083 + + mapreduce.map.output.compress false diff --git a/job/pom.xml b/job/pom.xml index e89f2c0..caa3c5f 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -169,11 +169,11 @@ hadoop-hdfs provided - - org.apache.hive.hcatalog - hive-hcatalog-core - 0.14.0 - + + org.apache.hive.hcatalog + hive-hcatalog-core + ${hive-hcatalog.version} + diff --git a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java index a17c0ff..0e60daa 100644 --- a/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java +++ b/job/src/main/java/com/kylinolap/job/JobInstanceBuilder.java @@ -321,7 +321,9 @@ private JobStep createFactDistinctColumnsStep(JobInstance jobInstance, int stepS factDistinctColumnsStep.setName(JobConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); + JoinedFlatTableDesc intermediateTableDesc = new JoinedFlatTableDesc(cube.getDescriptor(), this.cubeSegment); cmd = appendExecCmdParameters(cmd, "cubename", cubeName); + cmd = appendExecCmdParameters(cmd, "htablename", intermediateTableDesc.getTableName(jobUUID)); cmd = appendExecCmdParameters(cmd, "input", inputLocation); cmd = appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath()); cmd = appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + jobInstance.getRelatedCube() + "_Step_" + stepSeqNum); diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java index f6e38a5..ce9074f 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java @@ -55,6 +55,7 @@ public int run(String[] args) throws Exception { options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_INPUT_FORMAT); options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_HTABLE_NAME); parseOptions(options, args); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); @@ -62,6 +63,7 @@ public int run(String[] args) throws Exception { Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); String inputFormat = getOptionValue(OPTION_INPUT_FORMAT); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + String intermediateTable = getOptionValue(OPTION_HTABLE_NAME); // ---------------------------------------------------------------------------- // add metadata to distributed cache @@ -72,7 +74,7 @@ public int run(String[] args) throws Exception { job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); System.out.println("Starting: " + job.getJobName()); - setupMapInput(input, inputFormat, factTableName); + setupMapInput(input, inputFormat, intermediateTable); setupReduceOutput(output); // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment(); @@ -88,7 +90,7 @@ public int run(String[] args) throws Exception { } - private void setupMapInput(Path input, String inputFormat, String factTableName) throws IOException { + private void setupMapInput(Path input, String inputFormat, String intermediateTable) throws IOException { FileInputFormat.setInputPaths(job, input); File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath()); @@ -108,7 +110,7 @@ private void setupMapInput(Path input, String inputFormat, String factTableName) // HCatInputFormat.setInput(job, "default", // factTableName); HCatInputFormat.setInput(job, "default", - factTableName.toLowerCase()); + intermediateTable.toLowerCase()); job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(FactDistinctColumnsMapper.class); diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java index 375e9f2..91b1d20 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java @@ -106,7 +106,7 @@ public void map(KEYIN key, HCatRecord record, Context context) throws IOExceptio for (int i : factDictCols) { outputKey.set((short) i); // SplittedBytes bytes = splitBuffers[flatTableIndexes[i]]; - String textValue = record.get(flatTableIndexes[i]+1).toString(); + String textValue = record.get(flatTableIndexes[i]).toString(); byte[] bytes = Bytes.toBytes(textValue); outputValue.set(bytes, 0, bytes.length); context.write(outputKey, outputValue); diff --git a/pom.xml b/pom.xml index c38a8cc..65bd6eb 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ 0.98.0-hadoop2 3.4.5 0.13.0 + 0.13.0.2.1.1.0-385 3.4 From a9887c4b96b8f4ba43cf095af7f2086fbfe2ef2b Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Fri, 26 Dec 2014 11:31:28 +0800 Subject: [PATCH 03/16] Try to Use local megastore (without metastore server) --- examples/test_case_data/sandbox/mapred-site.xml | 24 ++++++++++++++++++++++-- job/pom.xml | 5 +++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/examples/test_case_data/sandbox/mapred-site.xml b/examples/test_case_data/sandbox/mapred-site.xml index fca7076..f7616b3 100644 --- a/examples/test_case_data/sandbox/mapred-site.xml +++ b/examples/test_case_data/sandbox/mapred-site.xml @@ -101,16 +101,36 @@ /tmp/kylin/*,/usr/lib/hive/conf/,/usr/lib/hbase/lib/*,/usr/lib/hive-hcatalog/share/hcatalog/*,/usr/lib/hive/lib/* + javax.jdo.option.ConnectionURL jdbc:mysql://sandbox.hortonworks.com/hive?createDatabaseIfNotExist=true - --> + + javax.jdo.option.ConnectionDriverName + com.mysql.jdbc.Driver + + + javax.jdo.option.ConnectionUserName + hive + + + javax.jdo.option.ConnectionPassword + hive + password to use against metastore database + + + hive.metastore.local + false + controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM + + mapreduce.map.output.compress false diff --git a/job/pom.xml b/job/pom.xml index caa3c5f..76d7f16 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -174,6 +174,11 @@ hive-hcatalog-core ${hive-hcatalog.version} + + mysql + mysql-connector-java + 5.1.34 + From 14018f29aaa04f8f148865915d7e401d1731ae80 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Tue, 30 Dec 2014 11:47:14 +0800 Subject: [PATCH 04/16] Change HiveColumnCardinalityJob to use HCat --- examples/test_case_data/sandbox/mapred-site.xml | 33 +++------------------- job/pom.xml | 5 ---- .../cardinality/ColumnCardinalityMapper.java | 15 ++++++++-- .../cardinality/ColumnCardinalityReducer.java | 2 +- .../cardinality/HiveColumnCardinalityJob.java | 23 ++++++++++++++- .../com/kylinolap/rest/service/CubeService.java | 5 ++-- 6 files changed, 43 insertions(+), 40 deletions(-) diff --git a/examples/test_case_data/sandbox/mapred-site.xml b/examples/test_case_data/sandbox/mapred-site.xml index f7616b3..24e2d7c 100644 --- a/examples/test_case_data/sandbox/mapred-site.xml +++ b/examples/test_case_data/sandbox/mapred-site.xml @@ -101,35 +101,10 @@ /tmp/kylin/*,/usr/lib/hive/conf/,/usr/lib/hbase/lib/*,/usr/lib/hive-hcatalog/share/hcatalog/*,/usr/lib/hive/lib/* - - - javax.jdo.option.ConnectionURL - jdbc:mysql://sandbox.hortonworks.com/hive?createDatabaseIfNotExist=true - - - javax.jdo.option.ConnectionDriverName - com.mysql.jdbc.Driver - - - javax.jdo.option.ConnectionUserName - hive - - - javax.jdo.option.ConnectionPassword - hive - password to use against metastore database - - - hive.metastore.local - false - controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM - + + hive.metastore.uris + thrift://sandbox.hortonworks.com:9083 + mapreduce.map.output.compress diff --git a/job/pom.xml b/job/pom.xml index 76d7f16..caa3c5f 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -174,11 +174,6 @@ hive-hcatalog-core ${hive-hcatalog.version} - - mysql - mysql-connector-java - 5.1.34 - diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java index e25ec36..11ebf4c 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java @@ -28,6 +28,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hive.hcatalog.data.HCatRecord; import com.kylinolap.common.hll.HyperLogLogPlusCounter; import com.kylinolap.cube.kv.RowConstants; @@ -36,17 +37,20 @@ * @author Jack * */ -public class ColumnCardinalityMapper extends Mapper { +public class ColumnCardinalityMapper extends Mapper { private Map hllcMap = new HashMap(); public static final String DEFAULT_DELIM = ","; @Override - public void map(T key, Text value, Context context) throws IOException, InterruptedException { + public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException { + /* String delim = context.getConfiguration().get(HiveColumnCardinalityJob.KEY_INPUT_DELIM); if (delim == null) { delim = DEFAULT_DELIM; } + + String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line, delim); int i = 1; @@ -55,6 +59,13 @@ public void map(T key, Text value, Context context) throws IOException, Interrup getHllc(i).add(Bytes.toBytes(temp)); i++; } + */ + + Integer columnSize = Integer.valueOf(context.getConfiguration().get(HiveColumnCardinalityJob.KEY_TABLE_COLUMN_NUMBER)); + for(int m=0; m values, Context context) throws IOException, InterruptedException { + int skey = key.get(); for (BytesWritable v : values) { - int skey = key.get(); ByteBuffer buffer = ByteBuffer.wrap(v.getBytes()); HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(); hll.readRegisters(buffer); diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java index b6ea002..1c6e6c2 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java @@ -30,11 +30,18 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import com.kylinolap.job.hadoop.AbstractHadoopJob; public class HiveColumnCardinalityJob extends AbstractHadoopJob { public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job"; + + + @SuppressWarnings("static-access") + protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table"); + @SuppressWarnings("static-access") protected static final Option OPTION_FORMAT = OptionBuilder.withArgName("input format").hasArg().isRequired(true).withDescription("The file format").create("iformat"); @@ -43,6 +50,7 @@ protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("input_dilim").hasArg().isRequired(false).withDescription("Input delim").create("idelim"); public static final String KEY_INPUT_DELIM = "INPUT_DELIM"; + public static final String KEY_TABLE_COLUMN_NUMBER = "TABLE_COLUMN_NUMBER"; public static final String OUTPUT_PATH = "/tmp/cardinality"; /** @@ -50,6 +58,8 @@ */ private String jarPath; private Configuration conf; + + private String table; /** * MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY @@ -114,6 +124,7 @@ public int run(String[] args) throws Exception { Options options = new Options(); try { + options.addOption(OPTION_TABLE); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_FORMAT); @@ -154,7 +165,16 @@ public int run(String[] args) throws Exception { } // Mapper - job.setInputFormatClass(cformat); +// job.setInputFormatClass(cformat); + + this.table = getOptionValue(OPTION_TABLE); + HCatInputFormat.setInput(job, "default", + table); + + HCatSchema tableSchema = HCatInputFormat.getTableSchema(job.getConfiguration()); + job.getConfiguration().set(KEY_TABLE_COLUMN_NUMBER, String.valueOf(tableSchema.size())); + + job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(ColumnCardinalityMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(BytesWritable.class); @@ -165,6 +185,7 @@ public int run(String[] args) throws Exception { job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(LongWritable.class); job.setNumReduceTasks(1); + this.deletePath(job.getConfiguration(), output); diff --git a/server/src/main/java/com/kylinolap/rest/service/CubeService.java b/server/src/main/java/com/kylinolap/rest/service/CubeService.java index 0215504..b247521 100644 --- a/server/src/main/java/com/kylinolap/rest/service/CubeService.java +++ b/server/src/main/java/com/kylinolap/rest/service/CubeService.java @@ -473,10 +473,11 @@ public void generateCardinality(String tableName, String format, String delimite String outPath = HiveColumnCardinalityJob.OUTPUT_PATH + "/" + tableName; String[] args = null; if (delim == null) { - args = new String[] { "-input", location, "-output", outPath, "-iformat", inputFormat }; + args = new String[] {"-table", tableName, "-input", location, "-output", outPath, "-iformat", inputFormat }; } else { - args = new String[] { "-input", location, "-output", outPath, "-iformat", inputFormat, "-idelim", delim }; + args = new String[] {"-table", tableName, "-input", location, "-output", outPath, "-iformat", inputFormat, "-idelim", delim }; } + HiveColumnCardinalityJob job = new HiveColumnCardinalityJob(jarPath, null); int hresult = 0; try { From 3adc69a871009872dc617220d09955b18850136d Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Tue, 30 Dec 2014 15:13:29 +0800 Subject: [PATCH 05/16] Use HCAT to calculate cardinality --- job/pom.xml | 9 +++++---- .../hadoop/cardinality/HiveColumnCardinalityJob.java | 1 + server/pom.xml | 17 ++++++++++++++++- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/job/pom.xml b/job/pom.xml index caa3c5f..5c410af 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -170,10 +170,11 @@ provided - org.apache.hive.hcatalog - hive-hcatalog-core - ${hive-hcatalog.version} - + org.apache.hive.hcatalog + hive-hcatalog-core + ${hive-hcatalog.version} + provided + diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java index 1c6e6c2..f0b5662 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java @@ -168,6 +168,7 @@ public int run(String[] args) throws Exception { // job.setInputFormatClass(cformat); this.table = getOptionValue(OPTION_TABLE); + System.out.println("Going to start HiveColumnCardinalityJob on table '" + table + "'"); HCatInputFormat.setInput(job, "default", table); diff --git a/server/pom.xml b/server/pom.xml index fea3397..07e4310 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -325,7 +325,22 @@ - + + org.apache.hive.hcatalog + hive-hcatalog-core + ${hive-hcatalog.version} + provided + + + javax.servlet + servlet-api + + + javax.servlet.jsp + jsp-api + + + org.apache.tomcat From 0def453bcc36116aad97222e87f9e0ad28362884 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Tue, 30 Dec 2014 16:13:18 +0800 Subject: [PATCH 06/16] Use HCat to calculate cardinality --- .../cardinality/ColumnCardinalityMapper.java | 22 +--------- .../cardinality/HiveColumnCardinalityJob.java | 47 +--------------------- .../com/kylinolap/rest/service/CubeService.java | 23 +---------- 3 files changed, 4 insertions(+), 88 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java index 11ebf4c..17ff6f7 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java @@ -21,12 +21,10 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.StringTokenizer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hive.hcatalog.data.HCatRecord; @@ -44,25 +42,9 @@ @Override public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException { - /* - String delim = context.getConfiguration().get(HiveColumnCardinalityJob.KEY_INPUT_DELIM); - if (delim == null) { - delim = DEFAULT_DELIM; - } - - String line = value.toString(); - StringTokenizer tokenizer = new StringTokenizer(line, delim); - int i = 1; - while (tokenizer.hasMoreTokens()) { - String temp = tokenizer.nextToken(); - getHllc(i).add(Bytes.toBytes(temp)); - i++; - } - */ - - Integer columnSize = Integer.valueOf(context.getConfiguration().get(HiveColumnCardinalityJob.KEY_TABLE_COLUMN_NUMBER)); - for(int m=0; m exd = getMetadataManager().getTableDescExd(tableName); - if (exd == null || !Boolean.valueOf(exd.get(MetadataConstances.TABLE_EXD_STATUS_KEY))) { - throw new IllegalArgumentException("Table " + tableName + " does not exist."); - } - String location = exd.get(MetadataConstances.TABLE_EXD_LOCATION); - if (location == null || MetadataConstances.TABLE_EXD_DEFAULT_VALUE.equals(location)) { - throw new IllegalArgumentException("Cannot get table " + tableName + " location, the location is " + location); - } - String inputFormat = exd.get(MetadataConstances.TABLE_EXD_IF); - if (inputFormat == null || MetadataConstances.TABLE_EXD_DEFAULT_VALUE.equals(inputFormat)) { - throw new IllegalArgumentException("Cannot get table " + tableName + " input format, the format is " + inputFormat); - } - String delim = exd.get(MetadataConstances.TABLE_EXD_DELIM); - if (delimiter != null) { - delim = delimiter; - } String jarPath = getKylinConfig().getKylinJobJarPath(); String outPath = HiveColumnCardinalityJob.OUTPUT_PATH + "/" + tableName; - String[] args = null; - if (delim == null) { - args = new String[] {"-table", tableName, "-input", location, "-output", outPath, "-iformat", inputFormat }; - } else { - args = new String[] {"-table", tableName, "-input", location, "-output", outPath, "-iformat", inputFormat, "-idelim", delim }; - } + String[] args = new String[] {"-table", tableName, "-output", outPath }; HiveColumnCardinalityJob job = new HiveColumnCardinalityJob(jarPath, null); int hresult = 0; From 3de7d5ee64789a436292dda748e47bcf2f112c2d Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Wed, 31 Dec 2014 11:39:35 +0800 Subject: [PATCH 07/16] Add some log to test run Hcat on a big avro table. --- .../hadoop/cardinality/HiveColumnCardinalityJob.java | 12 ++++++++++++ .../java/com/kylinolap/rest/service/CubeService.java | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java index 92ea49d..14a907c 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java @@ -7,7 +7,9 @@ import java.io.InputStream; import java.io.StringWriter; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -32,6 +34,9 @@ import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import com.kylinolap.job.JobDAO; +import com.kylinolap.job.JobInstance; +import com.kylinolap.job.JobInstance.JobStep; import com.kylinolap.job.hadoop.AbstractHadoopJob; public class HiveColumnCardinalityJob extends AbstractHadoopJob { @@ -143,6 +148,7 @@ public int run(String[] args) throws Exception { HCatInputFormat.setInput(job, "default", table); + System.out.println("Set input format as HCat on table '" + table + "'"); HCatSchema tableSchema = HCatInputFormat.getTableSchema(job.getConfiguration()); job.getConfiguration().set(KEY_TABLE_COLUMN_NUMBER, String.valueOf(tableSchema.size())); @@ -161,7 +167,13 @@ public int run(String[] args) throws Exception { this.deletePath(job.getConfiguration(), output); + isAsync = true; + System.out.println("Going to submit HiveColumnCardinalityJob for table '" + table + "'"); int result = waitForCompletion(job); + + System.out.println("Get job track url " + job.getJobID() + "\n"); + System.out.println("Get job track url " + job.getTrackingURL() + "\n"); + return result; } catch (Exception e) { printUsage(options); diff --git a/server/src/main/java/com/kylinolap/rest/service/CubeService.java b/server/src/main/java/com/kylinolap/rest/service/CubeService.java index 1f2fe72..6051d91 100644 --- a/server/src/main/java/com/kylinolap/rest/service/CubeService.java +++ b/server/src/main/java/com/kylinolap/rest/service/CubeService.java @@ -453,6 +453,24 @@ public void generateCardinality(String tableName, String format, String delimite logger.error("Cannot find table descirptor " + tableName, e); throw e; } + /* + Map exd = getMetadataManager().getTableDescExd(tableName); + if (exd == null || !Boolean.valueOf(exd.get(MetadataConstances.TABLE_EXD_STATUS_KEY))) { + throw new IllegalArgumentException("Table " + tableName + " does not exist."); + } + String location = exd.get(MetadataConstances.TABLE_EXD_LOCATION); + if (location == null || MetadataConstances.TABLE_EXD_DEFAULT_VALUE.equals(location)) { + throw new IllegalArgumentException("Cannot get table " + tableName + " location, the location is " + location); + } + String inputFormat = exd.get(MetadataConstances.TABLE_EXD_IF); + if (inputFormat == null || MetadataConstances.TABLE_EXD_DEFAULT_VALUE.equals(inputFormat)) { + throw new IllegalArgumentException("Cannot get table " + tableName + " input format, the format is " + inputFormat); + } + String delim = exd.get(MetadataConstances.TABLE_EXD_DELIM); + if (delimiter != null) { + delim = delimiter; + } + */ String jarPath = getKylinConfig().getKylinJobJarPath(); String outPath = HiveColumnCardinalityJob.OUTPUT_PATH + "/" + tableName; String[] args = new String[] {"-table", tableName, "-output", outPath }; From 0776ac8c3b8a7fb35ffc05fa89f5fb3fadd0dfc7 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Wed, 31 Dec 2014 17:17:37 +0800 Subject: [PATCH 08/16] Add log in ColumnCardinalityMapper --- .../kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java index 17ff6f7..b48e755 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java @@ -45,7 +45,12 @@ public void map(T key, HCatRecord value, Context context) throws IOException, In Integer columnSize = context.getConfiguration().getInt(HiveColumnCardinalityJob.KEY_TABLE_COLUMN_NUMBER, 100); for (int m = 0; m < columnSize; m++) { + int counter = 0; Object cell = value.get(m); + if(counter <5) { + System.out.println("Get col " + m + " row " + counter + " value: " + String.valueOf(cell)); + counter++; + } getHllc(m).add(Bytes.toBytes(String.valueOf(cell))); } } @@ -67,7 +72,7 @@ protected void cleanup(Context context) throws IOException, InterruptedException buf.clear(); hllc.writeRegisters(buf); buf.flip(); - context.write(new IntWritable(key), new BytesWritable(buf.array())); + context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit())); } } From 976efb6b11fcbdae81b0560c68170f3477e67065 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Wed, 31 Dec 2014 21:17:22 +0800 Subject: [PATCH 09/16] Small update --- .../kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java index b48e755..158ec92 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java @@ -39,20 +39,21 @@ private Map hllcMap = new HashMap(); public static final String DEFAULT_DELIM = ","; + + private int counter = 0; @Override public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException { Integer columnSize = context.getConfiguration().getInt(HiveColumnCardinalityJob.KEY_TABLE_COLUMN_NUMBER, 100); for (int m = 0; m < columnSize; m++) { - int counter = 0; Object cell = value.get(m); - if(counter <5) { + if(counter <5 && m <3) { System.out.println("Get col " + m + " row " + counter + " value: " + String.valueOf(cell)); - counter++; } getHllc(m).add(Bytes.toBytes(String.valueOf(cell))); } + counter++; } private HyperLogLogPlusCounter getHllc(Integer key) { From dec5e5c17e2c2de93ea3c0e1093bf6a862df8472 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Wed, 31 Dec 2014 23:10:28 +0800 Subject: [PATCH 10/16] Bug fix --- .../com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java index 158ec92..c7c395f 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java @@ -49,9 +49,9 @@ public void map(T key, HCatRecord value, Context context) throws IOException, In for (int m = 0; m < columnSize; m++) { Object cell = value.get(m); if(counter <5 && m <3) { - System.out.println("Get col " + m + " row " + counter + " value: " + String.valueOf(cell)); + System.out.println("Get col " + m + " row " + counter + " value: " + cell.toString()); } - getHllc(m).add(Bytes.toBytes(String.valueOf(cell))); + getHllc(m).add(Bytes.toBytes(cell.toString())); } counter++; } From 7876bac14afa20be46c6791011596886433ce7cb Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Thu, 1 Jan 2015 17:51:07 +0800 Subject: [PATCH 11/16] Log message --- .../com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java index c7c395f..919b682 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java @@ -49,7 +49,7 @@ public void map(T key, HCatRecord value, Context context) throws IOException, In for (int m = 0; m < columnSize; m++) { Object cell = value.get(m); if(counter <5 && m <3) { - System.out.println("Get col " + m + " row " + counter + " value: " + cell.toString()); + System.out.println("Get row " + counter + " column " + m + " value: " + cell.toString()); } getHllc(m).add(Bytes.toBytes(cell.toString())); } From b573e39b3ed0cdfab23073b920c8fa35c34f4931 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Sat, 3 Jan 2015 20:20:48 +0800 Subject: [PATCH 12/16] Another try: use field schema object to get the field value. --- .../cardinality/ColumnCardinalityMapper.java | 24 ++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java index 919b682..c576f2f 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java @@ -27,6 +27,9 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import com.kylinolap.common.hll.HyperLogLogPlusCounter; import com.kylinolap.cube.kv.RowConstants; @@ -45,7 +48,27 @@ @Override public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException { + HCatSchema schema = HCatInputFormat.getTableSchema(context.getConfiguration()); Integer columnSize = context.getConfiguration().getInt(HiveColumnCardinalityJob.KEY_TABLE_COLUMN_NUMBER, 100); + + Iterator it = schema.getFields().iterator(); + HCatFieldSchema field; + Object fieldValue; + int m = 0; + while(it.hasNext()) { + field = it.next(); + fieldValue = value.get(field.getName(), schema); + if(fieldValue == null) + continue; + + if(counter <5 && m <3) { + System.out.println("Get row " + counter + " column " + m + " value: " + fieldValue.toString()); + } + getHllc(m).add(Bytes.toBytes(fieldValue.toString())); + m++; + } + + /* for (int m = 0; m < columnSize; m++) { Object cell = value.get(m); if(counter <5 && m <3) { @@ -53,6 +76,7 @@ public void map(T key, HCatRecord value, Context context) throws IOException, In } getHllc(m).add(Bytes.toBytes(cell.toString())); } + */ counter++; } From 1f0bb3e182115ff2a1daab4d0ba03a9088dede60 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Sat, 3 Jan 2015 22:01:08 +0800 Subject: [PATCH 13/16] Enhancement on cardinality calculation --- .../cardinality/ColumnCardinalityMapper.java | 38 +++++++++------------- .../cardinality/HiveColumnCardinalityJob.java | 3 -- .../com/kylinolap/rest/service/CubeService.java | 2 +- 3 files changed, 16 insertions(+), 27 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java index c576f2f..a93900d 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.util.Bytes; @@ -42,41 +43,32 @@ private Map hllcMap = new HashMap(); public static final String DEFAULT_DELIM = ","; - + private int counter = 0; @Override public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException { HCatSchema schema = HCatInputFormat.getTableSchema(context.getConfiguration()); - Integer columnSize = context.getConfiguration().getInt(HiveColumnCardinalityJob.KEY_TABLE_COLUMN_NUMBER, 100); - - Iterator it = schema.getFields().iterator(); + + List fieldList = schema.getFields(); HCatFieldSchema field; Object fieldValue; - int m = 0; - while(it.hasNext()) { - field = it.next(); + Integer columnSize = fieldList.size(); + for (int m = 0; m < columnSize; m++) { + field = fieldList.get(m); fieldValue = value.get(field.getName(), schema); - if(fieldValue == null) - continue; + if (fieldValue == null) + fieldValue = "NULL"; - if(counter <5 && m <3) { - System.out.println("Get row " + counter + " column " + m + " value: " + fieldValue.toString()); - } - getHllc(m).add(Bytes.toBytes(fieldValue.toString())); - m++; - } - - /* - for (int m = 0; m < columnSize; m++) { - Object cell = value.get(m); - if(counter <5 && m <3) { - System.out.println("Get row " + counter + " column " + m + " value: " + cell.toString()); + if (counter < 5 && m < 10) { + System.out.println("Get row " + counter + " column '" + field.getName() + "' value: " + fieldValue); } - getHllc(m).add(Bytes.toBytes(cell.toString())); + + if (fieldValue != null) + getHllc(m).add(Bytes.toBytes(fieldValue.toString())); } - */ + counter++; } diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java index 14a907c..4280f71 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java @@ -46,7 +46,6 @@ protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table"); public static final String KEY_INPUT_DELIM = "INPUT_DELIM"; - public static final String KEY_TABLE_COLUMN_NUMBER = "TABLE_COLUMN_NUMBER"; public static final String OUTPUT_PATH = "/tmp/cardinality"; /** @@ -149,8 +148,6 @@ public int run(String[] args) throws Exception { table); System.out.println("Set input format as HCat on table '" + table + "'"); - HCatSchema tableSchema = HCatInputFormat.getTableSchema(job.getConfiguration()); - job.getConfiguration().set(KEY_TABLE_COLUMN_NUMBER, String.valueOf(tableSchema.size())); job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(ColumnCardinalityMapper.class); diff --git a/server/src/main/java/com/kylinolap/rest/service/CubeService.java b/server/src/main/java/com/kylinolap/rest/service/CubeService.java index 6051d91..702d60c 100644 --- a/server/src/main/java/com/kylinolap/rest/service/CubeService.java +++ b/server/src/main/java/com/kylinolap/rest/service/CubeService.java @@ -472,7 +472,7 @@ public void generateCardinality(String tableName, String format, String delimite } */ String jarPath = getKylinConfig().getKylinJobJarPath(); - String outPath = HiveColumnCardinalityJob.OUTPUT_PATH + "/" + tableName; + String outPath = HiveColumnCardinalityJob.OUTPUT_PATH + "/" + tableName.toUpperCase(); String[] args = new String[] {"-table", tableName, "-output", outPath }; HiveColumnCardinalityJob job = new HiveColumnCardinalityJob(jarPath, null); From 201a5047d1c6f9829cb98fc46920c7fca1bc1eee Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Sun, 4 Jan 2015 10:26:53 +0800 Subject: [PATCH 14/16] Change all reading hive table to use the HCat way --- dictionary/pom.xml | 6 + .../java/com/kylinolap/dict/lookup/HiveTable.java | 50 ++---- .../cardinality/ColumnCardinalityMapper.java | 16 +- .../job/hadoop/cube/FactDistinctColumnsMapper.java | 33 ++-- .../hadoop/invertedindex/IIDistinctColumnsJob.java | 8 + .../invertedindex/IIDistinctColumnsMapper.java | 52 ++++-- metadata/pom.xml | 6 + .../com/kylinolap/metadata/tool/HiveClient.java | 86 ++++++++++ .../metadata/tool/HiveSourceTableLoader.java | 181 ++++++++------------- 9 files changed, 244 insertions(+), 194 deletions(-) create mode 100644 metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java diff --git a/dictionary/pom.xml b/dictionary/pom.xml index 1d09903..931b913 100644 --- a/dictionary/pom.xml +++ b/dictionary/pom.xml @@ -68,6 +68,12 @@ hbase-client provided + + org.apache.hive.hcatalog + hive-hcatalog-core + ${hive-hcatalog.version} + provided + junit junit diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java index 25d2a87..14e6362 100644 --- a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java +++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java @@ -16,23 +16,16 @@ package com.kylinolap.dict.lookup; -import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; import com.kylinolap.common.KylinConfig; -import com.kylinolap.common.util.HadoopUtil; -import com.kylinolap.common.util.CliCommandExecutor; import com.kylinolap.metadata.MetadataManager; +import com.kylinolap.metadata.tool.HiveClient; /** * @author yangli9 @@ -89,36 +82,17 @@ private String computeHDFSLocation(boolean needFilePath) throws IOException { return override; } - String cmd = "hive -e \"describe extended " + hiveTable + ";\""; - CliCommandExecutor exec = KylinConfig.getInstanceFromEnv().getCliCommandExecutor(); - String output = exec.execute(cmd); - - Pattern ptn = Pattern.compile("location:(.*?),"); - Matcher m = ptn.matcher(output); - if (m.find() == false) - throw new IOException("Failed to find HDFS location for hive table " + hiveTable + " from output -- " + output); - - String hdfsDir = m.group(1); - - if (needFilePath) { - FileSystem fs = HadoopUtil.getFileSystem(hdfsDir); - FileStatus file = findOnlyFile(hdfsDir, fs); - return file.getPath().toString(); - } else { - return hdfsDir; + HiveMetaStoreClient hiveClient = HiveClient.getInstance().getMetaStoreClient(); + Table table = null; + try { + table = hiveClient.getTable(hiveTable); + } catch (Exception e) { + e.printStackTrace(); + throw new IOException(e); } - } + + return table.getSd().getLocation(); - private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFoundException, IOException { - FileStatus[] files = fs.listStatus(new Path(hdfsDir)); - ArrayList nonZeroFiles = Lists.newArrayList(); - for (FileStatus f : files) { - if (f.getLen() > 0) - nonZeroFiles.add(f); - } - if (nonZeroFiles.size() != 1) - throw new IllegalStateException("Expect 1 and only 1 non-zero file under " + hdfsDir + ", but find " + nonZeroFiles.size()); - return nonZeroFiles.get(0); } @Override diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java index a93900d..daaf709 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java @@ -20,7 +20,6 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.util.Bytes; @@ -45,18 +44,23 @@ public static final String DEFAULT_DELIM = ","; private int counter = 0; + + private HCatSchema schema = null; + private int columnSize = 0; + + @Override + protected void setup(Context context) throws IOException { + schema = HCatInputFormat.getTableSchema(context.getConfiguration()); + columnSize = schema.getFields().size(); + } @Override public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException { - HCatSchema schema = HCatInputFormat.getTableSchema(context.getConfiguration()); - - List fieldList = schema.getFields(); HCatFieldSchema field; Object fieldValue; - Integer columnSize = fieldList.size(); for (int m = 0; m < columnSize; m++) { - field = fieldList.get(m); + field = schema.get(m); fieldValue = value.get(field.getName(), schema); if (fieldValue == null) fieldValue = "NULL"; diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java index 91b1d20..658ea97 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java @@ -25,12 +25,13 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import com.kylinolap.common.KylinConfig; import com.kylinolap.cube.CubeInstance; import com.kylinolap.cube.CubeManager; -import com.kylinolap.cube.common.BytesSplitter; -import com.kylinolap.cube.common.SplittedBytes; import com.kylinolap.cube.cuboid.Cuboid; import com.kylinolap.dict.DictionaryManager; import com.kylinolap.job.constant.BatchConstants; @@ -51,20 +52,16 @@ private int[] factDictCols; private JoinedFlatTableDesc intermediateTableDesc; - private String intermediateTableRowDelimiter; - private byte byteRowDelimiter; - private BytesSplitter bytesSplitter; private ShortWritable outputKey = new ShortWritable(); private Text outputValue = new Text(); private int errorRecordCounter; + private HCatSchema schema = null; + @Override protected void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); - intermediateTableRowDelimiter = conf.get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER)); - byteRowDelimiter = intermediateTableRowDelimiter.getBytes("UTF-8")[0]; - bytesSplitter = new BytesSplitter(200, 4096); KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf); cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); @@ -92,34 +89,36 @@ protected void setup(Context context) throws IOException { this.factDictCols = new int[factDictCols.size()]; for (int i = 0; i < factDictCols.size(); i++) this.factDictCols[i] = factDictCols.get(i); + + schema = HCatInputFormat.getTableSchema(context.getConfiguration()); } @Override public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { try { -// bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter); -// intermediateTableDesc.sanityCheck(bytesSplitter); -// SplittedBytes[] splitBuffers = bytesSplitter.getSplitBuffers(); int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); + HCatFieldSchema fieldSchema = null; for (int i : factDictCols) { outputKey.set((short) i); -// SplittedBytes bytes = splitBuffers[flatTableIndexes[i]]; - String textValue = record.get(flatTableIndexes[i]).toString(); - byte[] bytes = Bytes.toBytes(textValue); + fieldSchema = schema.get(flatTableIndexes[i]); + Object fieldValue = record.get(fieldSchema.getName(), schema); + if (fieldValue == null) + fieldValue = "NULL"; + byte[] bytes = Bytes.toBytes(fieldValue.toString()); outputValue.set(bytes, 0, bytes.length); context.write(outputKey, outputValue); } } catch (Exception ex) { - handleErrorRecord(bytesSplitter, ex); + handleErrorRecord(record, ex); } } - private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException { + private void handleErrorRecord(HCatRecord record, Exception ex) throws IOException { - System.err.println("Insane record: " + bytesSplitter); + System.err.println("Insane record: " + record.getAll()); ex.printStackTrace(System.err); errorRecordCounter++; diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java index bc12db2..7fa41d5 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,6 +111,7 @@ private void setupMapInput(Path input, String inputFormat, String inputDelim) th job.setJarByClass(this.getClass()); } + /* if ("textinputformat".equalsIgnoreCase(inputFormat) || "text".equalsIgnoreCase(inputFormat)) { job.setInputFormatClass(TextInputFormat.class); } else { @@ -124,6 +126,12 @@ private void setupMapInput(Path input, String inputFormat, String inputDelim) th if (inputDelim != null) { job.getConfiguration().set(BatchConstants.INPUT_DELIM, inputDelim); } + */ + String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME); + HCatInputFormat.setInput(job, "default", + tableName.toLowerCase()); + + job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(IIDistinctColumnsMapper.class); job.setCombinerClass(IIDistinctColumnsCombiner.class); diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java index b7456bf..e744544 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java @@ -17,38 +17,44 @@ import java.io.IOException; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; - -import com.kylinolap.cube.common.BytesSplitter; -import com.kylinolap.cube.common.SplittedBytes; -import com.kylinolap.job.constant.BatchConstants; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; /** * @author yangli9 */ -public class IIDistinctColumnsMapper extends Mapper { +public class IIDistinctColumnsMapper extends Mapper { - private String[] columns; - private int delim; - private BytesSplitter splitter; +// private String[] columns; +// private int delim; +// private BytesSplitter splitter; private ShortWritable outputKey = new ShortWritable(); private Text outputValue = new Text(); + private HCatSchema schema = null; + private int columnSize = 0; @Override protected void setup(Context context) throws IOException { - Configuration conf = context.getConfiguration(); - this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(","); - String inputDelim = conf.get(BatchConstants.INPUT_DELIM); - this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0); - this.splitter = new BytesSplitter(200, 4096); +// Configuration conf = context.getConfiguration(); +// this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(","); +// String inputDelim = conf.get(BatchConstants.INPUT_DELIM); +// this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0); +// this.splitter = new BytesSplitter(200, 4096); + + schema = HCatInputFormat.getTableSchema(context.getConfiguration()); + columnSize = schema.getFields().size(); } - + @Override - public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException { + public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { + /* if (delim == -1) { delim = splitter.detectDelim(value, columns.length); } @@ -65,6 +71,20 @@ public void map(KEYIN key, Text value, Context context) throws IOException, Inte outputValue.set(parts[i].value, 0, parts[i].length); context.write(outputKey, outputValue); } + */ + + HCatFieldSchema fieldSchema = null; + for (short i = 0; i < columnSize; i++) { + outputKey.set(i); + fieldSchema = schema.get(i); + Object fieldValue = record.get(fieldSchema.getName(), schema); + if (fieldValue == null) + fieldValue = "NULL"; + byte[] bytes = Bytes.toBytes(fieldValue.toString()); + outputValue.set(bytes, 0, bytes.length); + context.write(outputKey, outputValue); + } + } } diff --git a/metadata/pom.xml b/metadata/pom.xml index 3e76619..ea255b0 100644 --- a/metadata/pom.xml +++ b/metadata/pom.xml @@ -71,6 +71,12 @@ hbase-client provided + + org.apache.hive.hcatalog + hive-hcatalog-core + ${hive-hcatalog.version} + provided + junit junit diff --git a/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java new file mode 100644 index 0000000..70f30ad --- /dev/null +++ b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java @@ -0,0 +1,86 @@ +package com.kylinolap.metadata.tool; + +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; + +/* + * Copyright 2013-2014 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class HiveClient { + + protected static HiveConf hiveConf = null; + protected static Driver driver = null; + protected static HiveMetaStoreClient metaStoreClient = null; + + private static HiveClient instance = null; + + private HiveClient() { + setup(); + } + + public static HiveClient getInstance() { + + if (instance == null) { + synchronized (HiveClient.class) { + if (instance == null) + instance = new HiveClient(); + } + + } + + return instance; + } + + private void setup() { + hiveConf = new HiveConf(HiveSourceTableLoader.class); + driver = new Driver(hiveConf); + try { + metaStoreClient = new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + SessionState.start(new CliSessionState(hiveConf)); + } + + public HiveConf getHiveConf() { + return hiveConf; + } + + /** + * Get the hive ql driver to execute ddl or dml + * @return + */ + public Driver getDriver() { + return driver; + } + + + /** + * Get the Hive Meta store client; + * @return + */ + public HiveMetaStoreClient getMetaStoreClient() { + return metaStoreClient; + } + + + +} diff --git a/metadata/src/main/java/com/kylinolap/metadata/tool/HiveSourceTableLoader.java b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveSourceTableLoader.java index 3e4cca3..17f7089 100644 --- a/metadata/src/main/java/com/kylinolap/metadata/tool/HiveSourceTableLoader.java +++ b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveSourceTableLoader.java @@ -16,11 +16,9 @@ * limitations under the License. */ -import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.StringReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,9 +26,14 @@ import java.util.Set; import java.util.UUID; -import com.kylinolap.metadata.MetadataManager; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +42,8 @@ import com.google.common.collect.Sets; import com.kylinolap.common.KylinConfig; import com.kylinolap.common.persistence.ResourceTool; -import com.kylinolap.common.util.CliCommandExecutor; import com.kylinolap.common.util.JsonUtil; +import com.kylinolap.metadata.MetadataManager; import com.kylinolap.metadata.model.schema.ColumnDesc; import com.kylinolap.metadata.model.schema.TableDesc; @@ -51,13 +54,17 @@ * @author jianliu */ public class HiveSourceTableLoader { + private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class); public static final String OUTPUT_SURFIX = "json"; public static final String TABLE_FOLDER_NAME = "table"; public static final String TABLE_EXD_FOLDER_NAME = "table_exd"; + protected static HiveMetaStoreClient client = HiveClient.getInstance().getMetaStoreClient(); + public static Set reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException { + Map> db2tables = Maps.newHashMap(); for (String table : hiveTables) { int cut = table.indexOf('.'); @@ -76,8 +83,8 @@ metaTmpDir.delete(); metaTmpDir.mkdirs(); - for (String database: db2tables.keySet()) { - for (String table: db2tables.get(database)) { + for (String database : db2tables.keySet()) { + for (String table : db2tables.get(database)) { TableDesc tableDesc = MetadataManager.getInstance(config).getTableDesc(table); if (tableDesc == null) { continue; @@ -104,33 +111,6 @@ } private static List extractHiveTables(String database, Set tables, File metaTmpDir, KylinConfig config) throws IOException { - StringBuilder cmd = new StringBuilder(); - cmd.append("hive -e \""); - if (StringUtils.isEmpty(database) == false) { - cmd.append("use " + database + "; "); - } - for (String table : tables) { - cmd.append("show table extended like " + table + "; "); - } - cmd.append("\""); - - CliCommandExecutor cmdExec = config.getCliCommandExecutor(); - String output = cmdExec.execute(cmd.toString()); - - return extractTableDescFromHiveOutput(database, output, metaTmpDir); - } - - private static List extractTableDescFromHiveOutput(String database, String hiveOutput, File metaTmpDir) throws IOException { - BufferedReader reader = new BufferedReader(new StringReader(hiveOutput)); - try { - return extractTables(database, reader, metaTmpDir); - } finally { - IOUtils.closeQuietly(reader); - } - } - - private static List extractTables(String database, BufferedReader reader, File metaTmpDir) throws IOException { - File tableDescDir = new File(metaTmpDir, TABLE_FOLDER_NAME); File tableExdDir = new File(metaTmpDir, TABLE_EXD_FOLDER_NAME); mkdirs(tableDescDir); @@ -138,7 +118,53 @@ List tableDescList = new ArrayList(); List> tableAttrsList = new ArrayList>(); - getTables(database, reader, tableDescList, tableAttrsList); + + for (String tableName : tables) { + Table table = null; + List fields = null; + try { + table = client.getTable(database, tableName); + fields = client.getFields(database, tableName); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + throw new IOException(e); + } + + TableDesc tableDesc = new TableDesc(); + tableDesc.setDatabase(database.toUpperCase()); + tableDesc.setName(tableName.toUpperCase()); + tableDesc.setUuid(UUID.randomUUID().toString()); + int columnNumber = fields.size(); + List columns = new ArrayList(columnNumber); + for (int i = 0; i < columnNumber; i++) { + FieldSchema field = fields.get(i); + ColumnDesc cdesc = new ColumnDesc(); + cdesc.setName(field.getName().toUpperCase()); + cdesc.setDatatype(field.getType()); + cdesc.setId(String.valueOf(i + 1)); + columns.add(cdesc); + } + tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber])); + + List partitionCols = table.getPartitionKeys(); + StringBuffer partitionColumnString = new StringBuffer(); + for(int i=0, n= partitionCols.size(); i0) + partitionColumnString.append(", "); + partitionColumnString.append(partitionCols.get(i).getName().toUpperCase()); + } + tableDescList.add(tableDesc); + Map map = new HashMap(); //table.getParameters(); + map.put("tableName", table.getTableName()); + map.put("location", table.getSd().getLocation()); + map.put("inputformat", table.getSd().getInputFormat()); + map.put("outputformat", table.getSd().getOutputFormat()); + map.put("owner", table.getOwner()); + map.put("lastAccessTime", String.valueOf(table.getLastAccessTime())); + map.put("partitionColumns", partitionColumnString.toString()); + tableAttrsList.add(map); + } List loadedTables = Lists.newArrayList(); @@ -152,9 +178,11 @@ File file = new File(tableExdDir, tableAttrs.get("tableName").toUpperCase() + "." + OUTPUT_SURFIX); JsonUtil.writeValueIndent(new FileOutputStream(file), tableAttrs); } + return loadedTables; } + private static void mkdirs(File metaTmpDir) { if (!metaTmpDir.exists()) { if (!metaTmpDir.mkdirs()) { @@ -163,87 +191,6 @@ private static void mkdirs(File metaTmpDir) { } } - private static void getTables(String database, BufferedReader reader, // - List tableDescList, List> tableAttrsList) throws IOException { - - Map tableAttrs = new HashMap(); - TableDesc tableDesc = new TableDesc(); - String line; - boolean hit = false; - - while ((line = reader.readLine()) != null) { - logger.info(line); - int i = line.indexOf(":"); - if (i == -1) { - continue; - } - String key = line.substring(0, i); - String value = line.substring(i + 1, line.length()); - if (key.equals("tableName")) {// Create a new table object - hit = true; - tableAttrs = new HashMap(); - tableAttrsList.add(tableAttrs); - tableDesc = new TableDesc(); - tableDescList.add(tableDesc); - } - - if (!hit) { - continue; - } - - if (line.startsWith("columns")) {// geneate source table metadata - String tname = tableAttrs.get("tableName"); - - tableDesc.setDatabase(database.toUpperCase()); - tableDesc.setName(tname.toUpperCase()); - tableDesc.setUuid(UUID.randomUUID().toString()); - addColumns(tableDesc, value); - } - tableAttrs.put(key, value); - if (key.equals("lastUpdateTime")) { - hit = false; - } - } - - } - - private static void addColumns(TableDesc sTable, String value) { - List columns = new ArrayList(); - int i1 = value.indexOf("{"); - int i2 = value.indexOf("}"); - if (i1 < 0 || i2 < 0 || i1 > i2) { - return; - } - String temp = value.substring(i1 + 1, i2); - String[] strArr = temp.split(", "); - for (int i = 0; i < strArr.length; i++) { - String t1 = strArr[i].trim(); - int pos = t1.indexOf(" "); - String colType = t1.substring(0, pos).trim(); - String colName = t1.substring(pos).trim(); - ColumnDesc cdesc = new ColumnDesc(); - cdesc.setName(colName.toUpperCase()); - cdesc.setDatatype(convertType(colType)); - cdesc.setId(String.valueOf(i + 1)); - columns.add(cdesc); - } - sTable.setColumns(columns.toArray(new ColumnDesc[0])); - } - - private static String convertType(String colType) { - if ("i32".equals(colType)) { - return "int"; - } else if ("i64".equals(colType)) { - return "bigint"; - } else if ("i16".equals(colType)) { - return "smallint"; - } else if ("byte".equals(colType)) { - return "tinyint"; - } else if ("bool".equals(colType)) - return "boolean"; - return colType; - } - /** */ public static void main(String[] args) { From 7faf279bdbdbe6955ab87b7d783cbda56d91bd99 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Sun, 4 Jan 2015 14:11:38 +0800 Subject: [PATCH 15/16] Add HiveTableReader --- .../java/com/kylinolap/dict/lookup/HiveTable.java | 31 ++++++++- .../com/kylinolap/dict/lookup/HiveTableReader.java | 81 ++++++++++++++++++++++ .../com/kylinolap/metadata/tool/HiveClient.java | 43 +++++++++++- 3 files changed, 150 insertions(+), 5 deletions(-) create mode 100644 dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java index 14e6362..12f2464 100644 --- a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java +++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java @@ -16,14 +16,21 @@ package com.kylinolap.dict.lookup; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; import com.kylinolap.common.KylinConfig; +import com.kylinolap.common.util.HadoopUtil; import com.kylinolap.metadata.MetadataManager; import com.kylinolap.metadata.tool.HiveClient; @@ -91,8 +98,28 @@ private String computeHDFSLocation(boolean needFilePath) throws IOException { throw new IOException(e); } - return table.getSd().getLocation(); - + String hdfsDir = table.getSd().getLocation(); + if (needFilePath) { + FileSystem fs = HadoopUtil.getFileSystem(hdfsDir); + FileStatus file = findOnlyFile(hdfsDir, fs); + return file.getPath().toString(); + } else { + return hdfsDir; + } + + } + + + private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFoundException, IOException { + FileStatus[] files = fs.listStatus(new Path(hdfsDir)); + ArrayList nonZeroFiles = Lists.newArrayList(); + for (FileStatus f : files) { + if (f.getLen() > 0) + nonZeroFiles.add(f); + } + if (nonZeroFiles.size() != 1) + throw new IllegalStateException("Expect 1 and only 1 non-zero file under " + hdfsDir + ", but find " + nonZeroFiles.size()); + return nonZeroFiles.get(0); } @Override diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java new file mode 100644 index 0000000..ee82633 --- /dev/null +++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java @@ -0,0 +1,81 @@ +package com.kylinolap.dict.lookup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.transfer.DataTransferFactory; +import org.apache.hive.hcatalog.data.transfer.HCatReader; +import org.apache.hive.hcatalog.data.transfer.ReaderContext; + +import com.kylinolap.metadata.tool.HiveClient; + +public class HiveTableReader implements TableReader { + + private String dbName; + private String tableName; + private int currentSplit = -1; + private ReaderContext readCntxt; + private Iterator currentHCatRecordItr = null; + private HCatRecord currentHCatRecord; + private int numberOfSplits = 0; + + public HiveTableReader(String dbName, String tableName) throws MetaException, ClassNotFoundException, CommandNeedRetryException, IOException { + this.dbName = dbName; + this.tableName = tableName; + this.readCntxt = HiveClient.getInstance().getReaderContext(dbName, tableName); + this.numberOfSplits = readCntxt.numSplits(); + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public boolean next() throws IOException { + while (currentHCatRecordItr == null || !currentHCatRecordItr.hasNext()) { + currentSplit++; + if (currentSplit == numberOfSplits) { + return false; + } + currentHCatRecordItr = loadHCatRecordItr(currentSplit); + } + + currentHCatRecord = currentHCatRecordItr.next(); + + return true; + } + + private Iterator loadHCatRecordItr(int dataSplit) throws HCatException { + HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit); + return currentHCatReader.read(); + } + + @Override + public String[] getRow() { + List allFields = currentHCatRecord.getAll(); + List rowValues = new ArrayList(allFields.size()); + for (Object o : allFields) { + rowValues.add(o != null ? o.toString() : "NULL"); + } + + return rowValues.toArray(new String[allFields.size()]); + } + + @Override + public void setExpectedColumnNumber(int expectedColumnNumber) { + + } + + public String toString() { + return "hive table reader for: " + dbName + "." + tableName; + } + +} diff --git a/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java index 70f30ad..9b53ff0 100644 --- a/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java +++ b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java @@ -1,11 +1,23 @@ package com.kylinolap.metadata.tool; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.transfer.DataTransferFactory; +import org.apache.hive.hcatalog.data.transfer.HCatReader; +import org.apache.hive.hcatalog.data.transfer.ReadEntity; +import org.apache.hive.hcatalog.data.transfer.ReaderContext; /* * Copyright 2013-2014 eBay Software Foundation @@ -72,7 +84,6 @@ public Driver getDriver() { return driver; } - /** * Get the Hive Meta store client; * @return @@ -81,6 +92,32 @@ public HiveMetaStoreClient getMetaStoreClient() { return metaStoreClient; } - - + public ReaderContext getReaderContext(String database, String table) throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException { + + Iterator> itr = hiveConf.iterator(); + Map map = new HashMap(); + while (itr.hasNext()) { + Entry kv = itr.next(); + map.put(kv.getKey(), kv.getValue()); + } + + ReaderContext readCntxt = runsInMaster(map, database, table); + + return readCntxt; + } + + private ReaderContext runsInMaster(Map config, String database, String table) throws HCatException { + ReadEntity entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build(); + HCatReader reader = DataTransferFactory.getHCatReader(entity, config); + ReaderContext cntxt = reader.prepareRead(); + return cntxt; + } + + public HCatReader getHCatReader(ReaderContext cntxt, int slaveNum) throws HCatException { + + HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum); + return reader; + + } + } From 652f0f75fa344129845067db40efb77fabc4f80d Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Mon, 5 Jan 2015 11:13:19 +0800 Subject: [PATCH 16/16] Add HiveTableReader and its test case --- .../java/com/kylinolap/dict/lookup/HiveTable.java | 12 ++--- .../com/kylinolap/dict/lookup/HiveTableReader.java | 34 ++++++++++--- .../com/kylinolap/dict/HiveTableReaderTest.java | 57 ++++++++++++++++++++++ .../invertedindex/IIDistinctColumnsMapper.java | 39 +++------------ 4 files changed, 99 insertions(+), 43 deletions(-) create mode 100644 dictionary/src/test/java/com/kylinolap/dict/HiveTableReaderTest.java diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java index 12f2464..36725ec 100644 --- a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java +++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java @@ -42,6 +42,7 @@ private static final Logger logger = LoggerFactory.getLogger(HiveTable.class); + private String database = "default"; private String hiveTable; private int nColumns; private String hdfsLocation; @@ -59,7 +60,7 @@ public String getColumnDelimeter() throws IOException { @Override public TableReader getReader() throws IOException { - return getFileTable().getReader(); + return new HiveTableReader(database, hiveTable); } @Override @@ -92,12 +93,12 @@ private String computeHDFSLocation(boolean needFilePath) throws IOException { HiveMetaStoreClient hiveClient = HiveClient.getInstance().getMetaStoreClient(); Table table = null; try { - table = hiveClient.getTable(hiveTable); + table = hiveClient.getTable(database, hiveTable); } catch (Exception e) { e.printStackTrace(); throw new IOException(e); } - + String hdfsDir = table.getSd().getLocation(); if (needFilePath) { FileSystem fs = HadoopUtil.getFileSystem(hdfsDir); @@ -106,9 +107,8 @@ private String computeHDFSLocation(boolean needFilePath) throws IOException { } else { return hdfsDir; } - + } - private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFoundException, IOException { FileStatus[] files = fs.listStatus(new Path(hdfsDir)); @@ -124,7 +124,7 @@ private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFou @Override public String toString() { - return "hive:" + hiveTable; + return "hive: database=[" + database + "], table=[" + hiveTable + "]"; } } diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java index ee82633..bae85e9 100644 --- a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java +++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java @@ -1,3 +1,19 @@ +/* + * Copyright 2013-2014 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.kylinolap.dict.lookup; import java.io.IOException; @@ -5,8 +21,6 @@ import java.util.Iterator; import java.util.List; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.transfer.DataTransferFactory; @@ -25,17 +39,25 @@ private HCatRecord currentHCatRecord; private int numberOfSplits = 0; - public HiveTableReader(String dbName, String tableName) throws MetaException, ClassNotFoundException, CommandNeedRetryException, IOException { + public HiveTableReader(String dbName, String tableName) throws IOException { this.dbName = dbName; this.tableName = tableName; - this.readCntxt = HiveClient.getInstance().getReaderContext(dbName, tableName); + try { + this.readCntxt = HiveClient.getInstance().getReaderContext(dbName, tableName); + } catch (Exception e) { + e.printStackTrace(); + throw new IOException(e); + } + this.numberOfSplits = readCntxt.numSplits(); } @Override public void close() throws IOException { - // TODO Auto-generated method stub - + this.readCntxt = null; + this.currentHCatRecordItr = null; + this.currentHCatRecord = null; + this.currentSplit = -1; } @Override diff --git a/dictionary/src/test/java/com/kylinolap/dict/HiveTableReaderTest.java b/dictionary/src/test/java/com/kylinolap/dict/HiveTableReaderTest.java new file mode 100644 index 0000000..60fadef --- /dev/null +++ b/dictionary/src/test/java/com/kylinolap/dict/HiveTableReaderTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2013-2014 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.kylinolap.dict; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.lang.ArrayUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.kylinolap.common.util.ClasspathUtil; +import com.kylinolap.dict.lookup.HiveTableReader; + +public class HiveTableReaderTest { + + private static HiveTableReader reader = null; + + @BeforeClass + public static void setup() throws Exception { + ClasspathUtil.addClasspath(new File("../examples/test_case_data/sandbox/").getAbsolutePath()); + + reader = new HiveTableReader("default", "test_kylin_fact"); + } + + public static void tearDown() throws IOException { + reader.close(); + } + + @Test + public void test() throws IOException { + int rowNumber = 0; + while (reader.next()) { + String[] row = reader.getRow(); + Assert.assertEquals(9, row.length); + System.out.println(ArrayUtils.toString(row)); + rowNumber++; + } + + Assert.assertEquals(10000, rowNumber); + } +} diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java index e744544..f714a12 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java @@ -31,56 +31,33 @@ */ public class IIDistinctColumnsMapper extends Mapper { -// private String[] columns; -// private int delim; -// private BytesSplitter splitter; - private ShortWritable outputKey = new ShortWritable(); private Text outputValue = new Text(); private HCatSchema schema = null; private int columnSize = 0; + + public static final byte[] NULL_VALUE = Bytes.toBytes("NULL"); @Override protected void setup(Context context) throws IOException { -// Configuration conf = context.getConfiguration(); -// this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(","); -// String inputDelim = conf.get(BatchConstants.INPUT_DELIM); -// this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0); -// this.splitter = new BytesSplitter(200, 4096); - schema = HCatInputFormat.getTableSchema(context.getConfiguration()); columnSize = schema.getFields().size(); } @Override public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { - /* - if (delim == -1) { - delim = splitter.detectDelim(value, columns.length); - } - - int nParts = splitter.split(value.getBytes(), value.getLength(), (byte) delim); - SplittedBytes[] parts = splitter.getSplitBuffers(); - - if (nParts != columns.length) { - throw new RuntimeException("Got " + parts.length + " from -- " + value.toString() + " -- but only " + columns.length + " expected"); - } - - for (short i = 0; i < nParts; i++) { - outputKey.set(i); - outputValue.set(parts[i].value, 0, parts[i].length); - context.write(outputKey, outputValue); - } - */ HCatFieldSchema fieldSchema = null; for (short i = 0; i < columnSize; i++) { outputKey.set(i); fieldSchema = schema.get(i); Object fieldValue = record.get(fieldSchema.getName(), schema); - if (fieldValue == null) - fieldValue = "NULL"; - byte[] bytes = Bytes.toBytes(fieldValue.toString()); + byte[] bytes; + if (fieldValue != null) { + bytes = Bytes.toBytes(fieldValue.toString()); + } else { + bytes = NULL_VALUE; + } outputValue.set(bytes, 0, bytes.length); context.write(outputKey, outputValue); }