Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java (revision 1182480) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java (working copy) @@ -63,6 +63,7 @@ private HCatSchema outputSchema; private StructObjectInspector hCatRecordOI; private StructObjectInspector lazyHBaseRowOI; + private String hbaseColumnMapping; private final Long outputVersion; /** @@ -75,8 +76,8 @@ HCatSchema outputSchema, Properties hcatProperties) throws IOException { - hcatProperties.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, - hcatProperties.getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY)); + hbaseColumnMapping = hcatProperties.getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY); + hcatProperties.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,hbaseColumnMapping); if(hcatProperties.containsKey(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)) outputVersion = Long.parseLong(hcatProperties.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)); @@ -264,4 +265,46 @@ throw new IOException("Unknown field schema type"); } } + + @SuppressWarnings("static-access") + public String getHBaseScanColumnsInfo() throws IOException { + StringBuilder sb = new StringBuilder(); + if(hbaseColumnMapping == null){ + throw new IOException("HBase column mapping found to be null."); + } + + List outputFieldNames = this.outputSchema.getFieldNames(); + List outputColumnMapping = new ArrayList(); + for(String fieldName: outputFieldNames){ + int position = schema.getPosition(fieldName); + outputColumnMapping.add(position); + } + + try { + List columnFamilies = new ArrayList(); + List columnQualifiers = new ArrayList(); + serDe.parseColumnMapping(hbaseColumnMapping, columnFamilies, null, columnQualifiers, null); + for(int i = 0; i < outputColumnMapping.size(); i++){ + int cfIndex = outputColumnMapping.get(i); + // We skip the index 0, which is the key column. We get that anyway + // using the scan. + if (cfIndex != 0) { + String cf = columnFamilies.get(cfIndex); + String qualifier = columnQualifiers.get(i); + sb.append(cf); + sb.append(":"); + if (qualifier != null) { + sb.append(qualifier); + } + sb.append(" "); + } + } + + } catch (SerDeException e) { + + throw new IOException(e.getMessage()); + } + + return sb.toString(); + } } Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java (revision 1182480) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; import org.apache.hcatalog.mapreduce.HCatTableInfo; @@ -52,12 +53,14 @@ private HCatSchema outputColSchema; private HCatSchema dataSchema; private Configuration jobConf; - + private String scanColumns; + private int scanRowCaching; + /* * @param JobContext - * + * * @param hcatProperties - * + * * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver * #initialize(org.apache.hadoop.mapreduce.JobContext, java.util.Properties) */ @@ -65,8 +68,7 @@ public void initialize(JobContext context, Properties hcatProperties) { jobConf = context.getConfiguration(); try { - String jobString = context.getConfiguration().get( - HCatConstants.HCAT_KEY_JOB_INFO); + String jobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO); if (jobString == null) { throw new IOException( "InputJobInfo information not found in JobContext. " @@ -77,24 +79,27 @@ tableInfo = jobInfo.getTableInfo(); dataSchema = tableInfo.getDataColumns(); List fields = HCatUtil - .getFieldSchemaList(outputColSchema.getFields()); + .getFieldSchemaList(dataSchema.getFields()); hcatProperties.setProperty(Constants.LIST_COLUMNS, MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES, MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); converter = new HBaseSerDeResultConverter(dataSchema, outputColSchema, hcatProperties); + scanColumns = ((HBaseSerDeResultConverter)converter).getHBaseScanColumnsInfo(); + scanRowCaching = jobConf.getInt(HBaseConstants.PROPERTY_SCAN_ROW_CACHING, HBaseConstants.DEFAULT_SCAN_ROW_CACHING); + } catch (Exception e) { e.printStackTrace(); } - + } - + /* * @param hcatProperties - * + * * @return InputFormat - * + * * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver * #getInputFormat(java.util.Properties) */ @@ -103,22 +108,22 @@ Properties hcatProperties) { HBaseInputFormat tableInputFormat = new HBaseInputFormat(); jobConf.set(TableInputFormat.INPUT_TABLE, tableInfo.getTableName()); + jobConf.set(TableInputFormat.SCAN_COLUMNS, scanColumns); tableInputFormat.setConf(jobConf); - // TODO: Make the caching configurable by the user - tableInputFormat.getScan().setCaching(200); + tableInputFormat.getScan().setCaching(scanRowCaching); tableInputFormat.getScan().setCacheBlocks(false); return tableInputFormat; } - + /* * @param baseKey - * + * * @param baseValue - * + * * @return HCatRecord - * + * * @throws IOException - * + * * @see * org.apache.hcatalog.mapreduce.HCatInputStorageDriver#convertToHCatRecord * (org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable) @@ -128,14 +133,14 @@ Writable baseValue) throws IOException { return this.converter.convert((Result) baseValue); } - + /* * @param jobContext - * + * * @param howlSchema - * + * * @throws IOException - * + * * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver# * setOutputSchema(org.apache.hadoop.mapreduce.JobContext, * org.apache.hcatalog.data.schema.HCatSchema) @@ -143,16 +148,16 @@ @Override public void setOutputSchema(JobContext jobContext, HCatSchema howlSchema) throws IOException { - outputColSchema = howlSchema; + this.outputColSchema = howlSchema; } - + /* * @param jobContext - * + * * @param partitionValues - * + * * @throws IOException - * + * * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver * #setPartitionValues(org.apache.hadoop.mapreduce.JobContext, * java.util.Map) @@ -161,14 +166,14 @@ public void setPartitionValues(JobContext jobContext, Map partitionValues) throws IOException { } - + /* * @param jobContext - * + * * @param hcatSchema - * + * * @throws IOException - * + * * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver * #setOriginalSchema(org.apache.hadoop.mapreduce.JobContext, * org.apache.hcatalog.data.schema.HCatSchema) Index: storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java =================================================================== --- storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java (revision 1182480) +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java (working copy) @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.HBaseSerDe; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -59,12 +58,11 @@ import org.junit.Test; public class TestHBaseInputStorageDriver extends SkeletonHBaseTest { - + private final byte[] FAMILY = Bytes.toBytes("testFamily"); private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2"); - private final String tableName = "mytesttable"; - + List generatePuts(int num) { List myPuts = new ArrayList(); for (int i = 0; i < num; i++) { @@ -77,16 +75,16 @@ } return myPuts; } - + private void registerHBaseTable(String tableName) throws Exception { - + String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME; HiveMetaStoreClient client = getCluster().getHiveMetaStoreClient(); try { client.dropTable(databaseName, tableName); } catch (Exception e) { } // can fail with NoSuchObjectException - + Table tbl = new Table(); tbl.setDbName(databaseName); tbl.setTableName(tableName); @@ -101,7 +99,7 @@ tableParams.put(Constants.SERIALIZATION_FORMAT, "9"); tableParams.put(Constants.SERIALIZATION_NULL_FORMAT, "NULL"); tbl.setParameters(tableParams); - + StorageDescriptor sd = new StorageDescriptor(); sd.setCols(HCatUtil.getFieldSchemaList(getSchema().getFields())); sd.setBucketCols(new ArrayList(3)); @@ -113,21 +111,21 @@ sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName()); sd.setInputFormat(HBaseInputFormat.class.getName()); sd.setOutputFormat("NotRequired"); - + tbl.setSd(sd); client.createTable(tbl); - + } - - public void populateTable() throws IOException { + + public void populateTable(String tableName) throws IOException { List myPuts = generatePuts(10); HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tableName)); table.put(myPuts); } - + @Test public void TestHBaseTableReadMR() throws Exception { - + String tableName = "testtableone"; Configuration conf = new Configuration(); // include hbase config in conf file for (Map.Entry el : getHbaseConf()) { @@ -135,14 +133,14 @@ conf.set(el.getKey(), el.getValue()); } } - + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties())); - + // create Hbase table using admin createTable(tableName, new String[] { "testFamily" }); registerHBaseTable(tableName); - populateTable(); + populateTable(tableName); // output settings Path outputDir = new Path(getTestDir(), "mapred/testHbaseTableMRRead"); FileSystem fs = getFileSystem(); @@ -153,14 +151,11 @@ Job job = new Job(conf, "hbase-mr-read-test"); job.setJarByClass(this.getClass()); job.setMapperClass(MapReadHTable.class); - - job.getConfiguration().set(TableInputFormat.INPUT_TABLE, tableName); - job.setInputFormatClass(HCatInputFormat.class); InputJobInfo inputJobInfo = InputJobInfo.create( MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null, null, null); - HCatInputFormat.setOutputSchema(job, getSchema()); + //HCatInputFormat.setOutputSchema(job, getSchema()); HCatInputFormat.setInput(job, inputJobInfo); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, outputDir); @@ -172,13 +167,60 @@ assertTrue(job.waitForCompletion(true)); assertTrue(MapReadHTable.error == false); } - - public static class MapReadHTable + + @Test + public void TestHBaseTableProjectionReadMR() throws Exception { + + String tableName = "testtabletwo"; + Configuration conf = new Configuration(); + // include hbase config in conf file + for (Map.Entry el : getHbaseConf()) { + if (el.getKey().startsWith("hbase.")) { + conf.set(el.getKey(), el.getValue()); + } + } + + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(getHiveConf().getAllProperties())); + + // create Hbase table using admin + createTable(tableName, new String[] { "testFamily" }); + registerHBaseTable(tableName); + populateTable(tableName); + // output settings + Path outputDir = new Path(getTestDir(), "mapred/testHBaseTableProjectionReadMR"); + FileSystem fs = getFileSystem(); + if (fs.exists(outputDir)) { + fs.delete(outputDir, true); + } + // create job + Job job = new Job(conf, "hbase-column-projection"); + job.setJarByClass(this.getClass()); + job.setMapperClass(MapReadProjHTable.class); + job.setInputFormatClass(HCatInputFormat.class); + InputJobInfo inputJobInfo = InputJobInfo.create( + MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null, null, + null); + HCatInputFormat.setOutputSchema(job, getProjectionSchema()); + HCatInputFormat.setInput(job, inputJobInfo); + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(0); + assertTrue(job.waitForCompletion(true)); + assertTrue(MapReadHTable.error == false); + } + + + static class MapReadHTable extends Mapper { - + static boolean error = false; - + @Override public void map(ImmutableBytesWritable key, HCatRecord value, Context context) throws IOException, InterruptedException { @@ -186,15 +228,34 @@ && (value.get(0).toString()).startsWith("testRow") && (value.get(1).toString()).startsWith("testQualifier1") && (value.get(2).toString()).startsWith("testQualifier2"); - + if (correctValues == false) { error = true; } } } - + + static class MapReadProjHTable + extends + Mapper { + + static boolean error = false; + + @Override + public void map(ImmutableBytesWritable key, HCatRecord value, + Context context) throws IOException, InterruptedException { + boolean correctValues = (value.size() == 2) + && (value.get(0).toString()).startsWith("testRow") + && (value.get(1).toString()).startsWith("testQualifier1"); + + if (correctValues == false) { + error = true; + } + } + } + private HCatSchema getSchema() throws HCatException { - + HCatSchema schema = new HCatSchema(new ArrayList()); schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING, "")); @@ -204,4 +265,14 @@ HCatFieldSchema.Type.STRING, "")); return schema; } + + private HCatSchema getProjectionSchema() throws HCatException { + + HCatSchema schema = new HCatSchema(new ArrayList()); + schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING, + "")); + schema.append(new HCatFieldSchema("testqualifier1", + HCatFieldSchema.Type.STRING, "")); + return schema; + } } Index: storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java =================================================================== --- storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java (revision 1182480) +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java (working copy) @@ -26,7 +26,7 @@ */ class HBaseConstants { - /** key used to define th version number HBaseOutputStorage driver to use when writing out data for a job */ + /** key used to define the version number HBaseOutputStorage driver to use when writing out data for a job */ public static final String PROPERTY_OUTPUT_VERSION_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputVersion"; /** key used to define the name of the table to write to */ @@ -35,7 +35,12 @@ /** key used to define the column mapping of hbase to hcatalog schema */ public static final String PROPERTY_COLUMN_MAPPING_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+ HBaseSerDe.HBASE_COLUMNS_MAPPING; - /** key used to define wether bulk storage driver will be used or not */ + /** key used to define whether bulk storage driver will be used or not */ public static final String PROPERTY_OSD_BULK_MODE_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.output.bulkMode"; + /** key used to set the number of rows for caching that will be passed to hbase scanners */ + public static final String PROPERTY_SCAN_ROW_CACHING = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.scan.row.caching"; + + /** Default number of rows for caching that will be passed to hbase scanners */ + static final int DEFAULT_SCAN_ROW_CACHING = 200; }