diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 65c81bf..6832543 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -69,11 +68,16 @@ public static final String HBASE_SCAN_CACHE = "hbase.scan.cache"; public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock"; public static final String HBASE_SCAN_BATCH = "hbase.scan.batch"; + + /** Determines whether a regex matching should be done on the columns or not. Defaults to true. + * Note that currently this does not support full regex matching **/ + public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching"; public static final Log LOG = LogFactory.getLog(HBaseSerDe.class); private ObjectInspector cachedObjectInspector; private String hbaseColumnsMapping; + private boolean doColumnRegexMatching; private List columnsMapping; private SerDeParameters serdeParams; private boolean useJSONSerialize; @@ -148,6 +152,21 @@ public void initialize(Configuration conf, Properties tbl) */ public static List parseColumnsMapping(String columnsMappingSpec) throws SerDeException { + return parseColumnsMapping(columnsMappingSpec, true); + } + + /** + * Parses the HBase columns mapping specifier to identify the column families, qualifiers + * and also caches the byte arrays corresponding to them. One of the Hive table + * columns maps to the HBase row key, by default the first column. + * + * @param columnsMappingSpec string hbase.columns.mapping specified when creating table + * @param doColumnRegexMatching whether to do a regex matching on the columns or not + * @return List which contains the column mapping information by position + * @throws SerDeException + */ + public static List parseColumnsMapping(String columnsMappingSpec, boolean doColumnRegexMatching) + throws SerDeException { if (columnsMappingSpec == null) { throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table."); @@ -193,8 +212,21 @@ public void initialize(Configuration conf, Properties tbl) columnMapping.hbaseRowKey = false; if (parts.length == 2) { - columnMapping.qualifierName = parts[1]; - columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]); + + if (doColumnRegexMatching && parts[1].endsWith(".*")) { + // we have a prefix with a wildcard + columnMapping.qualifierPrefix = parts[1].substring(0, parts[1].length() - 2); + columnMapping.qualifierPrefixBytes = Bytes.toBytes(columnMapping.qualifierPrefix); + // we weren't provided any actual qualifier name. Set these to + // null. + columnMapping.qualifierName = null; + columnMapping.qualifierNameBytes = null; + } else { + // set the regular provided qualifier names + columnMapping.qualifierName = parts[1]; + columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]); + ; + } } else { columnMapping.qualifierName = null; columnMapping.qualifierNameBytes = null; @@ -413,6 +445,8 @@ public static boolean isRowKeyColumn(String hbaseColumnName) { List binaryStorage; boolean hbaseRowKey; String mappingSpec; + String qualifierPrefix; + byte[] qualifierPrefixBytes; } private void initHBaseSerDeParameters( @@ -424,8 +458,10 @@ private void initHBaseSerDeParameters( String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1")); + doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true")); + // Parse and initialize the HBase columns mapping - columnsMapping = parseColumnsMapping(hbaseColumnsMapping); + columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); // Build the type property string if not supplied if (columnTypeProperty == null) { @@ -798,6 +834,7 @@ int getKeyColumnOffset() { return columnsMapping.get(colPos).binaryStorage; } + @Override public SerDeStats getSerDeStats() { // no support for statistics return null; diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index b550f45..c6e3356 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -39,9 +39,9 @@ import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; @@ -279,6 +279,8 @@ public void configureTableJobProperties( jobProperties.put( HBaseSerDe.HBASE_COLUMNS_MAPPING, tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING)); + jobProperties.put(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, + tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, "true")); jobProperties.put(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE,"string")); String scanCache = tableProperties.getProperty(HBaseSerDe.HBASE_SCAN_CACHE); diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 01938a7..e3a8133 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -91,11 +91,12 @@ String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); List columnsMapping = null; try { - columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping); + columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); } catch (SerDeException e) { throw new IOException(e); } @@ -434,6 +435,7 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); if (hbaseColumnsMapping == null) { throw new IOException("hbase.columns.mapping required for HBase Table."); @@ -441,7 +443,7 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( List columnsMapping = null; try { - columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping); + columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching); } catch (SerDeException e) { throw new IOException(e); } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java index a8ba9d9..cedef10 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java @@ -21,10 +21,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.Map.Entry; +import java.util.NavigableMap; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyMap; @@ -42,6 +43,7 @@ private Result result; private byte [] columnFamilyBytes; + private byte[] qualPrefix; private List binaryStorage; /** @@ -54,12 +56,21 @@ public LazyHBaseCellMap(LazyMapObjectInspector oi) { public void init( Result r, - byte [] columnFamilyBytes, + byte[] columnFamilyBytes, List binaryStorage) { + init(r, columnFamilyBytes, binaryStorage, null); + } + + public void init( + Result r, + byte [] columnFamilyBytes, + List binaryStorage, byte[] qualPrefix) { + result = r; this.columnFamilyBytes = columnFamilyBytes; this.binaryStorage = binaryStorage; + this.qualPrefix = qualPrefix; setParsed(false); } @@ -80,6 +91,12 @@ private void parse() { continue; } + if (qualPrefix != null && !Bytes.startsWith(e.getKey(), qualPrefix)) { + // since we were provided a qualifier prefix, only accept qualifiers that start with this + // prefix + continue; + } + LazyMapObjectInspector lazyMoi = getInspector(); // Keys are always primitive diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java index 10a9207..b254b0d 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java @@ -142,9 +142,11 @@ private Object uncheckedGetField(int fieldID) { } else { if (colMap.qualifierName == null) { // it is a column family - // primitive type for Map can be stored in binary format + // primitive type for Map can be stored in binary format. Pass in the + // qualifier prefix to cherry pick the qualifiers that match the prefix instead of picking + // up everything ((LazyHBaseCellMap) fields[fieldID]).init( - result, colMap.familyNameBytes, colMap.binaryStorage); + result, colMap.familyNameBytes, colMap.binaryStorage, colMap.qualifierPrefixBytes); } else { // it is a column i.e. a column-family with column-qualifier byte [] res = result.getValue(colMap.familyNameBytes, colMap.qualifierNameBytes); diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java index e821282..d25c731 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.hbase; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; @@ -688,4 +690,123 @@ private void deserializeAndSerializeHiveMapHBaseColumnFamilyII( Put serializedPut = (Put) hbaseSerDe.serialize(row, soi); assertEquals("Serialized data: ", p.toString(), serializedPut.toString()); } + + public void testHBaseSerDeWithColumnPrefixes() + throws Exception { + byte[] cfa = "cola".getBytes(); + + byte[] qualA = "prefixA_col1".getBytes(); + byte[] qualB = "prefixB_col2".getBytes(); + byte[] qualC = "prefixB_col3".getBytes(); + byte[] qualD = "unwanted_col".getBytes(); + + List qualifiers = new ArrayList(); + qualifiers.add(new Text("prefixA_col1")); + qualifiers.add(new Text("prefixB_col2")); + qualifiers.add(new Text("prefixB_col3")); + qualifiers.add(new Text("unwanted_col")); + + List expectedQualifiers = new ArrayList(); + expectedQualifiers.add(new Text("prefixA_col1")); + expectedQualifiers.add(new Text("prefixB_col2")); + expectedQualifiers.add(new Text("prefixB_col3")); + + byte[] rowKey = Bytes.toBytes("test-row1"); + + // Data + List kvs = new ArrayList(); + + byte[] dataA = "This is first test data".getBytes(); + byte[] dataB = "This is second test data".getBytes(); + byte[] dataC = "This is third test data".getBytes(); + byte[] dataD = "Unwanted data".getBytes(); + + kvs.add(new KeyValue(rowKey, cfa, qualA, dataA)); + kvs.add(new KeyValue(rowKey, cfa, qualB, dataB)); + kvs.add(new KeyValue(rowKey, cfa, qualC, dataC)); + kvs.add(new KeyValue(rowKey, cfa, qualD, dataD)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + p.add(new KeyValue(rowKey, cfa, qualA, dataA)); + p.add(new KeyValue(rowKey, cfa, qualB, dataB)); + p.add(new KeyValue(rowKey, cfa, qualC, dataC)); + + Object[] expectedFieldsData = { + new Text("test-row1"), + new String("This is first test data"), + new String("This is second test data"), + new String("This is third test data")}; + + int[] expectedMapSize = new int[] {1, 2}; + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForColumnPrefixes(); + serDe.initialize(conf, tbl); + + Object notPresentKey = new Text("unwanted_col"); + + deserializeAndSerializeHivePrefixColumnFamily(serDe, r, p, expectedFieldsData, expectedMapSize, + expectedQualifiers, + notPresentKey); + } + + private Properties createPropertiesForColumnPrefixes() { + Properties tbl = new Properties(); + tbl.setProperty(serdeConstants.LIST_COLUMNS, + "key,astring,along"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "string:map:map"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, + ":key,cola:prefixA_.*,cola:prefixB_.*"); + + return tbl; + } + + private void deserializeAndSerializeHivePrefixColumnFamily(HBaseSerDe serDe, Result r, Put p, + Object[] expectedFieldsData, int[] expectedMapSize, List expectedQualifiers, + Object notPresentKey) + throws SerDeException, IOException { + StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector(); + + List fieldRefs = soi.getAllStructFieldRefs(); + + Object row = serDe.deserialize(r); + + int j = 0; + + for (int i = 0; i < fieldRefs.size(); i++) { + Object fieldData = soi.getStructFieldData(row, fieldRefs.get(i)); + assertNotNull(fieldData); + + if (fieldData instanceof LazyPrimitive) { + assertEquals(expectedFieldsData[i], ((LazyPrimitive) fieldData).getWritableObject()); + } else if (fieldData instanceof LazyHBaseCellMap) { + assertEquals(expectedFieldsData[i], ((LazyHBaseCellMap) fieldData) + .getMapValueElement(expectedQualifiers.get(j)).toString().trim()); + + assertEquals(expectedMapSize[j], ((LazyHBaseCellMap) fieldData).getMapSize()); + // Make sure that the unwanted key is not present in the map + assertNull(((LazyHBaseCellMap) fieldData).getMapValueElement(notPresentKey)); + + j++; + + } else { + fail("Error: field data not an instance of LazyPrimitive or LazyHBaseCellMap"); + } + } + + SerDeUtils.getJSONString(row, soi); + + // Now serialize + Put put = (Put) serDe.serialize(row, soi); + + if (p != null) { + assertEquals("Serialized put:", p.toString(), put.toString()); + } + } }