vIter = values.iterator();
+ while (kIter.hasNext()) {
+ Key k = kIter.next();
+ Value v = vIter.next();
+ row.add(k.getColumnFamily().toString(), k.getColumnQualifier().toString(), v.get());
+ }
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java
new file mode 100644
index 0000000..530f232
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.mapred.RangeInputSplit;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Wraps RangeInputSplit into a FileSplit so Hadoop won't complain when it tries to make its own
+ * Path.
+ *
+ *
+ * If the {@link RangeInputSplit} is used directly, it will hit a branch of code in
+ * {@link HiveInputSplit} which generates an invalid Path. Wrap it ourselves so that it doesn't
+ * error
+ */
+public class HiveAccumuloSplit extends FileSplit implements InputSplit {
+ private static final Logger log = Logger.getLogger(HiveAccumuloSplit.class);
+
+ private RangeInputSplit split;
+
+ public HiveAccumuloSplit() {
+ super((Path) null, 0, 0, (String[]) null);
+ split = new RangeInputSplit();
+ }
+
+ public HiveAccumuloSplit(RangeInputSplit split, Path dummyPath) {
+ super(dummyPath, 0, 0, (String[]) null);
+ this.split = split;
+ }
+
+ public RangeInputSplit getSplit() {
+ return this.split;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ split.readFields(in);
+ }
+
+ @Override
+ public String toString() {
+ return "HiveAccumuloSplit: " + split;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ split.write(out);
+ }
+
+ @Override
+ public long getLength() {
+ int len = 0;
+ try {
+ return split.getLength();
+ } catch (IOException e) {
+ log.error("Error getting length for split: " + StringUtils.stringifyException(e));
+ }
+ return len;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return split.getLocations();
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
new file mode 100644
index 0000000..385b2f4
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java
@@ -0,0 +1,485 @@
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapred.AccumuloRowInputFormat;
+import org.apache.accumulo.core.client.mapred.RangeInputSplit;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters;
+import org.apache.hadoop.hive.accumulo.AccumuloHiveRow;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapper;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloMapColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.AccumuloPredicateHandler;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.hive.accumulo.serde.TooManyAccumuloColumnsException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wraps older InputFormat for use with Hive.
+ *
+ * Configure input scan with proper ranges, iterators, and columns based on serde properties for
+ * Hive table.
+ */
+public class HiveAccumuloTableInputFormat implements
+ org.apache.hadoop.mapred.InputFormat {
+ private static final Logger log = LoggerFactory.getLogger(HiveAccumuloTableInputFormat.class);
+
+ // Visible for testing
+ protected AccumuloRowInputFormat accumuloInputFormat = new AccumuloRowInputFormat();
+ protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance();
+
+ @Override
+ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+ final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(jobConf);
+ final Instance instance = accumuloParams.getInstance();
+ final ColumnMapper columnMapper;
+ try {
+ columnMapper = getColumnMapper(jobConf);
+ } catch (TooManyAccumuloColumnsException e) {
+ throw new IOException(e);
+ }
+
+ JobContext context = ShimLoader.getHadoopShims().newJobContext(Job.getInstance(jobConf));
+ Path[] tablePaths = FileInputFormat.getInputPaths(context);
+
+ try {
+ final Connector connector = accumuloParams.getConnector(instance);
+ final List columnMappings = columnMapper.getColumnMappings();
+ final List iterators = predicateHandler.getIterators(jobConf, columnMapper);
+ final Collection ranges = predicateHandler.getRanges(jobConf, columnMapper);
+
+ // Setting an empty collection of ranges will, unexpectedly, scan all data
+ // We don't want that.
+ if (null != ranges && ranges.isEmpty()) {
+ return new InputSplit[0];
+ }
+
+ // Set the relevant information in the Configuration for the AccumuloInputFormat
+ configure(jobConf, instance, connector, accumuloParams, columnMapper, iterators, ranges);
+
+ int numColumns = columnMappings.size();
+
+ List readColIds = ColumnProjectionUtils.getReadColumnIDs(jobConf);
+
+ // Sanity check
+ if (numColumns < readColIds.size())
+ throw new IOException("Number of column mappings (" + numColumns + ")"
+ + " numbers less than the hive table columns. (" + readColIds.size() + ")");
+
+ // get splits from Accumulo
+ InputSplit[] splits = accumuloInputFormat.getSplits(jobConf, numSplits);
+
+ HiveAccumuloSplit[] hiveSplits = new HiveAccumuloSplit[splits.length];
+ for (int i = 0; i < splits.length; i++) {
+ RangeInputSplit ris = (RangeInputSplit) splits[i];
+ hiveSplits[i] = new HiveAccumuloSplit(ris, tablePaths[0]);
+ }
+
+ return hiveSplits;
+ } catch (AccumuloException e) {
+ log.error("Could not configure AccumuloInputFormat", e);
+ throw new IOException(StringUtils.stringifyException(e));
+ } catch (AccumuloSecurityException e) {
+ log.error("Could not configure AccumuloInputFormat", e);
+ throw new IOException(StringUtils.stringifyException(e));
+ } catch (SerDeException e) {
+ log.error("Could not configure AccumuloInputFormat", e);
+ throw new IOException(StringUtils.stringifyException(e));
+ }
+ }
+
+ /**
+ * Setup accumulo input format from conf properties. Delegates to final RecordReader from mapred
+ * package.
+ *
+ * @param inputSplit
+ * @param jobConf
+ * @param reporter
+ * @return RecordReader
+ * @throws IOException
+ */
+ @Override
+ public RecordReader getRecordReader(InputSplit inputSplit,
+ final JobConf jobConf, final Reporter reporter) throws IOException {
+ final ColumnMapper columnMapper;
+ try {
+ columnMapper = getColumnMapper(jobConf);
+ } catch (TooManyAccumuloColumnsException e) {
+ throw new IOException(e);
+ }
+
+ try {
+ final List iterators = predicateHandler.getIterators(jobConf, columnMapper);
+
+ HiveAccumuloSplit hiveSplit = (HiveAccumuloSplit) inputSplit;
+ RangeInputSplit rangeSplit = hiveSplit.getSplit();
+
+ log.info("Split: " + rangeSplit);
+
+ // The RangeInputSplit *should* have all of the necesary information contained in it
+ // which alleviates us from re-parsing our configuration from the AccumuloStorageHandler
+ // and re-setting it into the Configuration (like we did in getSplits(...)). Thus, it should
+ // be unnecessary to re-invoke configure(...)
+
+ // ACCUMULO-2962 Iterators weren't getting serialized into the InputSplit, but we can
+ // compensate because we still have that info.
+ // Should be fixed in Accumulo 1.5.2 and 1.6.1
+ if (null == rangeSplit.getIterators()
+ || (rangeSplit.getIterators().isEmpty() && !iterators.isEmpty())) {
+ log.debug("Re-setting iterators on InputSplit due to Accumulo bug.");
+ rangeSplit.setIterators(iterators);
+ }
+
+ // ACCUMULO-3015 Like the above, RangeInputSplit should have the table name
+ // but we want it to, so just re-set it if it's null.
+ if (null == getTableName(rangeSplit)) {
+ final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(
+ jobConf);
+ log.debug("Re-setting table name on InputSplit due to Accumulo bug.");
+ setTableName(rangeSplit, accumuloParams.getAccumuloTableName());
+ }
+
+ final RecordReader>> recordReader = accumuloInputFormat
+ .getRecordReader(rangeSplit, jobConf, reporter);
+
+ return new HiveAccumuloRecordReader(recordReader, iterators.size());
+ } catch (SerDeException e) {
+ throw new IOException(StringUtils.stringifyException(e));
+ }
+ }
+
+ protected ColumnMapper getColumnMapper(Configuration conf) throws IOException,
+ TooManyAccumuloColumnsException {
+ final String defaultStorageType = conf.get(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE);
+
+ String[] columnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
+ if (null == columnNamesArr) {
+ throw new IOException(
+ "Hive column names must be provided to InputFormat in the Configuration");
+ }
+ List columnNames = Arrays.asList(columnNamesArr);
+
+ String serializedTypes = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+ if (null == serializedTypes) {
+ throw new IOException(
+ "Hive column types must be provided to InputFormat in the Configuration");
+ }
+ ArrayList columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(serializedTypes);
+
+ return new ColumnMapper(conf.get(AccumuloSerDeParameters.COLUMN_MAPPINGS), defaultStorageType,
+ columnNames, columnTypes);
+ }
+
+ /**
+ * Configure the underlying AccumuloInputFormat
+ *
+ * @param conf
+ * Job configuration
+ * @param instance
+ * Accumulo instance
+ * @param connector
+ * Accumulo connector
+ * @param accumuloParams
+ * Connection information to the Accumulo instance
+ * @param columnMapper
+ * Configuration of Hive to Accumulo columns
+ * @param iterators
+ * Any iterators to be configured server-side
+ * @param ranges
+ * Accumulo ranges on for the query
+ * @throws AccumuloSecurityException
+ * @throws AccumuloException
+ * @throws SerDeException
+ */
+ protected void configure(JobConf conf, Instance instance, Connector connector,
+ AccumuloConnectionParameters accumuloParams, ColumnMapper columnMapper,
+ List iterators, Collection ranges) throws AccumuloSecurityException,
+ AccumuloException, SerDeException {
+
+ // Handle implementation of Instance and invoke appropriate InputFormat method
+ if (instance instanceof MockInstance) {
+ setMockInstance(conf, instance.getInstanceName());
+ } else {
+ setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers());
+ }
+
+ // Set the username/passwd for the Accumulo connection
+ setConnectorInfo(conf, accumuloParams.getAccumuloUserName(),
+ new PasswordToken(accumuloParams.getAccumuloPassword()));
+
+ // Read from the given Accumulo table
+ setInputTableName(conf, accumuloParams.getAccumuloTableName());
+
+ // Check Configuration for any user-provided Authorization definition
+ Authorizations auths = AccumuloSerDeParameters.getAuthorizationsFromConf(conf);
+
+ if (null == auths) {
+ // Default to all of user's authorizations when no configuration is provided
+ auths = connector.securityOperations().getUserAuthorizations(
+ accumuloParams.getAccumuloUserName());
+ }
+
+ // Implicitly handles users providing invalid authorizations
+ setScanAuthorizations(conf, auths);
+
+ // restrict with any filters found from WHERE predicates.
+ addIterators(conf, iterators);
+
+ // restrict with any ranges found from WHERE predicates.
+ // not setting ranges scans the entire table
+ if (null != ranges) {
+ log.info("Setting ranges: " + ranges);
+ setRanges(conf, ranges);
+ }
+
+ // Restrict the set of columns that we want to read from the Accumulo table
+ HashSet> pairs = getPairCollection(columnMapper.getColumnMappings());
+ if (null != pairs && !pairs.isEmpty()) {
+ fetchColumns(conf, pairs);
+ }
+ }
+
+ // Wrap the static AccumuloInputFormat methods with methods that we can
+ // verify were correctly called via Mockito
+
+ protected void setMockInstance(JobConf conf, String instanceName) {
+ try {
+ AccumuloInputFormat.setMockInstance(conf, instanceName);
+ } catch (IllegalStateException e) {
+ // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting mock instance of " + instanceName, e);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts) {
+ // To support builds against 1.5, we can't use the new 1.6 setZooKeeperInstance which
+ // takes a ClientConfiguration class that only exists in 1.6
+ try {
+ AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts);
+ } catch (IllegalStateException ise) {
+ // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at "
+ + zkHosts, ise);
+ }
+ }
+
+ protected void setConnectorInfo(JobConf conf, String user, AuthenticationToken token)
+ throws AccumuloSecurityException {
+ try {
+ AccumuloInputFormat.setConnectorInfo(conf, user, token);
+ } catch (IllegalStateException e) {
+ // AccumuloInputFormat complains if you re-set an already set value. We just don't care.
+ log.debug("Ignoring exception setting Accumulo Connector instance for user " + user, e);
+ }
+ }
+
+ protected void setInputTableName(JobConf conf, String tableName) {
+ AccumuloInputFormat.setInputTableName(conf, tableName);
+ }
+
+ protected void setScanAuthorizations(JobConf conf, Authorizations auths) {
+ AccumuloInputFormat.setScanAuthorizations(conf, auths);
+ }
+
+ protected void addIterators(JobConf conf, List iterators) {
+ for (IteratorSetting is : iterators) {
+ AccumuloInputFormat.addIterator(conf, is);
+ }
+ }
+
+ protected void setRanges(JobConf conf, Collection ranges) {
+ AccumuloInputFormat.setRanges(conf, ranges);
+ }
+
+ protected void fetchColumns(JobConf conf, Set> cfCqPairs) {
+ AccumuloInputFormat.fetchColumns(conf, cfCqPairs);
+ }
+
+ /**
+ * Create col fam/qual pairs from pipe separated values, usually from config object. Ignores
+ * rowID.
+ *
+ * @param columnMappings
+ * The list of ColumnMappings for the given query
+ * @return a Set of Pairs of colfams and colquals
+ */
+ protected HashSet> getPairCollection(List columnMappings) {
+ final HashSet> pairs = new HashSet>();
+
+ for (ColumnMapping columnMapping : columnMappings) {
+ if (columnMapping instanceof HiveAccumuloColumnMapping) {
+ HiveAccumuloColumnMapping accumuloColumnMapping = (HiveAccumuloColumnMapping) columnMapping;
+
+ Text cf = new Text(accumuloColumnMapping.getColumnFamily());
+ Text cq = null;
+
+ // A null cq implies an empty column qualifier
+ if (null != accumuloColumnMapping.getColumnQualifier()) {
+ cq = new Text(accumuloColumnMapping.getColumnQualifier());
+ }
+
+ pairs.add(new Pair(cf, cq));
+ } else if (columnMapping instanceof HiveAccumuloMapColumnMapping) {
+ HiveAccumuloMapColumnMapping mapMapping = (HiveAccumuloMapColumnMapping) columnMapping;
+
+ // Can't fetch prefix on colqual, must pull the entire qualifier
+ // TODO use an iterator to do the filter, server-side.
+ pairs.add(new Pair(new Text(mapMapping.getColumnFamily()), null));
+ }
+ }
+
+ log.info("Computed columns to fetch (" + pairs + ") from " + columnMappings);
+
+ return pairs;
+ }
+
+ /**
+ * Reflection to work around Accumulo 1.5 and 1.6 incompatibilities. Throws an {@link IOException}
+ * for any reflection related exceptions
+ *
+ * @param split
+ * A RangeInputSplit
+ * @return The name of the table from the split
+ * @throws IOException
+ */
+ protected String getTableName(RangeInputSplit split) throws IOException {
+ // ACCUMULO-3017 shenanigans with method names changing without deprecation
+ Method getTableName = null;
+ try {
+ getTableName = RangeInputSplit.class.getMethod("getTableName");
+ } catch (SecurityException e) {
+ log.debug("Could not get getTableName method from RangeInputSplit", e);
+ } catch (NoSuchMethodException e) {
+ log.debug("Could not get getTableName method from RangeInputSplit", e);
+ }
+
+ if (null != getTableName) {
+ try {
+ return (String) getTableName.invoke(split);
+ } catch (IllegalArgumentException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ } catch (IllegalAccessException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ } catch (InvocationTargetException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ }
+ }
+
+ Method getTable;
+ try {
+ getTable = RangeInputSplit.class.getMethod("getTable");
+ } catch (SecurityException e) {
+ throw new IOException("Could not get table name from RangeInputSplit", e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException("Could not get table name from RangeInputSplit", e);
+ }
+
+ try {
+ return (String) getTable.invoke(split);
+ } catch (IllegalArgumentException e) {
+ throw new IOException("Could not get table name from RangeInputSplit", e);
+ } catch (IllegalAccessException e) {
+ throw new IOException("Could not get table name from RangeInputSplit", e);
+ } catch (InvocationTargetException e) {
+ throw new IOException("Could not get table name from RangeInputSplit", e);
+ }
+ }
+
+ /**
+ * Sets the table name on a RangeInputSplit, accounting for change in method name. Any reflection
+ * related exception is wrapped in an {@link IOException}
+ *
+ * @param split
+ * The RangeInputSplit to operate on
+ * @param tableName
+ * The name of the table to set
+ * @throws IOException
+ */
+ protected void setTableName(RangeInputSplit split, String tableName) throws IOException {
+ // ACCUMULO-3017 shenanigans with method names changing without deprecation
+ Method setTableName = null;
+ try {
+ setTableName = RangeInputSplit.class.getMethod("setTableName", String.class);
+ } catch (SecurityException e) {
+ log.debug("Could not get getTableName method from RangeInputSplit", e);
+ } catch (NoSuchMethodException e) {
+ log.debug("Could not get getTableName method from RangeInputSplit", e);
+ }
+
+ if (null != setTableName) {
+ try {
+ setTableName.invoke(split, tableName);
+ return;
+ } catch (IllegalArgumentException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ } catch (IllegalAccessException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ } catch (InvocationTargetException e) {
+ log.debug("Could not invoke getTableName method from RangeInputSplit", e);
+ }
+ }
+
+ Method setTable;
+ try {
+ setTable = RangeInputSplit.class.getMethod("setTable", String.class);
+ } catch (SecurityException e) {
+ throw new IOException("Could not set table name from RangeInputSplit", e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException("Could not set table name from RangeInputSplit", e);
+ }
+
+ try {
+ setTable.invoke(split, tableName);
+ } catch (IllegalArgumentException e) {
+ throw new IOException("Could not set table name from RangeInputSplit", e);
+ } catch (IllegalAccessException e) {
+ throw new IOException("Could not set table name from RangeInputSplit", e);
+ } catch (InvocationTargetException e) {
+ throw new IOException("Could not set table name from RangeInputSplit", e);
+ }
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
new file mode 100644
index 0000000..5cf008e
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.mr;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat {
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ configureAccumuloOutputFormat(job);
+
+ super.checkOutputSpecs(ignored, job);
+ }
+
+ protected void configureAccumuloOutputFormat(JobConf job) throws IOException {
+ AccumuloConnectionParameters cnxnParams = new AccumuloConnectionParameters(job);
+
+ final String tableName = job.get(AccumuloSerDeParameters.TABLE_NAME);
+
+ // Make sure we actually go the table name
+ Preconditions.checkNotNull(tableName,
+ "Expected Accumulo table name to be provided in job configuration");
+
+ // Set the necessary Accumulo information
+ try {
+ // Username/passwd for Accumulo
+ setAccumuloConnectorInfo(job, cnxnParams.getAccumuloUserName(),
+ new PasswordToken(cnxnParams.getAccumuloPassword()));
+
+ if (cnxnParams.useMockInstance()) {
+ setAccumuloMockInstance(job, cnxnParams.getAccumuloInstanceName());
+ } else {
+ // Accumulo instance name with ZK quorum
+ setAccumuloZooKeeperInstance(job, cnxnParams.getAccumuloInstanceName(),
+ cnxnParams.getZooKeepers());
+ }
+
+ // Set the table where we're writing this data
+ setDefaultAccumuloTableName(job, tableName);
+ } catch (AccumuloSecurityException e) {
+ log.error("Could not connect to Accumulo with provided credentials", e);
+ throw new IOException(e);
+ }
+ }
+
+ // Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing
+
+ protected void setAccumuloConnectorInfo(JobConf conf, String username, AuthenticationToken token)
+ throws AccumuloSecurityException {
+ AccumuloOutputFormat.setConnectorInfo(conf, username, token);
+ }
+
+ @SuppressWarnings("deprecation")
+ protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, String zookeepers) {
+ AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers);
+ }
+
+ protected void setAccumuloMockInstance(JobConf conf, String instanceName) {
+ AccumuloOutputFormat.setMockInstance(conf, instanceName);
+ }
+
+ protected void setDefaultAccumuloTableName(JobConf conf, String tableName) {
+ AccumuloOutputFormat.setDefaultTableName(conf, tableName);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java
new file mode 100644
index 0000000..4fd6ba7
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Serde and InputFormat support for connecting Hive to Accumulo tables.
+ */
+package org.apache.hadoop.hive.accumulo;
\ No newline at end of file
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
new file mode 100644
index 0000000..5edc9b5
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java
@@ -0,0 +1,408 @@
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Range;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMapper;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.DoubleCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.Equal;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThanOrEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.IntCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThanOrEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.Like;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LongCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.NotEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare;
+import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.udf.UDFLike;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ *
+ * Supporting operations dealing with Hive Predicate pushdown to iterators and ranges.
+ *
+ * See {@link PrimitiveComparisonFilter}
+ *
+ */
+public class AccumuloPredicateHandler {
+ private static final List TOTAL_RANGE = Collections.singletonList(new Range());
+
+ private static AccumuloPredicateHandler handler = new AccumuloPredicateHandler();
+ private static Map> compareOps = Maps.newHashMap();
+ private static Map> pComparisons = Maps.newHashMap();
+
+ // Want to start sufficiently "high" enough in the iterator stack
+ private static int iteratorCount = 50;
+
+ private static final Logger log = Logger.getLogger(AccumuloPredicateHandler.class);
+ static {
+ compareOps.put(GenericUDFOPEqual.class.getName(), Equal.class);
+ compareOps.put(GenericUDFOPNotEqual.class.getName(), NotEqual.class);
+ compareOps.put(GenericUDFOPGreaterThan.class.getName(), GreaterThan.class);
+ compareOps.put(GenericUDFOPEqualOrGreaterThan.class.getName(), GreaterThanOrEqual.class);
+ compareOps.put(GenericUDFOPEqualOrLessThan.class.getName(), LessThanOrEqual.class);
+ compareOps.put(GenericUDFOPLessThan.class.getName(), LessThan.class);
+ compareOps.put(UDFLike.class.getName(), Like.class);
+
+ pComparisons.put("bigint", LongCompare.class);
+ pComparisons.put("int", IntCompare.class);
+ pComparisons.put("double", DoubleCompare.class);
+ pComparisons.put("string", StringCompare.class);
+ }
+
+ public static AccumuloPredicateHandler getInstance() {
+ return handler;
+ }
+
+ /**
+ *
+ * @return set of all UDF class names with matching CompareOpt implementations.
+ */
+ public Set cOpKeyset() {
+ return compareOps.keySet();
+ }
+
+ /**
+ *
+ * @return set of all hive data types with matching PrimitiveCompare implementations.
+ */
+ public Set pComparisonKeyset() {
+ return pComparisons.keySet();
+ }
+
+ /**
+ *
+ * @param udfType
+ * GenericUDF classname to lookup matching CompareOpt
+ * @return Class extends CompareOpt/>
+ */
+ public Class extends CompareOp> getCompareOpClass(String udfType)
+ throws NoSuchCompareOpException {
+ if (!compareOps.containsKey(udfType))
+ throw new NoSuchCompareOpException("Null compare op for specified key: " + udfType);
+ return compareOps.get(udfType);
+ }
+
+ public CompareOp getCompareOp(String udfType, IndexSearchCondition sc)
+ throws NoSuchCompareOpException, SerDeException {
+ Class extends CompareOp> clz = getCompareOpClass(udfType);
+
+ try {
+ return clz.newInstance();
+ } catch (ClassCastException e) {
+ throw new SerDeException("Column type mismatch in WHERE clause "
+ + sc.getComparisonExpr().getExprString() + " found type "
+ + sc.getConstantDesc().getTypeString() + " instead of "
+ + sc.getColumnDesc().getTypeString());
+ } catch (IllegalAccessException e) {
+ throw new SerDeException("Could not instantiate class for WHERE clause", e);
+ } catch (InstantiationException e) {
+ throw new SerDeException("Could not instantiate class for WHERE clause", e);
+ }
+ }
+
+ /**
+ *
+ * @param type
+ * String hive column lookup matching PrimitiveCompare
+ * @return Class extends >?>
+ */
+ public Class extends PrimitiveComparison> getPrimitiveComparisonClass(String type)
+ throws NoSuchPrimitiveComparisonException {
+ if (!pComparisons.containsKey(type))
+ throw new NoSuchPrimitiveComparisonException("Null primitive comparison for specified key: "
+ + type);
+ return pComparisons.get(type);
+ }
+
+ public PrimitiveComparison getPrimitiveComparison(String type, IndexSearchCondition sc)
+ throws NoSuchPrimitiveComparisonException, SerDeException {
+ Class extends PrimitiveComparison> clz = getPrimitiveComparisonClass(type);
+
+ try {
+ return clz.newInstance();
+ } catch (ClassCastException e) {
+ throw new SerDeException("Column type mismatch in WHERE clause "
+ + sc.getComparisonExpr().getExprString() + " found type "
+ + sc.getConstantDesc().getTypeString() + " instead of "
+ + sc.getColumnDesc().getTypeString());
+ } catch (IllegalAccessException e) {
+ throw new SerDeException("Could not instantiate class for WHERE clause", e);
+ } catch (InstantiationException e) {
+ throw new SerDeException("Could not instantiate class for WHERE clause", e);
+ }
+ }
+
+ private AccumuloPredicateHandler() {}
+
+ /**
+ * Loop through search conditions and build ranges for predicates involving rowID column, if any.
+ */
+ public List getRanges(Configuration conf, ColumnMapper columnMapper) throws SerDeException {
+ if (!columnMapper.hasRowIdMapping()) {
+ return TOTAL_RANGE;
+ }
+
+ int rowIdOffset = columnMapper.getRowIdOffset();
+ String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
+
+ if (null == hiveColumnNamesArr) {
+ throw new IllegalArgumentException("Could not find Hive columns in configuration");
+ }
+
+ // Already verified that we should have the rowId mapping
+ String hiveRowIdColumnName = hiveColumnNamesArr[rowIdOffset];
+
+ ExprNodeDesc root = this.getExpression(conf);
+
+ // No expression, therefore scan the whole table
+ if (null == root) {
+ return TOTAL_RANGE;
+ }
+
+ Object result = generateRanges(columnMapper, hiveRowIdColumnName, root);
+
+ if (null == result) {
+ log.info("Calculated null set of ranges, scanning full table");
+ return TOTAL_RANGE;
+ } else if (result instanceof Range) {
+ log.info("Computed a single Range for the query: " + result);
+ return Collections.singletonList((Range) result);
+ } else if (result instanceof List) {
+ log.info("Computed a collection of Ranges for the query: " + result);
+ @SuppressWarnings("unchecked")
+ List ranges = (List) result;
+ return ranges;
+ } else {
+ throw new IllegalArgumentException("Unhandled return from Range generation: " + result);
+ }
+ }
+
+ /**
+ * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo
+ * Ranges using expressions involving the Accumulo rowid-mapped Hive column
+ *
+ * @param columnMapper
+ * Mapping of Hive to Accumulo columns for the query
+ * @param hiveRowIdColumnName
+ * Name of the hive column mapped to the Accumulo rowid
+ * @param root
+ * Root of some ExprNodeDesc tree to traverse, the WHERE clause
+ * @return An object representing the result from the ExprNodeDesc tree traversal using the
+ * AccumuloRangeGenerator
+ */
+ protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColumnName, ExprNodeDesc root) {
+ AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler,
+ columnMapper.getRowIdMapping(), hiveRowIdColumnName);
+ Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator,
+ Collections. emptyMap(), null);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ ArrayList roots = new ArrayList();
+ roots.add(root);
+ HashMap nodeOutput = new HashMap();
+
+ try {
+ ogw.startWalking(roots, nodeOutput);
+ } catch (SemanticException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ return nodeOutput.get(root);
+ }
+
+ /**
+ * Loop through search conditions and build iterator settings for predicates involving columns
+ * other than rowID, if any.
+ *
+ * @param conf
+ * Configuration
+ * @throws SerDeException
+ */
+ public List getIterators(Configuration conf, ColumnMapper columnMapper)
+ throws SerDeException {
+ List itrs = Lists.newArrayList();
+ boolean shouldPushdown = conf.getBoolean(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY,
+ AccumuloSerDeParameters.ITERATOR_PUSHDOWN_DEFAULT);
+ if (!shouldPushdown) {
+ log.info("Iterator pushdown is disabled for this table");
+ return itrs;
+ }
+
+ int rowIdOffset = columnMapper.getRowIdOffset();
+ String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS);
+
+ if (null == hiveColumnNamesArr) {
+ throw new IllegalArgumentException("Could not find Hive columns in configuration");
+ }
+
+ String hiveRowIdColumnName = null;
+
+ if (rowIdOffset >= 0 && rowIdOffset < hiveColumnNamesArr.length) {
+ hiveRowIdColumnName = hiveColumnNamesArr[rowIdOffset];
+ }
+
+ List hiveColumnNames = Arrays.asList(hiveColumnNamesArr);
+
+ for (IndexSearchCondition sc : getSearchConditions(conf)) {
+ String col = sc.getColumnDesc().getColumn();
+ if (hiveRowIdColumnName == null || !hiveRowIdColumnName.equals(col)) {
+ HiveAccumuloColumnMapping mapping = (HiveAccumuloColumnMapping) columnMapper
+ .getColumnMappingForHiveColumn(hiveColumnNames, col);
+ itrs.add(toSetting(mapping, sc));
+ }
+ }
+ if (log.isInfoEnabled())
+ log.info("num iterators = " + itrs.size());
+ return itrs;
+ }
+
+ /**
+ * Create an IteratorSetting for the right qualifier, constant, CompareOpt, and PrimitiveCompare
+ * type.
+ *
+ * @param accumuloColumnMapping
+ * ColumnMapping to filter
+ * @param sc
+ * IndexSearchCondition
+ * @return IteratorSetting
+ * @throws SerDeException
+ */
+ public IteratorSetting toSetting(HiveAccumuloColumnMapping accumuloColumnMapping,
+ IndexSearchCondition sc) throws SerDeException {
+ iteratorCount++;
+ final IteratorSetting is = new IteratorSetting(iteratorCount,
+ PrimitiveComparisonFilter.FILTER_PREFIX + iteratorCount, PrimitiveComparisonFilter.class);
+ final String type = sc.getColumnDesc().getTypeString();
+ final String comparisonOpStr = sc.getComparisonOp();
+
+ PushdownTuple tuple;
+ try {
+ tuple = new PushdownTuple(sc, getPrimitiveComparison(type, sc), getCompareOp(comparisonOpStr,
+ sc));
+ } catch (NoSuchPrimitiveComparisonException e) {
+ throw new SerDeException("No configured PrimitiveComparison class for " + type, e);
+ } catch (NoSuchCompareOpException e) {
+ throw new SerDeException("No configured CompareOp class for " + comparisonOpStr, e);
+ }
+
+ is.addOption(PrimitiveComparisonFilter.P_COMPARE_CLASS, tuple.getpCompare().getClass()
+ .getName());
+ is.addOption(PrimitiveComparisonFilter.COMPARE_OPT_CLASS, tuple.getcOpt().getClass().getName());
+ is.addOption(PrimitiveComparisonFilter.CONST_VAL,
+ new String(Base64.encodeBase64(tuple.getConstVal())));
+ is.addOption(PrimitiveComparisonFilter.COLUMN, accumuloColumnMapping.serialize());
+
+ return is;
+ }
+
+ public ExprNodeDesc getExpression(Configuration conf) {
+ String filteredExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ if (filteredExprSerialized == null)
+ return null;
+
+ return Utilities.deserializeExpression(filteredExprSerialized);
+ }
+
+ /**
+ *
+ * @param conf
+ * Configuration
+ * @return list of IndexSearchConditions from the filter expression.
+ */
+ public List getSearchConditions(Configuration conf) {
+ final List sConditions = Lists.newArrayList();
+ ExprNodeDesc filterExpr = getExpression(conf);
+ if (null == filterExpr) {
+ return sConditions;
+ }
+ IndexPredicateAnalyzer analyzer = newAnalyzer(conf);
+ ExprNodeDesc residual = analyzer.analyzePredicate(filterExpr, sConditions);
+ if (residual != null)
+ throw new RuntimeException("Unexpected residual predicate: " + residual.getExprString());
+ return sConditions;
+ }
+
+ /**
+ *
+ * @param conf
+ * Configuration
+ * @param desc
+ * predicate expression node.
+ * @return DecomposedPredicate containing translated search conditions the analyzer can support.
+ */
+ public DecomposedPredicate decompose(Configuration conf, ExprNodeDesc desc) {
+ IndexPredicateAnalyzer analyzer = newAnalyzer(conf);
+ List sConditions = new ArrayList();
+ ExprNodeDesc residualPredicate = analyzer.analyzePredicate(desc, sConditions);
+
+ if (sConditions.size() == 0) {
+ if (log.isInfoEnabled())
+ log.info("nothing to decompose. Returning");
+ return null;
+ }
+
+ DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
+ decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(sConditions);
+ decomposedPredicate.residualPredicate = (ExprNodeGenericFuncDesc) residualPredicate;
+ return decomposedPredicate;
+ }
+
+ /**
+ * Build an analyzer that allows comparison opts from compareOpts map, and all columns from table
+ * definition.
+ */
+ private IndexPredicateAnalyzer newAnalyzer(Configuration conf) {
+ IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
+ analyzer.clearAllowedColumnNames();
+ for (String op : cOpKeyset()) {
+ analyzer.addComparisonOp(op);
+ }
+
+ String[] hiveColumnNames = conf.getStrings(serdeConstants.LIST_COLUMNS);
+ for (String col : hiveColumnNames) {
+ analyzer.allowColumnName(col);
+ }
+
+ return analyzer;
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
new file mode 100644
index 0000000..d794e94
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.Equal;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThanOrEqual;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThan;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LessThanOrEqual;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class AccumuloRangeGenerator implements NodeProcessor {
+ private static final Logger log = LoggerFactory.getLogger(AccumuloRangeGenerator.class);
+
+ private final AccumuloPredicateHandler predicateHandler;
+ private final HiveAccumuloRowIdColumnMapping rowIdMapping;
+ private final String hiveRowIdColumnName;
+
+ public AccumuloRangeGenerator(AccumuloPredicateHandler predicateHandler,
+ HiveAccumuloRowIdColumnMapping rowIdMapping, String hiveRowIdColumnName) {
+ this.predicateHandler = predicateHandler;
+ this.rowIdMapping = rowIdMapping;
+ this.hiveRowIdColumnName = hiveRowIdColumnName;
+ }
+
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
+ // If it's not some operator, pass it back
+ if (!(nd instanceof ExprNodeGenericFuncDesc)) {
+ return nd;
+ }
+
+ ExprNodeGenericFuncDesc func = (ExprNodeGenericFuncDesc) nd;
+
+ // 'and' nodes need to be intersected
+ if (FunctionRegistry.isOpAnd(func)) {
+ return processAndOpNode(nd, nodeOutputs);
+ // 'or' nodes need to be merged
+ } else if (FunctionRegistry.isOpOr(func)) {
+ return processOrOpNode(nd, nodeOutputs);
+ } else if (FunctionRegistry.isOpNot(func)) {
+ // TODO handle negations
+ throw new IllegalArgumentException("Negations not yet implemented");
+ } else {
+ return processExpression(func, nodeOutputs);
+ }
+ }
+
+ protected Object processAndOpNode(Node nd, Object[] nodeOutputs) {
+ // We might have multiple ranges coming from children
+ List andRanges = null;
+
+ for (Object nodeOutput : nodeOutputs) {
+ // null signifies nodes that are irrelevant to the generation
+ // of Accumulo Ranges
+ if (null == nodeOutput) {
+ continue;
+ }
+
+ // When an AND has no children (some conjunction over a field that isn't the column
+ // mapped to the Accumulo rowid) and when a conjunction generates Ranges which are empty
+ // (the children of the conjunction are disjoint), these two cases need to be kept separate.
+ //
+ // A null `andRanges` implies that ranges couldn't be computed, while an empty List
+ // of Ranges implies that there are no possible Ranges to lookup.
+ if (null == andRanges) {
+ andRanges = new ArrayList();
+ }
+
+ // The child is a single Range
+ if (nodeOutput instanceof Range) {
+ Range childRange = (Range) nodeOutput;
+
+ // No existing ranges, just accept the current
+ if (andRanges.isEmpty()) {
+ andRanges.add(childRange);
+ } else {
+ // For each range we have, intersect them. If they don't overlap
+ // the range can be discarded
+ List newRanges = new ArrayList();
+ for (Range andRange : andRanges) {
+ Range intersectedRange = andRange.clip(childRange, true);
+ if (null != intersectedRange) {
+ newRanges.add(intersectedRange);
+ }
+ }
+
+ // Set the newly-constructed ranges as the current state
+ andRanges = newRanges;
+ }
+ } else if (nodeOutput instanceof List) {
+ @SuppressWarnings("unchecked")
+ List childRanges = (List) nodeOutput;
+
+ // No ranges, use the ranges from the child
+ if (andRanges.isEmpty()) {
+ andRanges.addAll(childRanges);
+ } else {
+ List newRanges = new ArrayList();
+
+ // Cartesian product of our ranges, to the child ranges
+ for (Range andRange : andRanges) {
+ for (Range childRange : childRanges) {
+ Range intersectedRange = andRange.clip(childRange, true);
+
+ // Retain only valid intersections (discard disjoint ranges)
+ if (null != intersectedRange) {
+ newRanges.add(intersectedRange);
+ }
+ }
+ }
+
+ // Set the newly-constructed ranges as the current state
+ andRanges = newRanges;
+ }
+ } else {
+ log.error("Expected Range from {} but got {}", nd, nodeOutput);
+ throw new IllegalArgumentException("Expected Range but got "
+ + nodeOutput.getClass().getName());
+ }
+ }
+
+ return andRanges;
+ }
+
+ protected Object processOrOpNode(Node nd, Object[] nodeOutputs) {
+ List orRanges = new ArrayList(nodeOutputs.length);
+ for (Object nodeOutput : nodeOutputs) {
+ if (nodeOutput instanceof Range) {
+ orRanges.add((Range) nodeOutput);
+ } else if (nodeOutput instanceof List) {
+ @SuppressWarnings("unchecked")
+ List childRanges = (List) nodeOutput;
+ orRanges.addAll(childRanges);
+ } else {
+ log.error("Expected Range from " + nd + " but got " + nodeOutput);
+ throw new IllegalArgumentException("Expected Range but got "
+ + nodeOutput.getClass().getName());
+ }
+ }
+
+ // Try to merge multiple ranges together
+ if (orRanges.size() > 1) {
+ return Range.mergeOverlapping(orRanges);
+ } else if (1 == orRanges.size()) {
+ // Return just the single Range
+ return orRanges.get(0);
+ } else {
+ // No ranges, just return the empty list
+ return orRanges;
+ }
+ }
+
+ protected Object processExpression(ExprNodeGenericFuncDesc func, Object[] nodeOutputs)
+ throws SemanticException {
+ // a binary operator (gt, lt, ge, le, eq, ne)
+ GenericUDF genericUdf = func.getGenericUDF();
+
+ // Find the argument to the operator which is a constant
+ ExprNodeConstantDesc constantDesc = null;
+ ExprNodeColumnDesc columnDesc = null;
+ ExprNodeDesc leftHandNode = null;
+ for (Object nodeOutput : nodeOutputs) {
+ if (nodeOutput instanceof ExprNodeConstantDesc) {
+ // Ordering of constant and column in expression is important in correct range generation
+ if (null == leftHandNode) {
+ leftHandNode = (ExprNodeDesc) nodeOutput;
+ }
+
+ constantDesc = (ExprNodeConstantDesc) nodeOutput;
+ } else if (nodeOutput instanceof ExprNodeColumnDesc) {
+ // Ordering of constant and column in expression is important in correct range generation
+ if (null == leftHandNode) {
+ leftHandNode = (ExprNodeDesc) nodeOutput;
+ }
+
+ columnDesc = (ExprNodeColumnDesc) nodeOutput;
+ }
+ }
+
+ // If it's constant = constant or column = column, we can't fetch any ranges
+ // TODO We can try to be smarter and push up the value to some node which
+ // we can generate ranges from e.g. rowid > (4 + 5)
+ if (null == constantDesc || null == columnDesc) {
+ return null;
+ }
+
+ // Reject any clauses that are against a column that isn't the rowId mapping
+ if (!this.hiveRowIdColumnName.equals(columnDesc.getColumn())) {
+ return null;
+ }
+
+ ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector();
+
+ Text constText;
+ switch (rowIdMapping.getEncoding()) {
+ case STRING:
+ constText = getUtf8Value(objInspector);
+ break;
+ case BINARY:
+ try {
+ constText = getBinaryValue(objInspector);
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ break;
+ default:
+ throw new SemanticException("Unable to parse unknown encoding: "
+ + rowIdMapping.getEncoding());
+ }
+
+ Class extends CompareOp> opClz;
+ try {
+ opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName());
+ } catch (NoSuchCompareOpException e) {
+ throw new IllegalArgumentException("Unhandled UDF class: " + genericUdf.getUdfName());
+ }
+
+ if (leftHandNode instanceof ExprNodeConstantDesc) {
+ return getConstantOpColumnRange(opClz, constText);
+ } else if (leftHandNode instanceof ExprNodeColumnDesc) {
+ return getColumnOpConstantRange(opClz, constText);
+ } else {
+ throw new IllegalStateException("Expected column or constant on LHS of expression");
+ }
+ }
+
+ protected Range getConstantOpColumnRange(Class extends CompareOp> opClz, Text constText) {
+ if (opClz.equals(Equal.class)) {
+ // 100 == x
+ return new Range(constText); // single row
+ } else if (opClz.equals(GreaterThanOrEqual.class)) {
+ // 100 >= x
+ return new Range(null, constText); // neg-infinity to end inclusive
+ } else if (opClz.equals(GreaterThan.class)) {
+ // 100 > x
+ return new Range(null, false, constText, false); // neg-infinity to end exclusive
+ } else if (opClz.equals(LessThanOrEqual.class)) {
+ // 100 <= x
+ return new Range(constText, true, null, false); // start inclusive to infinity
+ } else if (opClz.equals(LessThan.class)) {
+ // 100 < x
+ return new Range(constText, false, null, false); // start exclusive to infinity
+ } else {
+ throw new IllegalArgumentException("Could not process " + opClz);
+ }
+ }
+
+ protected Range getColumnOpConstantRange(Class extends CompareOp> opClz, Text constText) {
+ if (opClz.equals(Equal.class)) {
+ return new Range(constText); // start inclusive to end inclusive
+ } else if (opClz.equals(GreaterThanOrEqual.class)) {
+ return new Range(constText, null); // start inclusive to infinity inclusive
+ } else if (opClz.equals(GreaterThan.class)) {
+ return new Range(constText, false, null, false); // start exclusive to infinity inclusive
+ } else if (opClz.equals(LessThanOrEqual.class)) {
+ return new Range(null, false, constText, true); // neg-infinity to start inclusive
+ } else if (opClz.equals(LessThan.class)) {
+ return new Range(null, false, constText, false); // neg-infinity to start exclusive
+ } else {
+ throw new IllegalArgumentException("Could not process " + opClz);
+ }
+ }
+
+ protected Text getUtf8Value(ConstantObjectInspector objInspector) {
+ // TODO is there a more correct way to get the literal value for the Object?
+ return new Text(objInspector.getWritableConstantValue().toString());
+ }
+
+ /**
+ * Attempts to construct the binary value from the given inspector. Falls back to UTF8 encoding
+ * when the value cannot be coerced into binary.
+ *
+ * @return Binary value when possible, utf8 otherwise
+ * @throws IOException
+ */
+ protected Text getBinaryValue(ConstantObjectInspector objInspector) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ if (objInspector instanceof WritableConstantBooleanObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantBooleanObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantByteObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantByteObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantShortObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantShortObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantIntObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantIntObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantLongObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantLongObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantDoubleObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantDoubleObjectInspector) objInspector);
+ } else if (objInspector instanceof WritableConstantFloatObjectInspector) {
+ LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(),
+ (WritableConstantDoubleObjectInspector) objInspector);
+ } else {
+ return getUtf8Value(objInspector);
+ }
+
+ out.close();
+ return new Text(out.toByteArray());
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java
new file mode 100644
index 0000000..962185c
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchCompareOpException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+/**
+ *
+ */
+public class NoSuchCompareOpException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public NoSuchCompareOpException() {
+ super();
+ }
+
+ public NoSuchCompareOpException(String msg) {
+ super(msg);
+ }
+
+ public NoSuchCompareOpException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java
new file mode 100644
index 0000000..c305a9e
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/NoSuchPrimitiveComparisonException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+
+/**
+ * Used when a {@link PrimitiveComparison} was specified but one with that name cannot be found
+ */
+public class NoSuchPrimitiveComparisonException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public NoSuchPrimitiveComparisonException() {
+ super();
+ }
+
+ public NoSuchPrimitiveComparisonException(String msg) {
+ super(msg);
+ }
+
+ public NoSuchPrimitiveComparisonException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java
new file mode 100644
index 0000000..c303d49
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java
@@ -0,0 +1,123 @@
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding;
+import org.apache.hadoop.hive.accumulo.columns.ColumnMappingFactory;
+import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping;
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Operates over a single qualifier.
+ *
+ * Delegates to PrimitiveCompare and CompareOpt instances for value acceptance.
+ *
+ * The PrimitiveCompare strategy assumes a consistent value type for the same column family and
+ * qualifier.
+ */
+public class PrimitiveComparisonFilter extends WholeRowIterator {
+ @SuppressWarnings("unused")
+ private static final Logger log = Logger.getLogger(PrimitiveComparisonFilter.class);
+
+ public static final String FILTER_PREFIX = "accumulo.filter.compare.iterator.";
+ public static final String P_COMPARE_CLASS = "accumulo.filter.iterator.p.compare.class";
+ public static final String COMPARE_OPT_CLASS = "accumulo.filter.iterator.compare.opt.class";
+ public static final String CONST_VAL = "accumulo.filter.iterator.const.val";
+ public static final String COLUMN = "accumulo.filter.iterator.qual";
+
+ private Text cfHolder, cqHolder, columnMappingFamily, columnMappingQualifier;
+ private HiveAccumuloColumnMapping columnMapping;
+ private CompareOp compOpt;
+
+ @Override
+ protected boolean filter(Text currentRow, List keys, List values) {
+ SortedMap items;
+ boolean allow;
+ try { // if key doesn't contain CF, it's an encoded value from a previous iterator.
+ while (keys.get(0).getColumnFamily().getBytes().length == 0) {
+ items = decodeRow(keys.get(0), values.get(0));
+ keys = Lists.newArrayList(items.keySet());
+ values = Lists.newArrayList(items.values());
+ }
+ allow = accept(keys, values);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return allow;
+ }
+
+ private boolean accept(Collection keys, Collection values) {
+ Iterator kIter = keys.iterator();
+ Iterator vIter = values.iterator();
+ while (kIter.hasNext()) {
+ Key k = kIter.next();
+ Value v = vIter.next();
+ if (matchQualAndFam(k)) {
+ return compOpt.accept(v.get());
+ }
+ }
+ return false;
+ }
+
+ private boolean matchQualAndFam(Key k) {
+ k.getColumnFamily(cfHolder);
+ k.getColumnQualifier(cqHolder);
+ return cfHolder.equals(columnMappingFamily) && cqHolder.equals(columnMappingQualifier);
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator source, Map options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ String serializedColumnMapping = options.get(COLUMN);
+ Entry pair = ColumnMappingFactory.parseMapping(serializedColumnMapping);
+
+ // The ColumnEncoding, column name and type are all irrelevant at this point, just need the
+ // cf:[cq]
+ columnMapping = new HiveAccumuloColumnMapping(pair.getKey(), pair.getValue(),
+ ColumnEncoding.STRING, "column", "string");
+ columnMappingFamily = new Text(columnMapping.getColumnFamily());
+ columnMappingQualifier = new Text(columnMapping.getColumnQualifier());
+ cfHolder = new Text();
+ cqHolder = new Text();
+
+ try {
+ Class> pClass = Class.forName(options.get(P_COMPARE_CLASS));
+ Class> cClazz = Class.forName(options.get(COMPARE_OPT_CLASS));
+ PrimitiveComparison pCompare = pClass.asSubclass(PrimitiveComparison.class).newInstance();
+ compOpt = cClazz.asSubclass(CompareOp.class).newInstance();
+ byte[] constant = getConstant(options);
+ pCompare.init(constant);
+ compOpt.setPrimitiveCompare(pCompare);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+
+ protected byte[] getConstant(Map options) {
+ String b64Const = options.get(CONST_VAL);
+ return Base64.decodeBase64(b64Const.getBytes());
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java
new file mode 100644
index 0000000..32d143a
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PushdownTuple.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp;
+import org.apache.hadoop.hive.accumulo.predicate.compare.DoubleCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.IntCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.LongCompare;
+import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison;
+import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare;
+import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * For use in IteratorSetting construction.
+ *
+ * encapsulates a constant byte [], PrimitiveCompare instance, and CompareOp instance.
+ */
+public class PushdownTuple {
+ private static final Logger log = Logger.getLogger(PushdownTuple.class);
+
+ private byte[] constVal;
+ private PrimitiveComparison pCompare;
+ private CompareOp cOpt;
+
+ public PushdownTuple(IndexSearchCondition sc, PrimitiveComparison pCompare, CompareOp cOpt)
+ throws SerDeException {
+ ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc());
+
+ try {
+ this.pCompare = pCompare;
+ this.cOpt = cOpt;
+ Writable writable = (Writable) eval.evaluate(null);
+ constVal = getConstantAsBytes(writable);
+ } catch (ClassCastException cce) {
+ log.info(StringUtils.stringifyException(cce));
+ throw new SerDeException(" Column type mismatch in where clause "
+ + sc.getComparisonExpr().getExprString() + " found type "
+ + sc.getConstantDesc().getTypeString() + " instead of "
+ + sc.getColumnDesc().getTypeString());
+ } catch (HiveException e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ public byte[] getConstVal() {
+ return constVal;
+ }
+
+ public PrimitiveComparison getpCompare() {
+ return pCompare;
+ }
+
+ public CompareOp getcOpt() {
+ return cOpt;
+ }
+
+ /**
+ *
+ * @return byte [] value from writable.
+ * @throws SerDeException
+ */
+ public byte[] getConstantAsBytes(Writable writable) throws SerDeException {
+ if (pCompare instanceof StringCompare) {
+ return writable.toString().getBytes();
+ } else if (pCompare instanceof DoubleCompare) {
+ byte[] bts = new byte[8];
+ double val = ((DoubleWritable) writable).get();
+ ByteBuffer.wrap(bts).putDouble(val);
+ return bts;
+ } else if (pCompare instanceof IntCompare) {
+ byte[] bts = new byte[4];
+ int val = ((IntWritable) writable).get();
+ ByteBuffer.wrap(bts).putInt(val);
+ return bts;
+ } else if (pCompare instanceof LongCompare) {
+ byte[] bts = new byte[8];
+ long val = ((LongWritable) writable).get();
+ ByteBuffer.wrap(bts).putLong(val);
+ return bts;
+ } else {
+ throw new SerDeException("Unsupported primitive category: " + pCompare.getClass().getName());
+ }
+ }
+
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java
new file mode 100644
index 0000000..0585333
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/CompareOp.java
@@ -0,0 +1,26 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Handles different types of comparisons in hive predicates. Filter iterator delegates value
+ * acceptance to the CompareOpt.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}. Works with
+ * {@link PrimitiveComparison}
+ */
+public interface CompareOp {
+ /**
+ * Sets the PrimitiveComparison for this CompareOp
+ */
+ public void setPrimitiveCompare(PrimitiveComparison comp);
+
+ /**
+ * @return The PrimitiveComparison this CompareOp is a part of
+ */
+ public PrimitiveComparison getPrimitiveCompare();
+
+ /**
+ * @param val The bytes from the Accumulo Value
+ * @return true if the value is accepted by this CompareOp
+ */
+ public boolean accept(byte[] val);
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java
new file mode 100644
index 0000000..210ad72
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/DoubleCompare.java
@@ -0,0 +1,90 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+/**
+ * Set of comparison operations over a double constant. Used for Hive predicates involving double
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class DoubleCompare implements PrimitiveComparison {
+
+ private BigDecimal constant;
+
+ /**
+ *
+ */
+ public void init(byte[] constant) {
+ this.constant = serialize(constant);
+ }
+
+ /**
+ * @return BigDecimal holding double byte [] value
+ */
+ public BigDecimal serialize(byte[] value) {
+ try {
+ return new BigDecimal(ByteBuffer.wrap(value).asDoubleBuffer().get());
+ } catch (Exception e) {
+ throw new RuntimeException(e.toString() + " occurred trying to build double value. "
+ + "Make sure the value type for the byte[] is double.");
+ }
+ }
+
+ /**
+ * @return true if double value is equal to constant, false otherwise.
+ */
+ @Override
+ public boolean isEqual(byte[] value) {
+ return serialize(value).compareTo(constant) == 0;
+ }
+
+ /**
+ * @return true if double value not equal to constant, false otherwise.
+ */
+ @Override
+ public boolean isNotEqual(byte[] value) {
+ return serialize(value).compareTo(constant) != 0;
+ }
+
+ /**
+ * @return true if value greater than or equal to constant, false otherwise.
+ */
+ @Override
+ public boolean greaterThanOrEqual(byte[] value) {
+ return serialize(value).compareTo(constant) >= 0;
+ }
+
+ /**
+ * @return true if value greater than constant, false otherwise.
+ */
+ @Override
+ public boolean greaterThan(byte[] value) {
+ return serialize(value).compareTo(constant) > 0;
+ }
+
+ /**
+ * @return true if value less than or equal than constant, false otherwise.
+ */
+ @Override
+ public boolean lessThanOrEqual(byte[] value) {
+ return serialize(value).compareTo(constant) <= 0;
+ }
+
+ /**
+ * @return true if value less than constant, false otherwise.
+ */
+ @Override
+ public boolean lessThan(byte[] value) {
+ return serialize(value).compareTo(constant) < 0;
+ }
+
+ /**
+ * not supported for this PrimitiveCompare implementation.
+ */
+ @Override
+ public boolean like(byte[] value) {
+ throw new UnsupportedOperationException("Like not supported for " + getClass().getName());
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java
new file mode 100644
index 0000000..3a34f12
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Equal.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to isEqual() over PrimitiveCompare instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class Equal implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public Equal() {}
+
+ public Equal(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.isEqual(val);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java
new file mode 100644
index 0000000..a47b2a3
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThan.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to greaterThan over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class GreaterThan implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public GreaterThan() {}
+
+ public GreaterThan(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return this.comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.greaterThan(val);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java
new file mode 100644
index 0000000..c502a45
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/GreaterThanOrEqual.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to greaterThanOrEqual over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class GreaterThanOrEqual implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public GreaterThanOrEqual() {}
+
+ public GreaterThanOrEqual(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.greaterThanOrEqual(val);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java
new file mode 100644
index 0000000..d7de1ff
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/IntCompare.java
@@ -0,0 +1,63 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Set of comparison operations over a integer constant. Used for Hive predicates involving int
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class IntCompare implements PrimitiveComparison {
+
+ private int constant;
+
+ @Override
+ public void init(byte[] constant) {
+ this.constant = serialize(constant);
+ }
+
+ @Override
+ public boolean isEqual(byte[] value) {
+ return serialize(value) == constant;
+ }
+
+ @Override
+ public boolean isNotEqual(byte[] value) {
+ return serialize(value) != constant;
+ }
+
+ @Override
+ public boolean greaterThanOrEqual(byte[] value) {
+ return serialize(value) >= constant;
+ }
+
+ @Override
+ public boolean greaterThan(byte[] value) {
+ return serialize(value) > constant;
+ }
+
+ @Override
+ public boolean lessThanOrEqual(byte[] value) {
+ return serialize(value) <= constant;
+ }
+
+ @Override
+ public boolean lessThan(byte[] value) {
+ return serialize(value) < constant;
+ }
+
+ @Override
+ public boolean like(byte[] value) {
+ throw new UnsupportedOperationException("Like not supported for " + getClass().getName());
+ }
+
+ public Integer serialize(byte[] value) {
+ try {
+ return ByteBuffer.wrap(value).asIntBuffer().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e.toString() + " occurred trying to build int value. "
+ + "Make sure the value type for the byte[] is int ");
+ }
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java
new file mode 100644
index 0000000..2933131
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThan.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to lessThan over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class LessThan implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public LessThan() {}
+
+ public LessThan(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.lessThan(val);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java
new file mode 100644
index 0000000..86acb73
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LessThanOrEqual.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to lessThanOrEqual over {@link PrimitiveComparison} instance.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class LessThanOrEqual implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public LessThanOrEqual() {}
+
+ public LessThanOrEqual(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.lessThanOrEqual(val);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java
new file mode 100644
index 0000000..612641d
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/Like.java
@@ -0,0 +1,33 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to like over {@link PrimitiveComparison} instance. Currently only supported by
+ * StringCompare.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class Like implements CompareOp {
+
+ PrimitiveComparison comp;
+
+ public Like() {}
+
+ public Like(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return comp.like(val);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java
new file mode 100644
index 0000000..b32874f
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/LongCompare.java
@@ -0,0 +1,64 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Set of comparison operations over a long constant. Used for Hive predicates involving long
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class LongCompare implements PrimitiveComparison {
+
+ private long constant;
+
+ @Override
+ public void init(byte[] constant) {
+ this.constant = serialize(constant);
+ }
+
+ @Override
+ public boolean isEqual(byte[] value) {
+ long lonVal = serialize(value);
+ return lonVal == constant;
+ }
+
+ @Override
+ public boolean isNotEqual(byte[] value) {
+ return serialize(value) != constant;
+ }
+
+ @Override
+ public boolean greaterThanOrEqual(byte[] value) {
+ return serialize(value) >= constant;
+ }
+
+ @Override
+ public boolean greaterThan(byte[] value) {
+ return serialize(value) > constant;
+ }
+
+ @Override
+ public boolean lessThanOrEqual(byte[] value) {
+ return serialize(value) <= constant;
+ }
+
+ @Override
+ public boolean lessThan(byte[] value) {
+ return serialize(value) < constant;
+ }
+
+ @Override
+ public boolean like(byte[] value) {
+ throw new UnsupportedOperationException("Like not supported for " + getClass().getName());
+ }
+
+ public Long serialize(byte[] value) {
+ try {
+ return ByteBuffer.wrap(value).asLongBuffer().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e.toString() + " occurred trying to build long value. "
+ + "Make sure the value type for the byte[] is long ");
+ }
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java
new file mode 100644
index 0000000..22b84ba
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/NotEqual.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps call to isEqual over {@link PrimitiveComparison} instance and returns the negation.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class NotEqual implements CompareOp {
+
+ private PrimitiveComparison comp;
+
+ public NotEqual() {}
+
+ public NotEqual(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public void setPrimitiveCompare(PrimitiveComparison comp) {
+ this.comp = comp;
+ }
+
+ @Override
+ public PrimitiveComparison getPrimitiveCompare() {
+ return comp;
+ }
+
+ @Override
+ public boolean accept(byte[] val) {
+ return !comp.isEqual(val);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java
new file mode 100644
index 0000000..26e194f
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/PrimitiveComparison.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+/**
+ * Wraps type-specific comparison operations over a constant value. Methods take raw byte from
+ * incoming Accumulo values.
+ *
+ * The CompareOpt instance in the iterator uses one or more methods from a PrimitiveCompare
+ * implementation to perform type-specific comparisons and determine acceptances.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}. Works with
+ * {@link CompareOp}
+ */
+public interface PrimitiveComparison {
+
+ public boolean isEqual(byte[] value);
+
+ public boolean isNotEqual(byte[] value);
+
+ public boolean greaterThanOrEqual(byte[] value);
+
+ public boolean greaterThan(byte[] value);
+
+ public boolean lessThanOrEqual(byte[] value);
+
+ public boolean lessThan(byte[] value);
+
+ public boolean like(byte[] value);
+
+ public Object serialize(byte[] value);
+
+ public void init(byte[] constant);
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java
new file mode 100644
index 0000000..b71b8a8
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/StringCompare.java
@@ -0,0 +1,65 @@
+package org.apache.hadoop.hive.accumulo.predicate.compare;
+
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Set of comparison operations over a string constant. Used for Hive predicates involving string
+ * comparison.
+ *
+ * Used by {@link org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter}
+ */
+public class StringCompare implements PrimitiveComparison {
+ @SuppressWarnings("unused")
+ private static final Logger log = Logger.getLogger(StringCompare.class);
+
+ private String constant;
+
+ @Override
+ public void init(byte[] constant) {
+ this.constant = serialize(constant);
+ }
+
+ @Override
+ public boolean isEqual(byte[] value) {
+ return serialize(value).equals(constant);
+ }
+
+ @Override
+ public boolean isNotEqual(byte[] value) {
+ return !isEqual(value);
+ }
+
+ @Override
+ public boolean greaterThanOrEqual(byte[] value) {
+ return serialize(value).compareTo(constant) >= 0;
+ }
+
+ @Override
+ public boolean greaterThan(byte[] value) {
+ return serialize(value).compareTo(constant) > 0;
+ }
+
+ @Override
+ public boolean lessThanOrEqual(byte[] value) {
+ return serialize(value).compareTo(constant) <= 0;
+ }
+
+ @Override
+ public boolean lessThan(byte[] value) {
+ return serialize(value).compareTo(constant) < 0;
+ }
+
+ @Override
+ public boolean like(byte[] value) {
+ String temp = new String(value).replaceAll("%", "[\\\\\\w]+?");
+ Pattern pattern = Pattern.compile(temp);
+ boolean match = pattern.matcher(constant).matches();
+ return match;
+ }
+
+ public String serialize(byte[] value) {
+ return new String(value);
+ }
+}
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java
new file mode 100644
index 0000000..875fad2
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/compare/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * PrimitiveCompare and CompareOpt implementations for use in PrimitiveComparisonFilter iterator
+ */
+package org.apache.hadoop.hive.accumulo.predicate.compare;
\ No newline at end of file
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java
new file mode 100644
index 0000000..419ce01
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Predicate pushdown to Accumulo filter iterators.
+ */
+package org.apache.hadoop.hive.accumulo.predicate;
\ No newline at end of file
diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java
new file mode 100644
index 0000000..f3ebbd1
--- /dev/null
+++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloCompositeRowId.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.accumulo.serde;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+
+/**
+ * AccumuloCompositeKey extension of LazyStruct. All complex composite keys should extend this class
+ * and override the {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID of a
+ * key in the composite key.
+ *
+ * For example, for a composite key "/part1/part2/part3", part1 will have an id
+ * 0, part2 will have an id 1 and part3 will have an id 2. Custom
+ * implementations of getField(fieldID) should return the value corresponding to that fieldID. So,
+ * for the above example, the value returned for getField(0) should be part1,
+ * getField(1) should be part2 and getField(2) should be part3.
+ *
+ *
+ *
+ * All custom implementation are expected to have a constructor of the form:
+ *
+ *
+ * MyCustomCompositeKey(LazySimpleStructObjectInspector oi, Properties tbl, Configuration conf)
+ *
+ *
+ *
+ */
+public class AccumuloCompositeRowId extends LazyStruct {
+
+ public AccumuloCompositeRowId(LazySimpleStructObjectInspector oi) {
+ super(oi);
+ }
+
+ @Override
+ public ArrayList