diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java new file mode 100644 index 0000000..7d88bc7 --- /dev/null +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hcatalog.common.HCatConstants; + +/** + * Constants class for constants used in the HBase Storage Drivers module + */ +class HBaseConstants { + + /** key used to define th version number HBaseOutputStorage driver to use when writing out data for a job */ + public static final String PROPERTY_OUTPUT_VERSION_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputVersion"; + + /** key used to define the name of the table to write to */ + public static final String PROPERTY_OUTPUT_TABLE_NAME_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputTableName"; + + /** key used to define the column mapping of hbase to hcatalog schema */ + public static final String PROPERTY_COLUMN_MAPPING_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+ HBaseSerDe.HBASE_COLUMNS_MAPPING; + +} diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java new file mode 100644 index 0000000..23450c2 --- /dev/null +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.hive.hbase.LazyHBaseRow; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Implementation of ResultConverter using HBaseSerDe + * mapping between HBase schema and HCatRecord schema is defined by + * {@link HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY} + */ +class HBaseSerDeResultConverter implements ResultConverter { + private HBaseSerDe serDe; + private HCatSchema schema; + private HCatSchema outputSchema; + private StructObjectInspector hCatRecordOI; + private StructObjectInspector lazyHBaseRowOI; + private final Long outputVersion; + + /** + * @param schema table schema + * @param outputSchema schema of projected output + * @param hcatProperties table properties + * @throws IOException thrown if hive's HBaseSerDe couldn't be initialized + */ + public HBaseSerDeResultConverter(HCatSchema schema, + HCatSchema outputSchema, + Properties hcatProperties) throws IOException { + + hcatProperties.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, + hcatProperties.getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY)); + + if(hcatProperties.containsKey(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)) + outputVersion = Long.parseLong(hcatProperties.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)); + else + outputVersion = null; + + this.schema = schema; + if(outputSchema == null) { + this.outputSchema = schema; + } + else { + this.outputSchema = outputSchema; + } + + hCatRecordOI = createStructObjectInspector(); + try { + serDe = new HBaseSerDe(); + serDe.initialize(new Configuration(),hcatProperties); + lazyHBaseRowOI = (StructObjectInspector) serDe.getObjectInspector(); + } catch (SerDeException e) { + throw new IOException("SerDe initialization failed",e); + } + } + + @Override + public Put convert(HCatRecord record) throws IOException { + try { + //small hack to explicitly specify timestamp/version number to use + //since HBaseSerDe does not support specifying it + //will have to decide whether we will write our own or contribute code + //for the SerDe + Put put = (Put)serDe.serialize(record.getAll(),hCatRecordOI); + Put res; + if(outputVersion == null) { + res = put; + } + else { + res = new Put(put.getRow(),outputVersion.longValue()); + for(List row: put.getFamilyMap().values()) { + for(KeyValue el: row) { + res.add(el.getFamily(),el.getQualifier(),el.getValue()); + } + } + } + return res; + } catch (SerDeException e) { + throw new IOException("serialization failed",e); + } + } + + @Override + public HCatRecord convert(Result result) throws IOException { + // Deserialize bytesRefArray into struct and then convert that struct to + // HCatRecord. + LazyHBaseRow struct; + try { + struct = (LazyHBaseRow)serDe.deserialize(result); + } catch (SerDeException e) { + throw new IOException(e); + } + + List outList = new ArrayList(outputSchema.size()); + + String colName; + Integer index; + + for(HCatFieldSchema col : outputSchema.getFields()){ + + colName = col.getName().toLowerCase(); + index = outputSchema.getPosition(colName); + + if(index != null){ + StructField field = lazyHBaseRowOI.getStructFieldRef(colName); + outList.add(getTypedObj(lazyHBaseRowOI.getStructFieldData(struct, field), field.getFieldObjectInspector())); + } + } + return new DefaultHCatRecord(outList); + } + + private Object getTypedObj(Object data, ObjectInspector oi) throws IOException{ + // The real work-horse method. We are gobbling up all the laziness benefits + // of Hive-LazyHBaseRow by deserializing everything and creating crisp HCatRecord + // with crisp Java objects inside it. We have to do it because higher layer + // may not know how to do it. + //TODO leverage laziness of SerDe + switch(oi.getCategory()){ + + case PRIMITIVE: + return ((PrimitiveObjectInspector)oi).getPrimitiveJavaObject(data); + + case MAP: + MapObjectInspector moi = (MapObjectInspector)oi; + Map lazyMap = moi.getMap(data); + ObjectInspector keyOI = moi.getMapKeyObjectInspector(); + ObjectInspector valOI = moi.getMapValueObjectInspector(); + Map typedMap = new HashMap(lazyMap.size()); + for(Map.Entry e : lazyMap.entrySet()){ + typedMap.put(getTypedObj(e.getKey(), keyOI), getTypedObj(e.getValue(), valOI)); + } + return typedMap; + + case LIST: + ListObjectInspector loi = (ListObjectInspector)oi; + List lazyList = loi.getList(data); + ObjectInspector elemOI = loi.getListElementObjectInspector(); + List typedList = new ArrayList(lazyList.size()); + Iterator itr = lazyList.listIterator(); + while(itr.hasNext()){ + typedList.add(getTypedObj(itr.next(),elemOI)); + } + return typedList; + + case STRUCT: + StructObjectInspector soi = (StructObjectInspector)oi; + List fields = soi.getAllStructFieldRefs(); + List typedStruct = new ArrayList(fields.size()); + for(StructField field : fields){ + typedStruct.add( getTypedObj(soi.getStructFieldData(data, field), field.getFieldObjectInspector())); + } + return typedStruct; + + + default: + throw new IOException("Don't know how to deserialize: "+oi.getCategory()); + + } + } + + private StructObjectInspector createStructObjectInspector() throws IOException { + + if( outputSchema == null ) { + throw new IOException("Invalid output schema specified"); + } + + List fieldInspectors = new ArrayList(); + List fieldNames = new ArrayList(); + + for(HCatFieldSchema hcatFieldSchema : outputSchema.getFields()) { + TypeInfo type = TypeInfoUtils.getTypeInfoFromTypeString(hcatFieldSchema.getTypeString()); + + fieldNames.add(hcatFieldSchema.getName()); + fieldInspectors.add(getObjectInspector(type)); + } + + StructObjectInspector structInspector = ObjectInspectorFactory. + getStandardStructObjectInspector(fieldNames, fieldInspectors); + return structInspector; + } + + private ObjectInspector getObjectInspector(TypeInfo type) throws IOException { + + switch(type.getCategory()) { + + case PRIMITIVE : + PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type; + return PrimitiveObjectInspectorFactory. + getPrimitiveJavaObjectInspector(primitiveType.getPrimitiveCategory()); + + case MAP : + MapTypeInfo mapType = (MapTypeInfo) type; + MapObjectInspector mapInspector = ObjectInspectorFactory.getStandardMapObjectInspector( + getObjectInspector(mapType.getMapKeyTypeInfo()), getObjectInspector(mapType.getMapValueTypeInfo())); + return mapInspector; + + case LIST : + ListTypeInfo listType = (ListTypeInfo) type; + ListObjectInspector listInspector = ObjectInspectorFactory.getStandardListObjectInspector( + getObjectInspector(listType.getListElementTypeInfo())); + return listInspector; + + case STRUCT : + StructTypeInfo structType = (StructTypeInfo) type; + List fieldTypes = structType.getAllStructFieldTypeInfos(); + + List fieldInspectors = new ArrayList(); + for(TypeInfo fieldType : fieldTypes) { + fieldInspectors.add(getObjectInspector(fieldType)); + } + + StructObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector( + structType.getAllStructFieldNames(), fieldInspectors); + return structInspector; + + default : + throw new IOException("Unknown field schema type"); + } + } +} diff --git storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java new file mode 100644 index 0000000..235e5d2 --- /dev/null +++ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hcatalog.data.HCatRecord; + +import java.io.IOException; + +/** + * Interface used to define conversion of HCatRecord to and from Native HBase write (Put) and read (Result) objects. + * How the actual mapping is defined between an HBase Table's schema and an HCatalog Table's schema + * is up to the underlying implementation + */ +interface ResultConverter { + + /** + * convert HCatRecord instance to an HBase Put, used when writing out data. + * @param record instance to convert + * @return converted Put instance + * @throws IOException + */ + Put convert(HCatRecord record) throws IOException; + + /** + * convert HBase Result to HCatRecord instance, used when reading data. + * @param result instance to convert + * @return converted Result instance + * @throws IOException + */ + HCatRecord convert(Result result) throws IOException; + +} diff --git storage-drivers/hbase/src/test/all-tests storage-drivers/hbase/src/test/all-tests new file mode 100644 index 0000000..a945de6 --- /dev/null +++ storage-drivers/hbase/src/test/all-tests @@ -0,0 +1 @@ +**/Test*.java diff --git storage-drivers/hbase/src/test/excluded-tests storage-drivers/hbase/src/test/excluded-tests new file mode 100644 index 0000000..8b13789 --- /dev/null +++ storage-drivers/hbase/src/test/excluded-tests @@ -0,0 +1 @@ + diff --git storage-drivers/hbase/src/test/log4j.xml storage-drivers/hbase/src/test/log4j.xml new file mode 100644 index 0000000..8fd4fc9 --- /dev/null +++ storage-drivers/hbase/src/test/log4j.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java new file mode 100644 index 0000000..45cf37d --- /dev/null +++ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.hbase; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +/** + * Test HBaseSerdeResultConverter by manually creating records to convert to and from HBase objects + */ +public class TestHBaseSerDeResultConverter { + + private Properties createProperties() { + Properties tbl = new Properties(); + // Set the configuration parameters + tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9"); + tbl.setProperty("columns","key,aint,astring,amap"); + tbl.setProperty("columns.types","string:int:string:map"); + tbl.setProperty(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+ HBaseSerDe.HBASE_COLUMNS_MAPPING, + ":key,my_family:my_qualifier1,my_family:my_qualifier2,my_family2:"); + tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL"); + tbl.setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,"1"); + return tbl; + } + + private HCatSchema createHCatSchema() throws HCatException { + HCatSchema subSchema = new HCatSchema(new ArrayList()); + subSchema.append(new HCatFieldSchema(null, HCatFieldSchema.Type.INT,"")); + + HCatSchema schema = new HCatSchema(new ArrayList()); + schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING,"")); + schema.append(new HCatFieldSchema("aint", HCatFieldSchema.Type.INT,"")); + schema.append(new HCatFieldSchema("astring", HCatFieldSchema.Type.STRING,"")); + schema.append(new HCatFieldSchema("amap", HCatFieldSchema.Type.MAP,HCatFieldSchema.Type.STRING,subSchema,"")); + return schema; + } + + @Test + public void testDeserialize() throws IOException { + HBaseSerDeResultConverter converter = new HBaseSerDeResultConverter(createHCatSchema(), + null, + createProperties()); + //test integer + Result result = new Result(new KeyValue[]{new KeyValue(Bytes.toBytes("row"), + Bytes.toBytes("my_family"), + Bytes.toBytes("my_qualifier1"), + 0, + //This is how Hive's SerDe serializes numbers + Bytes.toBytes("123")), + //test string + new KeyValue(Bytes.toBytes("row"), + Bytes.toBytes("my_family"), + Bytes.toBytes("my_qualifier2"), + 0, + Bytes.toBytes("onetwothree")), + //test family map + new KeyValue(Bytes.toBytes("row"), + Bytes.toBytes("my_family2"), + Bytes.toBytes("one"), + 0, + Bytes.toBytes("1")), + new KeyValue(Bytes.toBytes("row"), + Bytes.toBytes("my_family2"), + Bytes.toBytes("two"), + 0, + Bytes.toBytes("2"))}); + + HCatRecord record = converter.convert(result); + + assertEquals(Bytes.toString(result.getRow()), record.get(0).toString()); + assertEquals(Integer.valueOf( + Bytes.toString( + result.getValue(Bytes.toBytes("my_family"), Bytes.toBytes("my_qualifier1")))), + record.get(1)); + assertEquals(Bytes.toString( + result.getValue(Bytes.toBytes("my_family"), Bytes.toBytes("my_qualifier2"))), + record.get(2).toString()); + Map recordMap = (Map)record.get(3); + Map familyMap = result.getFamilyMap(Bytes.toBytes("my_family2")); + assertEquals(Integer.valueOf( + Bytes.toString( + familyMap.get(Bytes.toBytes("one")))), + recordMap.get("one")); + assertEquals(Integer.valueOf( + Bytes.toString( + familyMap.get(Bytes.toBytes("two")))), + recordMap.get("two")); + } + + @Test + public void testSerialize() throws IOException { + HCatSchema schema = createHCatSchema(); + HBaseSerDeResultConverter converter = new HBaseSerDeResultConverter(schema, + null, + createProperties()); + HCatRecord in = new DefaultHCatRecord(4); + //row key + in.set(0,"row"); + //test integer + in.set(1,123); + //test string + in.set(2,"onetwothree"); + //test map + Map map = new HashMap(); + map.put("one",1); + map.put("two",2); + in.set(3,map); + + Put put = converter.convert(in); + + assertEquals(in.get(0).toString(),Bytes.toString(put.getRow())); + assertEquals(in.get(1), + Integer.valueOf( + Bytes.toString( + put.get(Bytes.toBytes("my_family"), + Bytes.toBytes("my_qualifier1")).get(0).getValue()))); + assertEquals(1l, + put.get(Bytes.toBytes("my_family"), + Bytes.toBytes("my_qualifier1")).get(0).getTimestamp()); + assertEquals(in.get(2), + Bytes.toString( + put.get(Bytes.toBytes("my_family"), + Bytes.toBytes("my_qualifier2")).get(0).getValue())); + assertEquals(1l, + put.get(Bytes.toBytes("my_family"), + Bytes.toBytes("my_qualifier2")).get(0).getTimestamp()); + assertEquals(map.get("one"), + Integer.valueOf( + Bytes.toString( + put.get(Bytes.toBytes("my_family2"), + Bytes.toBytes("one")).get(0).getValue()))); + assertEquals(1l, + put.get(Bytes.toBytes("my_family2"), + Bytes.toBytes("one")).get(0).getTimestamp()); + assertEquals(map.get("two"), + Integer.valueOf(Bytes.toString( + put.get("my_family2".getBytes(), + "two".getBytes()).get(0).getValue()))); + assertEquals(1l, + put.get(Bytes.toBytes("my_family2"), + Bytes.toBytes("two")).get(0).getTimestamp()); + } +}