diff --git hbase-handler/pom.xml hbase-handler/pom.xml index 7c3524c..9aca819 100644 --- hbase-handler/pom.xml +++ hbase-handler/pom.xml @@ -222,6 +222,19 @@ ${basedir}/src/java ${basedir}/src/test + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java index 5008f15..b033b5b 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java @@ -18,18 +18,31 @@ package org.apache.hadoop.hive.hbase; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; import org.apache.hadoop.hive.serde2.lazy.LazyStruct; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.mapred.JobConf; + +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; /** * HBaseCompositeKey extension of LazyStruct. All complex composite keys should extend this class @@ -52,9 +65,11 @@ * *

* - * */ + */ public class HBaseCompositeKey extends LazyStruct { + protected static final Log LOG = LogFactory.getLog(HBaseCompositeKey.class); + public HBaseCompositeKey(LazySimpleStructObjectInspector oi) { super(oi); } @@ -96,4 +111,101 @@ public HBaseCompositeKey(LazySimpleStructObjectInspector oi) { return lazyObject; } + + protected DecomposedPredicate decomposePredicate(JobConf jobConf, + Deserializer deserializer, ExprNodeDesc predicate) throws Exception { + return null; + } + + /** + * Returns the optional {@link Filter row filter} to be applied to the + * {@link org.apache.hadoop.hbase.client.Scan scans} to + * filter out unwanted row keys + * + *

+ * Note 1: For cases where the keys are salted, it is highly recommended that a custom filter be provided. + * The reason being for a scan to work properly for salted keys, multiple scans are required and + * the current implementation of hbase integration does not support that. + *

+ * + *

+ * Note 2: Since currently hive supports only pushdown for the first field in the struct, hence the value of + * fieldID will always be 0. Additional support for more fields will be added in future. + *

+ * + * @param fieldName + * id for the value + * @param values + * values that are parts of the row key + * @return {@link Filter filter} that can be applied to the {@link Scan scans}. By default, + * returns {@code null}. Can be overridden to provide a custom {@link Filter filter} + * */ + protected Filter getFilter(String fieldName, Object... values) { + return null; + } + + /** + * HBaseKeyFactory implementation providing HBaseCompositeKey as a row key + */ + public static class HBaseCompositeKeyFactory + implements HBaseKeyFactory, HiveStoragePredicateHandler, Configurable { + + private final Class keyClass; + private final Constructor constructor; + + private Configuration conf; + private Properties properties; + private SerDeParameters parameters; + + public HBaseCompositeKeyFactory(Class keyClass) throws Exception { + // see javadoc of HBaseCompositeKey + this.keyClass = keyClass; + this.constructor = keyClass.getDeclaredConstructor( + LazySimpleStructObjectInspector.class, Properties.class, Configuration.class); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void init(SerDeParameters parameters, Properties properties) { + this.parameters = parameters; + this.properties = properties; + } + + @Override + public T createObject(ObjectInspector inspector) throws SerDeException { + try { + return (T) constructor.newInstance(inspector, properties, conf); + } catch (Exception e) { + throw new SerDeException(e); + } + } + + @Override + public ObjectInspector createObjectInspector(TypeInfo type) + throws SerDeException { + return LazyFactory.createLazyObjectInspector(type, + parameters.getSeparators(), 1, parameters.getNullSequence(), + parameters.isEscaped(), parameters.getEscapeChar()); + } + + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + ExprNodeDesc predicate) { + try { + return createObject(null).decomposePredicate(jobConf, deserializer, predicate); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } + } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java new file mode 100644 index 0000000..80834eb --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.util.Properties; + +/** + * Provides custom implementation of object and object inspector for hbase key. + * key object should implement LazyObjectBase + */ +public interface HBaseKeyFactory { + + /** + * initialize factory with properties + */ + void init(SerDeParameters parameters, Properties properties) throws SerDeException; + + /** + * create custom object inspector for hbase key + * @param type type information + */ + ObjectInspector createObjectInspector(TypeInfo type) throws SerDeException; + + /** + * create custom object for hbase key + * @param inspector OI create by {@link HBaseKeyFactory#createObjectInspector} + */ + LazyObjectBase createObject(ObjectInspector inspector) throws SerDeException; +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java new file mode 100644 index 0000000..d466ff6 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.Text; + +import java.util.ArrayList; +import java.util.List; + +// Does same thing with LazyFactory#createLazyObjectInspector except that this replaces +// original keyOI with OI which is create by HBaseKeyFactory provided by serde property for hbase +public class HBaseLazyObjectFactory { + + public static ObjectInspector createLazyHBaseStructInspector( + List columnNames, List typeInfos, byte[] separators, + Text nullSequence, boolean lastColumnTakesRest, boolean escaped, + byte escapeChar, int index, HBaseKeyFactory factory) throws SerDeException { + ArrayList columnObjectInspectors = new ArrayList( + typeInfos.size()); + for (int i = 0; i < typeInfos.size(); i++) { + if (i == index && factory != null) { + columnObjectInspectors.add(factory.createObjectInspector(typeInfos.get(i))); + } else { + columnObjectInspectors.add(LazyFactory.createLazyObjectInspector( + typeInfos.get(i), separators, 1, nullSequence, escaped, escapeChar)); + } + } + return LazyObjectInspectorFactory.getLazySimpleStructObjectInspector( + columnNames, columnObjectInspectors, separators[0], nullSequence, + lastColumnTakesRest, escaped, escapeChar); + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java new file mode 100644 index 0000000..f5d1c27 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.TimestampsFilter; +import org.apache.hadoop.hbase.filter.ValueFilter; +import org.apache.hadoop.io.BytesWritable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class HBaseScanRange implements Serializable { + + private byte[] startRow; + private byte[] stopRow; + + private List filterDescs = new ArrayList(); + + public byte[] getStartRow() { + return startRow; + } + + public void setStartRow(byte[] startRow) { + this.startRow = startRow; + } + + public byte[] getStopRow() { + return stopRow; + } + + public void setStopRow(byte[] stopRow) { + this.stopRow = stopRow; + } + + public void addFilter(Filter filter) throws IOException { + filterDescs.add(new FilterDesc(filter.getClass().getName(), filter.toByteArray())); + } + + public void setup(Scan scan) throws DeserializationException { + if (startRow != null) { + scan.setStartRow(startRow); + } + if (stopRow != null) { + scan.setStopRow(stopRow); + } + if (filterDescs.isEmpty()) { + return; + } + if (filterDescs.size() == 1) { + scan.setFilter(filterDescs.get(0).toFilter()); + return; + } + List filters = new ArrayList(); + for (FilterDesc filter : filterDescs) { + filters.add(filter.toFilter()); + } + scan.setFilter(new FilterList(filters)); + } + + public String toString() { + return (startRow == null ? "" : new BytesWritable(startRow).toString()) + " ~ " + + (stopRow == null ? "" : new BytesWritable(stopRow).toString()); + } + + private static class FilterDesc implements Serializable { + + private String className; + private byte[] binary; + + public FilterDesc(String className, byte[] binary) { + this.className = className; + this.binary = binary; + } + + public Filter toFilter() throws DeserializationException { + if (className.endsWith("ValueFilter")) { + return ValueFilter.parseFrom(binary); + } else if (className.endsWith("TimestampsFilter")) { + return TimestampsFilter.parseFrom(binary); + } else if (className.endsWith("ColumnRangeFilter")) { + return ColumnRangeFilter.parseFrom(binary); + } else if (className.endsWith("FamilyFilter")) { + return FamilyFilter.parseFrom(binary); + } + throw new IllegalArgumentException("Invalid type " + className); + } + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 2cd65cb..bfad694 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -19,20 +19,19 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; import org.apache.hadoop.hive.serde2.lazy.LazyUtils; @@ -54,6 +53,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; /** * HBaseSerDe can be used to serialize object into an HBase table and @@ -67,6 +67,7 @@ public static final String HBASE_KEY_COL = ":key"; public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp"; public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class"; + public static final String HBASE_COMPOSITE_KEY_FACTORY = "hbase.composite.key.factory"; public static final String HBASE_SCAN_CACHE = "hbase.scan.cache"; public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock"; public static final String HBASE_SCAN_BATCH = "hbase.scan.batch"; @@ -87,8 +88,7 @@ private final ByteStream.Output serializeStream = new ByteStream.Output(); private int iKey; private long putTimestamp; - private Class compositeKeyClass; - private Object compositeKeyObj; + private HBaseKeyFactory keyFactory; // used for serializing a field private byte [] separators; // the separators array @@ -123,22 +123,19 @@ public void initialize(Configuration conf, Properties tbl) initHBaseSerDeParameters(conf, tbl, getClass().getName()); - cachedObjectInspector = LazyFactory.createLazyStructInspector( + cachedObjectInspector = HBaseLazyObjectFactory.createLazyHBaseStructInspector( serdeParams.getColumnNames(), serdeParams.getColumnTypes(), serdeParams.getSeparators(), serdeParams.getNullSequence(), serdeParams.isLastColumnTakesRest(), serdeParams.isEscaped(), - serdeParams.getEscapeChar()); + serdeParams.getEscapeChar(), + iKey, keyFactory + ); cachedHBaseRow = new LazyHBaseRow( - (LazySimpleStructObjectInspector) cachedObjectInspector); - - if (compositeKeyClass != null) { - // initialize the constructor of the composite key class with its object inspector - initCompositeKeyClass(conf,tbl); - } + (LazySimpleStructObjectInspector) cachedObjectInspector, iKey, keyFactory); if (LOG.isDebugEnabled()) { LOG.debug("HBaseSerDe initialized with : columnNames = " @@ -440,8 +437,7 @@ public static boolean isRowKeyColumn(String hbaseColumnName) { return hbaseColumnName.equals(HBASE_KEY_COL); } - - static class ColumnMapping { + public static class ColumnMapping { ColumnMapping() { binaryStorage = new ArrayList(2); @@ -469,16 +465,6 @@ private void initHBaseSerDeParameters( doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true")); - String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); - - if (compKeyClass != null) { - try { - compositeKeyClass = job.getClassByName(compKeyClass); - } catch (ClassNotFoundException e) { - throw new SerDeException(e); - } - } - // Parse and initialize the HBase columns mapping columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); @@ -547,6 +533,29 @@ private void initHBaseSerDeParameters( String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE); parseColumnStorageTypes(hbaseTableStorageType); setKeyColumnOffset(); + + try { + keyFactory = createKeyFactory(job, tbl); + if (keyFactory != null) { + keyFactory.init(serdeParams, tbl); + } + } catch (Exception e) { + throw new SerDeException(e); + } + } + + private static HBaseKeyFactory createKeyFactory(Configuration job, Properties tbl) throws Exception { + String factoryClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_FACTORY); + if (factoryClassName != null) { + Class factoryClazz = Class.forName(factoryClassName); + return (HBaseKeyFactory) ReflectionUtils.newInstance(factoryClazz, job); + } + String keyClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + if (keyClassName != null) { + Class keyClass = Class.forName(keyClassName); + return new HBaseCompositeKey.HBaseCompositeKeyFactory(keyClass); + } + return null; } /** @@ -562,7 +571,7 @@ public Object deserialize(Writable result) throws SerDeException { throw new SerDeException(getClass().getName() + ": expects ResultWritable!"); } - cachedHBaseRow.init(((ResultWritable) result).getResult(), columnsMapping, compositeKeyObj); + cachedHBaseRow.init(((ResultWritable) result).getResult(), columnsMapping); return cachedHBaseRow; } @@ -692,8 +701,11 @@ public Writable serialize(Object obj, ObjectInspector objInspector) // the data to JSON string. Otherwise serialize the data in the // delimited way. serializeStream.reset(); + boolean isNotNull; - if (!foi.getCategory().equals(Category.PRIMITIVE) + if (i == iKey && usesCustomWritableKey()) { + isNotNull = ((HBaseWritableKeyFactory) keyFactory).serialize(f, foi, serializeStream); + } else if (!foi.getCategory().equals(Category.PRIMITIVE) && (declaredFields == null || declaredFields.get(i).getFieldObjectInspector().getCategory() .equals(Category.PRIMITIVE) || useJSONSerialize)) { @@ -722,6 +734,22 @@ public Writable serialize(Object obj, ObjectInspector objInspector) return null; } + public boolean usesCustomWritableKey() { + return keyFactory instanceof HBaseWritableKeyFactory; + } + + public boolean usesCustomPredicateAnalyzer() { + return keyFactory instanceof HiveStoragePredicateHandler; + } + + public HBaseKeyFactory getKeyFactory() { + return keyFactory; + } + + public SerDeParameters getSerdeParams() { + return serdeParams; + } + /* * Serialize the row into a ByteStream. * @@ -828,48 +856,6 @@ private boolean serialize( } /** - * Initialize the composite key class with the objectinspector for the key - * - * @throws SerDeException - * */ - private void initCompositeKeyClass(Configuration conf,Properties tbl) throws SerDeException { - - int i = 0; - - // find the hbase row key - for (ColumnMapping colMap : columnsMapping) { - if (colMap.hbaseRowKey) { - break; - } - i++; - } - - ObjectInspector keyObjectInspector = ((LazySimpleStructObjectInspector) cachedObjectInspector) - .getAllStructFieldRefs().get(i).getFieldObjectInspector(); - - try { - compositeKeyObj = compositeKeyClass.getDeclaredConstructor( - LazySimpleStructObjectInspector.class, Properties.class, Configuration.class) - .newInstance( - ((LazySimpleStructObjectInspector) keyObjectInspector), tbl, conf); - } catch (IllegalArgumentException e) { - throw new SerDeException(e); - } catch (SecurityException e) { - throw new SerDeException(e); - } catch (InstantiationException e) { - throw new SerDeException(e); - } catch (IllegalAccessException e) { - throw new SerDeException(e); - } catch (InvocationTargetException e) { - throw new SerDeException(e); - } catch (NoSuchMethodException e) { - // the constructor wasn't defined in the implementation class. Flag error - throw new SerDeException("Constructor not defined in composite key class [" - + compositeKeyClass.getName() + "]", e); - } - } - - /** * @return the useJSONSerialize */ public boolean isUseJSONSerialize() { @@ -918,4 +904,15 @@ public static int getRowKeyColumnOffset(List columnsMapping) throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" + " row key column."); } + + public static TypeInfo getTypeForName(SerDeParameters parameters, String columnName) { + List columnNames = parameters.getColumnNames(); + for (int i = 0; i < columnNames.size(); i++) { + if (columnName.equals(columnNames.get(i))) { + return parameters.getColumnTypes().get(i); + } + } + throw new IllegalArgumentException("Invalid column name " + columnName); + } + } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 29e5da5..3028348 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -403,12 +403,17 @@ public DecomposedPredicate decomposePredicate( Deserializer deserializer, ExprNodeDesc predicate) { + HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer; + if (hbaseSerde.usesCustomPredicateAnalyzer()) { + HiveStoragePredicateHandler handler = (HiveStoragePredicateHandler) hbaseSerde.getKeyFactory(); + return handler.decomposePredicate(jobConf, deserializer, predicate); + } + String columnNameProperty = jobConf.get( org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS); List columnNames = Arrays.asList(columnNameProperty.split(",")); - HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer; int keyColPos = hbaseSerde.getKeyColumnOffset(); String keyColType = jobConf.get(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES). split(",")[keyColPos]; diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseWritableKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseWritableKeyFactory.java new file mode 100644 index 0000000..95b0d71 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseWritableKeyFactory.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +import java.io.IOException; + +public interface HBaseWritableKeyFactory extends HBaseKeyFactory { + + /** + * serialize hive object in internal format of custom key + * + * @param object + * @param inspector + * @param output + * + * @return true if it's not null + * @throws IOException + */ + boolean serialize(Object object, ObjectInspector inspector, ByteStream.Output output) throws IOException; +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 704fcb9..49dea7c 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; @@ -254,6 +255,18 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) throws IOException { Scan scan = new Scan(); + String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); + if (filterObjectSerialized != null) { + HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized, + HBaseScanRange.class); + try { + range.setup(scan); + } catch (DeserializationException e) { + throw new IOException(e); + } + return scan; + } + String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); if (filterExprSerialized == null) { return scan; @@ -325,6 +338,10 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) } scan.setStartRow(startRow); scan.setStopRow(stopRow); + + if (LOG.isDebugEnabled()) { + LOG.debug(Bytes.toStringBinary(startRow) + " ~ " + Bytes.toStringBinary(stopRow)); + } return scan; } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java index fc40195..27b063e 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java @@ -24,15 +24,14 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; -import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.lazy.LazyStruct; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; /** * LazyObject for storing an HBase row. The field of an HBase row can be @@ -45,72 +44,47 @@ */ private Result result; private List columnsMapping; - private Object compositeKeyObj; private ArrayList cachedList; - /** - * Construct a LazyHBaseRow object with the ObjectInspector. - */ + private final int iKey; + private final HBaseKeyFactory keyFactory; + public LazyHBaseRow(LazySimpleStructObjectInspector oi) { - super(oi); + this(oi, -1, null); } /** - * Set the HBase row data(a Result writable) for this LazyStruct. - * @see LazyHBaseRow#init(Result) + * Construct a LazyHBaseRow object with the ObjectInspector. */ - public void init(Result r, List columnsMapping) { - init(r, columnsMapping, null); + public LazyHBaseRow(LazySimpleStructObjectInspector oi, + int iKey, HBaseKeyFactory keyFactory) { + super(oi); + this.iKey = iKey; + this.keyFactory = keyFactory; } /** * Set the HBase row data(a Result writable) for this LazyStruct. - * * @see LazyHBaseRow#init(Result) - * - * @param compositeKeyClass - * custom implementation to interpret the composite key */ - public void init(Result r, List columnsMapping, Object compositeKeyObj) { - - result = r; + public void init(Result r, List columnsMapping) { + this.result = r; this.columnsMapping = columnsMapping; - this.compositeKeyObj = compositeKeyObj; setParsed(false); } - /** - * Parse the Result and fill each field. - * @see LazyStruct#parse() - */ - private void parse() { - - if (getFields() == null) { - List fieldRefs = - ((StructObjectInspector)getInspector()).getAllStructFieldRefs(); - LazyObject [] fields = new LazyObject[fieldRefs.size()]; - - for (int i = 0; i < fields.length; i++) { - ColumnMapping colMap = columnsMapping.get(i); - - if (colMap.qualifierName == null && !colMap.hbaseRowKey) { - // a column family - fields[i] = new LazyHBaseCellMap( - (LazyMapObjectInspector) fieldRefs.get(i).getFieldObjectInspector()); - continue; - } - - fields[i] = LazyFactory.createLazyObject( - fieldRefs.get(i).getFieldObjectInspector(), - colMap.binaryStorage.get(0)); - } - - setFields(fields); - setFieldInited(new boolean[fields.length]); + @Override + protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) throws SerDeException { + if (keyFactory != null && fieldID == iKey) { + return keyFactory.createObject(fieldRef.getFieldObjectInspector()); } - - Arrays.fill(getFieldInited(), false); - setParsed(true); + ColumnMapping colMap = columnsMapping.get(fieldID); + if (colMap.qualifierName == null && !colMap.hbaseRowKey) { + // a column family + return new LazyHBaseCellMap((LazyMapObjectInspector) fieldRef.getFieldObjectInspector()); + } + return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector(), + colMap.binaryStorage.get(0)); } /** @@ -127,16 +101,17 @@ private void parse() { */ @Override public Object getField(int fieldID) { - if (!getParsed()) { - parse(); - } - - Object value = uncheckedGetField(fieldID); + initFields(); + return uncheckedGetField(fieldID); + } - if (columnsMapping.get(fieldID).hbaseRowKey && compositeKeyObj != null) { - return compositeKeyObj; - } else { - return value; + private void initFields() { + if (getFields() == null) { + initLazyFields(oi.getAllStructFieldRefs()); + } + if (!getParsed()) { + Arrays.fill(getFieldInited(), false); + setParsed(true); } } @@ -149,7 +124,7 @@ public Object getField(int fieldID) { */ private Object uncheckedGetField(int fieldID) { - LazyObject [] fields = getFields(); + LazyObjectBase[] fields = getFields(); boolean [] fieldsInited = getFieldInited(); if (!fieldsInited[fieldID]) { @@ -182,12 +157,6 @@ private Object uncheckedGetField(int fieldID) { if (ref != null) { fields[fieldID].init(ref, 0, ref.getData().length); - - // if it was a row key and we have been provided a custom composite key class, initialize it - // with the bytes for the row key - if (colMap.hbaseRowKey && compositeKeyObj != null) { - ((LazyStruct) compositeKeyObj).init(ref, 0, ref.getData().length); - } } } @@ -203,9 +172,7 @@ private Object uncheckedGetField(int fieldID) { */ @Override public ArrayList getFieldsAsList() { - if (!getParsed()) { - parse(); - } + initFields(); if (cachedList == null) { cachedList = new ArrayList(); } else { diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java index 13c344b..a44f3d3 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java @@ -18,12 +18,30 @@ package org.apache.hadoop.hive.hbase; -import java.util.Properties; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.mapred.JobConf; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; public class HBaseTestCompositeKey extends HBaseCompositeKey { @@ -54,4 +72,76 @@ public Object getField(int fieldID) { return toLazyObject(fieldID, fieldBytes); } + + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + ExprNodeDesc predicate) throws Exception { + HBaseSerDe baseSerDe = (HBaseSerDe) deserializer; + LazySimpleSerDe.SerDeParameters params = baseSerDe.getSerdeParams(); + String keyColName = params.getColumnNames().get(baseSerDe.getKeyColumnOffset()); + + IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(true); + analyzer.allowColumnName(keyColName); + analyzer.setAcceptsFields(true); + analyzer.setFieldValidator(new Validator()); + + DecomposedPredicate decomposed = new DecomposedPredicate(); + + List conditions = new ArrayList(); + decomposed.residualPredicate = + (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, conditions); + if (!conditions.isEmpty()) { + decomposed.pushedPredicate = analyzer.translateSearchConditions(conditions); + decomposed.pushedPredicateObject = setupFilter(keyColName, conditions); + } + return decomposed; + } + + private Serializable setupFilter(String keyColName, List conditions) + throws Exception { + HBaseScanRange scanRange = new HBaseScanRange(); + for (IndexSearchCondition condition : conditions) { + if (condition.getFields() == null) { + continue; + } + Object value = condition.getConstantDesc().getValue(); + Filter filter = getFilter(condition.getFields()[0], value); + if (filter != null) { + scanRange.addFilter(filter); + } + } + return scanRange; + } + + @Override + protected Filter getFilter(String fieldName, Object... values) { + return new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(fieldName.getBytes())); + } + + private static class Validator implements IndexPredicateAnalyzer.FieldValidator { + + public boolean validate(ExprNodeFieldDesc fieldDesc) { + String fieldName = fieldDesc.getFieldName(); + + ExprNodeDesc nodeDesc = fieldDesc.getDesc(); + + TypeInfo typeInfo = nodeDesc.getTypeInfo(); + + if (!(typeInfo instanceof StructTypeInfo)) { + // since we are working off a ExprNodeFieldDesc which represents a field within a struct, this + // should never happen + throw new AssertionError("Expected StructTypeInfo. Found:" + typeInfo.getTypeName()); + } + + List allFieldNames = ((StructTypeInfo) typeInfo).getAllStructFieldNames(); + + if (allFieldNames == null || allFieldNames.size() == 0) { + return false; + } + + String firstElement = allFieldNames.get(0); + + return firstElement.equals(fieldName); + } + } } diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java new file mode 100644 index 0000000..5e9c421 --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.StructObjectBaseInspector; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +public class TestHBaseKeyFactory implements HBaseWritableKeyFactory { + + private static final String DELIMITER_PATTERN = "\\$\\$"; + private static final byte[] DELIMITER_BINARY = "$$".getBytes(); + + @Override + public void init(LazySimpleSerDe.SerDeParameters parameters, Properties properties) throws SerDeException { + } + + @Override + public ObjectInspector createObjectInspector(TypeInfo type) { + return new SlashSeparatedOI((StructTypeInfo)type); + } + + @Override + public LazyObjectBase createObject(ObjectInspector inspector) throws SerDeException { + return new DoubleDollarSeparated(); + } + + + @Override + public boolean serialize(Object object, ObjectInspector inspector, ByteStream.Output output) throws IOException { + if (inspector.getCategory() != ObjectInspector.Category.STRUCT) { + throw new IllegalStateException("invalid type value " + inspector.getTypeName()); + } + int current = output.getCount(); + for (Object element : ((StructObjectInspector)inspector).getStructFieldsDataAsList(object)) { + if (output.getCount() > current) { + output.write(DELIMITER_BINARY); + } + output.write(String.valueOf(element).getBytes()); + } + return output.getCount() > current; + } + + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { + return null; + } + + private static class DoubleDollarSeparated implements LazyObjectBase { + + private Object[] fields; + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + fields = new String(bytes.getData(), start, length).split(DELIMITER_PATTERN); + } + + @Override + public Object getObject() { + return this; + } + } + + private static class SlashSeparatedOI extends StructObjectBaseInspector { + + private int length; + + private SlashSeparatedOI(StructTypeInfo type) { + List names = type.getAllStructFieldNames(); + List ois = new ArrayList(); + for (int i = 0; i < names.size(); i++) { + ois.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + } + init(names, ois, null); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + return ((DoubleDollarSeparated)data).fields[((MyField)fieldRef).getFieldID()]; + } + + @Override + public List getStructFieldsDataAsList(Object data) { + return Arrays.asList(((DoubleDollarSeparated)data).fields); + } + } +} diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java new file mode 100644 index 0000000..00651ce --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.StructObjectBaseInspector; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class TestHBaseKeyFactory2 implements HBaseWritableKeyFactory, HiveStoragePredicateHandler { + + private static final int FIXED_LENGTH = 10; + + private SerDeParameters parameters; + + @Override + public void init(SerDeParameters parameters, Properties properties) throws SerDeException { + this.parameters = parameters; + } + + @Override + public ObjectInspector createObjectInspector(TypeInfo type) { + return new StringArrayOI((StructTypeInfo)type); + } + + @Override + public LazyObjectBase createObject(ObjectInspector inspector) throws SerDeException { + return new FixedLengthed(FIXED_LENGTH); + } + + @Override + public boolean serialize(Object object, ObjectInspector inspector, ByteStream.Output output) throws IOException { + if (inspector.getCategory() != ObjectInspector.Category.STRUCT) { + throw new IllegalStateException("invalid type value " + inspector.getTypeName()); + } + int current = output.getCount(); + for (Object element : ((StructObjectInspector)inspector).getStructFieldsDataAsList(object)) { + output.write(toBinary(String.valueOf(element).getBytes(), FIXED_LENGTH, false, false)); + } + return output.getCount() > current; + } + + private byte[] toBinary(String value, int max, boolean end, boolean nextBA) { + return toBinary(value.getBytes(), max, end, nextBA); + } + + private byte[] toBinary(byte[] value, int max, boolean end, boolean nextBA) { + byte[] bytes = new byte[max + 1]; + System.arraycopy(value, 0, bytes, 0, Math.min(value.length, max)); + if (end) { + Arrays.fill(bytes, value.length, max, (byte)0xff); + } + if (nextBA) { + bytes[max] = 0x01; + } + return bytes; + } + + private byte[] getNextBA(Text value) { + return Arrays.copyOfRange(value.getBytes(), 0, value.getLength() + 1); + } + + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + ExprNodeDesc predicate) { + HBaseSerDe baseSerDe = (HBaseSerDe) deserializer; + LazySimpleSerDe.SerDeParameters params = baseSerDe.getSerdeParams(); + String keyColName = params.getColumnNames().get(baseSerDe.getKeyColumnOffset()); + + IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(false); + analyzer.allowColumnName(keyColName); + analyzer.setAcceptsFields(true); + + DecomposedPredicate decomposed = new DecomposedPredicate(); + + List searchConditions = new ArrayList(); + decomposed.residualPredicate = + (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions); + if (!searchConditions.isEmpty()) { + decomposed.pushedPredicate = analyzer.translateSearchConditions(searchConditions); + try { + decomposed.pushedPredicateObject = setupFilter(keyColName, searchConditions); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return decomposed; + } + + private HBaseScanRange setupFilter(String keyColName, List conditions) + throws IOException { + Map> fieldConds = + new HashMap>(); + for (IndexSearchCondition condition : conditions) { + assert keyColName.equals(condition.getColumnDesc().getColumn()); + String fieldName = condition.getFields()[0]; + List fieldCond = fieldConds.get(fieldName); + if (fieldCond == null) { + fieldConds.put(fieldName, fieldCond = new ArrayList()); + } + fieldCond.add(condition); + } + HBaseScanRange range = new HBaseScanRange(); + + ByteArrayOutputStream startRow = new ByteArrayOutputStream(); + ByteArrayOutputStream stopRow = new ByteArrayOutputStream(); + + StructTypeInfo type = (StructTypeInfo) HBaseSerDe.getTypeForName(parameters, keyColName); + for (String name : type.getAllStructFieldNames()) { + List fieldCond = fieldConds.get(name); + if (fieldCond == null || fieldCond.size() > 2) { + continue; + } + byte[] startElement = null; + byte[] stopElement = null; + for (IndexSearchCondition condition : fieldCond) { + if (condition.getConstantDesc().getValue() == null) { + continue; + } + String comparisonOp = condition.getComparisonOp(); + String constantVal = String.valueOf(condition.getConstantDesc().getValue()); + + if (comparisonOp.endsWith("UDFOPEqual")) { + startElement = toBinary(constantVal, FIXED_LENGTH, false, false); + stopElement = toBinary(constantVal, FIXED_LENGTH, true, true); + } else if (comparisonOp.endsWith("UDFOPEqualOrGreaterThan")) { + startElement = toBinary(constantVal, FIXED_LENGTH, false, false); + } else if (comparisonOp.endsWith("UDFOPGreaterThan")) { + startElement = toBinary(constantVal, FIXED_LENGTH, false, true); + } else if (comparisonOp.endsWith("UDFOPEqualOrLessThan")) { + stopElement = toBinary(constantVal, FIXED_LENGTH, true, false); + } else if (comparisonOp.endsWith("UDFOPLessThan")) { + stopElement = toBinary(constantVal, FIXED_LENGTH, true, true); + } else { + throw new IOException(comparisonOp + " is not a supported comparison operator"); + } + } + if (startRow != null) { + if (startElement != null) { + startRow.write(startElement); + } else { + if (startRow.size() > 0) { + range.setStartRow(startRow.toByteArray()); + } + startRow = null; + } + } + if (stopRow != null) { + if (stopElement != null) { + stopRow.write(stopElement); + } else { + if (stopRow.size() > 0) { + range.setStopRow(stopRow.toByteArray()); + } + stopRow = null; + } + } + if (startElement == null && stopElement == null) { + break; + } + } + if (startRow != null && startRow.size() > 0) { + range.setStartRow(startRow.toByteArray()); + } + if (stopRow != null && stopRow.size() > 0) { + range.setStopRow(stopRow.toByteArray()); + } + return range; + } + + private static class FixedLengthed implements LazyObjectBase { + + private final int fixedLength; + private final List fields = new ArrayList(); + + public FixedLengthed(int fixedLength) { + this.fixedLength = fixedLength; + } + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + fields.clear(); + byte[] data = bytes.getData(); + int rowStart = start; + int rowStop = rowStart + fixedLength; + for (; rowStart < length; rowStart = rowStop + 1, rowStop = rowStart + fixedLength) { + fields.add(new String(data, rowStart, rowStop - rowStart).trim()); + } + } + + @Override + public Object getObject() { + return this; + } + } + + private static class StringArrayOI extends StructObjectBaseInspector { + + private int length; + + private StringArrayOI(StructTypeInfo type) { + List names = type.getAllStructFieldNames(); + List ois = new ArrayList(); + for (int i = 0; i < names.size(); i++) { + ois.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + } + init(names, ois, null); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + return ((FixedLengthed)data).fields.get(((MyField)fieldRef).getFieldID()); + } + + @Override + public List getStructFieldsDataAsList(Object data) { + return ((FixedLengthed)data).fields; + } + } +} diff --git hbase-handler/src/test/queries/positive/hbase_custom_key.q hbase-handler/src/test/queries/positive/hbase_custom_key.q new file mode 100644 index 0000000..f66081f --- /dev/null +++ hbase-handler/src/test/queries/positive/hbase_custom_key.q @@ -0,0 +1,26 @@ +CREATE TABLE hbase_ck_1(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.class"="org.apache.hadoop.hive.hbase.HBaseTestCompositeKey"); + +CREATE TABLE hbase_ck_2(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory"); + +CREATE EXTERNAL TABLE hbase_ck_3(key string, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string"); + +from src tablesample (1 rows) +insert into table hbase_ck_1 select struct('1000','2000','3000'),'value' +insert into table hbase_ck_2 select struct('1000','2000','3000'),'value'; + +select * from hbase_ck_1; +select * from hbase_ck_2; +select * from hbase_ck_3; diff --git hbase-handler/src/test/queries/positive/hbase_custom_key2.q hbase-handler/src/test/queries/positive/hbase_custom_key2.q new file mode 100644 index 0000000..866b9ef --- /dev/null +++ hbase-handler/src/test/queries/positive/hbase_custom_key2.q @@ -0,0 +1,42 @@ +CREATE TABLE hbase_ck_4(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom2", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory2"); + +CREATE EXTERNAL TABLE hbase_ck_5(key string, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom2", + "hbase.columns.mapping" = ":key,cf:string"); + +from src tablesample (5 rows) +insert into table hbase_ck_4 select +struct( + cast(key as string), + cast(cast(key + 1000 as int) as string), + cast(cast(key + 2000 as int) as string)), +value; + +set hive.fetch.task.conversion=more; + +-- 165,238,27,311,86 +select * from hbase_ck_4; +select * from hbase_ck_5; + +-- 238 +explain +select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238'; +select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238'; + +-- 165,238 +explain +select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27'; +select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27'; + +-- 238,311 +explain +select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238'; +select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238'; + diff --git hbase-handler/src/test/results/positive/hbase_custom_key.q.out hbase-handler/src/test/results/positive/hbase_custom_key.q.out new file mode 100644 index 0000000..653f6b4 --- /dev/null +++ hbase-handler/src/test/results/positive/hbase_custom_key.q.out @@ -0,0 +1,88 @@ +PREHOOK: query: CREATE TABLE hbase_ck_1(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.class"="org.apache.hadoop.hive.hbase.HBaseTestCompositeKey") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE hbase_ck_1(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.class"="org.apache.hadoop.hive.hbase.HBaseTestCompositeKey") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_ck_1 +PREHOOK: query: CREATE TABLE hbase_ck_2(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE hbase_ck_2(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_ck_2 +PREHOOK: query: CREATE EXTERNAL TABLE hbase_ck_3(key string, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE EXTERNAL TABLE hbase_ck_3(key string, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_ck_3 +PREHOOK: query: from src tablesample (1 rows) +insert into table hbase_ck_1 select struct('1000','2000','3000'),'value' +insert into table hbase_ck_2 select struct('1000','2000','3000'),'value' +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_ck_1 +PREHOOK: Output: default@hbase_ck_2 +POSTHOOK: query: from src tablesample (1 rows) +insert into table hbase_ck_1 select struct('1000','2000','3000'),'value' +insert into table hbase_ck_2 select struct('1000','2000','3000'),'value' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_ck_1 +POSTHOOK: Output: default@hbase_ck_2 +PREHOOK: query: select * from hbase_ck_1 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_ck_1 +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_ck_1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_ck_1 +#### A masked pattern was here #### +{"col1":"1","col2":"0","col3":"0"} value +PREHOOK: query: select * from hbase_ck_2 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_ck_2 +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_ck_2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_ck_2 +#### A masked pattern was here #### +{"col1":"1000","col2":"2000","col3":"3000"} value +PREHOOK: query: select * from hbase_ck_3 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_ck_3 +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_ck_3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_ck_3 +#### A masked pattern was here #### +1000$$2000$$3000 value diff --git hbase-handler/src/test/results/positive/hbase_custom_key2.q.out hbase-handler/src/test/results/positive/hbase_custom_key2.q.out new file mode 100644 index 0000000..136c023 Binary files /dev/null and hbase-handler/src/test/results/positive/hbase_custom_key2.q.out differ diff --git itests/util/pom.xml itests/util/pom.xml index 9885c53..695320e 100644 --- itests/util/pom.xml +++ itests/util/pom.xml @@ -46,6 +46,12 @@ org.apache.hive + hive-hbase-handler + ${project.version} + tests + + + org.apache.hive hive-metastore ${project.version} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 5995c14..5bc8f3c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -72,7 +72,6 @@ import java.util.UUID; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; -import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -661,11 +660,7 @@ public static Path getPlanPath(Configuration conf) { * @return Bytes. */ public static byte[] serializeExpressionToKryo(ExprNodeGenericFuncDesc expr) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Output output = new Output(baos); - runtimeSerializationKryo.get().writeObject(output, expr); - output.close(); - return baos.toByteArray(); + return serializeObjectToKryo(expr); } /** @@ -674,11 +669,7 @@ public static Path getPlanPath(Configuration conf) { * @return Expression; null if deserialization succeeded, but the result type is incorrect. */ public static ExprNodeGenericFuncDesc deserializeExpressionFromKryo(byte[] bytes) { - Input inp = new Input(new ByteArrayInputStream(bytes)); - ExprNodeGenericFuncDesc func = runtimeSerializationKryo.get(). - readObject(inp,ExprNodeGenericFuncDesc.class); - inp.close(); - return func; + return deserializeObjectFromKryo(bytes, ExprNodeGenericFuncDesc.class); } public static String serializeExpression(ExprNodeGenericFuncDesc expr) { @@ -699,6 +690,37 @@ public static ExprNodeGenericFuncDesc deserializeExpression(String s) { return deserializeExpressionFromKryo(bytes); } + public static byte[] serializeObjectToKryo(Serializable object) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + runtimeSerializationKryo.get().writeObject(output, object); + output.close(); + return baos.toByteArray(); + } + + public static T deserializeObjectFromKryo(byte[] bytes, Class clazz) { + Input inp = new Input(new ByteArrayInputStream(bytes)); + T func = runtimeSerializationKryo.get().readObject(inp, clazz); + inp.close(); + return func; + } + + public static String serializeObject(Serializable expr) { + try { + return new String(Base64.encodeBase64(serializeObjectToKryo(expr)), "UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + } + + public static T deserializeObject(String s, Class clazz) { + try { + return deserializeObjectFromKryo(Base64.decodeBase64(s.getBytes("UTF-8")), clazz); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + } + public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate { @Override protected Expression instantiate(Object oldInstance, Encoder out) { diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java index d39ee2e..683618f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; @@ -57,12 +58,19 @@ private final Set udfNames; private final Set allowedColumnNames; + private FieldValidator fieldValidator; + + private boolean acceptsFields; public IndexPredicateAnalyzer() { udfNames = new HashSet(); allowedColumnNames = new HashSet(); } + public void setFieldValidator(FieldValidator fieldValidator) { + this.fieldValidator = fieldValidator; + } + /** * Registers a comparison operator as one which can be satisfied * by an index search. Unless this is called, analyzePredicate @@ -175,11 +183,19 @@ private ExprNodeDesc analyzeExpr( ExprNodeDesc expr1 = (ExprNodeDesc) nodeOutputs[0]; ExprNodeDesc expr2 = (ExprNodeDesc) nodeOutputs[1]; ExprNodeDesc[] extracted = ExprNodeDescUtils.extractComparePair(expr1, expr2); - if (extracted == null) { + if (extracted == null || (extracted.length > 2 && !acceptsFields)) { return expr; } - if (extracted.length > 2) { + + ExprNodeColumnDesc columnDesc; + ExprNodeConstantDesc constantDesc; + if (extracted[0] instanceof ExprNodeConstantDesc) { genericUDF = genericUDF.flip(); + columnDesc = (ExprNodeColumnDesc) extracted[1]; + constantDesc = (ExprNodeConstantDesc) extracted[0]; + } else { + columnDesc = (ExprNodeColumnDesc) extracted[0]; + constantDesc = (ExprNodeConstantDesc) extracted[1]; } String udfName = genericUDF.getUdfName(); @@ -187,22 +203,34 @@ private ExprNodeDesc analyzeExpr( return expr; } - ExprNodeColumnDesc columnDesc = (ExprNodeColumnDesc) extracted[0]; - ExprNodeConstantDesc constantDesc = (ExprNodeConstantDesc) extracted[1]; if (!allowedColumnNames.contains(columnDesc.getColumn())) { return expr; } + String[] fields = null; + if (extracted.length > 2) { + ExprNodeFieldDesc fieldDesc = (ExprNodeFieldDesc) extracted[2]; + if (!isValidField(fieldDesc)) { + return expr; + } + fields = ExprNodeDescUtils.extractFields(fieldDesc); + } + searchConditions.add( new IndexSearchCondition( columnDesc, udfName, constantDesc, - expr)); + expr, + fields)); // we converted the expression to a search condition, so // remove it from the residual predicate - return null; + return fields == null ? null : expr; + } + + private boolean isValidField(ExprNodeFieldDesc field) { + return fieldValidator == null || fieldValidator.validate(field); } /** @@ -232,4 +260,27 @@ public ExprNodeGenericFuncDesc translateSearchConditions( } return expr; } + + public void setAcceptsFields(boolean acceptsFields) { + this.acceptsFields = acceptsFields; + } + + public static interface FieldValidator { + boolean validate(ExprNodeFieldDesc exprNodeDesc); + } + + public static IndexPredicateAnalyzer createAnalyzer(boolean equalOnly) { + + IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"); + if (equalOnly) { + return analyzer; + } + analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan"); + analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan"); + analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan"); + analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan"); + + return analyzer; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java index 5f1329c..3a2ecb7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java @@ -33,6 +33,16 @@ private ExprNodeConstantDesc constantDesc; private ExprNodeGenericFuncDesc comparisonExpr; + private String[] fields; + + public IndexSearchCondition( + ExprNodeColumnDesc columnDesc, + String comparisonOp, + ExprNodeConstantDesc constantDesc, + ExprNodeGenericFuncDesc comparisonExpr) { + this(columnDesc, comparisonOp, constantDesc, comparisonExpr, null); + } + /** * Constructs a search condition, which takes the form *
column-ref comparison-op constant-value
. @@ -50,12 +60,14 @@ public IndexSearchCondition( ExprNodeColumnDesc columnDesc, String comparisonOp, ExprNodeConstantDesc constantDesc, - ExprNodeGenericFuncDesc comparisonExpr) { + ExprNodeGenericFuncDesc comparisonExpr, + String[] fields) { this.columnDesc = columnDesc; this.comparisonOp = comparisonOp; this.constantDesc = constantDesc; this.comparisonExpr = comparisonExpr; + this.fields = fields; } public void setColumnDesc(ExprNodeColumnDesc columnDesc) { @@ -90,6 +102,10 @@ public ExprNodeGenericFuncDesc getComparisonExpr() { return comparisonExpr; } + public String[] getFields() { + return fields; + } + @Override public String toString() { return comparisonExpr.getExprString(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 647a9a6..6276680 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -413,6 +414,13 @@ public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { return; } + Serializable filterObject = scanDesc.getFilterObject(); + if (filterObject != null) { + jobConf.set( + TableScanDesc.FILTER_OBJECT_CONF_STR, + Utilities.serializeObject(filterObject)); + } + String filterText = filterExpr.getExprString(); String filterExprSerialized = Utilities.serializeExpression(filterExpr); if (LOG.isDebugEnabled()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java index 9f35575..7d7c764 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.mapred.JobConf; +import java.io.Serializable; + /** * HiveStoragePredicateHandler is an optional companion to {@link * HiveStorageHandler}; it should only be implemented by handlers which @@ -69,6 +71,11 @@ public DecomposedPredicate decomposePredicate( public ExprNodeGenericFuncDesc pushedPredicate; /** + * Serialized format for filter + */ + public Serializable pushedPredicateObject; + + /** * Portion of predicate to be post-evaluated by Hive for any rows * which are returned by storage handler. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java index e50026b..b6234f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java @@ -260,12 +260,46 @@ private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator cur return new ExprNodeDesc[] {expr1, expr2}; } if (expr1 instanceof ExprNodeConstantDesc && expr2 instanceof ExprNodeColumnDesc) { - return new ExprNodeDesc[] {expr2, expr1, null}; // add null as a marker (inverted order) + return new ExprNodeDesc[] {expr1, expr2}; + } + if (expr1 instanceof ExprNodeFieldDesc && expr2 instanceof ExprNodeConstantDesc) { + ExprNodeColumnDesc columnDesc = extractColumn(expr1); + return columnDesc != null ? new ExprNodeDesc[] {columnDesc, expr2, expr1} : null; + } + if (expr1 instanceof ExprNodeConstantDesc && expr2 instanceof ExprNodeFieldDesc) { + ExprNodeColumnDesc columnDesc = extractColumn(expr2); + return columnDesc != null ? new ExprNodeDesc[] {expr1, columnDesc, expr2} : null; } // todo: constant op constant return null; } + public static String[] extractFields(ExprNodeFieldDesc expr) { + return extractFields(expr, new ArrayList()).toArray(new String[0]); + } + + private static List extractFields(ExprNodeDesc expr, List fields) { + if (expr instanceof ExprNodeFieldDesc) { + ExprNodeFieldDesc field = (ExprNodeFieldDesc)expr; + fields.add(field.getFieldName()); + return extractFields(field.getDesc(), fields); + } + if (expr instanceof ExprNodeColumnDesc) { + return fields; + } + throw new IllegalStateException(); + } + + private static ExprNodeColumnDesc extractColumn(ExprNodeDesc expr) { + if (expr instanceof ExprNodeColumnDesc) { + return (ExprNodeColumnDesc)expr; + } + if (expr instanceof ExprNodeFieldDesc) { + return extractColumn(((ExprNodeFieldDesc)expr).getDesc()); + } + return null; + } + // from IndexPredicateAnalyzer private static ExprNodeDesc extractConstant(ExprNodeDesc expr) { if (!(expr instanceof ExprNodeGenericFuncDesc)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 10bae4d..8acb39b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -61,6 +62,7 @@ private int maxStatsKeyPrefixLength = -1; private ExprNodeGenericFuncDesc filterExpr; + private transient Serializable filterObject; public static final String FILTER_EXPR_CONF_STR = "hive.io.filter.expr.serialized"; @@ -68,6 +70,9 @@ public static final String FILTER_TEXT_CONF_STR = "hive.io.filter.text"; + public static final String FILTER_OBJECT_CONF_STR = + "hive.io.filter.object"; + // input file name (big) to bucket number private Map bucketFileNameMapping; @@ -110,6 +115,14 @@ public void setFilterExpr(ExprNodeGenericFuncDesc filterExpr) { this.filterExpr = filterExpr; } + public Serializable getFilterObject() { + return filterObject; + } + + public void setFilterObject(Serializable filterObject) { + this.filterObject = filterObject; + } + public void setAlias(String alias) { this.alias = alias; } diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java index 40298e1..9813900 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java @@ -836,6 +836,8 @@ private static ExprNodeGenericFuncDesc pushFilterToStorageHandler( } } tableScanDesc.setFilterExpr(decomposed.pushedPredicate); + tableScanDesc.setFilterObject(decomposed.pushedPredicateObject); + return (ExprNodeGenericFuncDesc)decomposed.residualPredicate; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/StructObject.java serde/src/java/org/apache/hadoop/hive/serde2/StructObject.java new file mode 100644 index 0000000..b7efff0 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/StructObject.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import java.util.List; + +public interface StructObject { + + Object getField(int fieldID); + + List getFieldsAsList(); +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/StructObjectBaseInspector.java serde/src/java/org/apache/hadoop/hive/serde2/StructObjectBaseInspector.java new file mode 100644 index 0000000..9a1a3b2 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/StructObjectBaseInspector.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +import java.util.ArrayList; +import java.util.List; + +public abstract class StructObjectBaseInspector extends StructObjectInspector { + + protected static class MyField implements StructField { + + protected final int fieldID; + protected final String fieldName; + protected final String fieldComment; + protected final ObjectInspector fieldObjectInspector; + + public MyField(int fieldID, String fieldName, + ObjectInspector fieldObjectInspector, String fieldComment) { + this.fieldID = fieldID; + this.fieldName = fieldName.toLowerCase(); + this.fieldObjectInspector = fieldObjectInspector; + this.fieldComment = fieldComment; + } + + public int getFieldID() { + return fieldID; + } + + public String getFieldName() { + return fieldName; + } + + public ObjectInspector getFieldObjectInspector() { + return fieldObjectInspector; + } + + public String getFieldComment() { + return fieldComment; + } + @Override + public String toString() { + return "" + fieldID + ":" + fieldName; + } + } + + private List fields; + + protected StructObjectBaseInspector() { + super(); + } + /** + * Call ObjectInspectorFactory.getLazySimpleStructObjectInspector instead. + */ + public StructObjectBaseInspector(List structFieldNames, + List structFieldObjectInspectors) { + init(structFieldNames, structFieldObjectInspectors, null); + } + + public StructObjectBaseInspector(List structFieldNames, + List structFieldObjectInspectors, + List structFieldComments) { + init(structFieldNames, structFieldObjectInspectors, structFieldComments); + } + + protected void init(List structFieldNames, + List structFieldObjectInspectors, + List structFieldComments) { + assert (structFieldNames.size() == structFieldObjectInspectors.size()); + assert (structFieldComments == null || + (structFieldNames.size() == structFieldComments.size())); + + fields = new ArrayList(structFieldNames.size()); + for (int i = 0; i < structFieldNames.size(); i++) { + fields.add(createField(i, + structFieldNames.get(i), structFieldObjectInspectors.get(i), + structFieldComments == null ? null : structFieldComments.get(i))); + } + } + + protected MyField createField(int index, String fieldName, ObjectInspector fieldOI, String comment) { + return new MyField(index, fieldName, fieldOI, comment); + } + + @Override + public String getTypeName() { + return ObjectInspectorUtils.getStandardStructTypeName(this); + } + + @Override + public final Category getCategory() { + return Category.STRUCT; + } + + @Override + public StructField getStructFieldRef(String fieldName) { + return ObjectInspectorUtils.getStandardStructFieldRef(fieldName, fields); + } + + @Override + public List getAllStructFieldRefs() { + return fields; + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java index 1fd6853..fd06f58 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java +++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java @@ -23,13 +23,14 @@ import java.util.List; import org.apache.hadoop.hive.serde2.SerDeStatsStruct; +import org.apache.hadoop.hive.serde2.StructObject; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -public abstract class ColumnarStructBase implements SerDeStatsStruct { +public abstract class ColumnarStructBase implements StructObject, SerDeStatsStruct { class FieldInfo { LazyObjectBase field; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java index 10f4c05..9b5ccbe 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java @@ -25,7 +25,7 @@ * A LazyObject can represent any primitive object or hierarchical object like * array, map or struct. */ -public abstract class LazyObject extends LazyObjectBase { +public abstract class LazyObject implements LazyObjectBase { protected OI oi; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java index 3334dff..7e42b3f 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.serde2.lazy; -public abstract class LazyObjectBase { +public interface LazyObjectBase { /** * Set the data for this LazyObjectBase. We take ByteArrayRef instead of byte[] so @@ -33,12 +33,12 @@ * The length of the data, starting from "start" * @see ByteArrayRef */ - public abstract void init(ByteArrayRef bytes, int start, int length); + void init(ByteArrayRef bytes, int start, int length); /** * If the LazyObjectBase is a primitive Object, then deserialize it and return the * actual primitive Object. Otherwise (array, map, struct), return this. */ - public abstract Object getObject(); + Object getObject(); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java index 8a1ea46..a01cd66 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java @@ -23,10 +23,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStatsStruct; +import org.apache.hadoop.hive.serde2.StructObject; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.Text; /** @@ -36,8 +37,8 @@ * LazyStruct does not deal with the case of a NULL struct. That is handled by * the parent LazyObject. */ -public class LazyStruct extends LazyNonPrimitive implements - SerDeStatsStruct { +public class LazyStruct extends LazyNonPrimitive + implements StructObject, SerDeStatsStruct { private static Log LOG = LogFactory.getLog(LazyStruct.class.getName()); @@ -62,7 +63,7 @@ /** * The fields of the struct. */ - LazyObject[] fields; + LazyObjectBase[] fields; /** * Whether init() has been called on the field or not. */ @@ -101,17 +102,7 @@ private void parse() { byte escapeChar = oi.getEscapeChar(); if (fields == null) { - List fieldRefs = ((StructObjectInspector) oi) - .getAllStructFieldRefs(); - fields = new LazyObject[fieldRefs.size()]; - for (int i = 0; i < fields.length; i++) { - fields[i] = LazyFactory.createLazyObject(fieldRefs.get(i) - .getFieldObjectInspector()); - } - fieldInited = new boolean[fields.length]; - // Extra element to make sure we have the same formula to compute the - // length of each element of the array. - startPosition = new int[fields.length + 1]; + initLazyFields(oi.getAllStructFieldRefs()); } int structByteEnd = start + length; @@ -172,6 +163,25 @@ private void parse() { parsed = true; } + protected final void initLazyFields(List fieldRefs) { + fields = new LazyObjectBase[fieldRefs.size()]; + for (int i = 0; i < fields.length; i++) { + try { + fields[i] = createLazyField(i, fieldRefs.get(i)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + fieldInited = new boolean[fields.length]; + // Extra element to make sure we have the same formula to compute the + // length of each element of the array. + startPosition = new int[fields.length + 1]; + } + + protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) throws SerDeException { + return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector()); + } + /** * Get one field out of the struct. * @@ -221,14 +231,14 @@ private Object uncheckedGetField(int fieldID) { return fields[fieldID].getObject(); } - ArrayList cachedList; + List cachedList; /** * Get the values of the fields as an ArrayList. * * @return The values of the fields as an ArrayList. */ - public ArrayList getFieldsAsList() { + public List getFieldsAsList() { if (!parsed) { parse(); } @@ -256,7 +266,7 @@ protected void setParsed(boolean parsed) { this.parsed = parsed; } - protected LazyObject[] getFields() { + protected LazyObjectBase[] getFields() { return fields; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java index 8a5386a..0813700 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.StructObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -184,7 +184,7 @@ public Object getStructFieldData(Object data, StructField fieldRef) { if (data == null) { return null; } - LazyStruct struct = (LazyStruct) data; + StructObject struct = (StructObject) data; MyField f = (MyField) fieldRef; int fieldID = f.getFieldID(); @@ -198,15 +198,8 @@ public Object getStructFieldData(Object data, StructField fieldRef) { if (data == null) { return null; } - - // Iterate over all the fields picking up the nested structs within them - List result = new ArrayList(fields.size()); - - for (MyField myField : fields) { - result.add(getStructFieldData(data, myField)); - } - - return result; + StructObject struct = (StructObject) data; + return struct.getFieldsAsList(); } // For LazyStruct diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java index 598683f..b3625b3 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java @@ -27,7 +27,7 @@ * A LazyBinaryObject can represent any primitive object or hierarchical object * like string, list, map or struct. */ -public abstract class LazyBinaryObject extends LazyObjectBase { +public abstract class LazyBinaryObject implements LazyObjectBase { OI oi; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java index caf3517..98a35c7 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.serde2.SerDeStatsStruct; +import org.apache.hadoop.hive.serde2.StructObject; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.RecordInfo; import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; @@ -42,8 +43,8 @@ * Following B, there is another section A and B. This pattern repeats until the * all struct fields are serialized. */ -public class LazyBinaryStruct extends - LazyBinaryNonPrimitive implements SerDeStatsStruct { +public class LazyBinaryStruct extends LazyBinaryNonPrimitive + implements StructObject, SerDeStatsStruct { private static Log LOG = LogFactory.getLog(LazyBinaryStruct.class.getName());