diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 7f37ba5..8c98e4e 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -188,8 +190,27 @@ public void initialize(Configuration conf, Properties tbl) columnMapping.hbaseRowKey = false; if (parts.length == 2) { - columnMapping.qualifierName = parts[1]; - columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]); + + // Regex to test if we have a prefix + String regex=".*\\*$"; + + Pattern inputPattern = Pattern.compile(regex); + Matcher m = inputPattern.matcher(parts[1]); + + if (m.matches()) { + // we have a prefix with a wildcard + columnMapping.qualifierPrefix = parts[1].substring(0, parts[1].length() - 1); + columnMapping.qualifierPrefixBytes = Bytes.toBytes(columnMapping.qualifierPrefix); + // we weren't provided any actual qualifier name. Set these to + // null. + columnMapping.qualifierName = null; + columnMapping.qualifierNameBytes = null; + } else { + // set the regular provided qualifier names + columnMapping.qualifierName = parts[1]; + columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]); + ; + } } else { columnMapping.qualifierName = null; columnMapping.qualifierNameBytes = null; @@ -408,6 +429,8 @@ public static boolean isRowKeyColumn(String hbaseColumnName) { List binaryStorage; boolean hbaseRowKey; String mappingSpec; + String qualifierPrefix; + byte[] qualifierPrefixBytes; } private void initHBaseSerDeParameters( @@ -546,10 +569,11 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throw new SerDeException("HBase row key cannot be NULL"); } - if(putTimestamp >= 0) + if(putTimestamp >= 0) { put = new Put(key,putTimestamp); - else + } else { put = new Put(key); + } // Serialize each field for (int i = 0; i < fields.size(); i++) { diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java index a8ba9d9..cedef10 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java @@ -21,10 +21,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.Map.Entry; +import java.util.NavigableMap; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyMap; @@ -42,6 +43,7 @@ private Result result; private byte [] columnFamilyBytes; + private byte[] qualPrefix; private List binaryStorage; /** @@ -54,12 +56,21 @@ public LazyHBaseCellMap(LazyMapObjectInspector oi) { public void init( Result r, - byte [] columnFamilyBytes, + byte[] columnFamilyBytes, List binaryStorage) { + init(r, columnFamilyBytes, binaryStorage, null); + } + + public void init( + Result r, + byte [] columnFamilyBytes, + List binaryStorage, byte[] qualPrefix) { + result = r; this.columnFamilyBytes = columnFamilyBytes; this.binaryStorage = binaryStorage; + this.qualPrefix = qualPrefix; setParsed(false); } @@ -80,6 +91,12 @@ private void parse() { continue; } + if (qualPrefix != null && !Bytes.startsWith(e.getKey(), qualPrefix)) { + // since we were provided a qualifier prefix, only accept qualifiers that start with this + // prefix + continue; + } + LazyMapObjectInspector lazyMoi = getInspector(); // Keys are always primitive diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java index d35bb52..74aa5a8 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java @@ -143,9 +143,11 @@ private Object uncheckedGetField(int fieldID) { } else { if (colMap.qualifierName == null) { // it is a column family - // primitive type for Map can be stored in binary format + // primitive type for Map can be stored in binary format. Pass in the + // qualifier prefix to cherry pick the qualifiers that match the prefix instead of picking + // up everything ((LazyHBaseCellMap) fields[fieldID]).init( - result, colMap.familyNameBytes, colMap.binaryStorage); + result, colMap.familyNameBytes, colMap.binaryStorage, colMap.qualifierPrefixBytes); } else { // it is a column i.e. a column-family with column-qualifier byte [] res = result.getValue(colMap.familyNameBytes, colMap.qualifierNameBytes); diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java index e821282..cda0f1e 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.hbase; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; @@ -688,4 +690,123 @@ private void deserializeAndSerializeHiveMapHBaseColumnFamilyII( Put serializedPut = (Put) hbaseSerDe.serialize(row, soi); assertEquals("Serialized data: ", p.toString(), serializedPut.toString()); } + + public void testHBaseSerDeWithColumnPrefixes() + throws Exception { + byte[] cfa = "cola".getBytes(); + + byte[] qualA = "prefixA_col1".getBytes(); + byte[] qualB = "prefixB_col2".getBytes(); + byte[] qualC = "prefixB_col3".getBytes(); + byte[] qualD = "unwanted_col".getBytes(); + + List qualifiers = new ArrayList(); + qualifiers.add(new Text("prefixA_col1")); + qualifiers.add(new Text("prefixB_col2")); + qualifiers.add(new Text("prefixB_col3")); + qualifiers.add(new Text("unwanted_col")); + + List expectedQualifiers = new ArrayList(); + expectedQualifiers.add(new Text("prefixA_col1")); + expectedQualifiers.add(new Text("prefixB_col2")); + expectedQualifiers.add(new Text("prefixB_col3")); + + byte[] rowKey = Bytes.toBytes("test-row1"); + + // Data + List kvs = new ArrayList(); + + byte[] dataA = "This is first test data".getBytes(); + byte[] dataB = "This is second test data".getBytes(); + byte[] dataC = "This is third test data".getBytes(); + byte[] dataD = "Unwanted data".getBytes(); + + kvs.add(new KeyValue(rowKey, cfa, qualA, dataA)); + kvs.add(new KeyValue(rowKey, cfa, qualB, dataB)); + kvs.add(new KeyValue(rowKey, cfa, qualC, dataC)); + kvs.add(new KeyValue(rowKey, cfa, qualD, dataD)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + p.add(new KeyValue(rowKey, cfa, qualA, dataA)); + p.add(new KeyValue(rowKey, cfa, qualB, dataB)); + p.add(new KeyValue(rowKey, cfa, qualC, dataC)); + + Object[] expectedFieldsData = { + new Text("test-row1"), + new String("This is first test data"), + new String("This is second test data"), + new String("This is third test data")}; + + int[] expectedMapSize = new int[] {1, 2}; + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForColumnPrefixes(); + serDe.initialize(conf, tbl); + + Object notPresentKey = new Text("unwanted_col"); + + deserializeAndSerializeHivePrefixColumnFamily(serDe, r, p, expectedFieldsData, expectedMapSize, + expectedQualifiers, + notPresentKey); + } + + private Properties createPropertiesForColumnPrefixes() { + Properties tbl = new Properties(); + tbl.setProperty(serdeConstants.LIST_COLUMNS, + "key,astring,along"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "string:map:map"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, + ":key,cola:prefixA_*,cola:prefixB_*"); + + return tbl; + } + + private void deserializeAndSerializeHivePrefixColumnFamily(HBaseSerDe serDe, Result r, Put p, + Object[] expectedFieldsData, int[] expectedMapSize, List expectedQualifiers, + Object notPresentKey) + throws SerDeException, IOException { + StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector(); + + List fieldRefs = soi.getAllStructFieldRefs(); + + Object row = serDe.deserialize(r); + + int j = 0; + + for (int i = 0; i < fieldRefs.size(); i++) { + Object fieldData = soi.getStructFieldData(row, fieldRefs.get(i)); + assertNotNull(fieldData); + + if (fieldData instanceof LazyPrimitive) { + assertEquals(expectedFieldsData[i], ((LazyPrimitive) fieldData).getWritableObject()); + } else if (fieldData instanceof LazyHBaseCellMap) { + assertEquals(expectedFieldsData[i], ((LazyHBaseCellMap) fieldData) + .getMapValueElement(expectedQualifiers.get(j)).toString().trim()); + + assertEquals(expectedMapSize[j], ((LazyHBaseCellMap) fieldData).getMapSize()); + // Make sure that the unwanted key is not present in the map + assertNull(((LazyHBaseCellMap) fieldData).getMapValueElement(notPresentKey)); + + j++; + + } else { + fail("Error: field data not an instance of LazyPrimitive or LazyHBaseCellMap"); + } + } + + SerDeUtils.getJSONString(row, soi); + + // Now serialize + Put put = (Put) serDe.serialize(row, soi); + + if (p != null) { + assertEquals("Serialized put:", p.toString(), put.toString()); + } + } }