Index: contrib/build.xml
===================================================================
--- contrib/build.xml (revision 806941)
+++ contrib/build.xml (working copy)
@@ -24,6 +24,7 @@
+
@@ -66,6 +67,8 @@
+
+
+
+ columnFamilies = new HashSet();
+ // Check the hbase columns and get all the families
+ Map serdeParam = tbl.getSd().getSerdeInfo().getParameters();
+ String hbaseColumnStr = serdeParam.get(HBaseSerDe.HBASE_SCHEMA_MAPPING);
+ if (hbaseColumnStr == null)
+ throw new MetaException("No schema mapping defined in Serde.");
+ String[] hbaseColumns = hbaseColumnStr.split(",");
+ for(String hbaseColumn : hbaseColumns) {
+ int idx = hbaseColumn.indexOf(":");
+ if (idx < 0)
+ throw new MetaException(hbaseColumn + " is not a qualified hbase column.");
+ columnFamilies.add(hbaseColumn.substring(0, idx + 1));
+ }
+
+ // Check if the given hbase table existes
+ HTableDescriptor tblDesc;
+
+ if (!getHBaseAdmin().tableExists(tblName)) {
+ // it is not a external table create one
+ if (!isExternal) {
+ // Create the all column descriptors
+ tblDesc = new HTableDescriptor(tblName);
+ for (String cf : columnFamilies) {
+ tblDesc.addFamily(new HColumnDescriptor(cf));
+ }
+
+ getHBaseAdmin().createTable(tblDesc);
+ } else { // an external table
+ throw new MetaException("HBase table " + tblName +
+ " doesn't exist while the table is declared as an external table.");
+ }
+
+ } else { // make sure the schema mapping is right
+ tblDesc = getHBaseAdmin().getTableDescriptor(tblName);
+ for (String cf : columnFamilies) {
+ if(!tblDesc.hasFamily(Bytes.toBytes(cf)))
+ throw new MetaException("Column Family " + cf + " is not defined in hbase table " + tblName);
+ }
+ }
+ // ensure the table is online
+ new HTable(hbaseConf, tblDesc.getName());
+ } catch (MasterNotRunningException mnre) {
+ throw new MetaException(StringUtils.stringifyException(mnre));
+ } catch (IOException ie) {
+ throw new MetaException(StringUtils.stringifyException(ie));
+ }
+ }
+
+ @Override
+ public void rollback_addPartition(Partition part) throws MetaException {
+ // do nothing now
+ }
+
+ @Override
+ public void rollback_alterPartition(Table tbl, Partition newPart)
+ throws MetaException {
+ // do nothing now
+ }
+
+ @Override
+ public void rollback_alterTable(Table oldTable, Table newTable)
+ throws MetaException {
+ // do nothing now
+ }
+
+ @Override
+ public void rollback_createTable(Table table, boolean isExternal) throws MetaException {
+ try {
+ if (!isExternal && getHBaseAdmin().tableExists(table.getTableName())) {
+ // we have create an hbase table, so we delete it to roll back;
+ if (getHBaseAdmin().isTableEnabled(table.getTableName()))
+ getHBaseAdmin().disableTable(table.getTableName());
+ getHBaseAdmin().deleteTable(table.getTableName());
+ }
+ } catch (IOException ie) {
+ throw new MetaException(StringUtils.stringifyException(ie));
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return hbaseConf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ hbaseConf = new HBaseConfiguration(conf);
+ }
+
+}
Index: contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HBaseSerDe.java
===================================================================
--- contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HBaseSerDe.java (revision 0)
+++ contrib/src/java/org/apache/hadoop/hive/contrib/hbase/HBaseSerDe.java (revision 0)
@@ -0,0 +1,457 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * HBaseSerDe can be used to serialize object into hbase table and
+ * deserialize object from hbase table.
+ */
+public class HBaseSerDe implements SerDe {
+
+ public static final String HBASE_SCHEMA_MAPPING = "hbase.columns.mapping";
+
+ public static final Log LOG = LogFactory.getLog(
+ HBaseSerDe.class.getName());
+
+ public static class HBaseSerDeParameters {
+ List hbaseColumnNames;
+ SerDeParameters serdeParams;
+
+ public List getHBaseColumnNames() {
+ return hbaseColumnNames;
+ }
+
+ public SerDeParameters getSerDeParameters() {
+ return serdeParams;
+ }
+ }
+
+ private ObjectInspector mCachedObjectInspector;
+ HBaseSerDeParameters mHBaseSerDeParameters = null;
+ private boolean useJSONSerialize; // use json to serialize
+
+ public String toString() {
+ return getClass().toString()
+ + "["
+ + mHBaseSerDeParameters.hbaseColumnNames
+ + ":"
+ + ((StructTypeInfo) mHBaseSerDeParameters.serdeParams.getRowTypeInfo())
+ .getAllStructFieldNames()
+ + ":"
+ + ((StructTypeInfo) mHBaseSerDeParameters.serdeParams.getRowTypeInfo())
+ .getAllStructFieldTypeInfos() + "]";
+ }
+
+ public HBaseSerDe() throws SerDeException {
+ }
+
+ /**
+ * Initialize the SerDe given parameters.
+ * @see SerDe#initialize(Configuration, Properties)
+ */
+ public void initialize(Configuration conf, Properties tbl)
+ throws SerDeException {
+ mHBaseSerDeParameters = HBaseSerDe.initHBaseSerDeParameters(conf, tbl,
+ getClass().getName());
+
+ // We just used columnNames & columnTypes these two parameters
+ mCachedObjectInspector = LazyFactory.createLazyStructInspector(
+ mHBaseSerDeParameters.serdeParams.getColumnNames(),
+ mHBaseSerDeParameters.serdeParams.getColumnTypes(),
+ mHBaseSerDeParameters.serdeParams.getSeparators(),
+ mHBaseSerDeParameters.serdeParams.getNullSequence(),
+ mHBaseSerDeParameters.serdeParams.isLastColumnTakesRest(),
+ mHBaseSerDeParameters.serdeParams.isEscaped(),
+ mHBaseSerDeParameters.serdeParams.getEscapeChar());
+
+ mCachedHBaseRow = new LazyHBaseRow((LazySimpleStructObjectInspector)mCachedObjectInspector);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("HBaseSerDe initialized with : columnNames = "
+ + mHBaseSerDeParameters.serdeParams.getColumnNames() + " columnTypes = "
+ + mHBaseSerDeParameters.serdeParams.getColumnTypes() + " hbaseColumnMapping = "
+ + mHBaseSerDeParameters.hbaseColumnNames);
+ }
+ }
+
+ public static HBaseSerDeParameters initHBaseSerDeParameters(
+ Configuration job, Properties tbl, String serdeName)
+ throws SerDeException {
+ HBaseSerDeParameters serdeParams = new HBaseSerDeParameters();
+
+ // Read Configuration Parameter
+ String hbaseColumnNameProperty = tbl.getProperty(HBaseSerDe.HBASE_SCHEMA_MAPPING);
+ String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
+
+ // Initial the hbase column list
+ if (hbaseColumnNameProperty != null && hbaseColumnNameProperty.length() > 0) {
+ serdeParams.hbaseColumnNames = Arrays.asList(hbaseColumnNameProperty.split(","));
+ } else {
+ serdeParams.hbaseColumnNames = new ArrayList();
+ }
+
+ // Add the hbase key to the columnNameList and columnTypeList
+
+ // Build the type property string
+ if (columnTypeProperty == null) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(Constants.STRING_TYPE_NAME);
+
+ for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) {
+ String colName = serdeParams.hbaseColumnNames.get(i);
+ if(colName.endsWith(":"))
+ sb.append(":").append(Constants.MAP_TYPE_NAME + "<" +
+ Constants.STRING_TYPE_NAME + "," + Constants.STRING_TYPE_NAME + ">");
+ else
+ sb.append(":").append(Constants.STRING_TYPE_NAME);
+ }
+ tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString());
+ }
+
+ serdeParams.serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
+
+ if (serdeParams.hbaseColumnNames.size() + 1 != serdeParams.serdeParams.getColumnNames().size()) {
+ throw new SerDeException(serdeName + ": columns has " +
+ serdeParams.serdeParams.getColumnNames().size() +
+ " elements while hbase.columns.mapping has " +
+ serdeParams.hbaseColumnNames.size() + " elements!");
+ }
+
+ // check the mapping schema is right?
+ // we just can make sure that "columnfamily:" is mapped to MAP
+ for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) {
+ String hbaseColName = serdeParams.hbaseColumnNames.get(i);
+ if(hbaseColName.endsWith(":")) {
+ TypeInfo typeInfo = serdeParams.serdeParams.getColumnTypes().get(i+1);
+ if(typeInfo.getCategory() == Category.MAP &&
+ ((MapTypeInfo)typeInfo).getMapKeyTypeInfo().getTypeName() != Constants.STRING_TYPE_NAME) {
+ throw new SerDeException(serdeName + ": hbase column family '" +
+ hbaseColName + "' should be mapped to Map while being mapped to " +
+ ((MapTypeInfo)typeInfo).getMapKeyTypeInfo().getTypeName());
+ }
+ }
+ }
+
+ return serdeParams;
+ }
+
+ // The object for storing hbase row data.
+ LazyHBaseRow mCachedHBaseRow;
+
+ /**
+ * Deserialize a row from the HBase RowResult writable to a LazyObject
+ * @param rowResult the HBase RowResult Writable contain a row
+ * @return the deserialized object
+ * @see SerDe#deserialize(Writable)
+ */
+ public Object deserialize(Writable rowResult) throws SerDeException {
+
+ if (!(rowResult instanceof RowResult)) {
+ throw new SerDeException(getClass().getName() + ": expects RowResult!");
+ }
+
+ RowResult rr = (RowResult)rowResult;
+ mCachedHBaseRow.init(rr, mHBaseSerDeParameters.hbaseColumnNames);
+ return mCachedHBaseRow;
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return mCachedObjectInspector;
+ }
+
+ BatchUpdate serializeCache = null;
+ ByteStream.Output serializeStream = new ByteStream.Output();
+
+ @Override
+ public Class extends Writable> getSerializedClass() {
+ return BatchUpdate.class;
+ }
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector)
+ throws SerDeException {
+ if (objInspector.getCategory() != Category.STRUCT) {
+ throw new SerDeException(getClass().toString()
+ + " can only serialize struct types, but we got: "
+ + objInspector.getTypeName());
+ }
+
+ // Prepare the field ObjectInspectors
+ StructObjectInspector soi = (StructObjectInspector)objInspector;
+ List extends StructField> fields = soi.getAllStructFieldRefs();
+ List