Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java (revision 0) @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; +import org.apache.hcatalog.mapreduce.HCatTableInfo; +import org.apache.hcatalog.mapreduce.InputJobInfo; + +/** + * The Class HBaseInputStorageDriver enables reading of HBase tables through + * HCatalog. + */ +public class HBaseInputStorageDriver extends HCatInputStorageDriver { + private HCatTableInfo tableInfo; + private ResultConverter converter; + private HCatSchema outputColSchema; + private HCatSchema dataSchema; + private Configuration jobConf; + + /* + * @param JobContext + * + * @param hcatProperties + * + * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver + * #initialize(org.apache.hadoop.mapreduce.JobContext, java.util.Properties) + */ + @Override + public void initialize(JobContext context, Properties hcatProperties) { + jobConf = context.getConfiguration(); + try { + String jobString = context.getConfiguration().get( + HCatConstants.HCAT_KEY_JOB_INFO); + if (jobString == null) { + throw new IOException( + "InputJobInfo information not found in JobContext. " + + "HCatInputFormat.setInput() not called?"); + } + InputJobInfo jobInfo = (InputJobInfo) HCatUtil + .deserialize(jobString); + tableInfo = jobInfo.getTableInfo(); + dataSchema = tableInfo.getDataColumns(); + List fields = HCatUtil + .getFieldSchemaList(outputColSchema.getFields()); + hcatProperties.setProperty(Constants.LIST_COLUMNS, + MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); + hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES, + MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); + converter = new HBaseSerDeResultConverter(dataSchema, + outputColSchema, hcatProperties); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + /* + * @param hcatProperties + * + * @return InputFormat + * + * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver + * #getInputFormat(java.util.Properties) + */ + @Override + public InputFormat getInputFormat( + Properties hcatProperties) { + HBaseInputFormat tableInputFormat = new HBaseInputFormat(); + jobConf.set(TableInputFormat.INPUT_TABLE, tableInfo.getTableName()); + tableInputFormat.setConf(jobConf); + // TODO: Make the caching configurable by the user + tableInputFormat.getScan().setCaching(200); + tableInputFormat.getScan().setCacheBlocks(false); + return tableInputFormat; + } + + /* + * @param baseKey + * + * @param baseValue + * + * @return HCatRecord + * + * @throws IOException + * + * @see + * org.apache.hcatalog.mapreduce.HCatInputStorageDriver#convertToHCatRecord + * (org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable) + */ + @Override + public HCatRecord convertToHCatRecord(WritableComparable baseKey, + Writable baseValue) throws IOException { + return this.converter.convert((Result) baseValue); + } + + /* + * @param jobContext + * + * @param howlSchema + * + * @throws IOException + * + * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver# + * setOutputSchema(org.apache.hadoop.mapreduce.JobContext, + * org.apache.hcatalog.data.schema.HCatSchema) + */ + @Override + public void setOutputSchema(JobContext jobContext, HCatSchema howlSchema) + throws IOException { + outputColSchema = howlSchema; + } + + /* + * @param jobContext + * + * @param partitionValues + * + * @throws IOException + * + * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver + * #setPartitionValues(org.apache.hadoop.mapreduce.JobContext, + * java.util.Map) + */ + @Override + public void setPartitionValues(JobContext jobContext, + Map partitionValues) throws IOException { + } + + /* + * @param jobContext + * + * @param hcatSchema + * + * @throws IOException + * + * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver + * #setOriginalSchema(org.apache.hadoop.mapreduce.JobContext, + * org.apache.hcatalog.data.schema.HCatSchema) + */ + @Override + public void setOriginalSchema(JobContext jobContext, HCatSchema hcatSchema) + throws IOException { + this.dataSchema = hcatSchema; + } +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (revision 0) @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase. + */ +class HBaseInputFormat extends InputFormat { + + private final TableInputFormat inputFormat; + + public HBaseInputFormat() { + inputFormat = new TableInputFormat(); + } + + /* + * @param instance of InputSplit + * + * @param instance of TaskAttemptContext + * + * @return RecordReader + * + * @throws IOException + * + * @throws InterruptedException + * + * @see + * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache + * .hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext tac) throws IOException, + InterruptedException { + return inputFormat.createRecordReader(split, tac); + } + + /* + * @param jobContext + * + * @return List of InputSplit + * + * @throws IOException + * + * @throws InterruptedException + * + * @see + * org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce + * .JobContext) + */ + @Override + public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + return inputFormat.getSplits(jobContext); + } + + public void setConf(Configuration conf) { + inputFormat.setConf(conf); + } + + public Scan getScan() { + return inputFormat.getScan(); + } + + public void setScan(Scan scan) { + inputFormat.setScan(scan); + } + +} Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java =================================================================== --- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java (revision 0) +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java (revision 0) @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.junit.Test; + +public class TestHBaseInputStorageDriver extends SkeletonHBaseTest { + + private final byte[] FAMILY = Bytes.toBytes("testFamily"); + private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); + private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2"); + private final String tableName = "mytesttable"; + + List generatePuts(int num) { + List myPuts = new ArrayList(); + for (int i = 0; i < num; i++) { + Put put = new Put(Bytes.toBytes("testRow" + i)); + put.add(FAMILY, QUALIFIER1, 0, + Bytes.toBytes("testQualifier1-" + "textValue-" + i)); + put.add(FAMILY, QUALIFIER2, 0, + Bytes.toBytes("testQualifier2-" + "textValue-" + i)); + myPuts.add(put); + } + return myPuts; + } + + private void registerHBaseTable(String tableName) throws Exception { + + String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME; + HiveMetaStoreClient client = getCluster().getHiveMetaStoreClient(); + try { + client.dropTable(databaseName, tableName); + } catch (Exception e) { + } // can fail with NoSuchObjectException + + Table tbl = new Table(); + tbl.setDbName(databaseName); + tbl.setTableName(tableName); + tbl.setTableType(TableType.EXTERNAL_TABLE.toString()); + tbl.setPartitionKeys(new ArrayList()); + Map tableParams = new HashMap(); + tableParams.put(HCatConstants.HCAT_ISD_CLASS, + HBaseInputStorageDriver.class.getName()); + tableParams.put(HCatConstants.HCAT_OSD_CLASS, "NotRequired"); + tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY, + ":key,testFamily:testQualifier1,testFamily:testQualifier2"); + tableParams.put(Constants.SERIALIZATION_FORMAT, "9"); + tableParams.put(Constants.SERIALIZATION_NULL_FORMAT, "NULL"); + tbl.setParameters(tableParams); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(HCatUtil.getFieldSchemaList(getSchema().getFields())); + sd.setBucketCols(new ArrayList(3)); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters() + .put(Constants.SERIALIZATION_FORMAT, "9"); + sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName()); + sd.setInputFormat(HBaseInputFormat.class.getName()); + sd.setOutputFormat("NotRequired"); + + tbl.setSd(sd); + client.createTable(tbl); + + } + + public void populateTable() throws IOException { + List myPuts = generatePuts(10); + HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tableName)); + table.put(myPuts); + } + + @Test + public void TestHBaseTableReadMR() throws Exception { + + Configuration conf = new Configuration(); + // include hbase config in conf file + for (Map.Entry el : getHbaseConf()) { + if (el.getKey().startsWith("hbase.")) { + conf.set(el.getKey(), el.getValue()); + } + } + + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(getHiveConf().getAllProperties())); + + // create Hbase table using admin + createTable(tableName, new String[] { "testFamily" }); + registerHBaseTable(tableName); + populateTable(); + // output settings + Path outputDir = new Path(getTestDir(), "mapred/testHbaseTableMRRead"); + FileSystem fs = getFileSystem(); + if (fs.exists(outputDir)) { + fs.delete(outputDir, true); + } + // create job + Job job = new Job(conf, "hbase-mr-read-test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapReadHTable.class); + + job.getConfiguration().set(TableInputFormat.INPUT_TABLE, tableName); + + job.setInputFormatClass(HCatInputFormat.class); + InputJobInfo inputJobInfo = InputJobInfo.create( + MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null, null, + null); + HCatInputFormat.setOutputSchema(job, getSchema()); + HCatInputFormat.setInput(job, inputJobInfo); + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(0); + assertTrue(job.waitForCompletion(true)); + assertTrue(MapReadHTable.error == false); + } + + public static class MapReadHTable + extends + Mapper { + + static boolean error = false; + + @Override + public void map(ImmutableBytesWritable key, HCatRecord value, + Context context) throws IOException, InterruptedException { + boolean correctValues = (value.size() == 3) + && (value.get(0).toString()).startsWith("testRow") + && (value.get(1).toString()).startsWith("testQualifier1") + && (value.get(2).toString()).startsWith("testQualifier2"); + + if (correctValues == false) { + error = true; + } + } + } + + private HCatSchema getSchema() throws HCatException { + + HCatSchema schema = new HCatSchema(new ArrayList()); + schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING, + "")); + schema.append(new HCatFieldSchema("testqualifier1", + HCatFieldSchema.Type.STRING, "")); + schema.append(new HCatFieldSchema("testqualifier2", + HCatFieldSchema.Type.STRING, "")); + return schema; + } +} Index: src/java/org/apache/hcatalog/mapreduce/InitializeInput.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision 1178480) +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; @@ -48,22 +49,12 @@ /** The prefix for keys used for storage driver arguments */ static final String HCAT_KEY_PREFIX = "hcat."; - private static final HiveConf hiveConf = new HiveConf(HCatInputFormat.class); + private static HiveConf hiveConf; private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, InputJobInfo inputJobInfo) throws Exception { - if (inputJobInfo.getServerUri() != null){ - hiveConf.set("hive.metastore.local", "false"); - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, inputJobInfo.getServerUri()); - } - - String kerberosPrincipal = inputJobInfo.getServerKerberosPrincipal(); - if(kerberosPrincipal != null){ - hiveConf.setBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, true); - hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, kerberosPrincipal); - } - - return new HiveMetaStoreClient(hiveConf,null); + hiveConf = getHiveConf(inputJobInfo, conf); + return new HiveMetaStoreClient(hiveConf, null); } /** @@ -207,4 +198,82 @@ return new StorerInfo(inputSDClass, outputSDClass, hcatProperties); } + static HiveConf getHiveConf(InputJobInfo iInfo, Configuration conf) + throws IOException { + + HiveConf hiveConf = new HiveConf(HCatInputFormat.class); + + if (iInfo.getServerUri() != null) { + // User specified a thrift url + + hiveConf.set("hive.metastore.local", "false"); + hiveConf.set(ConfVars.METASTOREURIS.varname, iInfo.getServerUri()); + + String kerberosPrincipal = iInfo.getServerKerberosPrincipal(); + if (kerberosPrincipal != null) { + hiveConf.setBoolean( + HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, + true); + hiveConf.set( + HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, + kerberosPrincipal); + } else { + + kerberosPrincipal = conf + .get(HCatConstants.HCAT_METASTORE_PRINCIPAL); + + if (kerberosPrincipal == null) { + kerberosPrincipal = conf + .get(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); + } + if (kerberosPrincipal != null) { + hiveConf.setBoolean( + ConfVars.METASTORE_USE_THRIFT_SASL.varname, true); + hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, + kerberosPrincipal); + } + + if (conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + hiveConf.set("hive.metastore.token.signature", + conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); + } + } + + } else { + // Thrift url is null, copy the hive conf into the job conf and + // restore it + // in the backend context + + if (conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null) { + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(hiveConf.getAllProperties())); + } else { + // Copy configuration properties into the hive conf + Properties properties = (Properties) HCatUtil.deserialize(conf + .get(HCatConstants.HCAT_KEY_HIVE_CONF)); + + for (Map.Entry prop : properties.entrySet()) { + if (prop.getValue() instanceof String) { + hiveConf.set((String) prop.getKey(), + (String) prop.getValue()); + } else if (prop.getValue() instanceof Integer) { + hiveConf.setInt((String) prop.getKey(), + (Integer) prop.getValue()); + } else if (prop.getValue() instanceof Boolean) { + hiveConf.setBoolean((String) prop.getKey(), + (Boolean) prop.getValue()); + } else if (prop.getValue() instanceof Long) { + hiveConf.setLong((String) prop.getKey(), + (Long) prop.getValue()); + } else if (prop.getValue() instanceof Float) { + hiveConf.setFloat((String) prop.getKey(), + (Float) prop.getValue()); + } + } + } + + } + return hiveConf; + } + }