diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java new file mode 100644 index 0000000..5008f15 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; + +/** + * HBaseCompositeKey extension of LazyStruct. All complex composite keys should extend this class + * and override the {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID of a + * key in the composite key. + *

+ * For example, for a composite key "/part1/part2/part3", part1 will have an id + * 0, part2 will have an id 1 and part3 will have an id 2. Custom + * implementations of getField(fieldID) should return the value corresponding to that fieldID. So, + * for the above example, the value returned for getField(0) should be part1, + * getField(1) should be part2 and getField(2) should be part3. + *

+ * + *

+ * All custom implementation are expected to have a constructor of the form: + * + *

+ * MyCustomCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl, Configuration conf)
+ * 
+ * + *

+ * + * */ +public class HBaseCompositeKey extends LazyStruct { + + public HBaseCompositeKey(LazySimpleStructObjectInspector oi) { + super(oi); + } + + @Override + public ArrayList getFieldsAsList() { + ArrayList allFields = new ArrayList(); + + List fields = oi.getAllStructFieldRefs(); + + for (int i = 0; i < fields.size(); i++) { + allFields.add(getField(i)); + } + + return allFields; + } + + /** + * Create an initialize a {@link LazyObject} with the given bytes for the given fieldID. + * + * @param fieldID + * field for which the object is to be created + * @param bytes + * value with which the object is to be initialized with + * @return initialized {@link LazyObject} + * */ + public LazyObject toLazyObject(int fieldID, byte[] bytes) { + ObjectInspector fieldOI = oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector(); + + LazyObject lazyObject = LazyFactory + .createLazyObject(fieldOI); + + ByteArrayRef ref = new ByteArrayRef(); + + ref.setData(bytes); + + // initialize the lazy object + lazyObject.init(ref, 0, ref.getData().length); + + return lazyObject; + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 8e4505d..2cd65cb 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -19,16 +19,13 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ByteStream; @@ -51,6 +48,11 @@ import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; /** @@ -64,11 +66,12 @@ public static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE = "hbase.table.default.storage.type"; public static final String HBASE_KEY_COL = ":key"; public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp"; + public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class"; public static final String HBASE_SCAN_CACHE = "hbase.scan.cache"; public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock"; public static final String HBASE_SCAN_BATCH = "hbase.scan.batch"; - - /** Determines whether a regex matching should be done on the columns or not. Defaults to true. + + /** Determines whether a regex matching should be done on the columns or not. Defaults to true. * WARNING: Note that currently this only supports the suffix wildcard .* **/ public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching"; @@ -84,6 +87,8 @@ private final ByteStream.Output serializeStream = new ByteStream.Output(); private int iKey; private long putTimestamp; + private Class compositeKeyClass; + private Object compositeKeyObj; // used for serializing a field private byte [] separators; // the separators array @@ -130,6 +135,11 @@ public void initialize(Configuration conf, Properties tbl) cachedHBaseRow = new LazyHBaseRow( (LazySimpleStructObjectInspector) cachedObjectInspector); + if (compositeKeyClass != null) { + // initialize the constructor of the composite key class with its object inspector + initCompositeKeyClass(conf,tbl); + } + if (LOG.isDebugEnabled()) { LOG.debug("HBaseSerDe initialized with : columnNames = " + serdeParams.getColumnNames() @@ -459,6 +469,16 @@ private void initHBaseSerDeParameters( doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true")); + String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + + if (compKeyClass != null) { + try { + compositeKeyClass = job.getClassByName(compKeyClass); + } catch (ClassNotFoundException e) { + throw new SerDeException(e); + } + } + // Parse and initialize the HBase columns mapping columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); @@ -542,7 +562,7 @@ public Object deserialize(Writable result) throws SerDeException { throw new SerDeException(getClass().getName() + ": expects ResultWritable!"); } - cachedHBaseRow.init(((ResultWritable) result).getResult(), columnsMapping); + cachedHBaseRow.init(((ResultWritable) result).getResult(), columnsMapping, compositeKeyObj); return cachedHBaseRow; } @@ -807,6 +827,47 @@ private boolean serialize( throw new RuntimeException("Unknown category type: " + objInspector.getCategory()); } + /** + * Initialize the composite key class with the objectinspector for the key + * + * @throws SerDeException + * */ + private void initCompositeKeyClass(Configuration conf,Properties tbl) throws SerDeException { + + int i = 0; + + // find the hbase row key + for (ColumnMapping colMap : columnsMapping) { + if (colMap.hbaseRowKey) { + break; + } + i++; + } + + ObjectInspector keyObjectInspector = ((LazySimpleStructObjectInspector) cachedObjectInspector) + .getAllStructFieldRefs().get(i).getFieldObjectInspector(); + + try { + compositeKeyObj = compositeKeyClass.getDeclaredConstructor( + LazySimpleStructObjectInspector.class, Properties.class, Configuration.class) + .newInstance( + ((LazySimpleStructObjectInspector) keyObjectInspector), tbl, conf); + } catch (IllegalArgumentException e) { + throw new SerDeException(e); + } catch (SecurityException e) { + throw new SerDeException(e); + } catch (InstantiationException e) { + throw new SerDeException(e); + } catch (IllegalAccessException e) { + throw new SerDeException(e); + } catch (InvocationTargetException e) { + throw new SerDeException(e); + } catch (NoSuchMethodException e) { + // the constructor wasn't defined in the implementation class. Flag error + throw new SerDeException("Constructor not defined in composite key class [" + + compositeKeyClass.getName() + "]", e); + } + } /** * @return the useJSONSerialize diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java index b254b0d..fc40195 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java @@ -45,6 +45,7 @@ */ private Result result; private List columnsMapping; + private Object compositeKeyObj; private ArrayList cachedList; /** @@ -59,9 +60,22 @@ public LazyHBaseRow(LazySimpleStructObjectInspector oi) { * @see LazyHBaseRow#init(Result) */ public void init(Result r, List columnsMapping) { + init(r, columnsMapping, null); + } + + /** + * Set the HBase row data(a Result writable) for this LazyStruct. + * + * @see LazyHBaseRow#init(Result) + * + * @param compositeKeyClass + * custom implementation to interpret the composite key + */ + public void init(Result r, List columnsMapping, Object compositeKeyObj) { result = r; this.columnsMapping = columnsMapping; + this.compositeKeyObj = compositeKeyObj; setParsed(false); } @@ -117,7 +131,13 @@ public Object getField(int fieldID) { parse(); } - return uncheckedGetField(fieldID); + Object value = uncheckedGetField(fieldID); + + if (columnsMapping.get(fieldID).hbaseRowKey && compositeKeyObj != null) { + return compositeKeyObj; + } else { + return value; + } } /** @@ -162,6 +182,12 @@ private Object uncheckedGetField(int fieldID) { if (ref != null) { fields[fieldID].init(ref, 0, ref.getData().length); + + // if it was a row key and we have been provided a custom composite key class, initialize it + // with the bytes for the row key + if (colMap.hbaseRowKey && compositeKeyObj != null) { + ((LazyStruct) compositeKeyObj).init(ref, 0, ref.getData().length); + } } } diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java new file mode 100644 index 0000000..13c344b --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; + +public class HBaseTestCompositeKey extends HBaseCompositeKey { + + byte[] bytes; + String bytesAsString; + Properties tbl; + Configuration conf; + + public HBaseTestCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl, Configuration conf) { + super(oi); + this.tbl = tbl; + this.conf = conf; + } + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + this.bytes = bytes.getData(); + } + + @Override + public Object getField(int fieldID) { + if (bytesAsString == null) { + bytesAsString = Bytes.toString(bytes).trim(); + } + + // Randomly pick the character corresponding to the field id and convert it to byte array + byte[] fieldBytes = new byte[] {(byte) bytesAsString.charAt(fieldID)}; + + return toLazyObject(fieldID, fieldBytes); + } +} diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java index 6b95be5..089a31a 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.hbase; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -45,6 +46,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; /** * Tests the HBaseSerDe class. @@ -809,4 +811,172 @@ private void deserializeAndSerializeHivePrefixColumnFamily(HBaseSerDe serDe, Res assertEquals("Serialized put:", p.toString(), put.toString()); } } + + public void testHBaseSerDeCompositeKeyWithSeparator() throws SerDeException, TException, + IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualStruct = "struct".getBytes(); + + TestStruct testStruct = new TestStruct("A", "B", "C", true, (byte) 45); + + byte[] rowKey = testStruct.getBytes(); + + // Data + List kvs = new ArrayList(); + + byte[] testData = "This is a test data".getBytes(); + + kvs.add(new KeyValue(rowKey, cfa, qualStruct, testData)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(rowKey, cfa, qualStruct, testData)); + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForCompositeKeyWithSeparator(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHBaseCompositeKey(serDe, r, p); + } + + private Properties createPropertiesForCompositeKeyWithSeparator() { + Properties tbl = new Properties(); + tbl.setProperty(serdeConstants.LIST_COLUMNS, + "key,astring"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "struct,string"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, + ":key,cola:struct"); + tbl.setProperty(serdeConstants.COLLECTION_DELIM, "-"); + + return tbl; + } + + public void testHBaseSerDeCompositeKeyWithoutSeparator() throws SerDeException, TException, + IOException { + byte[] cfa = "cola".getBytes(); + + byte[] qualStruct = "struct".getBytes(); + + TestStruct testStruct = new TestStruct("A", "B", "C", false, (byte) 0); + + byte[] rowKey = testStruct.getBytes(); + + // Data + List kvs = new ArrayList(); + + byte[] testData = "This is a test data".getBytes(); + + kvs.add(new KeyValue(rowKey, cfa, qualStruct, testData)); + + Result r = new Result(kvs); + + byte[] putRowKey = testStruct.getBytesWithDelimiters(); + + Put p = new Put(putRowKey); + + // Post serialization, separators are automatically inserted between different fields in the + // struct. Currently there is not way to disable that. So the work around here is to pad the + // data with the separator bytes before creating a "Put" object + p.add(new KeyValue(putRowKey, cfa, qualStruct, testData)); + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForCompositeKeyWithoutSeparator(); + serDe.initialize(conf, tbl); + + deserializeAndSerializeHBaseCompositeKey(serDe, r, p); + } + + private Properties createPropertiesForCompositeKeyWithoutSeparator() { + Properties tbl = new Properties(); + tbl.setProperty(serdeConstants.LIST_COLUMNS, + "key,astring"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "struct,string"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, + ":key,cola:struct"); + tbl.setProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS, + "org.apache.hadoop.hive.hbase.HBaseTestCompositeKey"); + + return tbl; + } + + private void deserializeAndSerializeHBaseCompositeKey(HBaseSerDe serDe, Result r, Put p) + throws SerDeException, IOException { + StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector(); + + List fieldRefs = soi.getAllStructFieldRefs(); + + Object row = serDe.deserialize(new ResultWritable(r)); + + for (int j = 0; j < fieldRefs.size(); j++) { + Object fieldData = soi.getStructFieldData(row, fieldRefs.get(j)); + assertNotNull(fieldData); + } + + assertEquals( + "{\"key\":{\"col1\":\"A\",\"col2\":\"B\",\"col3\":\"C\"},\"astring\":\"This is a test data\"}", + SerDeUtils.getJSONString(row, soi)); + + // Now serialize + Put put = ((PutWritable) serDe.serialize(row, soi)).getPut(); + + assertEquals("Serialized put:", p.toString(), put.toString()); + } + + class TestStruct { + String f1; + String f2; + String f3; + boolean hasSeparator; + byte separator; + + TestStruct(String f1, String f2, String f3, boolean hasSeparator, byte separator) { + this.f1 = f1; + this.f2 = f2; + this.f3 = f3; + this.hasSeparator = hasSeparator; + this.separator = separator; + } + + public byte[] getBytes() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + bos.write(f1.getBytes()); + if (hasSeparator) { + bos.write(separator); // Add field separator + } + bos.write(f2.getBytes()); + if (hasSeparator) { + bos.write(separator); + } + bos.write(f3.getBytes()); + + return bos.toByteArray(); + } + + public byte[] getBytesWithDelimiters() throws IOException { + // Add Ctrl-B delimiter between the fields. This is necessary because for structs in case no + // delimiter is provided, hive automatically adds Ctrl-B as a default delimiter between fields + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + bos.write(f1.getBytes()); + bos.write("\002".getBytes("UTF8")); + bos.write(f2.getBytes()); + bos.write("\002".getBytes("UTF8")); + bos.write(f3.getBytes()); + + return bos.toByteArray(); + } + } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java index db08282..8a5386a 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java @@ -198,8 +198,15 @@ public Object getStructFieldData(Object data, StructField fieldRef) { if (data == null) { return null; } - LazyStruct struct = (LazyStruct) data; - return struct.getFieldsAsList(); + + // Iterate over all the fields picking up the nested structs within them + List result = new ArrayList(fields.size()); + + for (MyField myField : fields) { + result.add(getStructFieldData(data, myField)); + } + + return result; } // For LazyStruct