From 652f0f75fa344129845067db40efb77fabc4f80d Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Mon, 5 Jan 2015 11:13:19 +0800 Subject: [PATCH 1/2] Add HiveTableReader and its test case --- .../java/com/kylinolap/dict/lookup/HiveTable.java | 12 ++--- .../com/kylinolap/dict/lookup/HiveTableReader.java | 34 ++++++++++--- .../com/kylinolap/dict/HiveTableReaderTest.java | 57 ++++++++++++++++++++++ .../invertedindex/IIDistinctColumnsMapper.java | 39 +++------------ 4 files changed, 99 insertions(+), 43 deletions(-) create mode 100644 dictionary/src/test/java/com/kylinolap/dict/HiveTableReaderTest.java diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java index 12f2464..36725ec 100644 --- a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java +++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java @@ -42,6 +42,7 @@ private static final Logger logger = LoggerFactory.getLogger(HiveTable.class); + private String database = "default"; private String hiveTable; private int nColumns; private String hdfsLocation; @@ -59,7 +60,7 @@ public String getColumnDelimeter() throws IOException { @Override public TableReader getReader() throws IOException { - return getFileTable().getReader(); + return new HiveTableReader(database, hiveTable); } @Override @@ -92,12 +93,12 @@ private String computeHDFSLocation(boolean needFilePath) throws IOException { HiveMetaStoreClient hiveClient = HiveClient.getInstance().getMetaStoreClient(); Table table = null; try { - table = hiveClient.getTable(hiveTable); + table = hiveClient.getTable(database, hiveTable); } catch (Exception e) { e.printStackTrace(); throw new IOException(e); } - + String hdfsDir = table.getSd().getLocation(); if (needFilePath) { FileSystem fs = HadoopUtil.getFileSystem(hdfsDir); @@ -106,9 +107,8 @@ private String computeHDFSLocation(boolean needFilePath) throws IOException { } else { return hdfsDir; } - + } - private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFoundException, IOException { FileStatus[] files = fs.listStatus(new Path(hdfsDir)); @@ -124,7 +124,7 @@ private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFou @Override public String toString() { - return "hive:" + hiveTable; + return "hive: database=[" + database + "], table=[" + hiveTable + "]"; } } diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java index ee82633..bae85e9 100644 --- a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java +++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java @@ -1,3 +1,19 @@ +/* + * Copyright 2013-2014 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.kylinolap.dict.lookup; import java.io.IOException; @@ -5,8 +21,6 @@ import java.util.Iterator; import java.util.List; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.transfer.DataTransferFactory; @@ -25,17 +39,25 @@ private HCatRecord currentHCatRecord; private int numberOfSplits = 0; - public HiveTableReader(String dbName, String tableName) throws MetaException, ClassNotFoundException, CommandNeedRetryException, IOException { + public HiveTableReader(String dbName, String tableName) throws IOException { this.dbName = dbName; this.tableName = tableName; - this.readCntxt = HiveClient.getInstance().getReaderContext(dbName, tableName); + try { + this.readCntxt = HiveClient.getInstance().getReaderContext(dbName, tableName); + } catch (Exception e) { + e.printStackTrace(); + throw new IOException(e); + } + this.numberOfSplits = readCntxt.numSplits(); } @Override public void close() throws IOException { - // TODO Auto-generated method stub - + this.readCntxt = null; + this.currentHCatRecordItr = null; + this.currentHCatRecord = null; + this.currentSplit = -1; } @Override diff --git a/dictionary/src/test/java/com/kylinolap/dict/HiveTableReaderTest.java b/dictionary/src/test/java/com/kylinolap/dict/HiveTableReaderTest.java new file mode 100644 index 0000000..60fadef --- /dev/null +++ b/dictionary/src/test/java/com/kylinolap/dict/HiveTableReaderTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2013-2014 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.kylinolap.dict; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.lang.ArrayUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.kylinolap.common.util.ClasspathUtil; +import com.kylinolap.dict.lookup.HiveTableReader; + +public class HiveTableReaderTest { + + private static HiveTableReader reader = null; + + @BeforeClass + public static void setup() throws Exception { + ClasspathUtil.addClasspath(new File("../examples/test_case_data/sandbox/").getAbsolutePath()); + + reader = new HiveTableReader("default", "test_kylin_fact"); + } + + public static void tearDown() throws IOException { + reader.close(); + } + + @Test + public void test() throws IOException { + int rowNumber = 0; + while (reader.next()) { + String[] row = reader.getRow(); + Assert.assertEquals(9, row.length); + System.out.println(ArrayUtils.toString(row)); + rowNumber++; + } + + Assert.assertEquals(10000, rowNumber); + } +} diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java index e744544..f714a12 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java @@ -31,56 +31,33 @@ */ public class IIDistinctColumnsMapper extends Mapper { -// private String[] columns; -// private int delim; -// private BytesSplitter splitter; - private ShortWritable outputKey = new ShortWritable(); private Text outputValue = new Text(); private HCatSchema schema = null; private int columnSize = 0; + + public static final byte[] NULL_VALUE = Bytes.toBytes("NULL"); @Override protected void setup(Context context) throws IOException { -// Configuration conf = context.getConfiguration(); -// this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(","); -// String inputDelim = conf.get(BatchConstants.INPUT_DELIM); -// this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0); -// this.splitter = new BytesSplitter(200, 4096); - schema = HCatInputFormat.getTableSchema(context.getConfiguration()); columnSize = schema.getFields().size(); } @Override public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { - /* - if (delim == -1) { - delim = splitter.detectDelim(value, columns.length); - } - - int nParts = splitter.split(value.getBytes(), value.getLength(), (byte) delim); - SplittedBytes[] parts = splitter.getSplitBuffers(); - - if (nParts != columns.length) { - throw new RuntimeException("Got " + parts.length + " from -- " + value.toString() + " -- but only " + columns.length + " expected"); - } - - for (short i = 0; i < nParts; i++) { - outputKey.set(i); - outputValue.set(parts[i].value, 0, parts[i].length); - context.write(outputKey, outputValue); - } - */ HCatFieldSchema fieldSchema = null; for (short i = 0; i < columnSize; i++) { outputKey.set(i); fieldSchema = schema.get(i); Object fieldValue = record.get(fieldSchema.getName(), schema); - if (fieldValue == null) - fieldValue = "NULL"; - byte[] bytes = Bytes.toBytes(fieldValue.toString()); + byte[] bytes; + if (fieldValue != null) { + bytes = Bytes.toBytes(fieldValue.toString()); + } else { + bytes = NULL_VALUE; + } outputValue.set(bytes, 0, bytes.length); context.write(outputKey, outputValue); } From bfffde49241fec2702bf7b875f02f9ee36d35559 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Tue, 6 Jan 2015 22:44:06 +0800 Subject: [PATCH 2/2] add partition filter in HiveColumnCardinalityJob to test the function on Avro --- .../job/hadoop/cardinality/HiveColumnCardinalityJob.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java index 4280f71..ba6eba0 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java @@ -7,9 +7,7 @@ import java.io.InputStream; import java.io.StringWriter; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -31,12 +29,8 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ToolRunner; -import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import com.kylinolap.job.JobDAO; -import com.kylinolap.job.JobInstance; -import com.kylinolap.job.JobInstance.JobStep; import com.kylinolap.job.hadoop.AbstractHadoopJob; public class HiveColumnCardinalityJob extends AbstractHadoopJob { @@ -144,8 +138,9 @@ public int run(String[] args) throws Exception { // Mapper this.table = getOptionValue(OPTION_TABLE); System.out.println("Going to start HiveColumnCardinalityJob on table '" + table + "'"); + String filter = "\"dt\"=\"20150101\" and \"hour\"=\"00\""; HCatInputFormat.setInput(job, "default", - table); + table, filter); System.out.println("Set input format as HCat on table '" + table + "'");