diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
new file mode 100644
index 0000000..cfa3826
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
@@ -0,0 +1,69 @@
+package org.apache.hadoop.hive.hbase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+
+/**
+ * HBaseCompositeKey extension of LazyStruct. All complex composite keys should extend this class
+ * and override the {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID of a
+ * key in the composite key.
+ *
+ * For example, for a composite key "/part1/part2/part3" , part1 will have an id
+ * 0 , part2 will have an id 1 and part3 will have an id 2 . Custom
+ * implementations of getField(fieldID) should return the value corresponding to that fieldID. So,
+ * for the above example, the value returned for getField(0) should be part1,
+ * getField(1) should be part2 and getField(2) should be part3 .
+ *
+ * */
+public class HBaseCompositeKey extends LazyStruct {
+
+ public HBaseCompositeKey(LazySimpleStructObjectInspector oi) {
+ super(oi);
+ }
+
+ @Override
+ public ArrayList getFieldsAsList() {
+ ArrayList allFields = new ArrayList();
+
+ List extends StructField> fields = oi.getAllStructFieldRefs();
+
+ for (int i = 0; i < fields.size(); i++) {
+ allFields.add(getField(i));
+ }
+
+ return allFields;
+ }
+
+ /**
+ * Create an initialize a {@link LazyObject} with the given bytes for the given fieldID.
+ *
+ * @param fieldID
+ * field for which the object is to be created
+ * @param bytes
+ * value with which the object is to be initialized with
+ * @return initialized {@link LazyObject}
+ * */
+ public LazyObject extends ObjectInspector> toLazyObject(int fieldID, byte[] bytes) {
+ ObjectInspector fieldOI = oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector();
+
+ LazyObject extends ObjectInspector> lazyObject = LazyFactory
+ .createLazyObject(fieldOI);
+
+ ByteArrayRef ref = new ByteArrayRef();
+
+ ref.setData(bytes);
+
+ // initialize the lazy object
+ lazyObject.init(ref, 0, ref.getData().length);
+
+ return lazyObject;
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
index 4900a41..c6a816e 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.hbase;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -65,11 +66,12 @@
public static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE = "hbase.table.default.storage.type";
public static final String HBASE_KEY_COL = ":key";
public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
+ public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class";
public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
-
- /** Determines whether a regex matching should be done on the columns or not. Defaults to true.
+
+ /** Determines whether a regex matching should be done on the columns or not. Defaults to true.
* WARNING: Note that currently this only supports the suffix wildcard .* **/
public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching";
@@ -85,6 +87,8 @@
private final ByteStream.Output serializeStream = new ByteStream.Output();
private int iKey;
private long putTimestamp;
+ private Class> compositeKeyClass;
+ private Object compositeKeyObj;
// used for serializing a field
private byte [] separators; // the separators array
@@ -131,6 +135,11 @@ public void initialize(Configuration conf, Properties tbl)
cachedHBaseRow = new LazyHBaseRow(
(LazySimpleStructObjectInspector) cachedObjectInspector);
+ if (compositeKeyClass != null) {
+ // initialize the constructor of the composite key class with its object inspector
+ initCompositeKeyClass(tbl);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("HBaseSerDe initialized with : columnNames = "
+ serdeParams.getColumnNames()
@@ -460,6 +469,16 @@ private void initHBaseSerDeParameters(
doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true"));
+ String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
+
+ if (compKeyClass != null) {
+ try {
+ compositeKeyClass = job.getClassByName(compKeyClass);
+ } catch (ClassNotFoundException e) {
+ throw new SerDeException(e);
+ }
+ }
+
// Parse and initialize the HBase columns mapping
columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
@@ -543,7 +562,7 @@ public Object deserialize(Writable result) throws SerDeException {
throw new SerDeException(getClass().getName() + ": expects Result!");
}
- cachedHBaseRow.init((Result) result, columnsMapping);
+ cachedHBaseRow.init((Result) result, columnsMapping, compositeKeyObj);
return cachedHBaseRow;
}
@@ -808,6 +827,47 @@ private boolean serialize(
throw new RuntimeException("Unknown category type: " + objInspector.getCategory());
}
+ /**
+ * Initialize the composite key class with the objectinspector for the key
+ *
+ * @throws SerDeException
+ * */
+ private void initCompositeKeyClass(Properties tbl) throws SerDeException {
+
+ int i = 0;
+
+ // find the hbase row key
+ for (ColumnMapping colMap : columnsMapping) {
+ if (colMap.hbaseRowKey) {
+ break;
+ }
+ i++;
+ }
+
+ ObjectInspector keyObjectInspector = ((LazySimpleStructObjectInspector) cachedObjectInspector)
+ .getAllStructFieldRefs().get(i).getFieldObjectInspector();
+
+ try {
+ compositeKeyObj = compositeKeyClass.getDeclaredConstructor(
+ LazySimpleStructObjectInspector.class, Properties.class)
+ .newInstance(
+ ((LazySimpleStructObjectInspector) keyObjectInspector), tbl);
+ } catch (IllegalArgumentException e) {
+ throw new SerDeException(e);
+ } catch (SecurityException e) {
+ throw new SerDeException(e);
+ } catch (InstantiationException e) {
+ throw new SerDeException(e);
+ } catch (IllegalAccessException e) {
+ throw new SerDeException(e);
+ } catch (InvocationTargetException e) {
+ throw new SerDeException(e);
+ } catch (NoSuchMethodException e) {
+ // the constructor wasn't defined in the implementation class. Flag error
+ throw new SerDeException("Constructor not defined in composite key class ["
+ + compositeKeyClass.getName() + "]", e);
+ }
+ }
/**
* @return the useJSONSerialize
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
index b254b0d..fc40195 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
@@ -45,6 +45,7 @@
*/
private Result result;
private List columnsMapping;
+ private Object compositeKeyObj;
private ArrayList cachedList;
/**
@@ -59,9 +60,22 @@ public LazyHBaseRow(LazySimpleStructObjectInspector oi) {
* @see LazyHBaseRow#init(Result)
*/
public void init(Result r, List columnsMapping) {
+ init(r, columnsMapping, null);
+ }
+
+ /**
+ * Set the HBase row data(a Result writable) for this LazyStruct.
+ *
+ * @see LazyHBaseRow#init(Result)
+ *
+ * @param compositeKeyClass
+ * custom implementation to interpret the composite key
+ */
+ public void init(Result r, List columnsMapping, Object compositeKeyObj) {
result = r;
this.columnsMapping = columnsMapping;
+ this.compositeKeyObj = compositeKeyObj;
setParsed(false);
}
@@ -117,7 +131,13 @@ public Object getField(int fieldID) {
parse();
}
- return uncheckedGetField(fieldID);
+ Object value = uncheckedGetField(fieldID);
+
+ if (columnsMapping.get(fieldID).hbaseRowKey && compositeKeyObj != null) {
+ return compositeKeyObj;
+ } else {
+ return value;
+ }
}
/**
@@ -162,6 +182,12 @@ private Object uncheckedGetField(int fieldID) {
if (ref != null) {
fields[fieldID].init(ref, 0, ref.getData().length);
+
+ // if it was a row key and we have been provided a custom composite key class, initialize it
+ // with the bytes for the row key
+ if (colMap.hbaseRowKey && compositeKeyObj != null) {
+ ((LazyStruct) compositeKeyObj).init(ref, 0, ref.getData().length);
+ }
}
}
diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java
new file mode 100644
index 0000000..e95bd0e
--- /dev/null
+++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java
@@ -0,0 +1,36 @@
+package org.apache.hadoop.hive.hbase;
+
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+
+public class HBaseTestCompositeKey extends HBaseCompositeKey {
+
+ byte[] bytes;
+ String bytesAsString;
+ Properties tbl;
+
+ public HBaseTestCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl) {
+ super(oi);
+ this.tbl = tbl;
+ }
+
+ @Override
+ public void init(ByteArrayRef bytes, int start, int length) {
+ this.bytes = bytes.getData();
+ }
+
+ @Override
+ public Object getField(int fieldID) {
+ if (bytesAsString == null) {
+ bytesAsString = Bytes.toString(bytes).trim();
+ }
+
+ // Randomly pick the character corresponding to the field id and convert it to byte array
+ byte[] fieldBytes = new byte[] {(byte) bytesAsString.charAt(fieldID)};
+
+ return toLazyObject(fieldID, fieldBytes);
+ }
+}
diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
index d25c731..c7589ad 100644
--- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
+++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.hbase;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -45,6 +46,7 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
/**
* Tests the HBaseSerDe class.
@@ -809,4 +811,172 @@ private void deserializeAndSerializeHivePrefixColumnFamily(HBaseSerDe serDe, Res
assertEquals("Serialized put:", p.toString(), put.toString());
}
}
+
+ public void testHBaseSerDeCompositeKeyWithSeparator() throws SerDeException, TException,
+ IOException {
+ byte[] cfa = "cola".getBytes();
+
+ byte[] qualStruct = "struct".getBytes();
+
+ TestStruct testStruct = new TestStruct("A", "B", "C", true, (byte) 45);
+
+ byte[] rowKey = testStruct.getBytes();
+
+ // Data
+ List kvs = new ArrayList();
+
+ byte[] testData = "This is a test data".getBytes();
+
+ kvs.add(new KeyValue(rowKey, cfa, qualStruct, testData));
+
+ Result r = new Result(kvs);
+
+ Put p = new Put(rowKey);
+
+ // Post serialization, separators are automatically inserted between different fields in the
+ // struct. Currently there is not way to disable that. So the work around here is to pad the
+ // data with the separator bytes before creating a "Put" object
+ p.add(new KeyValue(rowKey, cfa, qualStruct, testData));
+
+ // Create, initialize, and test the SerDe
+ HBaseSerDe serDe = new HBaseSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl = createPropertiesForCompositeKeyWithSeparator();
+ serDe.initialize(conf, tbl);
+
+ deserializeAndSerializeHBaseCompositeKey(serDe, r, p);
+ }
+
+ private Properties createPropertiesForCompositeKeyWithSeparator() {
+ Properties tbl = new Properties();
+ tbl.setProperty(serdeConstants.LIST_COLUMNS,
+ "key,astring");
+ tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES,
+ "struct,string");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key,cola:struct");
+ tbl.setProperty(serdeConstants.COLLECTION_DELIM, "-");
+
+ return tbl;
+ }
+
+ public void testHBaseSerDeCompositeKeyWithoutSeparator() throws SerDeException, TException,
+ IOException {
+ byte[] cfa = "cola".getBytes();
+
+ byte[] qualStruct = "struct".getBytes();
+
+ TestStruct testStruct = new TestStruct("A", "B", "C", false, (byte) 0);
+
+ byte[] rowKey = testStruct.getBytes();
+
+ // Data
+ List kvs = new ArrayList();
+
+ byte[] testData = "This is a test data".getBytes();
+
+ kvs.add(new KeyValue(rowKey, cfa, qualStruct, testData));
+
+ Result r = new Result(kvs);
+
+ byte[] putRowKey = testStruct.getBytesWithDelimiters();
+
+ Put p = new Put(putRowKey);
+
+ // Post serialization, separators are automatically inserted between different fields in the
+ // struct. Currently there is not way to disable that. So the work around here is to pad the
+ // data with the separator bytes before creating a "Put" object
+ p.add(new KeyValue(putRowKey, cfa, qualStruct, testData));
+
+ // Create, initialize, and test the SerDe
+ HBaseSerDe serDe = new HBaseSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl = createPropertiesForCompositeKeyWithoutSeparator();
+ serDe.initialize(conf, tbl);
+
+ deserializeAndSerializeHBaseCompositeKey(serDe, r, p);
+ }
+
+ private Properties createPropertiesForCompositeKeyWithoutSeparator() {
+ Properties tbl = new Properties();
+ tbl.setProperty(serdeConstants.LIST_COLUMNS,
+ "key,astring");
+ tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES,
+ "struct,string");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key,cola:struct");
+ tbl.setProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS,
+ "org.apache.hadoop.hive.hbase.HBaseTestCompositeKey");
+
+ return tbl;
+ }
+
+ private void deserializeAndSerializeHBaseCompositeKey(HBaseSerDe serDe, Result r, Put p)
+ throws SerDeException, IOException {
+ StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector();
+
+ List extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+
+ Object row = serDe.deserialize(r);
+
+ for (int j = 0; j < fieldRefs.size(); j++) {
+ Object fieldData = soi.getStructFieldData(row, fieldRefs.get(j));
+ assertNotNull(fieldData);
+ }
+
+ assertEquals(
+ "{\"key\":{\"col1\":\"A\",\"col2\":\"B\",\"col3\":\"C\"},\"astring\":\"This is a test data\"}",
+ SerDeUtils.getJSONString(row, soi));
+
+ // Now serialize
+ Put put = (Put) serDe.serialize(row, soi);
+
+ assertEquals("Serialized put:", p.toString(), put.toString());
+ }
+
+ class TestStruct {
+ String f1;
+ String f2;
+ String f3;
+ boolean hasSeparator;
+ byte separator;
+
+ TestStruct(String f1, String f2, String f3, boolean hasSeparator, byte separator) {
+ this.f1 = f1;
+ this.f2 = f2;
+ this.f3 = f3;
+ this.hasSeparator = hasSeparator;
+ this.separator = separator;
+ }
+
+ public byte[] getBytes() throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ bos.write(f1.getBytes());
+ if (hasSeparator) {
+ bos.write(separator); // Add field separator
+ }
+ bos.write(f2.getBytes());
+ if (hasSeparator) {
+ bos.write(separator);
+ }
+ bos.write(f3.getBytes());
+
+ return bos.toByteArray();
+ }
+
+ public byte[] getBytesWithDelimiters() throws IOException {
+ // Add Ctrl-B delimiter between the fields. This is necessary because for structs in case no
+ // delimiter is provided, hive automatically adds Ctrl-B as a default delimiter between fields
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ bos.write(f1.getBytes());
+ bos.write("\002".getBytes("UTF8"));
+ bos.write(f2.getBytes());
+ bos.write("\002".getBytes("UTF8"));
+ bos.write(f3.getBytes());
+
+ return bos.toByteArray();
+ }
+ }
}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java
index 08400f1..df7c8db 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java
@@ -33,10 +33,10 @@
/**
* LazySimpleStructObjectInspector works on struct data that is stored in
* LazyStruct.
- *
+ *
* The names of the struct fields and the internal structure of the struct
* fields are specified in the ctor of the LazySimpleStructObjectInspector.
- *
+ *
* Always use the ObjectInspectorFactory to create new ObjectInspector objects,
* instead of directly creating an instance of this class.
*/
@@ -193,8 +193,15 @@ public Object getStructFieldData(Object data, StructField fieldRef) {
if (data == null) {
return null;
}
- LazyStruct struct = (LazyStruct) data;
- return struct.getFieldsAsList();
+
+ // Iterate over all the fields picking up the nested structs within them
+ List result = new ArrayList(fields.size());
+
+ for (MyField myField : fields) {
+ result.add(getStructFieldData(data, myField));
+ }
+
+ return result;
}
// For LazyStruct