Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java (revision 0) @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; +import org.apache.hcatalog.mapreduce.HCatTableInfo; +import org.apache.hcatalog.mapreduce.InputJobInfo; + +/** + * The Class HBaseInputStorageDriver enables reading of HBase tables through + * HCatalog. + */ +public class HBaseInputStorageDriver extends HCatInputStorageDriver { + private HCatTableInfo tableInfo; + private ResultConverter converter; + private HCatSchema outputColSchema; + private HCatSchema dataSchema; + private Configuration jobConf; + + /* + * @param JobContext + * + * @param hcatProperties + * + * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver + * #initialize(org.apache.hadoop.mapreduce.JobContext, java.util.Properties) + */ + @Override + public void initialize(JobContext context, Properties hcatProperties) { + jobConf = context.getConfiguration(); + try { + String jobString = context.getConfiguration().get( + HCatConstants.HCAT_KEY_JOB_INFO); + if (jobString == null) { + throw new IOException( + "InputJobInfo information not found in JobContext. " + + "HCatInputFormat.setInput() not called?"); + } + InputJobInfo jobInfo = (InputJobInfo) HCatUtil + .deserialize(jobString); + tableInfo = jobInfo.getTableInfo(); + dataSchema = tableInfo.getDataColumns(); + List fields = HCatUtil + .getFieldSchemaList(outputColSchema.getFields()); + hcatProperties.setProperty(Constants.LIST_COLUMNS, + MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); + hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES, + MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); + converter = new HBaseSerDeResultConverter(dataSchema, + outputColSchema, hcatProperties); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + /* + * @param hcatProperties + * + * @return InputFormat + * + * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver + * #getInputFormat(java.util.Properties) + */ + @Override + public InputFormat getInputFormat( + Properties hcatProperties) { + HBaseInputFormat tableInputFormat = new HBaseInputFormat(); + jobConf.set(TableInputFormat.INPUT_TABLE, tableInfo.getTableName()); + tableInputFormat.setConf(jobConf); + // TODO: Make the caching configurable by the user + tableInputFormat.getScan().setCaching(200); + tableInputFormat.getScan().setCacheBlocks(false); + return tableInputFormat; + } + + /* + * @param baseKey + * + * @param baseValue + * + * @return HCatRecord + * + * @throws IOException + * + * @see + * org.apache.hcatalog.mapreduce.HCatInputStorageDriver#convertToHCatRecord + * (org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable) + */ + @Override + public HCatRecord convertToHCatRecord(WritableComparable baseKey, + Writable baseValue) throws IOException { + return this.converter.convert((Result) baseValue); + } + + /* + * @param jobContext + * + * @param howlSchema + * + * @throws IOException + * + * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver# + * setOutputSchema(org.apache.hadoop.mapreduce.JobContext, + * org.apache.hcatalog.data.schema.HCatSchema) + */ + @Override + public void setOutputSchema(JobContext jobContext, HCatSchema howlSchema) + throws IOException { + outputColSchema = howlSchema; + } + + /* + * @param jobContext + * + * @param partitionValues + * + * @throws IOException + * + * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver + * #setPartitionValues(org.apache.hadoop.mapreduce.JobContext, + * java.util.Map) + */ + @Override + public void setPartitionValues(JobContext jobContext, + Map partitionValues) throws IOException { + } + + /* + * @param jobContext + * + * @param hcatSchema + * + * @throws IOException + * + * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver + * #setOriginalSchema(org.apache.hadoop.mapreduce.JobContext, + * org.apache.hcatalog.data.schema.HCatSchema) + */ + @Override + public void setOriginalSchema(JobContext jobContext, HCatSchema hcatSchema) + throws IOException { + this.dataSchema = hcatSchema; + } +} Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (revision 0) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (revision 0) @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase. + */ +class HBaseInputFormat extends InputFormat { + + private final TableInputFormat inputFormat; + + public HBaseInputFormat() { + inputFormat = new TableInputFormat(); + } + + /* + * @param instance of InputSplit + * + * @param instance of TaskAttemptContext + * + * @return RecordReader + * + * @throws IOException + * + * @throws InterruptedException + * + * @see + * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache + * .hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext tac) throws IOException, + InterruptedException { + return inputFormat.createRecordReader(split, tac); + } + + /* + * @param jobContext + * + * @return List of InputSplit + * + * @throws IOException + * + * @throws InterruptedException + * + * @see + * org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce + * .JobContext) + */ + @Override + public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + return inputFormat.getSplits(jobContext); + } + + public void setConf(Configuration conf) { + inputFormat.setConf(conf); + } + + public Scan getScan() { + return inputFormat.getScan(); + } + + public void setScan(Scan scan) { + inputFormat.setScan(scan); + } + +} Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java =================================================================== --- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java (revision 0) +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java (revision 0) @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.junit.Test; + +public class TestHBaseInputStorageDriver extends SkeletonHBaseTest { + + private final byte[] FAMILY = Bytes.toBytes("testFamily"); + private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); + private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2"); + private final String tableName = "mytesttable"; + + List generatePuts(int num) { + List myPuts = new ArrayList(); + for (int i = 0; i < num; i++) { + Put put = new Put(Bytes.toBytes("testRow" + i)); + put.add(FAMILY, QUALIFIER1, 0, + Bytes.toBytes("testQualifier1-" + "textValue-" + i)); + put.add(FAMILY, QUALIFIER2, 0, + Bytes.toBytes("testQualifier2-" + "textValue-" + i)); + myPuts.add(put); + } + return myPuts; + } + + private void registerHBaseTable(String tableName) throws Exception { + + String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME; + HiveMetaStoreClient client = getCluster().getHiveMetaStoreClient(); + try { + client.dropTable(databaseName, tableName); + } catch (Exception e) { + } // can fail with NoSuchObjectException + + Table tbl = new Table(); + tbl.setDbName(databaseName); + tbl.setTableName(tableName); + tbl.setTableType(TableType.EXTERNAL_TABLE.toString()); + tbl.setPartitionKeys(new ArrayList()); + Map tableParams = new HashMap(); + tableParams.put(HCatConstants.HCAT_ISD_CLASS, + HBaseInputStorageDriver.class.getName()); + tableParams.put(HCatConstants.HCAT_OSD_CLASS, "NotRequired"); + tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY, + ":key,testFamily:testQualifier1,testFamily:testQualifier2"); + tableParams.put(Constants.SERIALIZATION_FORMAT, "9"); + tableParams.put(Constants.SERIALIZATION_NULL_FORMAT, "NULL"); + tbl.setParameters(tableParams); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(HCatUtil.getFieldSchemaList(getSchema().getFields())); + sd.setBucketCols(new ArrayList(3)); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters() + .put(Constants.SERIALIZATION_FORMAT, "9"); + sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName()); + sd.setInputFormat(HBaseInputFormat.class.getName()); + sd.setOutputFormat("NotRequired"); + + tbl.setSd(sd); + client.createTable(tbl); + + } + + public void populateTable() throws IOException { + List myPuts = generatePuts(10); + HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tableName)); + table.put(myPuts); + } + + @Test + public void TestHBaseTableReadMR() throws Exception { + + Configuration conf = new Configuration(); + // include hbase config in conf file + for (Map.Entry el : getHbaseConf()) { + if (el.getKey().startsWith("hbase.")) { + conf.set(el.getKey(), el.getValue()); + } + } + + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(getHiveConf().getAllProperties())); + + // create Hbase table using admin + createTable(tableName, new String[] { "testFamily" }); + registerHBaseTable(tableName); + populateTable(); + // output settings + Path outputDir = new Path(getTestDir(), "mapred/testHbaseTableMRRead"); + FileSystem fs = getFileSystem(); + if (fs.exists(outputDir)) { + fs.delete(outputDir, true); + } + // create job + Job job = new Job(conf, "hbase-mr-read-test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapReadHTable.class); + + job.getConfiguration().set(TableInputFormat.INPUT_TABLE, tableName); + + job.setInputFormatClass(HCatInputFormat.class); + InputJobInfo inputJobInfo = InputJobInfo.create( + MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null, null, + null); + HCatInputFormat.setOutputSchema(job, getSchema()); + HCatInputFormat.setInput(job, inputJobInfo); + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(0); + assertTrue(job.waitForCompletion(true)); + assertTrue(MapReadHTable.error == false); + } + + public static class MapReadHTable + extends + Mapper { + + static boolean error = false; + + @Override + public void map(ImmutableBytesWritable key, HCatRecord value, + Context context) throws IOException, InterruptedException { + boolean correctValues = (value.size() == 3) + && (value.get(0).toString()).startsWith("testRow") + && (value.get(1).toString()).startsWith("testQualifier1") + && (value.get(2).toString()).startsWith("testQualifier2"); + + if (correctValues == false) { + error = true; + } + } + } + + private HCatSchema getSchema() throws HCatException { + + HCatSchema schema = new HCatSchema(new ArrayList()); + schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING, + "")); + schema.append(new HCatFieldSchema("testqualifier1", + HCatFieldSchema.Type.STRING, "")); + schema.append(new HCatFieldSchema("testqualifier2", + HCatFieldSchema.Type.STRING, "")); + return schema; + } +} Index: src/java/org/apache/hcatalog/mapreduce/InitializeInput.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision 1178480) +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; @@ -39,172 +40,268 @@ import org.apache.hcatalog.data.schema.HCatSchema; /** - * The Class which handles querying the metadata server using the MetaStoreClient. The list of - * partitions matching the partition filter is fetched from the server and the information is - * serialized and written into the JobContext configuration. The inputInfo is also updated with - * info required in the client process context. + * The Class which handles querying the metadata server using the + * MetaStoreClient. The list of partitions matching the partition filter is + * fetched from the server and the information is serialized and written into + * the JobContext configuration. The inputInfo is also updated with info + * required in the client process context. */ public class InitializeInput { - - /** The prefix for keys used for storage driver arguments */ - static final String HCAT_KEY_PREFIX = "hcat."; - private static final HiveConf hiveConf = new HiveConf(HCatInputFormat.class); - - private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, InputJobInfo inputJobInfo) throws Exception { - - if (inputJobInfo.getServerUri() != null){ - hiveConf.set("hive.metastore.local", "false"); - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, inputJobInfo.getServerUri()); + + /** The prefix for keys used for storage driver arguments */ + static final String HCAT_KEY_PREFIX = "hcat."; + + private static HiveConf hiveConf; + + private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, + InputJobInfo inputJobInfo) throws Exception { + + hiveConf = getHiveConf(inputJobInfo, conf); + return new HiveMetaStoreClient(hiveConf, null); } - String kerberosPrincipal = inputJobInfo.getServerKerberosPrincipal(); - if(kerberosPrincipal != null){ - hiveConf.setBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, true); - hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, kerberosPrincipal); + /** + * Set the input to use for the Job. This queries the metadata server with + * the specified partition predicates, gets the matching partitions, puts + * the information in the configuration object. + * + * @param job the job object + * @param inputJobInfo information on the Input to read + * @throws Exception + */ + public static void setInput(Job job, InputJobInfo inputJobInfo) + throws Exception { + + // * Create and initialize an InputJobInfo object + // * Serialize the InputJobInfo and save in the Job's Configuration + // object + + HiveMetaStoreClient client = null; + + try { + client = createHiveMetaClient(job.getConfiguration(), inputJobInfo); + Table table = client.getTable(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName()); + + List partInfoList = new ArrayList(); + + if (table.getPartitionKeys().size() != 0) { + // Partitioned table + List parts = client.listPartitionsByFilter( + inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName(), inputJobInfo.getFilter(), + (short) -1); + + // Default to 100,000 partitions if hive.metastore.maxpartition + // is not defined + int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", + 100000); + if (parts != null && parts.size() > maxPart) { + throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, + "total number of partitions is " + parts.size()); + } + + // populate partition info + for (Partition ptn : parts) { + PartInfo partInfo = extractPartInfo(ptn.getSd(), + ptn.getParameters()); + partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn)); + partInfoList.add(partInfo); + } + + } else { + // Non partitioned table + PartInfo partInfo = extractPartInfo(table.getSd(), + table.getParameters()); + partInfo.setPartitionValues(new HashMap()); + partInfoList.add(partInfo); + } + inputJobInfo.setPartitions(partInfoList); + inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); + + job.getConfiguration().set(HCatConstants.HCAT_KEY_JOB_INFO, + HCatUtil.serialize(inputJobInfo)); + } finally { + if (client != null) { + client.close(); + } + } } - - return new HiveMetaStoreClient(hiveConf,null); - } - - /** - * Set the input to use for the Job. This queries the metadata server with the specified partition predicates, - * gets the matching partitions, puts the information in the configuration object. - * @param job the job object - * @param inputJobInfo information on the Input to read - * @throws Exception - */ - public static void setInput(Job job, InputJobInfo inputJobInfo) throws Exception { - - //* Create and initialize an InputJobInfo object - //* Serialize the InputJobInfo and save in the Job's Configuration object - - HiveMetaStoreClient client = null; - - try { - client = createHiveMetaClient(job.getConfiguration(),inputJobInfo); - Table table = client.getTable(inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName()); - - List partInfoList = new ArrayList(); - - if( table.getPartitionKeys().size() != 0 ) { - //Partitioned table - List parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName(), - inputJobInfo.getFilter(), - (short) -1); - - // Default to 100,000 partitions if hive.metastore.maxpartition is not defined - int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000); - if (parts != null && parts.size() > maxPart) { - throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size()); + + private static Map createPtnKeyValueMap(Table table, + Partition ptn) throws IOException { + List values = ptn.getValues(); + if (values.size() != table.getPartitionKeys().size()) { + throw new IOException( + "Partition values in partition inconsistent with table definition, table " + + table.getTableName() + " has " + + table.getPartitionKeys().size() + + " partition keys, partition has " + values.size() + + "partition values"); } - - // populate partition info - for (Partition ptn : parts){ - PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters()); - partInfo.setPartitionValues(createPtnKeyValueMap(table,ptn)); - partInfoList.add(partInfo); + + Map ptnKeyValues = new HashMap(); + + int i = 0; + for (FieldSchema schema : table.getPartitionKeys()) { + // CONCERN : the way this mapping goes, the order *needs* to be + // preserved for table.getPartitionKeys() and ptn.getValues() + ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i)); + i++; } - - }else{ - //Non partitioned table - PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters()); - partInfo.setPartitionValues(new HashMap()); - partInfoList.add(partInfo); - } - inputJobInfo.setPartitions(partInfoList); - inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); - - job.getConfiguration().set( - HCatConstants.HCAT_KEY_JOB_INFO, - HCatUtil.serialize(inputJobInfo) - ); - } finally { - if (client != null ) { - client.close(); - } + + return ptnKeyValues; } - } - - private static Map createPtnKeyValueMap(Table table, Partition ptn) throws IOException{ - List values = ptn.getValues(); - if( values.size() != table.getPartitionKeys().size() ) { - throw new IOException("Partition values in partition inconsistent with table definition, table " - + table.getTableName() + " has " - + table.getPartitionKeys().size() - + " partition keys, partition has " + values.size() + "partition values" ); + + static PartInfo extractPartInfo(StorageDescriptor sd, + Map parameters) throws IOException { + HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd); + String inputStorageDriverClass = null; + Properties hcatProperties = new Properties(); + if (parameters.containsKey(HCatConstants.HCAT_ISD_CLASS)) { + inputStorageDriverClass = parameters + .get(HCatConstants.HCAT_ISD_CLASS); + } else { + // attempt to default to RCFile if the storage descriptor says it's + // an RCFile + if ((sd.getInputFormat() != null) + && (sd.getInputFormat() + .equals(HCatConstants.HIVE_RCFILE_IF_CLASS))) { + inputStorageDriverClass = HCatConstants.HCAT_RCFILE_ISD_CLASS; + } else { + throw new IOException( + "No input storage driver classname found, cannot read partition"); + } + } + for (String key : parameters.keySet()) { + if (key.startsWith(HCAT_KEY_PREFIX)) { + hcatProperties.put(key, parameters.get(key)); + } + } + return new PartInfo(schema, inputStorageDriverClass, sd.getLocation(), + hcatProperties); } - - Map ptnKeyValues = new HashMap(); - - int i = 0; - for(FieldSchema schema : table.getPartitionKeys()) { - // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues() - ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i)); - i++; + + static StorerInfo extractStorerInfo(StorageDescriptor sd, + Map properties) throws IOException { + String inputSDClass, outputSDClass; + + if (properties.containsKey(HCatConstants.HCAT_ISD_CLASS)) { + inputSDClass = properties.get(HCatConstants.HCAT_ISD_CLASS); + } else { + // attempt to default to RCFile if the storage descriptor says it's + // an RCFile + if ((sd.getInputFormat() != null) + && (sd.getInputFormat() + .equals(HCatConstants.HIVE_RCFILE_IF_CLASS))) { + inputSDClass = HCatConstants.HCAT_RCFILE_ISD_CLASS; + } else { + throw new IOException( + "No input storage driver classname found for table, cannot write partition"); + } + } + + if (properties.containsKey(HCatConstants.HCAT_OSD_CLASS)) { + outputSDClass = properties.get(HCatConstants.HCAT_OSD_CLASS); + } else { + // attempt to default to RCFile if the storage descriptor says it's + // an RCFile + if ((sd.getOutputFormat() != null) + && (sd.getOutputFormat() + .equals(HCatConstants.HIVE_RCFILE_OF_CLASS))) { + outputSDClass = HCatConstants.HCAT_RCFILE_OSD_CLASS; + } else { + throw new IOException( + "No output storage driver classname found for table, cannot write partition"); + } + } + + Properties hcatProperties = new Properties(); + for (String key : properties.keySet()) { + if (key.startsWith(HCAT_KEY_PREFIX)) { + hcatProperties.put(key, properties.get(key)); + } + } + + return new StorerInfo(inputSDClass, outputSDClass, hcatProperties); } - - return ptnKeyValues; - } - - static PartInfo extractPartInfo(StorageDescriptor sd, Map parameters) throws IOException{ - HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd); - String inputStorageDriverClass = null; - Properties hcatProperties = new Properties(); - if (parameters.containsKey(HCatConstants.HCAT_ISD_CLASS)){ - inputStorageDriverClass = parameters.get(HCatConstants.HCAT_ISD_CLASS); - }else{ - // attempt to default to RCFile if the storage descriptor says it's an RCFile - if ((sd.getInputFormat() != null) && (sd.getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS))){ - inputStorageDriverClass = HCatConstants.HCAT_RCFILE_ISD_CLASS; - }else{ - throw new IOException("No input storage driver classname found, cannot read partition"); - } + + static HiveConf getHiveConf(InputJobInfo iInfo, Configuration conf) + throws IOException { + + HiveConf hiveConf = new HiveConf(HCatInputFormat.class); + + if (iInfo.getServerUri() != null) { + // User specified a thrift url + + hiveConf.set("hive.metastore.local", "false"); + hiveConf.set(ConfVars.METASTOREURIS.varname, iInfo.getServerUri()); + + String kerberosPrincipal = iInfo.getServerKerberosPrincipal(); + if (kerberosPrincipal != null) { + hiveConf.setBoolean( + HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, + true); + hiveConf.set( + HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, + kerberosPrincipal); + } else { + + kerberosPrincipal = conf + .get(HCatConstants.HCAT_METASTORE_PRINCIPAL); + + if (kerberosPrincipal == null) { + kerberosPrincipal = conf + .get(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); + } + if (kerberosPrincipal != null) { + hiveConf.setBoolean( + ConfVars.METASTORE_USE_THRIFT_SASL.varname, true); + hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, + kerberosPrincipal); + } + + if (conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + hiveConf.set("hive.metastore.token.signature", + conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); + } + } + + } else { + // Thrift url is null, copy the hive conf into the job conf and + // restore it + // in the backend context + + if (conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null) { + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(hiveConf.getAllProperties())); + } else { + // Copy configuration properties into the hive conf + Properties properties = (Properties) HCatUtil.deserialize(conf + .get(HCatConstants.HCAT_KEY_HIVE_CONF)); + + for (Map.Entry prop : properties.entrySet()) { + if (prop.getValue() instanceof String) { + hiveConf.set((String) prop.getKey(), + (String) prop.getValue()); + } else if (prop.getValue() instanceof Integer) { + hiveConf.setInt((String) prop.getKey(), + (Integer) prop.getValue()); + } else if (prop.getValue() instanceof Boolean) { + hiveConf.setBoolean((String) prop.getKey(), + (Boolean) prop.getValue()); + } else if (prop.getValue() instanceof Long) { + hiveConf.setLong((String) prop.getKey(), + (Long) prop.getValue()); + } else if (prop.getValue() instanceof Float) { + hiveConf.setFloat((String) prop.getKey(), + (Float) prop.getValue()); + } + } + } + + } + return hiveConf; } - for (String key : parameters.keySet()){ - if (key.startsWith(HCAT_KEY_PREFIX)){ - hcatProperties.put(key, parameters.get(key)); - } - } - return new PartInfo(schema,inputStorageDriverClass, sd.getLocation(), hcatProperties); - } - - - - static StorerInfo extractStorerInfo(StorageDescriptor sd, Map properties) throws IOException { - String inputSDClass, outputSDClass; - - if (properties.containsKey(HCatConstants.HCAT_ISD_CLASS)){ - inputSDClass = properties.get(HCatConstants.HCAT_ISD_CLASS); - }else{ - // attempt to default to RCFile if the storage descriptor says it's an RCFile - if ((sd.getInputFormat() != null) && (sd.getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS))){ - inputSDClass = HCatConstants.HCAT_RCFILE_ISD_CLASS; - }else{ - throw new IOException("No input storage driver classname found for table, cannot write partition"); - } - } - - if (properties.containsKey(HCatConstants.HCAT_OSD_CLASS)){ - outputSDClass = properties.get(HCatConstants.HCAT_OSD_CLASS); - }else{ - // attempt to default to RCFile if the storage descriptor says it's an RCFile - if ((sd.getOutputFormat() != null) && (sd.getOutputFormat().equals(HCatConstants.HIVE_RCFILE_OF_CLASS))){ - outputSDClass = HCatConstants.HCAT_RCFILE_OSD_CLASS; - }else{ - throw new IOException("No output storage driver classname found for table, cannot write partition"); - } - } - - Properties hcatProperties = new Properties(); - for (String key : properties.keySet()){ - if (key.startsWith(HCAT_KEY_PREFIX)){ - hcatProperties.put(key, properties.get(key)); - } - } - - return new StorerInfo(inputSDClass, outputSDClass, hcatProperties); - } - + }