diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java
new file mode 100644
index 0000000..0cb12b0
--- /dev/null
+++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This default index scanner expects indexes to be in the same format as presto's
+ * accumulo index tables defined as:
+ * [rowid=field value] [cf=column_column] [cq=table rowid] [visibility] [value=""]
+ *
+ * This handler looks for the following hive serde properties:
+ * 'accumulo.indextable.name' = 'table_idx' (required - name of the corresponding index table)
+ * 'accumulo.indexed.columns' = 'name,age,phone' (optional - comma separated list of indexed
+ * hive columns if not defined or defined as '*' all columns are
+ * assumed to be indexed )
+ * 'accumulo.max.index.rows' = '20000' (optional - maximum number of match indexes to use
+ * before converting to a full table scan default=20000'
+ * 'accumulo.index.scanner' = 'org.apache.accumulo.MyHandler' (optional - name of
+ * the index scanner if you need to use some other scanner)
+ *
+ * To implement your own index table scheme it should be as simple as sub-classing
+ * this class and overriding getIndexRowRanges() and optionally init() if you need more
+ * config settings
+ */
+public class AccumuloDefaultIndexScanner implements AccumuloIndexScanner {
+ private static final Logger LOG = LoggerFactory.getLogger(AccumuloDefaultIndexScanner.class);
+ public static final Range NO_MATCH = new Range("\n");
+
+ protected AccumuloConnectionParameters connectParams;
+ protected AccumuloIndexParameters indexParams;
+ protected int maxRowIds;
+ protected Authorizations auths;
+ protected String indexTable;
+ protected Set indexColumns;
+ private Connector connect;
+
+ /**
+ * Initialize object based on configuration.
+ *
+ * @param conf - Hive configuration
+ */
+ @Override
+ public void init(Configuration conf) {
+ connectParams = new AccumuloConnectionParameters(conf);
+ indexParams = new AccumuloIndexParameters(conf);
+ maxRowIds = indexParams.getMaxIndexRows();
+ auths = indexParams.getTableAuths();
+ indexTable = indexParams.getIndexTable();
+ indexColumns = indexParams.getIndexColumns();
+ }
+
+ /**
+ * Get a list of rowid ranges by scanning a column index.
+ *
+ * @param column - the hive column name
+ * @param indexRange - Key range to scan on the index table
+ * @return List of matching rowid ranges or null if too many matches found
+ * if index values are not found a newline range is added to list to
+ * short-circuit the query
+ */
+ @Override
+ public List getIndexRowRanges(String column, Range indexRange) {
+ List rowIds = new ArrayList();
+ Scanner scan = null;
+ String col = getMappingIndexColumn(column);
+ try {
+ LOG.debug("Searching tab=" + indexTable + " column=" + col + " range=" + indexRange);
+ Connector conn = getConnector();
+ scan = conn.createScanner(indexTable, auths);
+ scan.setRange(indexRange);
+ scan.enableIsolation();
+ Text cf = new Text(col + "_" + col);
+ LOG.debug("Using Column Family=" + cf.toString());
+ scan.fetchColumnFamily(cf);
+
+ for (Map.Entry entry : scan) {
+
+ rowIds.add(new Range(entry.getKey().getColumnQualifier()));
+
+ // if we have too many results return null for a full scan
+ if (rowIds.size() > maxRowIds) {
+ return null;
+ }
+ }
+
+ // no hits on the index so return a no match range
+ if (rowIds.isEmpty()) {
+ rowIds.add(NO_MATCH);
+ LOG.debug("Found 0 index matches");
+ } else {
+ LOG.debug("Found " + rowIds.size() + " index matches");
+
+ }
+
+ return rowIds;
+ } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+ LOG.error(e.getLocalizedMessage(), e);
+ } finally {
+ if (scan != null) {
+ scan.close();
+ }
+ }
+
+ // assume the index is bad and do a full scan
+ LOG.debug("Index lookup failed for table " + indexTable);
+ return null;
+ }
+
+ /**
+ * Test if column is defined in the index table.
+ *
+ * @param column - hive column name
+ * @return true if the column is defined as part of the index table
+ */
+ @Override
+ public boolean isIndexed(String column) {
+ return indexTable != null
+ && (indexColumns.isEmpty() || indexColumns.contains("*")
+ || this.indexColumns.contains(column.toLowerCase())
+ || this.indexColumns.contains(column.toUpperCase()));
+
+ }
+
+ protected String getMappingIndexColumn(String column) {
+ for (String col : indexColumns) {
+ if (col.equalsIgnoreCase(column)) {
+ return col;
+ }
+ }
+ return column;
+ }
+
+ protected Connector getConnector() throws AccumuloSecurityException, AccumuloException {
+ if (connect == null) {
+ connect = connectParams.getConnector();
+ }
+ return connect;
+ }
+
+}
diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexParameters.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexParameters.java
new file mode 100644
index 0000000..c74277a
--- /dev/null
+++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexParameters.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static java.util.Collections.EMPTY_SET;
+
+/**
+ * Accumulo Index Parameters for Hive tables.
+ */
+public class AccumuloIndexParameters {
+ public static final int DEFAULT_MAX_ROWIDS = 20000;
+ public static final int DEFAULT_WRITE_THREADS = 4;
+ public static final String ACCUMULO_INDEX_SCANNER = "accumulo.index.scanner";
+ public static final String ACCUMULO_MAX_INDEX_ROWS = "accumulo.max.index.rows";
+ public static final String ACCUMULO_INDEXED_COLUMNS = "accumulo.indexed.columns";
+ public static final String ACCUMULO_INDEXTABLE_NAME = "accumulo.indextable.name";
+ private static final Logger LOG = LoggerFactory.getLogger(AccumuloIndexParameters.class);
+ private Configuration conf;
+
+ public AccumuloIndexParameters(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public String getIndexTable() {
+ return this.conf.get(ACCUMULO_INDEXTABLE_NAME);
+ }
+
+ public int getMaxIndexRows() {
+ return this.conf.getInt(ACCUMULO_MAX_INDEX_ROWS, DEFAULT_MAX_ROWIDS);
+ }
+
+ public final Set getIndexColumns() {
+ String colmap = conf.get(ACCUMULO_INDEXED_COLUMNS);
+ if (colmap != null) {
+ return new HashSet(Arrays.asList(colmap.split(",")));
+ }
+ return EMPTY_SET;
+ }
+
+
+ protected final Authorizations getTableAuths() {
+ String auths = conf.get(AccumuloSerDeParameters.AUTHORIZATIONS_KEY);
+ if (auths != null && !auths.isEmpty()) {
+ return new Authorizations(auths.trim().getBytes(StandardCharsets.UTF_8));
+ }
+ return new Authorizations();
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public final AccumuloIndexScanner createScanner() {
+ AccumuloIndexScanner handler = null;
+
+ String classname = conf.get(ACCUMULO_INDEX_SCANNER);
+ if (classname != null) {
+ try {
+ handler = (AccumuloIndexScanner) Class.forName(classname).newInstance();
+ } catch (ClassCastException | InstantiationException | IllegalAccessException
+ | ClassNotFoundException e) {
+ LOG.error(e.getLocalizedMessage(), e);
+ }
+ } else {
+ handler = new AccumuloDefaultIndexScanner();
+ }
+ if (handler != null) {
+ handler.init(conf);
+ }
+ return handler;
+ }
+}
diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java
new file mode 100644
index 0000000..8547f70
--- /dev/null
+++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+
+/**
+ * Specification for implementing a AccumuloIndexScanner.
+ */
+public interface AccumuloIndexScanner {
+
+ void init(Configuration conf);
+
+ boolean isIndexed(String columnName);
+
+ List getIndexRowRanges(String column, Range indexRange);
+
+}
diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
index a7ec7c5..0ff1011 100644
--- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
+++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
@@ -196,7 +196,8 @@ private AccumuloPredicateHandler() {}
/**
* Loop through search conditions and build ranges for predicates involving rowID column, if any.
*/
- public List getRanges(Configuration conf, ColumnMapper columnMapper) throws SerDeException {
+ public List getRanges(Configuration conf, ColumnMapper columnMapper)
+ throws SerDeException {
if (!columnMapper.hasRowIdMapping()) {
return TOTAL_RANGE;
}
@@ -218,7 +219,7 @@ private AccumuloPredicateHandler() {}
return TOTAL_RANGE;
}
- Object result = generateRanges(columnMapper, hiveRowIdColumnName, root);
+ Object result = generateRanges(conf, columnMapper, hiveRowIdColumnName, root);
if (null == result) {
log.info("Calculated null set of ranges, scanning full table");
@@ -240,6 +241,8 @@ private AccumuloPredicateHandler() {}
* Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo
* Ranges using expressions involving the Accumulo rowid-mapped Hive column
*
+ * @param conf
+ * Hadoop configuration
* @param columnMapper
* Mapping of Hive to Accumulo columns for the query
* @param hiveRowIdColumnName
@@ -249,8 +252,9 @@ private AccumuloPredicateHandler() {}
* @return An object representing the result from the ExprNodeDesc tree traversal using the
* AccumuloRangeGenerator
*/
- protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColumnName, ExprNodeDesc root) {
- AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler,
+ protected Object generateRanges(Configuration conf, ColumnMapper columnMapper,
+ String hiveRowIdColumnName, ExprNodeDesc root) {
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler,
columnMapper.getRowIdMapping(), hiveRowIdColumnName);
Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
Collections. emptyMap(), null);
diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
index 21392d1..020f320 100644
--- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
+++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
@@ -16,13 +16,10 @@
*/
package org.apache.hadoop.hive.accumulo.predicate;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Stack;
-
import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.AccumuloIndexParameters;
+import org.apache.hadoop.hive.accumulo.AccumuloIndexScanner;
import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping;
import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
import org.apache.hadoop.hive.accumulo.predicate.compare.Equal;
@@ -43,17 +40,16 @@
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
/**
*
*/
@@ -63,12 +59,14 @@
private final AccumuloPredicateHandler predicateHandler;
private final HiveAccumuloRowIdColumnMapping rowIdMapping;
private final String hiveRowIdColumnName;
+ protected AccumuloIndexScanner indexScanner;
- public AccumuloRangeGenerator(AccumuloPredicateHandler predicateHandler,
+ public AccumuloRangeGenerator(Configuration conf, AccumuloPredicateHandler predicateHandler,
HiveAccumuloRowIdColumnMapping rowIdMapping, String hiveRowIdColumnName) {
this.predicateHandler = predicateHandler;
this.rowIdMapping = rowIdMapping;
this.hiveRowIdColumnName = hiveRowIdColumnName;
+ this.indexScanner = new AccumuloIndexParameters(conf).createScanner();
}
@Override
@@ -234,30 +232,22 @@ protected Object processExpression(ExprNodeGenericFuncDesc func, Object[] nodeOu
return null;
}
- // Reject any clauses that are against a column that isn't the rowId mapping
- if (!this.hiveRowIdColumnName.equals(columnDesc.getColumn())) {
+ ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector();
+
+ // Reject any clauses that are against a column that isn't the rowId mapping or indexed
+ if (!this.hiveRowIdColumnName.equals(columnDesc.getColumn())){
+ if (this.indexScanner != null && this.indexScanner.isIndexed(columnDesc.getColumn())) {
+ return getIndexedRowIds(genericUdf, leftHandNode, columnDesc.getColumn(), objInspector);
+ }
return null;
}
- ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector();
+ Text constText = getConstantText(objInspector);
- Text constText;
- switch (rowIdMapping.getEncoding()) {
- case STRING:
- constText = getUtf8Value(objInspector);
- break;
- case BINARY:
- try {
- constText = getBinaryValue(objInspector);
- } catch (IOException e) {
- throw new SemanticException(e);
- }
- break;
- default:
- throw new SemanticException("Unable to parse unknown encoding: "
- + rowIdMapping.getEncoding());
+ return getRange(genericUdf, leftHandNode, constText);
}
+ private Range getRange(GenericUDF genericUdf, ExprNodeDesc leftHandNode, Text constText) {
Class extends CompareOp> opClz;
try {
opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName());
@@ -274,6 +264,26 @@ protected Object processExpression(ExprNodeGenericFuncDesc func, Object[] nodeOu
}
}
+ private Text getConstantText(ConstantObjectInspector objInspector) throws SemanticException {
+ Text constText;
+ switch (rowIdMapping.getEncoding()) {
+ case STRING:
+ constText = getUtf8Value(objInspector);
+ break;
+ case BINARY:
+ try {
+ constText = getBinaryValue(objInspector);
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ break;
+ default:
+ throw new SemanticException("Unable to parse unknown encoding: "
+ + rowIdMapping.getEncoding());
+ }
+ return constText;
+ }
+
protected Range getConstantOpColumnRange(Class extends CompareOp> opClz, Text constText) {
if (opClz.equals(Equal.class)) {
// 100 == x
@@ -311,6 +321,15 @@ protected Range getColumnOpConstantRange(Class extends CompareOp> opClz, Text
}
}
+
+ protected Object getIndexedRowIds(GenericUDF genericUdf, ExprNodeDesc leftHandNode,
+ String columnName, ConstantObjectInspector objInspector) throws SemanticException {
+ Text constText = getConstantText(objInspector);
+ Range range = getRange(genericUdf, leftHandNode, constText);
+ return indexScanner.getIndexRowRanges(columnName, range);
+ }
+
+
protected Text getUtf8Value(ConstantObjectInspector objInspector) {
// TODO is there a more correct way to get the literal value for the Object?
return new Text(objInspector.getWritableConstantValue().toString());
diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java
new file mode 100644
index 0000000..8815ce3
--- /dev/null
+++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestAccumuloDefaultIndexScanner {
+ private static final Logger LOG = LoggerFactory.getLogger(TestAccumuloDefaultIndexScanner.class);
+ private static final Value EMPTY_VALUE = new Value();
+
+ private static void addRow(BatchWriter writer, String rowId, String cf, String cq) throws MutationsRejectedException {
+ Mutation mut = new Mutation(rowId);
+ mut.put(new Text(cf), new Text(cq), EMPTY_VALUE);
+ writer.addMutation(mut);
+ }
+
+ public static AccumuloDefaultIndexScanner buildMockHandler(int maxMatches) {
+ try {
+ String table = "table";
+ Text emptyText = new Text("");
+ Configuration conf = new Configuration();
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, table);
+ conf.setInt(AccumuloIndexParameters.ACCUMULO_MAX_INDEX_ROWS, maxMatches);
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "*");
+ AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+ handler.init(conf);
+
+ MockInstance inst = new MockInstance("test_instance");
+ Connector conn = inst.getConnector("root", new PasswordToken(""));
+ if (!conn.tableOperations().exists(table)) {
+ conn.tableOperations().create(table);
+ BatchWriterConfig batchConfig = new BatchWriterConfig();
+ BatchWriter writer = conn.createBatchWriter(table, batchConfig);
+ addRow(writer, "fred", "name_name", "row1");
+ addRow(writer, "25", "age_age", "row1");
+ addRow(writer, "bill", "name_name", "row2");
+ addRow(writer, "20", "age_age", "row2");
+ addRow(writer, "sally", "name_name", "row3");
+ addRow(writer, "23", "age_age", "row3");
+ addRow(writer, "rob", "name_name", "row4");
+ addRow(writer, "60", "age_age", "row4");
+ writer.close();
+ }
+ AccumuloConnectionParameters connectionParams = Mockito
+ .mock(AccumuloConnectionParameters.class);
+ AccumuloStorageHandler storageHandler = Mockito.mock(AccumuloStorageHandler.class);
+
+ Mockito.when(connectionParams.getConnector()).thenReturn(conn);
+ handler.connectParams = connectionParams;
+ return handler;
+ } catch (AccumuloSecurityException | AccumuloException | TableExistsException | TableNotFoundException e) {
+ LOG.error(e.getLocalizedMessage(), e);
+ }
+ return null;
+ }
+
+ @Test
+ public void testMatchNone() {
+ AccumuloDefaultIndexScanner handler = buildMockHandler(10);
+ List ranges = handler.getIndexRowRanges("name", new Range("mike"));
+ assertEquals(1, ranges.size());
+ assertTrue("does not contain no-match range", ranges.contains(AccumuloDefaultIndexScanner.NO_MATCH));
+ }
+
+ @Test
+ public void testMatchRange() {
+ AccumuloDefaultIndexScanner handler = buildMockHandler(10);
+ List ranges = handler.getIndexRowRanges("age", new Range("10", "50"));
+ assertEquals(3, ranges.size());
+ assertTrue("does not contain row1", ranges.contains(new Range("row1")));
+ assertTrue("does not contain row2", ranges.contains(new Range("row2")));
+ assertTrue("does not contain row3", ranges.contains(new Range("row3")));
+ }
+
+ public void testTooManyMatches() {
+ AccumuloDefaultIndexScanner handler = buildMockHandler(2);
+ List ranges = handler.getIndexRowRanges("age", new Range("10", "50"));
+ assertNull("ranges should be null", ranges);
+ }
+
+ @Test
+ public void testMatchExact() {
+ AccumuloDefaultIndexScanner handler = buildMockHandler(10);
+ List ranges = handler.getIndexRowRanges("age", new Range("20"));
+ assertEquals(1, ranges.size());
+ assertTrue("does not contain row2", ranges.contains(new Range("row2")));
+ }
+
+ @Test
+ public void testValidIndex() {
+ Configuration conf = new Configuration();
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "name,age,phone,email");
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, "contact");
+ AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+ handler.init(conf);
+ assertTrue("name is not identified as an index", handler.isIndexed("name"));
+ assertTrue("age is not identified as an index", handler.isIndexed("age"));
+ assertTrue("phone is not identified as an index", handler.isIndexed("phone"));
+ assertTrue("email is not identified as an index", handler.isIndexed("email"));
+ }
+
+ @Test
+ public void testInvalidIndex() {
+ Configuration conf = new Configuration();
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "name,age,phone,email");
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, "contact");
+ AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+ handler.init(conf);
+ assertFalse("mobile is identified as an index", handler.isIndexed("mobile"));
+ assertFalse("mail is identified as an index", handler.isIndexed("mail"));
+ }
+
+
+ @Test
+ public void testMissingTable() {
+ Configuration conf = new Configuration();
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "name,age,phone,email");
+ AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+ handler.init(conf);
+ assertFalse("name is identified as an index", handler.isIndexed("name"));
+ assertFalse("age is identified as an index", handler.isIndexed("age"));
+ }
+
+ @Test
+ public void testWildcardIndex() {
+ Configuration conf = new Configuration();
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "*");
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, "contact");
+ AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+ handler.init(conf);
+ assertTrue("name is not identified as an index", handler.isIndexed("name"));
+ assertTrue("age is not identified as an index", handler.isIndexed("age"));
+ }
+
+ @Test
+ public void testNullIndex() {
+ Configuration conf = new Configuration();
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, "contact");
+ AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+ handler.init(conf);
+ assertTrue("name is not identified as an index", handler.isIndexed("name"));
+ }
+
+ @Test
+ public void testEmptyIndex() {
+ Configuration conf = new Configuration();
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "");
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXTABLE_NAME, "contact");
+ AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner();
+ handler.init(conf);
+ assertFalse("name is identified as an index", handler.isIndexed("name"));
+ }
+}
diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java
new file mode 100644
index 0000000..af279ad
--- /dev/null
+++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestAccumuloIndexParameters implements AccumuloIndexScanner {
+
+ @Test
+ public void testDefaultScanner() {
+ AccumuloIndexScanner scanner = new AccumuloIndexParameters(new Configuration()).createScanner();
+ assertTrue(scanner instanceof AccumuloDefaultIndexScanner);
+ }
+
+ @Test
+ public void testUserHandler() {
+ Configuration conf = new Configuration();
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEX_SCANNER, this.getClass().getName());
+ AccumuloIndexScanner scanner = new AccumuloIndexParameters(conf).createScanner();
+ assertTrue(scanner instanceof TestAccumuloIndexParameters);
+ }
+
+ @Test
+ public void testBadHandler() {
+ Configuration conf = new Configuration();
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEX_SCANNER, "a.class.does.not.exist.IndexHandler");
+ AccumuloIndexScanner scanner = new AccumuloIndexParameters(conf).createScanner();
+ assertTrue(scanner == null);
+ }
+
+ @Test
+ public void getIndexColumns() {
+ Configuration conf = new Configuration();
+ conf.set(AccumuloIndexParameters.ACCUMULO_INDEXED_COLUMNS, "a,b,c");
+ Set cols = new AccumuloIndexParameters(conf).getIndexColumns();
+ assertEquals(3, cols.size());
+ assertTrue("Missing column a", cols.contains("a"));
+ assertTrue("Missing column b", cols.contains("b"));
+ assertTrue("Missing column c", cols.contains("c"));
+ }
+
+ @Test
+ public void getMaxIndexRows() {
+ Configuration conf = new Configuration();
+ conf.setInt(AccumuloIndexParameters.ACCUMULO_MAX_INDEX_ROWS, 10);
+ int maxRows = new AccumuloIndexParameters(conf).getMaxIndexRows();
+ assertEquals(10, maxRows);
+ }
+
+ @Test
+ public void getAuths() {
+ Configuration conf = new Configuration();
+ conf.set(AccumuloSerDeParameters.AUTHORIZATIONS_KEY, "public,open");
+ Authorizations auths = new AccumuloIndexParameters(conf).getTableAuths();
+ assertEquals(2, auths.size());
+ assertTrue("Missing auth public", auths.contains("public"));
+ assertTrue("Missing auth open", auths.contains("open"));
+ }
+
+
+ //AccumuloIndexScanner Interface methods
+ @Override
+ public void init(Configuration conf) {
+ }
+
+ @Override
+ public boolean isIndexed(String columnName) {
+ return false;
+ }
+
+ @Override
+ public List getIndexRowRanges(String column, Range indexRange) {
+ return null;
+ }
+}
diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
index 88e4530..46c5b93 100644
--- accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
+++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java
@@ -758,7 +758,7 @@ public void testNullRangeGeneratorOutput() throws SerDeException {
String hiveRowIdColumnName = "rid";
Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod();
- Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(null);
+ Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)).thenReturn(null);
Mockito.when(mockHandler.getExpression(conf)).thenReturn(root);
// A null result from AccumuloRangeGenerator is all ranges
@@ -776,7 +776,8 @@ public void testEmptyListRangeGeneratorOutput() throws SerDeException {
String hiveRowIdColumnName = "rid";
Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod();
- Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(Collections.emptyList());
+ Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root))
+ .thenReturn(Collections.emptyList());
Mockito.when(mockHandler.getExpression(conf)).thenReturn(root);
// A null result from AccumuloRangeGenerator is all ranges
@@ -795,7 +796,7 @@ public void testSingleRangeGeneratorOutput() throws SerDeException {
Range r = new Range("a");
Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod();
- Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(r);
+ Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)).thenReturn(r);
Mockito.when(mockHandler.getExpression(conf)).thenReturn(root);
// A null result from AccumuloRangeGenerator is all ranges
@@ -814,7 +815,8 @@ public void testManyRangesGeneratorOutput() throws SerDeException {
Range r1 = new Range("a"), r2 = new Range("z");
Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod();
- Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(Arrays.asList(r1, r2));
+ Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root))
+ .thenReturn(Arrays.asList(r1, r2));
Mockito.when(mockHandler.getExpression(conf)).thenReturn(root);
// A null result from AccumuloRangeGenerator is all ranges
diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java
index 339da07..49a926c 100644
--- accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java
+++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java
@@ -27,7 +27,9 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants;
+import org.apache.hadoop.hive.accumulo.TestAccumuloDefaultIndexScanner;
import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -66,12 +68,14 @@
private AccumuloPredicateHandler handler;
private HiveAccumuloRowIdColumnMapping rowIdMapping;
+ private Configuration conf;
@Before
public void setup() {
handler = AccumuloPredicateHandler.getInstance();
rowIdMapping = new HiveAccumuloRowIdColumnMapping(AccumuloHiveConstants.ROWID,
ColumnEncoding.STRING, "row", TypeInfoFactory.stringTypeInfo.toString());
+ conf = new Configuration(true);
}
@Test
@@ -108,7 +112,7 @@ public void testRangeConjunction() throws Exception {
List expectedRanges = Arrays
.asList(new Range(new Key("f"), true, new Key("m\0"), false));
- AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
Collections. emptyMap(), null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -163,7 +167,7 @@ public void testRangeDisjunction() throws Exception {
// Should generate (-inf,+inf)
List expectedRanges = Arrays.asList(new Range());
- AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
Collections. emptyMap(), null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -236,7 +240,7 @@ public void testRangeConjunctionWithDisjunction() throws Exception {
// Should generate ['q', +inf)
List expectedRanges = Arrays.asList(new Range(new Key("q"), true, null, false));
- AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
Collections. emptyMap(), null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -291,7 +295,7 @@ public void testPartialRangeConjunction() throws Exception {
// Should generate [f,+inf)
List expectedRanges = Arrays.asList(new Range(new Key("f"), true, null, false));
- AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
Collections. emptyMap(), null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -349,7 +353,7 @@ public void testDateRangeConjunction() throws Exception {
List expectedRanges = Arrays.asList(new Range(new Key("2014-01-01"), true, new Key(
"2014-07-01"), false));
- AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
Collections. emptyMap(), null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -397,7 +401,7 @@ public void testCastExpression() throws Exception {
ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPEqualOrGreaterThan(), Arrays.asList(key, cast));
- AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "key");
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "key");
Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
Collections. emptyMap(), null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -446,7 +450,7 @@ public void testRangeOverNonRowIdField() throws Exception {
ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
new GenericUDFOPAnd(), bothFilters);
- AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid");
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
Collections. emptyMap(), null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -464,4 +468,59 @@ public void testRangeOverNonRowIdField() throws Exception {
Object result = nodeOutput.get(both);
Assert.assertNull(result);
}
+
+ @Test
+ public void testRangeOverIndexedField() throws Exception {
+ // age >= '10'
+ ExprNodeDesc column = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "age", null, false);
+ ExprNodeDesc constant = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "10");
+ List children = Lists.newArrayList();
+ children.add(column);
+ children.add(constant);
+ ExprNodeDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
+ new GenericUDFOPEqualOrGreaterThan(), children);
+ assertNotNull(node);
+
+ // age <= '50'
+ ExprNodeDesc column2 = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "age", null,
+ false);
+ ExprNodeDesc constant2 = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "50");
+ List children2 = Lists.newArrayList();
+ children2.add(column2);
+ children2.add(constant2);
+ ExprNodeDesc node2 = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
+ new GenericUDFOPEqualOrLessThan(), children2);
+ assertNotNull(node2);
+
+ // And UDF
+ List bothFilters = Lists.newArrayList();
+ bothFilters.add(node);
+ bothFilters.add(node2);
+ ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo,
+ new GenericUDFOPAnd(), bothFilters);
+
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid");
+ rangeGenerator.indexScanner = TestAccumuloDefaultIndexScanner.buildMockHandler(10);
+ Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
+ Collections. emptyMap(), null);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ ArrayList topNodes = new ArrayList();
+ topNodes.add(both);
+ HashMap nodeOutput = new HashMap();
+
+ try {
+ ogw.startWalking(topNodes, nodeOutput);
+ } catch (SemanticException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ // Filters are using an index which should match 3 rows
+ List result = (List)nodeOutput.get(both);
+
+ Assert.assertEquals( 3, result.size());
+ Assert.assertTrue( "does not contain row1", result.contains( new Range( "row1")));
+ Assert.assertTrue( "does not contain row2", result.contains( new Range( "row2")));
+ Assert.assertTrue( "does not contain row3", result.contains( new Range( "row3")));
+ }
+
}