diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java new file mode 100644 index 0000000..0cb12b0 --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * This default index scanner expects indexes to be in the same format as presto's + * accumulo index tables defined as: + * [rowid=field value] [cf=column_column] [cq=table rowid] [visibility] [value=""] + *

+ * This handler looks for the following hive serde properties: + * 'accumulo.indextable.name' = 'table_idx' (required - name of the corresponding index table) + * 'accumulo.indexed.columns' = 'name,age,phone' (optional - comma separated list of indexed + * hive columns if not defined or defined as '*' all columns are + * assumed to be indexed ) + * 'accumulo.max.index.rows' = '20000' (optional - maximum number of match indexes to use + * before converting to a full table scan default=20000' + * 'accumulo.index.scanner' = 'org.apache.accumulo.MyHandler' (optional - name of + * the index scanner if you need to use some other scanner) + *

+ * To implement your own index table scheme it should be as simple as sub-classing + * this class and overriding getIndexRowRanges() and optionally init() if you need more + * config settings + */ +public class AccumuloDefaultIndexScanner implements AccumuloIndexScanner { + private static final Logger LOG = LoggerFactory.getLogger(AccumuloDefaultIndexScanner.class); + public static final Range NO_MATCH = new Range("\n"); + + protected AccumuloConnectionParameters connectParams; + protected AccumuloIndexParameters indexParams; + protected int maxRowIds; + protected Authorizations auths; + protected String indexTable; + protected Set indexColumns; + private Connector connect; + + /** + * Initialize object based on configuration. + * + * @param conf - Hive configuration + */ + @Override + public void init(Configuration conf) { + connectParams = new AccumuloConnectionParameters(conf); + indexParams = new AccumuloIndexParameters(conf); + maxRowIds = indexParams.getMaxIndexRows(); + auths = indexParams.getTableAuths(); + indexTable = indexParams.getIndexTable(); + indexColumns = indexParams.getIndexColumns(); + } + + /** + * Get a list of rowid ranges by scanning a column index. + * + * @param column - the hive column name + * @param indexRange - Key range to scan on the index table + * @return List of matching rowid ranges or null if too many matches found + * if index values are not found a newline range is added to list to + * short-circuit the query + */ + @Override + public List getIndexRowRanges(String column, Range indexRange) { + List rowIds = new ArrayList(); + Scanner scan = null; + String col = getMappingIndexColumn(column); + try { + LOG.debug("Searching tab=" + indexTable + " column=" + col + " range=" + indexRange); + Connector conn = getConnector(); + scan = conn.createScanner(indexTable, auths); + scan.setRange(indexRange); + scan.enableIsolation(); + Text cf = new Text(col + "_" + col); + LOG.debug("Using Column Family=" + cf.toString()); + scan.fetchColumnFamily(cf); + + for (Map.Entry entry : scan) { + + rowIds.add(new Range(entry.getKey().getColumnQualifier())); + + // if we have too many results return null for a full scan + if (rowIds.size() > maxRowIds) { + return null; + } + } + + // no hits on the index so return a no match range + if (rowIds.isEmpty()) { + rowIds.add(NO_MATCH); + LOG.debug("Found 0 index matches"); + } else { + LOG.debug("Found " + rowIds.size() + " index matches"); + + } + + return rowIds; + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + LOG.error(e.getLocalizedMessage(), e); + } finally { + if (scan != null) { + scan.close(); + } + } + + // assume the index is bad and do a full scan + LOG.debug("Index lookup failed for table " + indexTable); + return null; + } + + /** + * Test if column is defined in the index table. + * + * @param column - hive column name + * @return true if the column is defined as part of the index table + */ + @Override + public boolean isIndexed(String column) { + return indexTable != null + && (indexColumns.isEmpty() || indexColumns.contains("*") + || this.indexColumns.contains(column.toLowerCase()) + || this.indexColumns.contains(column.toUpperCase())); + + } + + protected String getMappingIndexColumn(String column) { + for (String col : indexColumns) { + if (col.equalsIgnoreCase(column)) { + return col; + } + } + return column; + } + + protected Connector getConnector() throws AccumuloSecurityException, AccumuloException { + if (connect == null) { + connect = connectParams.getConnector(); + } + return connect; + } + +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexParameters.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexParameters.java new file mode 100644 index 0000000..c74277a --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexParameters.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static java.util.Collections.EMPTY_SET; + +/** + * Accumulo Index Parameters for Hive tables. + */ +public class AccumuloIndexParameters { + public static final int DEFAULT_MAX_ROWIDS = 20000; + public static final int DEFAULT_WRITE_THREADS = 4; + public static final String ACCUMULO_INDEX_SCANNER = "accumulo.index.scanner"; + public static final String ACCUMULO_MAX_INDEX_ROWS = "accumulo.max.index.rows"; + public static final String ACCUMULO_INDEXED_COLUMNS = "accumulo.indexed.columns"; + public static final String ACCUMULO_INDEXTABLE_NAME = "accumulo.indextable.name"; + private static final Logger LOG = LoggerFactory.getLogger(AccumuloIndexParameters.class); + private Configuration conf; + + public AccumuloIndexParameters(Configuration conf) { + this.conf = conf; + } + + public String getIndexTable() { + return this.conf.get(ACCUMULO_INDEXTABLE_NAME); + } + + public int getMaxIndexRows() { + return this.conf.getInt(ACCUMULO_MAX_INDEX_ROWS, DEFAULT_MAX_ROWIDS); + } + + public final Set getIndexColumns() { + String colmap = conf.get(ACCUMULO_INDEXED_COLUMNS); + if (colmap != null) { + return new HashSet(Arrays.asList(colmap.split(","))); + } + return EMPTY_SET; + } + + + protected final Authorizations getTableAuths() { + String auths = conf.get(AccumuloSerDeParameters.AUTHORIZATIONS_KEY); + if (auths != null && !auths.isEmpty()) { + return new Authorizations(auths.trim().getBytes(StandardCharsets.UTF_8)); + } + return new Authorizations(); + } + + public Configuration getConf() { + return conf; + } + + public final AccumuloIndexScanner createScanner() { + AccumuloIndexScanner handler = null; + + String classname = conf.get(ACCUMULO_INDEX_SCANNER); + if (classname != null) { + try { + handler = (AccumuloIndexScanner) Class.forName(classname).newInstance(); + } catch (ClassCastException | InstantiationException | IllegalAccessException + | ClassNotFoundException e) { + LOG.error(e.getLocalizedMessage(), e); + } + } else { + handler = new AccumuloDefaultIndexScanner(); + } + if (handler != null) { + handler.init(conf); + } + return handler; + } +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java new file mode 100644 index 0000000..8547f70 --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +/** + * Specification for implementing a AccumuloIndexScanner. + */ +public interface AccumuloIndexScanner { + + void init(Configuration conf); + + boolean isIndexed(String columnName); + + List getIndexRowRanges(String column, Range indexRange); + +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java index a7ec7c5..0ff1011 100644 --- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java @@ -196,7 +196,8 @@ private AccumuloPredicateHandler() {} /** * Loop through search conditions and build ranges for predicates involving rowID column, if any. */ - public List getRanges(Configuration conf, ColumnMapper columnMapper) throws SerDeException { + public List getRanges(Configuration conf, ColumnMapper columnMapper) + throws SerDeException { if (!columnMapper.hasRowIdMapping()) { return TOTAL_RANGE; } @@ -218,7 +219,7 @@ private AccumuloPredicateHandler() {} return TOTAL_RANGE; } - Object result = generateRanges(columnMapper, hiveRowIdColumnName, root); + Object result = generateRanges(conf, columnMapper, hiveRowIdColumnName, root); if (null == result) { log.info("Calculated null set of ranges, scanning full table"); @@ -240,6 +241,8 @@ private AccumuloPredicateHandler() {} * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo * Ranges using expressions involving the Accumulo rowid-mapped Hive column * + * @param conf + * Hadoop configuration * @param columnMapper * Mapping of Hive to Accumulo columns for the query * @param hiveRowIdColumnName @@ -249,8 +252,9 @@ private AccumuloPredicateHandler() {} * @return An object representing the result from the ExprNodeDesc tree traversal using the * AccumuloRangeGenerator */ - protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColumnName, ExprNodeDesc root) { - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, + protected Object generateRanges(Configuration conf, ColumnMapper columnMapper, + String hiveRowIdColumnName, ExprNodeDesc root) { + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, columnMapper.getRowIdMapping(), hiveRowIdColumnName); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java index 21392d1..020f320 100644 --- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java @@ -16,13 +16,10 @@ */ package org.apache.hadoop.hive.accumulo.predicate; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Stack; - import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.AccumuloIndexParameters; +import org.apache.hadoop.hive.accumulo.AccumuloIndexScanner; import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping; import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp; import org.apache.hadoop.hive.accumulo.predicate.compare.Equal; @@ -43,17 +40,16 @@ import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + /** * */ @@ -63,12 +59,14 @@ private final AccumuloPredicateHandler predicateHandler; private final HiveAccumuloRowIdColumnMapping rowIdMapping; private final String hiveRowIdColumnName; + protected AccumuloIndexScanner indexScanner; - public AccumuloRangeGenerator(AccumuloPredicateHandler predicateHandler, + public AccumuloRangeGenerator(Configuration conf, AccumuloPredicateHandler predicateHandler, HiveAccumuloRowIdColumnMapping rowIdMapping, String hiveRowIdColumnName) { this.predicateHandler = predicateHandler; this.rowIdMapping = rowIdMapping; this.hiveRowIdColumnName = hiveRowIdColumnName; + this.indexScanner = new AccumuloIndexParameters(conf).createScanner(); } @Override @@ -234,30 +232,22 @@ protected Object processExpression(ExprNodeGenericFuncDesc func, Object[] nodeOu return null; } - // Reject any clauses that are against a column that isn't the rowId mapping - if (!this.hiveRowIdColumnName.equals(columnDesc.getColumn())) { + ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector(); + + // Reject any clauses that are against a column that isn't the rowId mapping or indexed + if (!this.hiveRowIdColumnName.equals(columnDesc.getColumn())){ + if (this.indexScanner != null && this.indexScanner.isIndexed(columnDesc.getColumn())) { + return getIndexedRowIds(genericUdf, leftHandNode, columnDesc.getColumn(), objInspector); + } return null; } - ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector(); + Text constText = getConstantText(objInspector); - Text constText; - switch (rowIdMapping.getEncoding()) { - case STRING: - constText = getUtf8Value(objInspector); - break; - case BINARY: - try { - constText = getBinaryValue(objInspector); - } catch (IOException e) { - throw new SemanticException(e); - } - break; - default: - throw new SemanticException("Unable to parse unknown encoding: " - + rowIdMapping.getEncoding()); + return getRange(genericUdf, leftHandNode, constText); } + private Range getRange(GenericUDF genericUdf, ExprNodeDesc leftHandNode, Text constText) { Class opClz; try { opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName()); @@ -274,6 +264,26 @@ protected Object processExpression(ExprNodeGenericFuncDesc func, Object[] nodeOu } } + private Text getConstantText(ConstantObjectInspector objInspector) throws SemanticException { + Text constText; + switch (rowIdMapping.getEncoding()) { + case STRING: + constText = getUtf8Value(objInspector); + break; + case BINARY: + try { + constText = getBinaryValue(objInspector); + } catch (IOException e) { + throw new SemanticException(e); + } + break; + default: + throw new SemanticException("Unable to parse unknown encoding: " + + rowIdMapping.getEncoding()); + } + return constText; + } + protected Range getConstantOpColumnRange(Class opClz, Text constText) { if (opClz.equals(Equal.class)) { // 100 == x @@ -311,6 +321,15 @@ protected Range getColumnOpConstantRange(Class opClz, Text } } + + protected Object getIndexedRowIds(GenericUDF genericUdf, ExprNodeDesc leftHandNode, + String columnName, ConstantObjectInspector objInspector) throws SemanticException { + Text constText = getConstantText(objInspector); + Range range = getRange(genericUdf, leftHandNode, constText); + return indexScanner.getIndexRowRanges(columnName, range); + } + + protected Text getUtf8Value(ConstantObjectInspector objInspector) { // TODO is there a more correct way to get the literal value for the Object? return new Text(objInspector.getWritableConstantValue().toString()); diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java new file mode 100644 index 0000000..8815ce3 --- /dev/null +++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestAccumuloDefaultIndexScanner { + private static final Logger LOG = LoggerFactory.getLogger(TestAccumuloDefaultIndexScanner.class); + private static final Value EMPTY_VALUE = new Value(); + + private static void addRow(BatchWriter writer, String rowId, String cf, String cq) throws MutationsRejectedException { + Mutation mut = new Mutation(rowId); + mut.put(new Text(cf), new Text(cq), EMPTY_VALUE); + writer.addMutation(mut); + } + + public static AccumuloDefaultIndexScanner buildMockHandler(int maxMatches) { + try { + String table = "table"; + Text emptyText = new Text(""); + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, table); + conf.setInt(AccumuloIndexParameters.ACCUMULO_MAX_INDEX_ROWS, maxMatches); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "*"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + + MockInstance inst = new MockInstance("test_instance"); + Connector conn = inst.getConnector("root", new PasswordToken("")); + if (!conn.tableOperations().exists(table)) { + conn.tableOperations().create(table); + BatchWriterConfig batchConfig = new BatchWriterConfig(); + BatchWriter writer = conn.createBatchWriter(table, batchConfig); + addRow(writer, "fred", "name_name", "row1"); + addRow(writer, "25", "age_age", "row1"); + addRow(writer, "bill", "name_name", "row2"); + addRow(writer, "20", "age_age", "row2"); + addRow(writer, "sally", "name_name", "row3"); + addRow(writer, "23", "age_age", "row3"); + addRow(writer, "rob", "name_name", "row4"); + addRow(writer, "60", "age_age", "row4"); + writer.close(); + } + AccumuloConnectionParameters connectionParams = Mockito + .mock(AccumuloConnectionParameters.class); + AccumuloStorageHandler storageHandler = Mockito.mock(AccumuloStorageHandler.class); + + Mockito.when(connectionParams.getConnector()).thenReturn(conn); + handler.connectParams = connectionParams; + return handler; + } catch (AccumuloSecurityException | AccumuloException | TableExistsException | TableNotFoundException e) { + LOG.error(e.getLocalizedMessage(), e); + } + return null; + } + + @Test + public void testMatchNone() { + AccumuloDefaultIndexScanner handler = buildMockHandler(10); + List ranges = handler.getIndexRowRanges("name", new Range("mike")); + assertEquals(1, ranges.size()); + assertTrue("does not contain no-match range", ranges.contains(AccumuloDefaultIndexScanner.NO_MATCH)); + } + + @Test + public void testMatchRange() { + AccumuloDefaultIndexScanner handler = buildMockHandler(10); + List ranges = handler.getIndexRowRanges("age", new Range("10", "50")); + assertEquals(3, ranges.size()); + assertTrue("does not contain row1", ranges.contains(new Range("row1"))); + assertTrue("does not contain row2", ranges.contains(new Range("row2"))); + assertTrue("does not contain row3", ranges.contains(new Range("row3"))); + } + + public void testTooManyMatches() { + AccumuloDefaultIndexScanner handler = buildMockHandler(2); + List ranges = handler.getIndexRowRanges("age", new Range("10", "50")); + assertNull("ranges should be null", ranges); + } + + @Test + public void testMatchExact() { + AccumuloDefaultIndexScanner handler = buildMockHandler(10); + List ranges = handler.getIndexRowRanges("age", new Range("20")); + assertEquals(1, ranges.size()); + assertTrue("does not contain row2", ranges.contains(new Range("row2"))); + } + + @Test + public void testValidIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "name,age,phone,email"); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertTrue("name is not identified as an index", handler.isIndexed("name")); + assertTrue("age is not identified as an index", handler.isIndexed("age")); + assertTrue("phone is not identified as an index", handler.isIndexed("phone")); + assertTrue("email is not identified as an index", handler.isIndexed("email")); + } + + @Test + public void testInvalidIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "name,age,phone,email"); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertFalse("mobile is identified as an index", handler.isIndexed("mobile")); + assertFalse("mail is identified as an index", handler.isIndexed("mail")); + } + + + @Test + public void testMissingTable() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "name,age,phone,email"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertFalse("name is identified as an index", handler.isIndexed("name")); + assertFalse("age is identified as an index", handler.isIndexed("age")); + } + + @Test + public void testWildcardIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "*"); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertTrue("name is not identified as an index", handler.isIndexed("name")); + assertTrue("age is not identified as an index", handler.isIndexed("age")); + } + + @Test + public void testNullIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertTrue("name is not identified as an index", handler.isIndexed("name")); + } + + @Test + public void testEmptyIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, ""); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertFalse("name is identified as an index", handler.isIndexed("name")); + } +} diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java new file mode 100644 index 0000000..af279ad --- /dev/null +++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.junit.Test; + +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestAccumuloIndexParameters implements AccumuloIndexScanner { + + @Test + public void testDefaultScanner() { + AccumuloIndexScanner scanner = new AccumuloIndexParameters(new Configuration()).createScanner(); + assertTrue(scanner instanceof AccumuloDefaultIndexScanner); + } + + @Test + public void testUserHandler() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEX_SCANNER, this.getClass().getName()); + AccumuloIndexScanner scanner = new AccumuloIndexParameters(conf).createScanner(); + assertTrue(scanner instanceof TestAccumuloIndexParameters); + } + + @Test + public void testBadHandler() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEX_SCANNER, "a.class.does.not.exist.IndexHandler"); + AccumuloIndexScanner scanner = new AccumuloIndexParameters(conf).createScanner(); + assertTrue(scanner == null); + } + + @Test + public void getIndexColumns() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "a,b,c"); + Set cols = new AccumuloIndexParameters(conf).getIndexColumns(); + assertEquals(3, cols.size()); + assertTrue("Missing column a", cols.contains("a")); + assertTrue("Missing column b", cols.contains("b")); + assertTrue("Missing column c", cols.contains("c")); + } + + @Test + public void getMaxIndexRows() { + Configuration conf = new Configuration(); + conf.setInt(AccumuloIndexParameters.ACCUMULO_MAX_INDEX_ROWS, 10); + int maxRows = new AccumuloIndexParameters(conf).getMaxIndexRows(); + assertEquals(10, maxRows); + } + + @Test + public void getAuths() { + Configuration conf = new Configuration(); + conf.set(AccumuloSerDeParameters.AUTHORIZATIONS_KEY, "public,open"); + Authorizations auths = new AccumuloIndexParameters(conf).getTableAuths(); + assertEquals(2, auths.size()); + assertTrue("Missing auth public", auths.contains("public")); + assertTrue("Missing auth open", auths.contains("open")); + } + + + //AccumuloIndexScanner Interface methods + @Override + public void init(Configuration conf) { + } + + @Override + public boolean isIndexed(String columnName) { + return false; + } + + @Override + public List getIndexRowRanges(String column, Range indexRange) { + return null; + } +} diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java index 88e4530..46c5b93 100644 --- accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java +++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java @@ -758,7 +758,7 @@ public void testNullRangeGeneratorOutput() throws SerDeException { String hiveRowIdColumnName = "rid"; Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(null); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)).thenReturn(null); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges @@ -776,7 +776,8 @@ public void testEmptyListRangeGeneratorOutput() throws SerDeException { String hiveRowIdColumnName = "rid"; Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(Collections.emptyList()); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)) + .thenReturn(Collections.emptyList()); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges @@ -795,7 +796,7 @@ public void testSingleRangeGeneratorOutput() throws SerDeException { Range r = new Range("a"); Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(r); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)).thenReturn(r); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges @@ -814,7 +815,8 @@ public void testManyRangesGeneratorOutput() throws SerDeException { Range r1 = new Range("a"), r2 = new Range("z"); Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(Arrays.asList(r1, r2)); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)) + .thenReturn(Arrays.asList(r1, r2)); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java index 339da07..49a926c 100644 --- accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java +++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java @@ -27,7 +27,9 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants; +import org.apache.hadoop.hive.accumulo.TestAccumuloDefaultIndexScanner; import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -66,12 +68,14 @@ private AccumuloPredicateHandler handler; private HiveAccumuloRowIdColumnMapping rowIdMapping; + private Configuration conf; @Before public void setup() { handler = AccumuloPredicateHandler.getInstance(); rowIdMapping = new HiveAccumuloRowIdColumnMapping(AccumuloHiveConstants.ROWID, ColumnEncoding.STRING, "row", TypeInfoFactory.stringTypeInfo.toString()); + conf = new Configuration(true); } @Test @@ -108,7 +112,7 @@ public void testRangeConjunction() throws Exception { List expectedRanges = Arrays .asList(new Range(new Key("f"), true, new Key("m\0"), false)); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -163,7 +167,7 @@ public void testRangeDisjunction() throws Exception { // Should generate (-inf,+inf) List expectedRanges = Arrays.asList(new Range()); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -236,7 +240,7 @@ public void testRangeConjunctionWithDisjunction() throws Exception { // Should generate ['q', +inf) List expectedRanges = Arrays.asList(new Range(new Key("q"), true, null, false)); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -291,7 +295,7 @@ public void testPartialRangeConjunction() throws Exception { // Should generate [f,+inf) List expectedRanges = Arrays.asList(new Range(new Key("f"), true, null, false)); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -349,7 +353,7 @@ public void testDateRangeConjunction() throws Exception { List expectedRanges = Arrays.asList(new Range(new Key("2014-01-01"), true, new Key( "2014-07-01"), false)); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -397,7 +401,7 @@ public void testCastExpression() throws Exception { ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPEqualOrGreaterThan(), Arrays.asList(key, cast)); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "key"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "key"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -446,7 +450,7 @@ public void testRangeOverNonRowIdField() throws Exception { ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPAnd(), bothFilters); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -464,4 +468,59 @@ public void testRangeOverNonRowIdField() throws Exception { Object result = nodeOutput.get(both); Assert.assertNull(result); } + + @Test + public void testRangeOverIndexedField() throws Exception { + // age >= '10' + ExprNodeDesc column = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "age", null, false); + ExprNodeDesc constant = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "10"); + List children = Lists.newArrayList(); + children.add(column); + children.add(constant); + ExprNodeDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, + new GenericUDFOPEqualOrGreaterThan(), children); + assertNotNull(node); + + // age <= '50' + ExprNodeDesc column2 = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "age", null, + false); + ExprNodeDesc constant2 = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "50"); + List children2 = Lists.newArrayList(); + children2.add(column2); + children2.add(constant2); + ExprNodeDesc node2 = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, + new GenericUDFOPEqualOrLessThan(), children2); + assertNotNull(node2); + + // And UDF + List bothFilters = Lists.newArrayList(); + bothFilters.add(node); + bothFilters.add(node2); + ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, + new GenericUDFOPAnd(), bothFilters); + + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); + rangeGenerator.indexScanner = TestAccumuloDefaultIndexScanner.buildMockHandler(10); + Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, + Collections. emptyMap(), null); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.add(both); + HashMap nodeOutput = new HashMap(); + + try { + ogw.startWalking(topNodes, nodeOutput); + } catch (SemanticException ex) { + throw new RuntimeException(ex); + } + + // Filters are using an index which should match 3 rows + List result = (List)nodeOutput.get(both); + + Assert.assertEquals( 3, result.size()); + Assert.assertTrue( "does not contain row1", result.contains( new Range( "row1"))); + Assert.assertTrue( "does not contain row2", result.contains( new Range( "row2"))); + Assert.assertTrue( "does not contain row3", result.contains( new Range( "row3"))); + } + }