From 201a5047d1c6f9829cb98fc46920c7fca1bc1eee Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Sun, 4 Jan 2015 10:26:53 +0800 Subject: [PATCH 1/2] Change all reading hive table to use the HCat way --- dictionary/pom.xml | 6 + .../java/com/kylinolap/dict/lookup/HiveTable.java | 50 ++---- .../cardinality/ColumnCardinalityMapper.java | 16 +- .../job/hadoop/cube/FactDistinctColumnsMapper.java | 33 ++-- .../hadoop/invertedindex/IIDistinctColumnsJob.java | 8 + .../invertedindex/IIDistinctColumnsMapper.java | 52 ++++-- metadata/pom.xml | 6 + .../com/kylinolap/metadata/tool/HiveClient.java | 86 ++++++++++ .../metadata/tool/HiveSourceTableLoader.java | 181 ++++++++------------- 9 files changed, 244 insertions(+), 194 deletions(-) create mode 100644 metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java diff --git a/dictionary/pom.xml b/dictionary/pom.xml index 1d09903..931b913 100644 --- a/dictionary/pom.xml +++ b/dictionary/pom.xml @@ -68,6 +68,12 @@ hbase-client provided + + org.apache.hive.hcatalog + hive-hcatalog-core + ${hive-hcatalog.version} + provided + junit junit diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java index 25d2a87..14e6362 100644 --- a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java +++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java @@ -16,23 +16,16 @@ package com.kylinolap.dict.lookup; -import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; import com.kylinolap.common.KylinConfig; -import com.kylinolap.common.util.HadoopUtil; -import com.kylinolap.common.util.CliCommandExecutor; import com.kylinolap.metadata.MetadataManager; +import com.kylinolap.metadata.tool.HiveClient; /** * @author yangli9 @@ -89,36 +82,17 @@ private String computeHDFSLocation(boolean needFilePath) throws IOException { return override; } - String cmd = "hive -e \"describe extended " + hiveTable + ";\""; - CliCommandExecutor exec = KylinConfig.getInstanceFromEnv().getCliCommandExecutor(); - String output = exec.execute(cmd); - - Pattern ptn = Pattern.compile("location:(.*?),"); - Matcher m = ptn.matcher(output); - if (m.find() == false) - throw new IOException("Failed to find HDFS location for hive table " + hiveTable + " from output -- " + output); - - String hdfsDir = m.group(1); - - if (needFilePath) { - FileSystem fs = HadoopUtil.getFileSystem(hdfsDir); - FileStatus file = findOnlyFile(hdfsDir, fs); - return file.getPath().toString(); - } else { - return hdfsDir; + HiveMetaStoreClient hiveClient = HiveClient.getInstance().getMetaStoreClient(); + Table table = null; + try { + table = hiveClient.getTable(hiveTable); + } catch (Exception e) { + e.printStackTrace(); + throw new IOException(e); } - } + + return table.getSd().getLocation(); - private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFoundException, IOException { - FileStatus[] files = fs.listStatus(new Path(hdfsDir)); - ArrayList nonZeroFiles = Lists.newArrayList(); - for (FileStatus f : files) { - if (f.getLen() > 0) - nonZeroFiles.add(f); - } - if (nonZeroFiles.size() != 1) - throw new IllegalStateException("Expect 1 and only 1 non-zero file under " + hdfsDir + ", but find " + nonZeroFiles.size()); - return nonZeroFiles.get(0); } @Override diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java index a93900d..daaf709 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java @@ -20,7 +20,6 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.util.Bytes; @@ -45,18 +44,23 @@ public static final String DEFAULT_DELIM = ","; private int counter = 0; + + private HCatSchema schema = null; + private int columnSize = 0; + + @Override + protected void setup(Context context) throws IOException { + schema = HCatInputFormat.getTableSchema(context.getConfiguration()); + columnSize = schema.getFields().size(); + } @Override public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException { - HCatSchema schema = HCatInputFormat.getTableSchema(context.getConfiguration()); - - List fieldList = schema.getFields(); HCatFieldSchema field; Object fieldValue; - Integer columnSize = fieldList.size(); for (int m = 0; m < columnSize; m++) { - field = fieldList.get(m); + field = schema.get(m); fieldValue = value.get(field.getName(), schema); if (fieldValue == null) fieldValue = "NULL"; diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java index 91b1d20..658ea97 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java @@ -25,12 +25,13 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import com.kylinolap.common.KylinConfig; import com.kylinolap.cube.CubeInstance; import com.kylinolap.cube.CubeManager; -import com.kylinolap.cube.common.BytesSplitter; -import com.kylinolap.cube.common.SplittedBytes; import com.kylinolap.cube.cuboid.Cuboid; import com.kylinolap.dict.DictionaryManager; import com.kylinolap.job.constant.BatchConstants; @@ -51,20 +52,16 @@ private int[] factDictCols; private JoinedFlatTableDesc intermediateTableDesc; - private String intermediateTableRowDelimiter; - private byte byteRowDelimiter; - private BytesSplitter bytesSplitter; private ShortWritable outputKey = new ShortWritable(); private Text outputValue = new Text(); private int errorRecordCounter; + private HCatSchema schema = null; + @Override protected void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); - intermediateTableRowDelimiter = conf.get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER)); - byteRowDelimiter = intermediateTableRowDelimiter.getBytes("UTF-8")[0]; - bytesSplitter = new BytesSplitter(200, 4096); KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf); cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); @@ -92,34 +89,36 @@ protected void setup(Context context) throws IOException { this.factDictCols = new int[factDictCols.size()]; for (int i = 0; i < factDictCols.size(); i++) this.factDictCols[i] = factDictCols.get(i); + + schema = HCatInputFormat.getTableSchema(context.getConfiguration()); } @Override public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { try { -// bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter); -// intermediateTableDesc.sanityCheck(bytesSplitter); -// SplittedBytes[] splitBuffers = bytesSplitter.getSplitBuffers(); int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); + HCatFieldSchema fieldSchema = null; for (int i : factDictCols) { outputKey.set((short) i); -// SplittedBytes bytes = splitBuffers[flatTableIndexes[i]]; - String textValue = record.get(flatTableIndexes[i]).toString(); - byte[] bytes = Bytes.toBytes(textValue); + fieldSchema = schema.get(flatTableIndexes[i]); + Object fieldValue = record.get(fieldSchema.getName(), schema); + if (fieldValue == null) + fieldValue = "NULL"; + byte[] bytes = Bytes.toBytes(fieldValue.toString()); outputValue.set(bytes, 0, bytes.length); context.write(outputKey, outputValue); } } catch (Exception ex) { - handleErrorRecord(bytesSplitter, ex); + handleErrorRecord(record, ex); } } - private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException { + private void handleErrorRecord(HCatRecord record, Exception ex) throws IOException { - System.err.println("Insane record: " + bytesSplitter); + System.err.println("Insane record: " + record.getAll()); ex.printStackTrace(System.err); errorRecordCounter++; diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java index bc12db2..7fa41d5 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,6 +111,7 @@ private void setupMapInput(Path input, String inputFormat, String inputDelim) th job.setJarByClass(this.getClass()); } + /* if ("textinputformat".equalsIgnoreCase(inputFormat) || "text".equalsIgnoreCase(inputFormat)) { job.setInputFormatClass(TextInputFormat.class); } else { @@ -124,6 +126,12 @@ private void setupMapInput(Path input, String inputFormat, String inputDelim) th if (inputDelim != null) { job.getConfiguration().set(BatchConstants.INPUT_DELIM, inputDelim); } + */ + String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME); + HCatInputFormat.setInput(job, "default", + tableName.toLowerCase()); + + job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(IIDistinctColumnsMapper.class); job.setCombinerClass(IIDistinctColumnsCombiner.class); diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java index b7456bf..e744544 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java @@ -17,38 +17,44 @@ import java.io.IOException; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; - -import com.kylinolap.cube.common.BytesSplitter; -import com.kylinolap.cube.common.SplittedBytes; -import com.kylinolap.job.constant.BatchConstants; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; /** * @author yangli9 */ -public class IIDistinctColumnsMapper extends Mapper { +public class IIDistinctColumnsMapper extends Mapper { - private String[] columns; - private int delim; - private BytesSplitter splitter; +// private String[] columns; +// private int delim; +// private BytesSplitter splitter; private ShortWritable outputKey = new ShortWritable(); private Text outputValue = new Text(); + private HCatSchema schema = null; + private int columnSize = 0; @Override protected void setup(Context context) throws IOException { - Configuration conf = context.getConfiguration(); - this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(","); - String inputDelim = conf.get(BatchConstants.INPUT_DELIM); - this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0); - this.splitter = new BytesSplitter(200, 4096); +// Configuration conf = context.getConfiguration(); +// this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(","); +// String inputDelim = conf.get(BatchConstants.INPUT_DELIM); +// this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0); +// this.splitter = new BytesSplitter(200, 4096); + + schema = HCatInputFormat.getTableSchema(context.getConfiguration()); + columnSize = schema.getFields().size(); } - + @Override - public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException { + public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { + /* if (delim == -1) { delim = splitter.detectDelim(value, columns.length); } @@ -65,6 +71,20 @@ public void map(KEYIN key, Text value, Context context) throws IOException, Inte outputValue.set(parts[i].value, 0, parts[i].length); context.write(outputKey, outputValue); } + */ + + HCatFieldSchema fieldSchema = null; + for (short i = 0; i < columnSize; i++) { + outputKey.set(i); + fieldSchema = schema.get(i); + Object fieldValue = record.get(fieldSchema.getName(), schema); + if (fieldValue == null) + fieldValue = "NULL"; + byte[] bytes = Bytes.toBytes(fieldValue.toString()); + outputValue.set(bytes, 0, bytes.length); + context.write(outputKey, outputValue); + } + } } diff --git a/metadata/pom.xml b/metadata/pom.xml index 3e76619..ea255b0 100644 --- a/metadata/pom.xml +++ b/metadata/pom.xml @@ -71,6 +71,12 @@ hbase-client provided + + org.apache.hive.hcatalog + hive-hcatalog-core + ${hive-hcatalog.version} + provided + junit junit diff --git a/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java new file mode 100644 index 0000000..70f30ad --- /dev/null +++ b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java @@ -0,0 +1,86 @@ +package com.kylinolap.metadata.tool; + +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; + +/* + * Copyright 2013-2014 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class HiveClient { + + protected static HiveConf hiveConf = null; + protected static Driver driver = null; + protected static HiveMetaStoreClient metaStoreClient = null; + + private static HiveClient instance = null; + + private HiveClient() { + setup(); + } + + public static HiveClient getInstance() { + + if (instance == null) { + synchronized (HiveClient.class) { + if (instance == null) + instance = new HiveClient(); + } + + } + + return instance; + } + + private void setup() { + hiveConf = new HiveConf(HiveSourceTableLoader.class); + driver = new Driver(hiveConf); + try { + metaStoreClient = new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + SessionState.start(new CliSessionState(hiveConf)); + } + + public HiveConf getHiveConf() { + return hiveConf; + } + + /** + * Get the hive ql driver to execute ddl or dml + * @return + */ + public Driver getDriver() { + return driver; + } + + + /** + * Get the Hive Meta store client; + * @return + */ + public HiveMetaStoreClient getMetaStoreClient() { + return metaStoreClient; + } + + + +} diff --git a/metadata/src/main/java/com/kylinolap/metadata/tool/HiveSourceTableLoader.java b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveSourceTableLoader.java index 3e4cca3..17f7089 100644 --- a/metadata/src/main/java/com/kylinolap/metadata/tool/HiveSourceTableLoader.java +++ b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveSourceTableLoader.java @@ -16,11 +16,9 @@ * limitations under the License. */ -import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.StringReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,9 +26,14 @@ import java.util.Set; import java.util.UUID; -import com.kylinolap.metadata.MetadataManager; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +42,8 @@ import com.google.common.collect.Sets; import com.kylinolap.common.KylinConfig; import com.kylinolap.common.persistence.ResourceTool; -import com.kylinolap.common.util.CliCommandExecutor; import com.kylinolap.common.util.JsonUtil; +import com.kylinolap.metadata.MetadataManager; import com.kylinolap.metadata.model.schema.ColumnDesc; import com.kylinolap.metadata.model.schema.TableDesc; @@ -51,13 +54,17 @@ * @author jianliu */ public class HiveSourceTableLoader { + private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class); public static final String OUTPUT_SURFIX = "json"; public static final String TABLE_FOLDER_NAME = "table"; public static final String TABLE_EXD_FOLDER_NAME = "table_exd"; + protected static HiveMetaStoreClient client = HiveClient.getInstance().getMetaStoreClient(); + public static Set reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException { + Map> db2tables = Maps.newHashMap(); for (String table : hiveTables) { int cut = table.indexOf('.'); @@ -76,8 +83,8 @@ metaTmpDir.delete(); metaTmpDir.mkdirs(); - for (String database: db2tables.keySet()) { - for (String table: db2tables.get(database)) { + for (String database : db2tables.keySet()) { + for (String table : db2tables.get(database)) { TableDesc tableDesc = MetadataManager.getInstance(config).getTableDesc(table); if (tableDesc == null) { continue; @@ -104,33 +111,6 @@ } private static List extractHiveTables(String database, Set tables, File metaTmpDir, KylinConfig config) throws IOException { - StringBuilder cmd = new StringBuilder(); - cmd.append("hive -e \""); - if (StringUtils.isEmpty(database) == false) { - cmd.append("use " + database + "; "); - } - for (String table : tables) { - cmd.append("show table extended like " + table + "; "); - } - cmd.append("\""); - - CliCommandExecutor cmdExec = config.getCliCommandExecutor(); - String output = cmdExec.execute(cmd.toString()); - - return extractTableDescFromHiveOutput(database, output, metaTmpDir); - } - - private static List extractTableDescFromHiveOutput(String database, String hiveOutput, File metaTmpDir) throws IOException { - BufferedReader reader = new BufferedReader(new StringReader(hiveOutput)); - try { - return extractTables(database, reader, metaTmpDir); - } finally { - IOUtils.closeQuietly(reader); - } - } - - private static List extractTables(String database, BufferedReader reader, File metaTmpDir) throws IOException { - File tableDescDir = new File(metaTmpDir, TABLE_FOLDER_NAME); File tableExdDir = new File(metaTmpDir, TABLE_EXD_FOLDER_NAME); mkdirs(tableDescDir); @@ -138,7 +118,53 @@ List tableDescList = new ArrayList(); List> tableAttrsList = new ArrayList>(); - getTables(database, reader, tableDescList, tableAttrsList); + + for (String tableName : tables) { + Table table = null; + List fields = null; + try { + table = client.getTable(database, tableName); + fields = client.getFields(database, tableName); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + throw new IOException(e); + } + + TableDesc tableDesc = new TableDesc(); + tableDesc.setDatabase(database.toUpperCase()); + tableDesc.setName(tableName.toUpperCase()); + tableDesc.setUuid(UUID.randomUUID().toString()); + int columnNumber = fields.size(); + List columns = new ArrayList(columnNumber); + for (int i = 0; i < columnNumber; i++) { + FieldSchema field = fields.get(i); + ColumnDesc cdesc = new ColumnDesc(); + cdesc.setName(field.getName().toUpperCase()); + cdesc.setDatatype(field.getType()); + cdesc.setId(String.valueOf(i + 1)); + columns.add(cdesc); + } + tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber])); + + List partitionCols = table.getPartitionKeys(); + StringBuffer partitionColumnString = new StringBuffer(); + for(int i=0, n= partitionCols.size(); i0) + partitionColumnString.append(", "); + partitionColumnString.append(partitionCols.get(i).getName().toUpperCase()); + } + tableDescList.add(tableDesc); + Map map = new HashMap(); //table.getParameters(); + map.put("tableName", table.getTableName()); + map.put("location", table.getSd().getLocation()); + map.put("inputformat", table.getSd().getInputFormat()); + map.put("outputformat", table.getSd().getOutputFormat()); + map.put("owner", table.getOwner()); + map.put("lastAccessTime", String.valueOf(table.getLastAccessTime())); + map.put("partitionColumns", partitionColumnString.toString()); + tableAttrsList.add(map); + } List loadedTables = Lists.newArrayList(); @@ -152,9 +178,11 @@ File file = new File(tableExdDir, tableAttrs.get("tableName").toUpperCase() + "." + OUTPUT_SURFIX); JsonUtil.writeValueIndent(new FileOutputStream(file), tableAttrs); } + return loadedTables; } + private static void mkdirs(File metaTmpDir) { if (!metaTmpDir.exists()) { if (!metaTmpDir.mkdirs()) { @@ -163,87 +191,6 @@ private static void mkdirs(File metaTmpDir) { } } - private static void getTables(String database, BufferedReader reader, // - List tableDescList, List> tableAttrsList) throws IOException { - - Map tableAttrs = new HashMap(); - TableDesc tableDesc = new TableDesc(); - String line; - boolean hit = false; - - while ((line = reader.readLine()) != null) { - logger.info(line); - int i = line.indexOf(":"); - if (i == -1) { - continue; - } - String key = line.substring(0, i); - String value = line.substring(i + 1, line.length()); - if (key.equals("tableName")) {// Create a new table object - hit = true; - tableAttrs = new HashMap(); - tableAttrsList.add(tableAttrs); - tableDesc = new TableDesc(); - tableDescList.add(tableDesc); - } - - if (!hit) { - continue; - } - - if (line.startsWith("columns")) {// geneate source table metadata - String tname = tableAttrs.get("tableName"); - - tableDesc.setDatabase(database.toUpperCase()); - tableDesc.setName(tname.toUpperCase()); - tableDesc.setUuid(UUID.randomUUID().toString()); - addColumns(tableDesc, value); - } - tableAttrs.put(key, value); - if (key.equals("lastUpdateTime")) { - hit = false; - } - } - - } - - private static void addColumns(TableDesc sTable, String value) { - List columns = new ArrayList(); - int i1 = value.indexOf("{"); - int i2 = value.indexOf("}"); - if (i1 < 0 || i2 < 0 || i1 > i2) { - return; - } - String temp = value.substring(i1 + 1, i2); - String[] strArr = temp.split(", "); - for (int i = 0; i < strArr.length; i++) { - String t1 = strArr[i].trim(); - int pos = t1.indexOf(" "); - String colType = t1.substring(0, pos).trim(); - String colName = t1.substring(pos).trim(); - ColumnDesc cdesc = new ColumnDesc(); - cdesc.setName(colName.toUpperCase()); - cdesc.setDatatype(convertType(colType)); - cdesc.setId(String.valueOf(i + 1)); - columns.add(cdesc); - } - sTable.setColumns(columns.toArray(new ColumnDesc[0])); - } - - private static String convertType(String colType) { - if ("i32".equals(colType)) { - return "int"; - } else if ("i64".equals(colType)) { - return "bigint"; - } else if ("i16".equals(colType)) { - return "smallint"; - } else if ("byte".equals(colType)) { - return "tinyint"; - } else if ("bool".equals(colType)) - return "boolean"; - return colType; - } - /** */ public static void main(String[] args) { From 7faf279bdbdbe6955ab87b7d783cbda56d91bd99 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Sun, 4 Jan 2015 14:11:38 +0800 Subject: [PATCH 2/2] Add HiveTableReader --- .../java/com/kylinolap/dict/lookup/HiveTable.java | 31 ++++++++- .../com/kylinolap/dict/lookup/HiveTableReader.java | 81 ++++++++++++++++++++++ .../com/kylinolap/metadata/tool/HiveClient.java | 43 +++++++++++- 3 files changed, 150 insertions(+), 5 deletions(-) create mode 100644 dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java index 14e6362..12f2464 100644 --- a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java +++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java @@ -16,14 +16,21 @@ package com.kylinolap.dict.lookup; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; import com.kylinolap.common.KylinConfig; +import com.kylinolap.common.util.HadoopUtil; import com.kylinolap.metadata.MetadataManager; import com.kylinolap.metadata.tool.HiveClient; @@ -91,8 +98,28 @@ private String computeHDFSLocation(boolean needFilePath) throws IOException { throw new IOException(e); } - return table.getSd().getLocation(); - + String hdfsDir = table.getSd().getLocation(); + if (needFilePath) { + FileSystem fs = HadoopUtil.getFileSystem(hdfsDir); + FileStatus file = findOnlyFile(hdfsDir, fs); + return file.getPath().toString(); + } else { + return hdfsDir; + } + + } + + + private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFoundException, IOException { + FileStatus[] files = fs.listStatus(new Path(hdfsDir)); + ArrayList nonZeroFiles = Lists.newArrayList(); + for (FileStatus f : files) { + if (f.getLen() > 0) + nonZeroFiles.add(f); + } + if (nonZeroFiles.size() != 1) + throw new IllegalStateException("Expect 1 and only 1 non-zero file under " + hdfsDir + ", but find " + nonZeroFiles.size()); + return nonZeroFiles.get(0); } @Override diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java new file mode 100644 index 0000000..ee82633 --- /dev/null +++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTableReader.java @@ -0,0 +1,81 @@ +package com.kylinolap.dict.lookup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.transfer.DataTransferFactory; +import org.apache.hive.hcatalog.data.transfer.HCatReader; +import org.apache.hive.hcatalog.data.transfer.ReaderContext; + +import com.kylinolap.metadata.tool.HiveClient; + +public class HiveTableReader implements TableReader { + + private String dbName; + private String tableName; + private int currentSplit = -1; + private ReaderContext readCntxt; + private Iterator currentHCatRecordItr = null; + private HCatRecord currentHCatRecord; + private int numberOfSplits = 0; + + public HiveTableReader(String dbName, String tableName) throws MetaException, ClassNotFoundException, CommandNeedRetryException, IOException { + this.dbName = dbName; + this.tableName = tableName; + this.readCntxt = HiveClient.getInstance().getReaderContext(dbName, tableName); + this.numberOfSplits = readCntxt.numSplits(); + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public boolean next() throws IOException { + while (currentHCatRecordItr == null || !currentHCatRecordItr.hasNext()) { + currentSplit++; + if (currentSplit == numberOfSplits) { + return false; + } + currentHCatRecordItr = loadHCatRecordItr(currentSplit); + } + + currentHCatRecord = currentHCatRecordItr.next(); + + return true; + } + + private Iterator loadHCatRecordItr(int dataSplit) throws HCatException { + HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit); + return currentHCatReader.read(); + } + + @Override + public String[] getRow() { + List allFields = currentHCatRecord.getAll(); + List rowValues = new ArrayList(allFields.size()); + for (Object o : allFields) { + rowValues.add(o != null ? o.toString() : "NULL"); + } + + return rowValues.toArray(new String[allFields.size()]); + } + + @Override + public void setExpectedColumnNumber(int expectedColumnNumber) { + + } + + public String toString() { + return "hive table reader for: " + dbName + "." + tableName; + } + +} diff --git a/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java index 70f30ad..9b53ff0 100644 --- a/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java +++ b/metadata/src/main/java/com/kylinolap/metadata/tool/HiveClient.java @@ -1,11 +1,23 @@ package com.kylinolap.metadata.tool; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.transfer.DataTransferFactory; +import org.apache.hive.hcatalog.data.transfer.HCatReader; +import org.apache.hive.hcatalog.data.transfer.ReadEntity; +import org.apache.hive.hcatalog.data.transfer.ReaderContext; /* * Copyright 2013-2014 eBay Software Foundation @@ -72,7 +84,6 @@ public Driver getDriver() { return driver; } - /** * Get the Hive Meta store client; * @return @@ -81,6 +92,32 @@ public HiveMetaStoreClient getMetaStoreClient() { return metaStoreClient; } - - + public ReaderContext getReaderContext(String database, String table) throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException { + + Iterator> itr = hiveConf.iterator(); + Map map = new HashMap(); + while (itr.hasNext()) { + Entry kv = itr.next(); + map.put(kv.getKey(), kv.getValue()); + } + + ReaderContext readCntxt = runsInMaster(map, database, table); + + return readCntxt; + } + + private ReaderContext runsInMaster(Map config, String database, String table) throws HCatException { + ReadEntity entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build(); + HCatReader reader = DataTransferFactory.getHCatReader(entity, config); + ReaderContext cntxt = reader.prepareRead(); + return cntxt; + } + + public HCatReader getHCatReader(ReaderContext cntxt, int slaveNum) throws HCatException { + + HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum); + return reader; + + } + }