diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseStoreIntegration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseStoreIntegration.java index ef53030..2d4e2cb 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseStoreIntegration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseStoreIntegration.java @@ -58,7 +58,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -67,6 +66,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; +import java.util.TreeSet; /** * Integration tests with HBase Mini-cluster for HBaseStore @@ -521,7 +522,7 @@ public void listPartitions() throws Exception { @Test public void listPartitionsWithPs() throws Exception { String dbName = "default"; - String tableName = "listPartsPs"; + String tableName = "listPartitionsWithPs"; int startTime = (int)(System.currentTimeMillis() / 1000); List cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); @@ -582,6 +583,91 @@ public void listPartitionsWithPs() throws Exception { "ds=tomorrow/region=europe"}, names); } + + @Test + public void getPartitionsByFilter() throws Exception { + String dbName = "default"; + String tableName = "getPartitionsByFilter"; + int startTime = (int)(System.currentTimeMillis() / 1000); + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, emptyParameters); + List partCols = new ArrayList(); + partCols.add(new FieldSchema("ds", "string", "")); + partCols.add(new FieldSchema("region", "string", "")); + Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, partCols, + emptyParameters, null, null, null); + store.createTable(table); + + String[][] partVals = new String[][]{{"20010101", "north america"}, {"20010101", "europe"}, + {"20010102", "north america"}, {"20010102", "europe"}, {"20010103", "north america"}}; + for (String[] pv : partVals) { + List vals = new ArrayList(); + for (String v : pv) vals.add(v); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/ds=" + pv[0] + "/region=" + pv[1]); + Partition part = new Partition(vals, dbName, tableName, startTime, startTime, psd, + emptyParameters); + store.addPartition(part); + } + + // We only test getPartitionsByFilter since it calls same code as getPartitionsByExpr anyway. + // Test the case where we completely specify the partition + List parts = null; + parts = store.getPartitionsByFilter(dbName, tableName, "ds > '20010101'", (short) -1); + checkPartVals(parts, "[20010102, north america]", "[20010102, europe]", + "[20010103, north america]"); + + parts = store.getPartitionsByFilter(dbName, tableName, "ds >= '20010102'", (short) -1); + checkPartVals(parts, "[20010102, north america]", "[20010102, europe]", + "[20010103, north america]"); + + parts = store.getPartitionsByFilter(dbName, tableName, + "ds >= '20010102' and region = 'europe' ", (short) -1); + // filtering on first partition is only implemented as of now, so it will + // not filter on region + checkPartVals(parts, "[20010102, north america]", "[20010102, europe]", + "[20010103, north america]"); + + parts = store.getPartitionsByFilter(dbName, tableName, + "ds >= '20010101' and ds < '20010102'", (short) -1); + checkPartVals(parts,"[20010101, north america]", "[20010101, europe]"); + + parts = store.getPartitionsByFilter(dbName, tableName, + "ds = '20010102' or ds < '20010103'", (short) -1); + checkPartVals(parts, "[20010101, north america]", "[20010101, europe]", + "[20010102, north america]", "[20010102, europe]"); + + // test conversion to DNF + parts = store.getPartitionsByFilter(dbName, tableName, + "ds = '20010102' and (ds = '20010102' or region = 'europe')", (short) -1); + // filtering on first partition is only implemented as of now, so it will not filter on region + checkPartVals(parts, "[20010102, north america]", "[20010102, europe]"); + + parts = store.getPartitionsByFilter(dbName, tableName, + "region = 'europe'", (short) -1); + // filtering on first partition is only implemented as of now, so it will not filter on region + checkPartVals(parts, "[20010101, north america]", "[20010101, europe]", + "[20010102, north america]", "[20010102, europe]", "[20010103, north america]"); + + } + + /** + * Check if the given partitions have same values as given partitions value strings + * @param parts given partitions + * @param expectedPartVals + */ + private void checkPartVals(List parts, String ... expectedPartVals) { + Assert.assertEquals("number of partitions", expectedPartVals.length, parts.size()); + Set partValStrings = new TreeSet(); + for(Partition part : parts) { + partValStrings.add(part.getValues().toString()); + } + partValStrings.equals(new TreeSet(Arrays.asList(expectedPartVals))); + } + @Test public void dropPartition() throws Exception { String dbName = "default"; diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 780613e..821b981 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -274,33 +274,14 @@ private void initialize(Properties dsProps) { pm = getPersistenceManager(); isInitialized = pm != null; if (isInitialized) { - expressionProxy = createExpressionProxy(hiveConf); + expressionProxy = PartFilterExprUtil.createExpressionProxy(hiveConf); directSql = new MetaStoreDirectSql(pm, hiveConf); } LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm + " created in the thread with id: " + Thread.currentThread().getId()); } - /** - * Creates the proxy used to evaluate expressions. This is here to prevent circular - * dependency - ql -> metastore client <-> metastore server -> ql. If server and - * client are split, this can be removed. - * @param conf Configuration. - * @return The partition expression proxy. - */ - private static PartitionExpressionProxy createExpressionProxy(Configuration conf) { - String className = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS); - try { - @SuppressWarnings("unchecked") - Class clazz = - (Class)MetaStoreUtils.getClass(className); - return MetaStoreUtils.newInstance( - clazz, new Class[0], new Object[0]); - } catch (MetaException e) { - LOG.error("Error loading PartitionExpressionProxy", e); - throw new RuntimeException("Error loading PartitionExpressionProxy: " + e.getMessage()); - } - } + /** * Properties specified in hive-default.xml override the properties specified @@ -2036,24 +2017,7 @@ protected boolean getPartitionsByExprInternal(String dbName, String tblName, fin final String defaultPartitionName, final short maxParts, List result, boolean allowSql, boolean allowJdo) throws TException { assert result != null; - - // We will try pushdown first, so make the filter. This will also validate the expression, - // if serialization fails we will throw incompatible metastore error to the client. - String filter = null; - try { - filter = expressionProxy.convertExprToFilter(expr); - } catch (MetaException ex) { - throw new IMetaStoreClient.IncompatibleMetastoreException(ex.getMessage()); - } - - // Make a tree out of the filter. - // TODO: this is all pretty ugly. The only reason we need all these transformations - // is to maintain support for simple filters for HCat users that query metastore. - // If forcing everyone to use thick client is out of the question, maybe we could - // parse the filter into standard hive expressions and not all this separate tree - // Filter.g stuff. That way this method and ...ByFilter would just be merged. - final ExpressionTree exprTree = makeExpressionTree(filter); - + final ExpressionTree exprTree = PartFilterExprUtil.makeExpressionTree(expressionProxy, expr); final AtomicBoolean hasUnknownPartitions = new AtomicBoolean(false); result.addAll(new GetListHelper(dbName, tblName, allowSql, allowJdo) { @Override @@ -2093,50 +2057,7 @@ protected boolean getPartitionsByExprInternal(String dbName, String tblName, fin return hasUnknownPartitions.get(); } - private class LikeChecker extends ExpressionTree.TreeVisitor { - private boolean hasLike; - public boolean hasLike() { - return hasLike; - } - - @Override - protected boolean shouldStop() { - return hasLike; - } - - @Override - protected void visit(LeafNode node) throws MetaException { - hasLike = hasLike || (node.operator == Operator.LIKE); - } - } - - /** - * Makes expression tree out of expr. - * @param filter Filter. - * @return Expression tree. Null if there was an error. - */ - private ExpressionTree makeExpressionTree(String filter) throws MetaException { - // TODO: ExprNodeDesc is an expression tree, we could just use that and be rid of Filter.g. - if (filter == null || filter.isEmpty()) { - return ExpressionTree.EMPTY_TREE; - } - LOG.debug("Filter specified is " + filter); - ExpressionTree tree = null; - try { - tree = getFilterParser(filter).tree; - } catch (MetaException ex) { - LOG.info("Unable to make the expression tree from expression string [" - + filter + "]" + ex.getMessage()); // Don't log the stack, this is normal. - } - if (tree == null) { - return null; - } - // We suspect that LIKE pushdown into JDO is invalid; see HIVE-5134. Check for like here. - LikeChecker lc = new LikeChecker(); - tree.accept(lc); - return lc.hasLike() ? null : tree; - } /** * Gets the partition names from a table, pruned using an expression. @@ -2491,7 +2412,7 @@ protected String describeResult() { String filter, final short maxParts, boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException { final ExpressionTree tree = (filter != null && !filter.isEmpty()) - ? getFilterParser(filter).tree : ExpressionTree.EMPTY_TREE; + ? PartFilterExprUtil.getFilterParser(filter).tree : ExpressionTree.EMPTY_TREE; return new GetListHelper(dbName, tblName, allowSql, allowJdo) { @Override @@ -2534,24 +2455,6 @@ private Table ensureGetTable( return convertToTable(ensureGetMTable(dbName, tblName)); } - private FilterParser getFilterParser(String filter) throws MetaException { - FilterLexer lexer = new FilterLexer(new ANTLRNoCaseStringStream(filter)); - CommonTokenStream tokens = new CommonTokenStream(lexer); - - FilterParser parser = new FilterParser(tokens); - try { - parser.filter(); - } catch(RecognitionException re) { - throw new MetaException("Error parsing partition filter; lexer error: " - + lexer.errorMsg + "; exception " + re); - } - - if (lexer.errorMsg != null) { - throw new MetaException("Error parsing partition filter : " + lexer.errorMsg); - } - return parser; - } - /** * Makes a JDO query filter string. * Makes a JDO query filter string for tables or partitions. @@ -2565,7 +2468,7 @@ private FilterParser getFilterParser(String filter) throws MetaException { private String makeQueryFilterString(String dbName, MTable mtable, String filter, Map params) throws MetaException { ExpressionTree tree = (filter != null && !filter.isEmpty()) - ? getFilterParser(filter).tree : ExpressionTree.EMPTY_TREE; + ? PartFilterExprUtil.getFilterParser(filter).tree : ExpressionTree.EMPTY_TREE; return makeQueryFilterString(dbName, convertToTable(mtable), tree, params, true); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/PartFilterExprUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/PartFilterExprUtil.java new file mode 100644 index 0000000..5766bdd --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/PartFilterExprUtil.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.antlr.runtime.CommonTokenStream; +import org.antlr.runtime.RecognitionException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree; +import org.apache.hadoop.hive.metastore.parser.FilterLexer; +import org.apache.hadoop.hive.metastore.parser.FilterParser; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.ANTLRNoCaseStringStream; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator; + +/** + * Utility functions for working with partition filter expressions + */ +public class PartFilterExprUtil { + private static final Log LOG = LogFactory.getLog(PartFilterExprUtil.class.getName()); + + + public static ExpressionTree makeExpressionTree(PartitionExpressionProxy expressionProxy, + byte[] expr) throws MetaException { + // We will try pushdown first, so make the filter. This will also validate the expression, + // if serialization fails we will throw incompatible metastore error to the client. + String filter = null; + try { + filter = expressionProxy.convertExprToFilter(expr); + } catch (MetaException ex) { + throw new IMetaStoreClient.IncompatibleMetastoreException(ex.getMessage()); + } + + // Make a tree out of the filter. + // TODO: this is all pretty ugly. The only reason we need all these transformations + // is to maintain support for simple filters for HCat users that query metastore. + // If forcing everyone to use thick client is out of the question, maybe we could + // parse the filter into standard hive expressions and not all this separate tree + // Filter.g stuff. That way this method and ...ByFilter would just be merged. + return PartFilterExprUtil.makeExpressionTree(filter); + } + + + /** + * Creates the proxy used to evaluate expressions. This is here to prevent circular + * dependency - ql -> metastore client <-> metastore server -> ql. If server and + * client are split, this can be removed. + * @param conf Configuration. + * @return The partition expression proxy. + */ + public static PartitionExpressionProxy createExpressionProxy(Configuration conf) { + String className = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS); + try { + @SuppressWarnings("unchecked") + Class clazz = + (Class)MetaStoreUtils.getClass(className); + return MetaStoreUtils.newInstance( + clazz, new Class[0], new Object[0]); + } catch (MetaException e) { + LOG.error("Error loading PartitionExpressionProxy", e); + throw new RuntimeException("Error loading PartitionExpressionProxy: " + e.getMessage()); + } + } + + /** + * Makes expression tree out of expr. + * @param filter Filter. + * @return Expression tree. Null if there was an error. + */ + private static ExpressionTree makeExpressionTree(String filter) throws MetaException { + // TODO: ExprNodeDesc is an expression tree, we could just use that and be rid of Filter.g. + if (filter == null || filter.isEmpty()) { + return ExpressionTree.EMPTY_TREE; + } + LOG.debug("Filter specified is " + filter); + ExpressionTree tree = null; + try { + tree = getFilterParser(filter).tree; + } catch (MetaException ex) { + LOG.info("Unable to make the expression tree from expression string [" + + filter + "]" + ex.getMessage()); // Don't log the stack, this is normal. + } + if (tree == null) { + return null; + } + // We suspect that LIKE pushdown into JDO is invalid; see HIVE-5134. Check for like here. + LikeChecker lc = new LikeChecker(); + tree.accept(lc); + return lc.hasLike() ? null : tree; + } + + + private static class LikeChecker extends ExpressionTree.TreeVisitor { + private boolean hasLike; + + public boolean hasLike() { + return hasLike; + } + + @Override + protected boolean shouldStop() { + return hasLike; + } + + @Override + protected void visit(LeafNode node) throws MetaException { + hasLike = hasLike || (node.operator == Operator.LIKE); + } + } + + public static FilterParser getFilterParser(String filter) throws MetaException { + FilterLexer lexer = new FilterLexer(new ANTLRNoCaseStringStream(filter)); + CommonTokenStream tokens = new CommonTokenStream(lexer); + + FilterParser parser = new FilterParser(tokens); + try { + parser.filter(); + } catch(RecognitionException re) { + throw new MetaException("Error parsing partition filter; lexer error: " + + lexer.errorMsg + "; exception " + re); + } + + if (lexer.errorMsg != null) { + throw new MetaException("Error parsing partition filter : " + lexer.errorMsg); + } + return parser; + } + + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java new file mode 100644 index 0000000..5ea5d95 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java @@ -0,0 +1,494 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; + + +/** + * Utility function for generating hbase partition filtering plan representation + * from ExpressionTree. + * Optimizations to be done - + * - Case where all partition keys are specified. Should use a get + * + * {@link PartitionFilterGenerator} is a visitor on the given filter expression tree. After + * walking it it produces the HBase execution plan represented by {@link FilterPlan}. See + * their javadocs for more details. + */ +class HBaseFilterPlanUtil { + + /** + * Compare two byte arrays. + * + * @param ar1 + * first byte array + * @param ar2 + * second byte array + * @return -1 if ar1 < ar2, 0 if == , 1 if > + */ + static int compare(byte[] ar1, byte[] ar2) { + // null check is not needed, nulls are not passed here + for (int i = 0; i < ar1.length; i++) { + if (i == ar2.length) { + return 1; + } else { + if (ar1[i] == ar2[i]) { + continue; + } else if (ar1[i] > ar2[i]) { + return 1; + } else { + return -1; + } + } + } + // ar2 equal until length of ar1. + if(ar1.length == ar2.length) { + return 0; + } + // ar2 has more bytes + return -1; + } + + /** + * Represents the execution plan for hbase to find the set of partitions that + * match given filter expression. + * If you have an AND or OR of two expressions, you can determine FilterPlan for each + * children and then call lhs.and(rhs) or lhs.or(rhs) respectively + * to generate a new plan for the expression. + * + * The execution plan has one or more ScanPlan objects. To get the results the set union of all + * ScanPlan objects needs to be done. + */ + public static abstract class FilterPlan { + abstract FilterPlan and(FilterPlan other); + abstract FilterPlan or(FilterPlan other); + abstract List getPlans(); + @Override + public String toString() { + return getPlans().toString(); + } + + } + + /** + * Represents a union/OR of single scan plans (ScanPlan). + */ + public static class MultiScanPlan extends FilterPlan { + final ImmutableList scanPlans; + + public MultiScanPlan(List scanPlans){ + this.scanPlans = ImmutableList.copyOf(scanPlans); + } + + @Override + public FilterPlan and(FilterPlan other) { + // Convert to disjunctive normal form (DNF), ie OR of ANDs + // First get a new set of FilterPlans by doing an AND + // on each ScanPlan in this one with the other FilterPlan + List newFPlans = new ArrayList(); + for (ScanPlan splan : getPlans()) { + newFPlans.add(splan.and(other)); + } + //now combine scanPlans in multiple new FilterPlans into one + // MultiScanPlan + List newScanPlans = new ArrayList(); + for (FilterPlan fp : newFPlans) { + newScanPlans.addAll(fp.getPlans()); + } + return new MultiScanPlan(newScanPlans); + } + + @Override + public FilterPlan or(FilterPlan other) { + // just combine the ScanPlans + List newScanPlans = new ArrayList(this.getPlans()); + newScanPlans.addAll(other.getPlans()); + return new MultiScanPlan(newScanPlans); + } + + @Override + public List getPlans() { + return scanPlans; + } + } + + /** + * Represents a single Hbase Scan api call + */ + public static class ScanPlan extends FilterPlan { + + public static class ScanMarker { + final byte[] bytes; + /** + * If inclusive = true, it means that the + * marker includes those bytes. + * If it is false, it means the marker starts at the next possible byte array + * or ends at the next possible byte array + */ + final boolean isInclusive; + ScanMarker(byte [] b, boolean i){ + this.bytes = b; + this.isInclusive = i; + } + @Override + public String toString() { + return "ScanMarker [bytes=" + Arrays.toString(bytes) + ", isInclusive=" + isInclusive + "]"; + } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(bytes); + result = prime * result + (isInclusive ? 1231 : 1237); + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ScanMarker other = (ScanMarker) obj; + if (!Arrays.equals(bytes, other.bytes)) + return false; + if (isInclusive != other.isInclusive) + return false; + return true; + } + } + // represent Scan start + private ScanMarker startMarker = new ScanMarker(null, false); + // represent Scan end + private ScanMarker endMarker = new ScanMarker(null, false); + + private ScanFilter filter; + + public ScanFilter getFilter() { + return filter; + } + + public void setFilter(ScanFilter filter) { + this.filter = filter; + } + + public ScanMarker getStartMarker() { + return startMarker; + } + + public void setStartMarker(ScanMarker startMarker) { + this.startMarker = startMarker; + } + public void setStartMarker(byte[] start, boolean isInclusive) { + setStartMarker(new ScanMarker(start, isInclusive)); + } + + public ScanMarker getEndMarker() { + return endMarker; + } + + public void setEndMarker(ScanMarker endMarker) { + this.endMarker = endMarker; + } + public void setEndMarker(byte[] end, boolean isInclusive) { + setEndMarker(new ScanMarker(end, isInclusive)); + } + + @Override + public FilterPlan and(FilterPlan other) { + List newSPlans = new ArrayList(); + for (ScanPlan otherSPlan : other.getPlans()) { + newSPlans.add(this.and(otherSPlan)); + } + return new MultiScanPlan(newSPlans); + } + + private ScanPlan and(ScanPlan other) { + // create combined FilterPlan based on existing lhs and rhs plan + ScanPlan newPlan = new ScanPlan(); + + // create new scan start + ScanMarker greaterStartMarker = getComparedMarker(this.getStartMarker(), + other.getStartMarker(), true); + newPlan.setStartMarker(greaterStartMarker); + + // create new scan end + ScanMarker lesserEndMarker = getComparedMarker(this.getEndMarker(), other.getEndMarker(), + false); + newPlan.setEndMarker(lesserEndMarker); + + // create new filter plan + newPlan.setFilter(createCombinedFilter(this.getFilter(), other.getFilter())); + + return newPlan; + } + + private ScanFilter createCombinedFilter(ScanFilter filter1, ScanFilter filter2) { + // TODO create combined filter - filter1 && filter2 + return null; + } + + /** + * @param lStartMarker + * @param rStartMarker + * @param getGreater if true return greater startmarker, else return smaller one + * @return greater/lesser marker depending on value of getGreater + */ + @VisibleForTesting + static ScanMarker getComparedMarker(ScanMarker lStartMarker, ScanMarker rStartMarker, + boolean getGreater) { + // if one of them has null bytes, just return other + if(lStartMarker.bytes == null) { + return rStartMarker; + } else if (rStartMarker.bytes == null) { + return lStartMarker; + } + + int compareRes = compare(lStartMarker.bytes, rStartMarker.bytes); + if (compareRes == 0) { + // bytes are equal, now compare the isInclusive flags + if (lStartMarker.isInclusive == rStartMarker.isInclusive) { + // actually equal, so return any one + return lStartMarker; + } + boolean isInclusive = true; + // one that does not include the current bytes is greater + if (getGreater) { + isInclusive = false; + } + // else + return new ScanMarker(lStartMarker.bytes, isInclusive); + } + if (getGreater) { + return compareRes == 1 ? lStartMarker : rStartMarker; + } + // else + return compareRes == -1 ? lStartMarker : rStartMarker; + } + + + @Override + public FilterPlan or(FilterPlan other) { + List plans = new ArrayList(getPlans()); + plans.addAll(other.getPlans()); + return new MultiScanPlan(plans); + } + + @Override + public List getPlans() { + return Arrays.asList(this); + } + + + /** + * @return row suffix - This is appended to db + table, to generate start row for the Scan + */ + public byte[] getStartRowSuffix() { + if (startMarker.isInclusive) { + return startMarker.bytes; + } else { + return HBaseUtils.getEndPrefix(startMarker.bytes); + } + } + + /** + * @return row suffix - This is appended to db + table, to generate end row for the Scan + */ + public byte[] getEndRowSuffix() { + if (endMarker.isInclusive) { + return HBaseUtils.getEndPrefix(endMarker.bytes); + } else { + return endMarker.bytes; + } + } + + @Override + public String toString() { + return "ScanPlan [startMarker=" + startMarker + ", endMarker=" + endMarker + ", filter=" + + filter + "]"; + } + + } + + /** + * represent a plan that can be used to create a hbase filter and then set in + * Scan.setFilter() + */ + public static class ScanFilter { + // TODO: implement this + } + + /** + * Visitor for ExpressionTree. + * It first generates the ScanPlan for the leaf nodes. The higher level nodes are + * either AND or OR operations. It then calls FilterPlan.and and FilterPlan.or with + * the child nodes to generate the plans for higher level nodes. + */ + @VisibleForTesting + static class PartitionFilterGenerator extends TreeVisitor { + private FilterPlan curPlan; + + // this tells us if there is a condition that did not get included in the plan + // such condition would be treated as getting evaluated to TRUE + private boolean hasUnsupportedCondition = false; + + //Need to cache the left plans for the TreeNode. Use IdentityHashMap here + // as we don't want to dedupe on two TreeNode that are otherwise considered equal + Map leftPlans = new IdentityHashMap(); + + // temporary params for current left and right side plans, for AND, OR + private FilterPlan rPlan; + + private final String firstPartcolumn; + public PartitionFilterGenerator(String firstPartitionColumn) { + this.firstPartcolumn = firstPartitionColumn; + } + + FilterPlan getPlan() { + return curPlan; + } + + @Override + protected void beginTreeNode(TreeNode node) throws MetaException { + // reset the params + curPlan = rPlan = null; + } + + @Override + protected void midTreeNode(TreeNode node) throws MetaException { + leftPlans.put(node, curPlan); + curPlan = null; + } + + @Override + protected void endTreeNode(TreeNode node) throws MetaException { + rPlan = curPlan; + FilterPlan lPlan = leftPlans.get(node); + leftPlans.remove(node); + + switch (node.getAndOr()) { + case AND: + curPlan = lPlan.and(rPlan); + break; + case OR: + curPlan = lPlan.or(rPlan); + break; + default: + throw new AssertionError("Unexpected logical operation " + node.getAndOr()); + } + + } + + + @Override + public void visit(LeafNode node) throws MetaException { + ScanPlan leafPlan = new ScanPlan(); + curPlan = leafPlan; + if (!isFirstParitionColumn(node.keyName)) { + leafPlan.setFilter(generateScanFilter(node)); + return; + } + + // this is a condition on first partition column, so might influence the + // start and end of the scan + final boolean INCLUSIVE = true; + switch (node.operator) { + case EQUALS: + leafPlan.setStartMarker(toBytes(node.value), INCLUSIVE); + leafPlan.setEndMarker(toBytes(node.value), INCLUSIVE); + break; + case GREATERTHAN: + leafPlan.setStartMarker(toBytes(node.value), !INCLUSIVE); + break; + case GREATERTHANOREQUALTO: + leafPlan.setStartMarker(toBytes(node.value), INCLUSIVE); + break; + case LESSTHAN: + leafPlan.setEndMarker(toBytes(node.value), !INCLUSIVE); + break; + case LESSTHANOREQUALTO: + leafPlan.setEndMarker(toBytes(node.value), INCLUSIVE); + break; + case LIKE: + case NOTEQUALS: + case NOTEQUALS2: + // TODO: create filter plan for these + hasUnsupportedCondition = true; + break; + } + } + + @VisibleForTesting + static byte[] toBytes(Object value) { + // TODO: actually implement this + // We need to determine the actual type and use appropriate + // serialization format for that type + return ((String) value).getBytes(HBaseUtils.ENCODING); + } + + private ScanFilter generateScanFilter(LeafNode node) { + // TODO Auto-generated method stub + hasUnsupportedCondition = true; + return null; + } + + private boolean isFirstParitionColumn(String keyName) { + return keyName.equalsIgnoreCase(firstPartcolumn); + } + + private boolean hasUnsupportedCondition() { + return hasUnsupportedCondition; + } + + } + + public static class PlanResult { + public final FilterPlan plan; + public final boolean hasUnsupportedCondition; + PlanResult(FilterPlan plan, boolean hasUnsupportedCondition) { + this.plan = plan; + this.hasUnsupportedCondition = hasUnsupportedCondition; + } + } + + public static PlanResult getFilterPlan(ExpressionTree exprTree, String firstPartitionColumn) throws MetaException { + if (exprTree == null) { + // TODO: if exprTree is null, we should do what ObjectStore does. See HIVE-10102 + return new PlanResult(new ScanPlan(), true); + } + PartitionFilterGenerator pGenerator = new PartitionFilterGenerator(firstPartitionColumn); + exprTree.accept(pGenerator); + return new PlanResult(pGenerator.getPlan(), pGenerator.hasUnsupportedCondition()); + } + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index 8db82e6..c83d7c8 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.metastore.hbase; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -106,27 +108,27 @@ protected HBaseReadWrite initialValue() { private static boolean tablesCreated = false; private static Configuration staticConf = null; - private Configuration conf; + private final Configuration conf; private HBaseConnection conn; private MessageDigest md; private ObjectCache, Table> tableCache; private ObjectCache sdCache; private PartitionCache partCache; private StatsCache statsCache; - private Counter tableHits; - private Counter tableMisses; - private Counter tableOverflows; - private Counter partHits; - private Counter partMisses; - private Counter partOverflows; - private Counter sdHits; - private Counter sdMisses; - private Counter sdOverflows; - private List counters; + private final Counter tableHits; + private final Counter tableMisses; + private final Counter tableOverflows; + private final Counter partHits; + private final Counter partMisses; + private final Counter partOverflows; + private final Counter sdHits; + private final Counter sdMisses; + private final Counter sdOverflows; + private final List counters; // roleCache doesn't use ObjectCache because I don't want to limit the size. I am assuming // that the number of roles will always be small (< 100) so caching the whole thing should not // be painful. - private Map roleCache; + private final Map roleCache; boolean entireRoleTableInCache; /** @@ -310,7 +312,7 @@ Database getDb(String name) throws IOException { filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regex)); } Iterator iter = - scanWithFilter(DB_TABLE, null, CATALOG_CF, CATALOG_COL, filter); + scan(DB_TABLE, CATALOG_CF, CATALOG_COL, filter); List databases = new ArrayList(); while (iter.hasNext()) { Result result = iter.next(); @@ -376,7 +378,7 @@ Function getFunction(String dbName, String functionName) throws IOException { filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regex)); } Iterator iter = - scanWithFilter(FUNC_TABLE, keyPrefix, CATALOG_CF, CATALOG_COL, filter); + scan(FUNC_TABLE, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), CATALOG_CF, CATALOG_COL, filter); List functions = new ArrayList(); while (iter.hasNext()) { Result result = iter.next(); @@ -584,7 +586,7 @@ void replacePartitions(List oldParts, List newParts) throw : new ArrayList(cached); } byte[] keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(dbName, tableName); - List parts = scanPartitions(keyPrefix, -1); + List parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null); partCache.put(dbName, tableName, parts, true); return maxPartitions < parts.size() ? parts.subList(0, maxPartitions) : parts; } @@ -670,11 +672,37 @@ void replacePartitions(List oldParts, List newParts) throw regex + ">"); } - List parts = scanPartitionsWithFilter(keyPrefix, maxPartitions, filter); + List parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), maxPartitions, filter); partCache.put(dbName, tableName, parts, false); return parts; } + List scanPartitions(String dbName, String tableName, byte[] keyStart, byte[] keyEnd, + Filter filter, int maxPartitions) throws IOException, NoSuchObjectException { + List keyElements = new ArrayList(); + keyElements.add(dbName); + keyElements.add(tableName); + + byte[] keyPrefix = + HBaseUtils.buildKeyWithTrailingSeparator(keyElements.toArray(new String[keyElements.size()])); + byte[] startRow = ArrayUtils.addAll(keyPrefix, keyStart); + byte[] endRow; + if (keyEnd == null || keyEnd.length == 0) { + // stop when current db+table entries are over + endRow = HBaseUtils.getEndPrefix(keyPrefix); + } else { + endRow = ArrayUtils.addAll(keyPrefix, keyEnd); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Scanning partitions with start row <" + new String(startRow) + "> and end row <" + + new String(endRow) + ">"); + } + return scanPartitionsWithFilter(startRow, endRow, maxPartitions, filter); + } + + + /** * Delete a partition * @param dbName database name that table is in @@ -707,21 +735,17 @@ private Partition getPartition(String dbName, String tableName, List par return sdParts.containingPartition; } - - private List scanPartitions(byte[] keyPrefix, int maxResults) throws IOException { - return scanPartitionsWithFilter(keyPrefix, maxResults, null); - } - - private List scanPartitionsWithFilter(byte[] keyPrefix, int maxResults, Filter filter) + private List scanPartitionsWithFilter(byte[] startRow, byte [] endRow, + int maxResults, Filter filter) throws IOException { Iterator iter = - scanWithFilter(PART_TABLE, keyPrefix, CATALOG_CF, CATALOG_COL, filter); + scan(PART_TABLE, startRow, endRow, CATALOG_CF, CATALOG_COL, filter); List parts = new ArrayList(); int numToFetch = maxResults < 0 ? Integer.MAX_VALUE : maxResults; for (int i = 0; i < numToFetch && iter.hasNext(); i++) { Result result = iter.next(); - HBaseUtils.StorageDescriptorParts sdParts = - HBaseUtils.deserializePartition(result.getRow(), result.getValue(CATALOG_CF, CATALOG_COL)); + HBaseUtils.StorageDescriptorParts sdParts = HBaseUtils.deserializePartition(result.getRow(), + result.getValue(CATALOG_CF, CATALOG_COL)); StorageDescriptor sd = getStorageDescriptor(sdParts.sdHash); HBaseUtils.assembleStorageDescriptor(sd, sdParts); parts.add(sdParts.containingPartition); @@ -729,7 +753,6 @@ private Partition getPartition(String dbName, String tableName, List par return parts; } - /********************************************************************************************** * Role related methods *********************************************************************************************/ @@ -818,7 +841,7 @@ private Partition getPartition(String dbName, String tableName, List par Set findAllUsersInRole(String roleName) throws IOException { // Walk the userToRole table and collect every user that matches this role. Set users = new HashSet(); - Iterator iter = scanWithFilter(USER_TO_ROLE_TABLE, null, CATALOG_CF, CATALOG_COL, null); + Iterator iter = scan(USER_TO_ROLE_TABLE, CATALOG_CF, CATALOG_COL); while (iter.hasNext()) { Result result = iter.next(); List roleList = @@ -1066,7 +1089,7 @@ Role getRole(String roleName) throws IOException { * @throws IOException */ List scanRoles() throws IOException { - Iterator iter = scanWithFilter(ROLE_TABLE, null, CATALOG_CF, CATALOG_COL, null); + Iterator iter = scan(ROLE_TABLE, CATALOG_CF, CATALOG_COL); List roles = new ArrayList(); while (iter.hasNext()) { Result result = iter.next(); @@ -1099,7 +1122,7 @@ void deleteRole(String roleName) throws IOException { private void buildRoleCache() throws IOException { if (!entireRoleTableInCache) { - Iterator roles = scanWithFilter(ROLE_TABLE, null, CATALOG_CF, ROLES_COL, null); + Iterator roles = scan(ROLE_TABLE, CATALOG_CF, ROLES_COL); while (roles.hasNext()) { Result res = roles.next(); String roleName = new String(res.getRow(), HBaseUtils.ENCODING); @@ -1196,7 +1219,8 @@ Table getTable(String dbName, String tableName) throws IOException { filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regex)); } Iterator iter = - scanWithFilter(TABLE_TABLE, keyPrefix, CATALOG_CF, CATALOG_COL, filter); + scan(TABLE_TABLE, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), + CATALOG_CF, CATALOG_COL, filter); List tables = new ArrayList
(); while (iter.hasNext()) { Result result = iter.next(); @@ -1661,23 +1685,36 @@ private void delete(String table, byte[] key, byte[] colFam, byte[] colName) thr htab.delete(d); } - private Iterator scanWithFilter(String table, byte[] keyPrefix, byte[] colFam, + private Iterator scan(String table, byte[] colFam, + byte[] colName) throws IOException { + return scan(table, null, null, colFam, colName, null); + } + + private Iterator scan(String table, byte[] colFam, byte[] colName, + Filter filter) throws IOException { + return scan(table, null, null, colFam, colName, filter); + } + + private Iterator scan(String table, byte[] keyStart, byte[] keyEnd, byte[] colFam, byte[] colName, Filter filter) throws IOException { HTableInterface htab = conn.getHBaseTable(table); - Scan s; - if (keyPrefix == null) { - s = new Scan(); - } else { - byte[] stop = Arrays.copyOf(keyPrefix, keyPrefix.length); - stop[stop.length - 1]++; - s = new Scan(keyPrefix, stop); + Scan s = new Scan(); + if (keyStart != null) { + s.setStartRow(keyStart); + } + if (keyEnd != null) { + s.setStopRow(keyEnd); } s.addColumn(colFam, colName); - if (filter != null) s.setFilter(filter); + if (filter != null) { + s.setFilter(filter); + } ResultScanner scanner = htab.getScanner(s); return scanner.iterator(); } + + /********************************************************************************************** * Testing methods and classes *********************************************************************************************/ diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 159b9e7..4b4cfeb 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.PartFilterExprUtil; +import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; @@ -52,12 +54,14 @@ import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo; import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PlanResult; +import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.thrift.TException; @@ -67,6 +71,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; /** @@ -79,6 +84,7 @@ private HBaseReadWrite hbase = null; private Configuration conf; private int txnNestLevel = 0; + private PartitionExpressionProxy expressionProxy = null; public HBaseStore() { } @@ -450,17 +456,74 @@ public void alterIndex(String dbname, String baseTblName, String name, Index new public List getPartitionsByFilter(String dbName, String tblName, String filter, short maxParts) throws MetaException, NoSuchObjectException { - // TODO - Needs to wait for ability to push filters into HBase - throw new UnsupportedOperationException(); + final ExpressionTree exprTree = (filter != null && !filter.isEmpty()) ? PartFilterExprUtil + .getFilterParser(filter).tree : ExpressionTree.EMPTY_TREE; + List result = new ArrayList(); + getPartitionsByExprInternal(dbName, tblName, exprTree, maxParts, result); + return result; } @Override public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, String defaultPartitionName, short maxParts, List result) throws TException { - // TODO for now just return all partitions, need to add real expression parsing later. - result.addAll(getPartitions(dbName, tblName, maxParts)); - return true; + final ExpressionTree exprTree = PartFilterExprUtil.makeExpressionTree(expressionProxy, expr); + // TODO: investigate if there should be any role for defaultPartitionName in this + // implementation. direct sql code path in ObjectStore does not use it. + + return getPartitionsByExprInternal(dbName, tblName, exprTree, maxParts, result); + } + + private boolean getPartitionsByExprInternal(String dbName, String tblName, + ExpressionTree exprTree, short maxParts, List result) throws MetaException, + NoSuchObjectException { + + Table table = getTable(dbName, tblName); + if (table == null) { + throw new NoSuchObjectException("Unable to find table " + dbName + "." + tblName); + } + String firstPartitionColumn = table.getPartitionKeys().get(0).getName(); + // general hbase filter plan from expression tree + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, firstPartitionColumn); + + if (LOG.isDebugEnabled()) { + LOG.debug("Hbase Filter Plan generated : " + planRes.plan); + } + + // results from scans need to be merged as there can be overlapping results between + // the scans. Use a map of list of partition values to partition for this. + Map, Partition> mergedParts = new HashMap, Partition>(); + for (ScanPlan splan : planRes.plan.getPlans()) { + try { + List parts = getHBase().scanPartitions(dbName, tblName, + splan.getStartRowSuffix(), splan.getEndRowSuffix(), null, -1); + boolean reachedMax = false; + for (Partition part : parts) { + mergedParts.put(part.getValues(), part); + if (mergedParts.size() == maxParts) { + reachedMax = true; + break; + } + } + if (reachedMax) { + break; + } + } catch (IOException e) { + LOG.error("Unable to get partitions", e); + throw new MetaException("Error scanning partitions" + tableNameForErrorMsg(dbName, tblName) + + ": " + e); + } + } + for (Entry, Partition> mp : mergedParts.entrySet()) { + result.add(mp.getValue()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Matched partitions " + result); + } + + // return true if there might be some additional partitions that don't match filter conditions + // being returned + return !planRes.hasUnsupportedCondition; } @Override @@ -1599,6 +1662,12 @@ public void flushCache() { @Override public void setConf(Configuration configuration) { + // initialize expressionProxy. Also re-initialize it if + // setConf is being called with new configuration object (though that + // is not expected to happen, doing it just for safety) + if(expressionProxy == null || conf != configuration) { + expressionProxy = PartFilterExprUtil.createExpressionProxy(configuration); + } conf = configuration; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 3f9e1d9..2e1ae27 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -20,7 +20,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -1064,4 +1063,20 @@ static ColumnStatisticsObj deserializeStatsForOneColumn(ColumnStatistics stats, obj.setStatsData(colData); return obj; } + + /** + * @param keyStart byte array representing the start prefix + * @return byte array corresponding to the next possible prefix + */ + static byte[] getEndPrefix(byte[] keyStart) { + if (keyStart == null) { + return null; + } + // Since this is a prefix and not full key, the usual hbase technique of + // appending 0 byte does not work. Instead of that, increment the last byte. + byte[] keyEnd = Arrays.copyOf(keyStart, keyStart.length); + keyEnd[keyEnd.length - 1]++; + return keyEnd; + } + } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java b/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java index 781ac63..63be7b7 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java @@ -26,9 +26,7 @@ import org.antlr.runtime.ANTLRStringStream; import org.antlr.runtime.CharStream; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.HiveMetaStore; -import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -36,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.serde.serdeConstants; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; /** @@ -535,6 +534,12 @@ public TreeNode getRoot() { return this.root; } + @VisibleForTesting + public void setRootForTest(TreeNode tn) { + this.root = tn; + } + + /** * Adds a intermediate node of either type(AND/OR). Pops last two nodes from * the stack and sets them as children of the new node and pushes itself diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/MockUtils.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/MockUtils.java index c5c5b83..e481317 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/MockUtils.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/MockUtils.java @@ -22,13 +22,16 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -45,8 +48,28 @@ */ public class MockUtils { + /** + * The default impl is in ql package and is not available in unit tests. + */ + public static class NOOPProxy implements PartitionExpressionProxy { + + @Override + public String convertExprToFilter(byte[] expr) throws MetaException { + return null; + } + + @Override + public boolean filterPartitionsByExpr(List partColumnNames, + List partColumnTypeInfos, byte[] expr, String defaultPartitionName, + List partitionNames) throws MetaException { + return false; + } + + } + static HBaseStore init(Configuration conf, HTableInterface htable, final SortedMap rows) throws IOException { + ((HiveConf)conf).setVar(ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, NOOPProxy.class.getName()); Mockito.when(htable.get(Mockito.any(Get.class))).thenAnswer(new Answer() { @Override public Result answer(InvocationOnMock invocation) throws Throwable { @@ -65,8 +88,9 @@ public Result answer(InvocationOnMock invocation) throws Throwable { public ResultScanner answer(InvocationOnMock invocation) throws Throwable { Scan scan = (Scan)invocation.getArguments()[0]; List results = new ArrayList(); - SortedMap sub = - rows.subMap(new String(scan.getStartRow()), new String(scan.getStopRow())); + String start = new String(scan.getStartRow()); + String stop = new String(scan.getStopRow()); + SortedMap sub = rows.subMap(start, stop); for (Map.Entry e : sub.entrySet()) { results.add(Result.create(new Cell[]{e.getValue()})); } diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java new file mode 100644 index 0000000..5943d14 --- /dev/null +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import java.util.Arrays; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.FilterPlan; +import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.MultiScanPlan; +import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PartitionFilterGenerator; +import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PlanResult; +import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan; +import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan.ScanMarker; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LogicalOperator; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode; +import org.junit.Assert; +import org.junit.Test; + +public class TestHBaseFilterPlanUtil { + final boolean INCLUSIVE = true; + + /** + * Test the function that compares byte arrays + */ + @Test + public void testCompare() { + + Assert.assertEquals(-1, HBaseFilterPlanUtil.compare(new byte[] { 1, 2 }, new byte[] { 1, 3 })); + Assert.assertEquals(-1, + HBaseFilterPlanUtil.compare(new byte[] { 1, 2, 3 }, new byte[] { 1, 3 })); + Assert.assertEquals(-1, + HBaseFilterPlanUtil.compare(new byte[] { 1, 2 }, new byte[] { 1, 2, 3 })); + + Assert.assertEquals(0, HBaseFilterPlanUtil.compare(new byte[] { 3, 2 }, new byte[] { 3, 2 })); + + Assert + .assertEquals(1, HBaseFilterPlanUtil.compare(new byte[] { 3, 2, 1 }, new byte[] { 3, 2 })); + Assert + .assertEquals(1, HBaseFilterPlanUtil.compare(new byte[] { 3, 3, 1 }, new byte[] { 3, 2 })); + + } + + /** + * Test function that finds greater/lesser marker + */ + @Test + public void testgetComparedMarker() { + ScanMarker l; + ScanMarker r; + + // equal plans + l = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE); + r = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE); + assertFirstGreater(l, r); + + l = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE); + r = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE); + assertFirstGreater(l, r); + + l = new ScanMarker(null, !INCLUSIVE); + r = new ScanMarker(null, !INCLUSIVE); + assertFirstGreater(l, r); + + // create l is greater because of inclusive flag + l = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE); + r = new ScanMarker(null, !INCLUSIVE); + // the rule for null vs non-null is different + // non-null is both smaller and greater than null + Assert.assertEquals(l, ScanPlan.getComparedMarker(l, r, true)); + Assert.assertEquals(l, ScanPlan.getComparedMarker(r, l, true)); + Assert.assertEquals(l, ScanPlan.getComparedMarker(l, r, false)); + Assert.assertEquals(l, ScanPlan.getComparedMarker(r, l, false)); + + // create l that is greater because of the bytes + l = new ScanMarker(new byte[] { 1, 2, 0 }, INCLUSIVE); + r = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE); + assertFirstGreater(l, r); + + } + + private void assertFirstGreater(ScanMarker big, ScanMarker small) { + Assert.assertEquals(big, ScanPlan.getComparedMarker(big, small, true)); + Assert.assertEquals(big, ScanPlan.getComparedMarker(small, big, true)); + Assert.assertEquals(small, ScanPlan.getComparedMarker(big, small, false)); + Assert.assertEquals(small, ScanPlan.getComparedMarker(small, big, false)); + } + + /** + * Test ScanPlan AND operation + */ + @Test + public void testScanPlanAnd() { + ScanPlan l = new ScanPlan(); + ScanPlan r = new ScanPlan(); + l.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE)); + r.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE)); + + ScanPlan res; + // both equal + res = l.and(r).getPlans().get(0); + Assert.assertEquals(new ScanMarker(new byte[] { 10 }, INCLUSIVE), res.getStartMarker()); + + // add equal end markers as well, and test AND again + l.setEndMarker(new ScanMarker(new byte[] { 20 }, INCLUSIVE)); + r.setEndMarker(new ScanMarker(new byte[] { 20 }, INCLUSIVE)); + res = l.and(r).getPlans().get(0); + Assert.assertEquals(new ScanMarker(new byte[] { 10 }, INCLUSIVE), res.getStartMarker()); + Assert.assertEquals(new ScanMarker(new byte[] { 20 }, INCLUSIVE), res.getEndMarker()); + + l.setEndMarker(new ScanMarker(null, INCLUSIVE)); + r.setStartMarker(new ScanMarker(null, !INCLUSIVE)); + // markers with non null bytes are both lesser and greator + Assert.assertEquals(l.getStartMarker(), res.getStartMarker()); + Assert.assertEquals(r.getEndMarker(), res.getEndMarker()); + + l.setStartMarker(new ScanMarker(new byte[] { 10, 11 }, !INCLUSIVE)); + l.setEndMarker(new ScanMarker(new byte[] { 20, 21 }, INCLUSIVE)); + + r.setStartMarker(new ScanMarker(new byte[] { 10, 10 }, INCLUSIVE)); + r.setEndMarker(new ScanMarker(new byte[] { 15 }, INCLUSIVE)); + res = l.and(r).getPlans().get(0); + // start of l is greater, end of r is smaller + Assert.assertEquals(l.getStartMarker(), res.getStartMarker()); + Assert.assertEquals(r.getEndMarker(), res.getEndMarker()); + + } + + /** + * Test ScanPlan OR operation + */ + @Test + public void testScanPlanOr() { + ScanPlan l = new ScanPlan(); + ScanPlan r = new ScanPlan(); + l.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE)); + r.setStartMarker(new ScanMarker(new byte[] { 11 }, INCLUSIVE)); + + FilterPlan res1 = l.or(r); + Assert.assertEquals(2, res1.getPlans().size()); + res1.getPlans().get(0).getStartMarker().equals(l.getStartMarker()); + res1.getPlans().get(1).getStartMarker().equals(r.getStartMarker()); + + FilterPlan res2 = res1.or(r); + Assert.assertEquals(3, res2.getPlans().size()); + } + + /** + * Test MultiScanPlan OR + */ + @Test + public void testMultiScanPlanOr() { + + MultiScanPlan l = createMultiScanPlan(new ScanPlan()); + MultiScanPlan r = createMultiScanPlan(new ScanPlan()); + // verify OR of two multi plans with one plan each + Assert.assertEquals(2, l.or(r).getPlans().size()); + + // verify OR of multi plan with a single scanplan + Assert.assertEquals(2, l.or(new ScanPlan()).getPlans().size()); + Assert.assertEquals(2, (new ScanPlan()).or(l).getPlans().size()); + + // verify or of two multiplans with more than one scan plan + r = createMultiScanPlan(new ScanPlan(), new ScanPlan()); + Assert.assertEquals(3, l.or(r).getPlans().size()); + Assert.assertEquals(3, r.or(l).getPlans().size()); + + } + + private MultiScanPlan createMultiScanPlan(ScanPlan... scanPlans) { + return new MultiScanPlan(Arrays.asList(scanPlans)); + } + + /** + * Test MultiScanPlan AND + */ + @Test + public void testMultiScanPlanAnd() { + MultiScanPlan l = createMultiScanPlan(new ScanPlan()); + MultiScanPlan r = createMultiScanPlan(new ScanPlan()); + + // two MultiScanPlan with single scan plans should result in new FilterPlan + // with just one scan + Assert.assertEquals(1, l.and(r).getPlans().size()); + + // l has one ScanPlan, r has two. AND result should have two + r = createMultiScanPlan(new ScanPlan(), new ScanPlan()); + Assert.assertEquals(2, l.and(r).getPlans().size()); + Assert.assertEquals(2, r.and(l).getPlans().size()); + + // l has 2 ScanPlans, r has 3. AND result should have 6 + l = createMultiScanPlan(new ScanPlan(), new ScanPlan()); + r = createMultiScanPlan(new ScanPlan(), new ScanPlan(), new ScanPlan()); + Assert.assertEquals(6, l.and(r).getPlans().size()); + Assert.assertEquals(6, r.and(l).getPlans().size()); + } + + /** + * Test plan generation from LeafNode + * + * @throws MetaException + */ + @Test + public void testLeafNodePlan() throws MetaException { + + final String KEY = "k1"; + final String VAL = "v1"; + final byte[] VAL_BYTES = PartitionFilterGenerator.toBytes(VAL); + LeafNode l = new LeafNode(); + l.keyName = KEY; + l.value = VAL; + final ScanMarker DEFAULT_SCANMARKER = new ScanMarker(null, false); + + l.operator = Operator.EQUALS; + verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, INCLUSIVE), new ScanMarker(VAL_BYTES, INCLUSIVE)); + + l.operator = Operator.GREATERTHAN; + verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, !INCLUSIVE), DEFAULT_SCANMARKER); + + l.operator = Operator.GREATERTHANOREQUALTO; + verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, INCLUSIVE), DEFAULT_SCANMARKER); + + l.operator = Operator.LESSTHAN; + verifyPlan(l, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL_BYTES, !INCLUSIVE)); + + l.operator = Operator.LESSTHANOREQUALTO; + verifyPlan(l, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL_BYTES, INCLUSIVE)); + + // following leaf node plans should currently have true for 'has unsupported condition', + // because of the unsupported operator + l.operator = Operator.NOTEQUALS; + verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + + l.operator = Operator.NOTEQUALS2; + verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + + l.operator = Operator.LIKE; + verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + + // following leaf node plans should currently have true for 'has unsupported condition', + // because of the condition is not on first key + l.operator = Operator.EQUALS; + verifyPlan(l, "NOT_FIRST_PART", DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + + l.operator = Operator.NOTEQUALS; + verifyPlan(l, "NOT_FIRST_PART", DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + + // if tree is null, it should return equivalent of full scan, and true + // for 'has unsupported condition' + verifyPlan(null, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + + } + + private void verifyPlan(TreeNode l, String keyName, ScanMarker startMarker, ScanMarker endMarker) + throws MetaException { + verifyPlan(l, keyName, startMarker, endMarker, false); + } + + private void verifyPlan(TreeNode l, String keyName, ScanMarker startMarker, ScanMarker endMarker, + boolean hasUnsupportedCondition) throws MetaException { + ExpressionTree e = null; + if (l != null) { + e = new ExpressionTree(); + e.setRootForTest(l); + } + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, keyName); + FilterPlan plan = planRes.plan; + Assert.assertEquals("Has unsupported condition", hasUnsupportedCondition, + planRes.hasUnsupportedCondition); + Assert.assertEquals(1, plan.getPlans().size()); + ScanPlan splan = plan.getPlans().get(0); + Assert.assertEquals(startMarker, splan.getStartMarker()); + Assert.assertEquals(endMarker, splan.getEndMarker()); + } + + /** + * Test plan generation from TreeNode + * + * @throws MetaException + */ + @Test + public void testTreeNodePlan() throws MetaException { + + final String KEY = "k1"; + final String VAL1 = "10"; + final String VAL2 = "11"; + final byte[] VAL1_BYTES = PartitionFilterGenerator.toBytes(VAL1); + final byte[] VAL2_BYTES = PartitionFilterGenerator.toBytes(VAL2); + LeafNode l = new LeafNode(); + l.keyName = KEY; + l.value = VAL1; + final ScanMarker DEFAULT_SCANMARKER = new ScanMarker(null, false); + + LeafNode r = new LeafNode(); + r.keyName = KEY; + r.value = VAL2; + + TreeNode tn = new TreeNode(l, LogicalOperator.AND, r); + + // verify plan for - k1 >= '10' and k1 < '11' + l.operator = Operator.GREATERTHANOREQUALTO; + r.operator = Operator.LESSTHAN; + verifyPlan(tn, KEY, new ScanMarker(VAL1_BYTES, INCLUSIVE), new ScanMarker(VAL2_BYTES, + !INCLUSIVE)); + + // verify plan for - k1 >= '10' and k1 > '11' + l.operator = Operator.GREATERTHANOREQUALTO; + r.operator = Operator.GREATERTHAN; + verifyPlan(tn, KEY, new ScanMarker(VAL2_BYTES, !INCLUSIVE), DEFAULT_SCANMARKER); + + // verify plan for - k1 >= '10' or k1 > '11' + tn = new TreeNode(l, LogicalOperator.OR, r); + ExpressionTree e = new ExpressionTree(); + e.setRootForTest(tn); + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY); + Assert.assertEquals(2, planRes.plan.getPlans().size()); + Assert.assertEquals(false, planRes.hasUnsupportedCondition); + + // verify plan for - k1 >= '10' and (k1 >= '10' or k1 > '11') + TreeNode tn2 = new TreeNode(l, LogicalOperator.AND, tn); + e = new ExpressionTree(); + e.setRootForTest(tn2); + planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY); + Assert.assertEquals(2, planRes.plan.getPlans().size()); + Assert.assertEquals(false, planRes.hasUnsupportedCondition); + + // verify plan for (k1 >= '10' and (k1 >= '10' or k1 > '11')) or k1 LIKE '2' + // plan should return true for hasUnsupportedCondition + LeafNode klike = new LeafNode(); + klike.keyName = KEY; + klike.value = VAL1; + klike.operator = Operator.LIKE; + TreeNode tn3 = new TreeNode(tn2, LogicalOperator.OR, klike); + e = new ExpressionTree(); + e.setRootForTest(tn3); + planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY); + Assert.assertEquals(3, planRes.plan.getPlans().size()); + Assert.assertEquals(true, planRes.hasUnsupportedCondition); + + + } + +} diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java index de4e28a..df3cd73 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java @@ -20,16 +20,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; @@ -59,22 +51,17 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.IOException; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -92,6 +79,8 @@ SortedMap rows = new TreeMap(); HBaseStore store; + + @Before public void init() throws IOException { MockitoAnnotations.initMocks(this);