diff --git accumulo-handler/pom.xml accumulo-handler/pom.xml index f1acb44..2fdf6e5 100644 --- accumulo-handler/pom.xml +++ accumulo-handler/pom.xml @@ -62,6 +62,16 @@ org.apache.hive hive-common ${project.version} + + + org.eclipse.jetty.aggregate + jetty-all + + + org.eclipse.jetty.orbit + javax.servlet + + org.apache.hive @@ -77,6 +87,16 @@ org.apache.hive hive-service ${project.version} + + + org.eclipse.jetty.aggregate + jetty-all + + + org.eclipse.jetty.orbit + javax.servlet + + org.apache.hive diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java new file mode 100644 index 0000000..427a6c7 --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloDefaultIndexScanner.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.Collections.EMPTY_SET; + +/** + * This default index scanner expects indexes to be in the same format as presto's + * accumulo index tables defined as: + * [rowid=field value] [cf=cfname_cqname] [cq=rowid] [visibility] [value=""] + *

+ * This handler looks for the following hive serde properties: + * 'accumulo.indextable.name' = 'table_idx' (required - name of the corresponding index table) + * 'accumulo.indexed.columns' = 'name,age,phone' (optional - comma separated list of indexed + * hive columns if not defined or defined as '*' all columns are + * assumed to be indexed ) + * 'accumulo.index.rows.max' = '20000' (optional - maximum number of match indexes to use + * before converting to a full table scan default=20000' + * Note: This setting controls the size of the in-memory list of rowids + * each search predicate. Using large values for this setting or having + * very large rowid values may require additional memory to prevent + * out of memory errors + * 'accumulo.index.scanner' = 'org.apache.hadoop.hive.accumulo.AccumuloDefaultIndexScanner' + * (optional - name of the index scanner) + *

+ * To implement your own index table scheme it should be as simple as sub-classing + * this class and overriding getIndexRowRanges() and optionally init() if you need more + * config settings + */ +public class AccumuloDefaultIndexScanner implements AccumuloIndexScanner { + private static final Logger LOG = LoggerFactory.getLogger(AccumuloDefaultIndexScanner.class); + + private AccumuloConnectionParameters connectParams; + private AccumuloIndexParameters indexParams; + private int maxRowIds; + private Authorizations auths; + private String indexTable; + private Set indexColumns = EMPTY_SET; + private Connector connect; + private Map colMap; + + /** + * Initialize object based on configuration. + * + * @param conf - Hive configuration + */ + @Override + public void init(Configuration conf) { + connectParams = new AccumuloConnectionParameters(conf); + indexParams = new AccumuloIndexParameters(conf); + maxRowIds = indexParams.getMaxIndexRows(); + auths = indexParams.getTableAuths(); + indexTable = indexParams.getIndexTable(); + indexColumns = indexParams.getIndexColumns(); + colMap = createColumnMap(conf); + + } + + /** + * Get a list of rowid ranges by scanning a column index. + * + * @param column - the hive column name + * @param indexRange - Key range to scan on the index table + * @return List of matching rowid ranges or null if too many matches found + * if index values are not found a newline range is added to list to + * short-circuit the query + */ + @Override + public List getIndexRowRanges(String column, Range indexRange) { + List rowIds = new ArrayList(); + Scanner scan = null; + String col = this.colMap.get(column); + + if (col != null) { + + try { + LOG.debug("Searching tab=" + indexTable + " column=" + column + " range=" + indexRange); + Connector conn = getConnector(); + scan = conn.createScanner(indexTable, auths); + scan.setRange(indexRange); + Text cf = new Text(col); + LOG.debug("Using Column Family=" + toString()); + scan.fetchColumnFamily(cf); + + for (Map.Entry entry : scan) { + + rowIds.add(new Range(entry.getKey().getColumnQualifier())); + + // if we have too many results return null for a full scan + if (rowIds.size() > maxRowIds) { + return null; + } + } + + // no hits on the index so return a no match range + if (rowIds.isEmpty()) { + LOG.debug("Found 0 index matches"); + } else { + LOG.debug("Found " + rowIds.size() + " index matches"); + } + + return rowIds; + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + LOG.error("Failed to scan index table: " + indexTable, e); + } finally { + if (scan != null) { + scan.close(); + } + } + } + + // assume the index is bad and do a full scan + LOG.debug("Index lookup failed for table " + indexTable); + return null; + } + + /** + * Test if column is defined in the index table. + * + * @param column - hive column name + * @return true if the column is defined as part of the index table + */ + @Override + public boolean isIndexed(String column) { + return indexTable != null + && (indexColumns.isEmpty() || indexColumns.contains("*") + || this.indexColumns.contains(column.toLowerCase()) + || this.indexColumns.contains(column.toUpperCase())); + + } + + protected Map createColumnMap(Configuration conf) { + Map colsMap = new HashMap(); + String accColString = conf.get(AccumuloSerDeParameters.COLUMN_MAPPINGS); + if (accColString != null && !accColString.trim().isEmpty()) { + String[] accCols = accColString.split(","); + String[] hiveCols = conf.get(serdeConstants.LIST_COLUMNS).split(","); + for (int i = 0; i < accCols.length; i++) { + colsMap.put(hiveCols[i], accCols[i].replace(':', '_')); + } + } + return colsMap; + } + + protected Connector getConnector() throws AccumuloSecurityException, AccumuloException { + if (connect == null) { + connect = connectParams.getConnector(); + } + return connect; + } + + public void setConnectParams(AccumuloConnectionParameters connectParams) { + this.connectParams = connectParams; + } + + public AccumuloConnectionParameters getConnectParams() { + return connectParams; + } + + public AccumuloIndexParameters getIndexParams() { + return indexParams; + } + + public int getMaxRowIds() { + return maxRowIds; + } + + public Authorizations getAuths() { + return auths; + } + + public String getIndexTable() { + return indexTable; + } + + public Set getIndexColumns() { + return indexColumns; + } + + public Connector getConnect() { + return connect; + } +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexLexicoder.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexLexicoder.java new file mode 100644 index 0000000..6703570 --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexLexicoder.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.client.lexicoder.BigIntegerLexicoder; +import org.apache.accumulo.core.client.lexicoder.DoubleLexicoder; +import org.apache.accumulo.core.client.lexicoder.IntegerLexicoder; +import org.apache.accumulo.core.client.lexicoder.LongLexicoder; +import org.apache.hadoop.hive.serde.serdeConstants; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Utility class to encode index values for accumulo. + */ +public final class AccumuloIndexLexicoder { + private static final IntegerLexicoder INTEGER_LEXICODER = new IntegerLexicoder(); + private static final DoubleLexicoder DOUBLE_LEXICODER = new DoubleLexicoder(); + private static final LongLexicoder LONG_LEXICODER = new LongLexicoder(); + private static final BigIntegerLexicoder BIG_INTEGER_LEXICODER = new BigIntegerLexicoder(); + private static final String DIM_PAT = "[(]+.*"; + + + private AccumuloIndexLexicoder() { + // hide constructor + } + + public static String getRawType(String hiveType) { + if (hiveType != null) { + return hiveType.toLowerCase().replaceFirst(DIM_PAT, "").trim(); + } + return hiveType; + } + + public static byte[] encodeValue(byte[] value, String hiveType, boolean stringEncoded) { + if (stringEncoded) { + return encodeStringValue(value, hiveType); + } else { + return encodeBinaryValue(value, hiveType); + } + } + + public static byte[] encodeStringValue(byte[] value, String hiveType) { + String rawType = getRawType(hiveType); + + switch(rawType) { + case serdeConstants.BOOLEAN_TYPE_NAME: + return Boolean.valueOf(new String(value)).toString().getBytes(UTF_8); + case serdeConstants.SMALLINT_TYPE_NAME : + case serdeConstants.TINYINT_TYPE_NAME : + case serdeConstants.INT_TYPE_NAME : + return INTEGER_LEXICODER.encode(Integer.valueOf(new String(value))); + case serdeConstants.FLOAT_TYPE_NAME : + case serdeConstants.DOUBLE_TYPE_NAME : + return DOUBLE_LEXICODER.encode(Double.valueOf(new String(value))); + case serdeConstants.BIGINT_TYPE_NAME : + return LONG_LEXICODER.encode(Long.valueOf(new String(value))); + case serdeConstants.DECIMAL_TYPE_NAME : + return BIG_INTEGER_LEXICODER.encode(new BigInteger(new String(value), 10)); + default : + // return the passed in string value + return value; + } + } + + public static byte[] encodeBinaryValue(byte[] value, String hiveType) { + String rawType = getRawType(hiveType); + + switch(rawType) { + case serdeConstants.BOOLEAN_TYPE_NAME : + return String.valueOf(value[0] == 1).getBytes(); + case serdeConstants.INT_TYPE_NAME : + return INTEGER_LEXICODER.encode(ByteBuffer.wrap(value).asIntBuffer().get()); + case serdeConstants.SMALLINT_TYPE_NAME : + return INTEGER_LEXICODER.encode((int)(ByteBuffer.wrap(value).asShortBuffer().get())); + case serdeConstants.TINYINT_TYPE_NAME : + return INTEGER_LEXICODER.encode((int)value[0]); + case serdeConstants.FLOAT_TYPE_NAME : + return DOUBLE_LEXICODER.encode((double)ByteBuffer.wrap(value).asFloatBuffer().get()); + case serdeConstants.DOUBLE_TYPE_NAME : + return DOUBLE_LEXICODER.encode(ByteBuffer.wrap(value).asDoubleBuffer().get()); + case serdeConstants.BIGINT_TYPE_NAME : + return LONG_LEXICODER.encode(ByteBuffer.wrap(value).asLongBuffer().get()); + case serdeConstants.DECIMAL_TYPE_NAME : + return BIG_INTEGER_LEXICODER.encode(new BigInteger(value)); + default : + return value; + } + } +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java new file mode 100644 index 0000000..8029f3c --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScanner.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +/** + * Specification for implementing a AccumuloIndexScanner. + */ +public interface AccumuloIndexScanner { + + /** + * Initialize the index scanner implementation with the runtime configuration. + * + * @param conf - the hadoop configuration + */ + void init(Configuration conf); + + /** + * Check if column is defined as being indexed. + * + * @param columnName - the hive column name + * @return true if the column is indexed + */ + boolean isIndexed(String columnName); + + /** + * Get a list of rowid ranges by scanning a column index. + * + * @param column - the hive column name + * @param indexRange - Key range to scan on the index table + * @return List of matching rowid ranges or null if too many matches found + * + */ + List getIndexRowRanges(String column, Range indexRange); + +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScannerException.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScannerException.java new file mode 100644 index 0000000..c50b606 --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloIndexScannerException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +/** + * Exception class for AccumuloIndexScanner operations. + */ +public class AccumuloIndexScannerException extends Exception { + + private static final long serialVersionUID = 1L; + + public AccumuloIndexScannerException() { + super(); + } + + public AccumuloIndexScannerException(String msg) { + super(msg); + } + + public AccumuloIndexScannerException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java index cdbc7f2..62524e8 100644 --- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java @@ -1,10 +1,11 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -17,10 +18,6 @@ package org.apache.hadoop.hive.accumulo; -import java.io.IOException; -import java.util.Map; -import java.util.Properties; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; @@ -39,6 +36,7 @@ import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableInputFormat; import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableOutputFormat; import org.apache.hadoop.hive.accumulo.predicate.AccumuloPredicateHandler; +import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters; import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe; import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; import org.apache.hadoop.hive.metastore.HiveMetaHook; @@ -52,13 +50,13 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; -import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; @@ -66,12 +64,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + /** * Create table mapping to Accumulo for Hive. Handle predicate pushdown if necessary. */ public class AccumuloStorageHandler extends DefaultStorageHandler implements HiveMetaHook, HiveStoragePredicateHandler { - private static final Logger log = LoggerFactory.getLogger(AccumuloStorageHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(AccumuloStorageHandler.class); private static final String DEFAULT_PREFIX = "default"; protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance(); @@ -88,7 +92,7 @@ * Properties that will be added to the JobConf by Hive */ @Override - public void configureTableJobProperties(TableDesc desc, Map jobProps) { + public void configureTableJobProperties(TableDesc desc, Map jobProps) { // Should not be getting invoked, configureInputJobProperties or configureOutputJobProperties // should be invoked instead. configureInputJobProperties(desc, jobProps); @@ -119,6 +123,21 @@ protected String getTableName(Table table) throws MetaException { } } + protected String getIndexTableName(Table table) { + // Use TBLPROPERTIES + String idxTableName = table.getParameters().get(AccumuloIndexParameters.INDEXTABLE_NAME); + + if (null != idxTableName) { + return idxTableName; + } + + // Then try SERDEPROPERTIES + idxTableName = table.getSd().getSerdeInfo().getParameters() + .get(AccumuloIndexParameters.INDEXTABLE_NAME); + + return idxTableName; + } + protected String getTableName(TableDesc tableDesc) { Properties props = tableDesc.getProperties(); String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME); @@ -135,6 +154,18 @@ protected String getTableName(TableDesc tableDesc) { return tableName; } + protected String getColumnTypes(TableDesc tableDesc) { + Properties props = tableDesc.getProperties(); + String columnTypes = props.getProperty(serdeConstants.LIST_COLUMN_TYPES); + return columnTypes; + } + + protected String getIndexTableName(TableDesc tableDesc) { + Properties props = tableDesc.getProperties(); + String tableName = props.getProperty(AccumuloIndexParameters.INDEXTABLE_NAME); + return tableName; + } + @Override public Configuration getConf() { return conf; @@ -163,7 +194,7 @@ public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException } @Override - public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { + public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { Properties props = tableDesc.getProperties(); jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS, @@ -178,7 +209,7 @@ public void configureInputJobProperties(TableDesc tableDesc, Map String useIterators = props.getProperty(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY); if (useIterators != null) { - if (!useIterators.equalsIgnoreCase("true") && !useIterators.equalsIgnoreCase("false")) { + if (!"true".equalsIgnoreCase(useIterators) && !"false".equalsIgnoreCase(useIterators)) { throw new IllegalArgumentException("Expected value of true or false for " + AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY); } @@ -196,15 +227,15 @@ public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties.put(AccumuloSerDeParameters.AUTHORIZATIONS_KEY, authValue); } - log.info("Computed input job properties of " + jobProperties); + LOG.info("Computed input job properties of " + jobProperties); } @Override - public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { + public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { Properties props = tableDesc.getProperties(); // Adding these job properties will make them available to the OutputFormat in checkOutputSpecs - jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS, - props.getProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS)); + String colMap = props.getProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS); + jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS, colMap); String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME); if (null == tableName) { @@ -212,6 +243,19 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map } jobProperties.put(AccumuloSerDeParameters.TABLE_NAME, tableName); + String indexTable = props.getProperty(AccumuloIndexParameters.INDEXTABLE_NAME); + if (null == indexTable) { + indexTable = getIndexTableName(tableDesc); + } + + if ( null != indexTable) { + jobProperties.put(AccumuloIndexParameters.INDEXTABLE_NAME, indexTable); + + String indexColumns = props.getProperty(AccumuloIndexParameters.INDEXED_COLUMNS); + jobProperties.put(AccumuloIndexParameters.INDEXED_COLUMNS, + getIndexedColFamQuals(tableDesc, indexColumns, colMap)); + } + if (props.containsKey(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)) { jobProperties.put(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE, props.getProperty(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)); @@ -223,6 +267,42 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map } } + private String getIndexedColFamQuals(TableDesc tableDesc, String indexColumns, String colMap) { + StringBuilder sb = new StringBuilder(); + + String cols = indexColumns; + + + String hiveColString = tableDesc.getProperties().getProperty(serdeConstants.LIST_COLUMNS); + // if there are actual accumulo index columns defined then build + // the comma separated list of accumulo columns + if (cols == null || cols.isEmpty() || "*".equals(indexColumns)) { + // skip rowid + cols = hiveColString.substring(hiveColString.indexOf(',')+1); + } + + String[] hiveTypes = tableDesc.getProperties() + .getProperty(serdeConstants.LIST_COLUMN_TYPES).split(":"); + String[] accCols = colMap.split(","); + String[] hiveCols = hiveColString.split(","); + Set indexSet = new HashSet(); + + for (String idx : cols.split(",")) { + indexSet.add(idx.trim()); + } + + for (int i = 0; i < hiveCols.length; i++) { + if (indexSet.contains(hiveCols[i].trim())) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(accCols[i].trim() + ":" + AccumuloIndexLexicoder.getRawType(hiveTypes[i])); + } + } + + return sb.toString(); + } + @SuppressWarnings("rawtypes") @Override public Class getInputFormatClass() { @@ -242,7 +322,7 @@ public void preCreateTable(Table table) throws MetaException { throw new MetaException("Location can't be specified for Accumulo"); } - Map serdeParams = table.getSd().getSerdeInfo().getParameters(); + Map serdeParams = table.getSd().getSerdeInfo().getParameters(); String columnMapping = serdeParams.get(AccumuloSerDeParameters.COLUMN_MAPPINGS); if (columnMapping == null) { throw new MetaException(AccumuloSerDeParameters.COLUMN_MAPPINGS @@ -268,6 +348,16 @@ public void preCreateTable(Table table) throws MetaException { + " already exists in Accumulo. Use CREATE EXTERNAL TABLE to register with Hive."); } } + + String idxTable = getIndexTableName(table); + + if (idxTable != null && !idxTable.isEmpty()) { + + // create the index table if it does not exist + if (!tableOpts.exists(idxTable)) { + tableOpts.create(idxTable); + } + } } catch (AccumuloSecurityException e) { throw new MetaException(StringUtils.stringifyException(e)); } catch (TableExistsException e) { @@ -336,7 +426,7 @@ public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deseria if (serDe.getIteratorPushdown()) { return predicateHandler.decompose(conf, desc); } else { - log.info("Set to ignore Accumulo iterator pushdown, skipping predicate handler."); + LOG.info("Set to ignore Accumulo iterator pushdown, skipping predicate handler."); return null; } } @@ -348,22 +438,24 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { Utils.addDependencyJars(jobConf, Tracer.class, Fate.class, Connector.class, Main.class, ZooKeeper.class, AccumuloStorageHandler.class); } catch (IOException e) { - log.error("Could not add necessary Accumulo dependencies to classpath", e); + LOG.error("Could not add necessary Accumulo dependencies to classpath", e); } Properties tblProperties = tableDesc.getProperties(); AccumuloSerDeParameters serDeParams = null; try { - serDeParams = new AccumuloSerDeParameters(jobConf, tblProperties, AccumuloSerDe.class.getName()); + serDeParams = + new AccumuloSerDeParameters(jobConf, tblProperties, AccumuloSerDe.class.getName()); } catch (SerDeException e) { - log.error("Could not instantiate AccumuloSerDeParameters", e); + LOG.error("Could not instantiate AccumuloSerDeParameters", e); return; } try { serDeParams.getRowIdFactory().addDependencyJars(jobConf); } catch (IOException e) { - log.error("Could not add necessary dependencies for " + serDeParams.getRowIdFactory().getClass(), e); + LOG.error("Could not add necessary dependencies for " + + serDeParams.getRowIdFactory().getClass(), e); } // When Kerberos is enabled, we have to add the Accumulo delegation token to the @@ -383,25 +475,26 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { connectionParams.getAccumuloUserName(), token); } catch (IllegalStateException e) { // The implementation balks when this method is invoked multiple times - log.debug("Ignoring IllegalArgumentException about re-setting connector information"); + LOG.debug("Ignoring IllegalArgumentException about re-setting connector information"); } try { OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, jobConf, connectionParams.getAccumuloUserName(), token); } catch (IllegalStateException e) { // The implementation balks when this method is invoked multiple times - log.debug("Ignoring IllegalArgumentException about re-setting connector information"); + LOG.debug("Ignoring IllegalArgumentException about re-setting connector information"); } // Convert the Accumulo token in a Hadoop token Token accumuloToken = helper.getHadoopToken(token); - log.info("Adding Hadoop Token for Accumulo to Job's Credentials"); + LOG.info("Adding Hadoop Token for Accumulo to Job's Credentials"); // Add the Hadoop token to the JobConf helper.mergeTokenIntoJobConf(jobConf, accumuloToken); } catch (Exception e) { - throw new RuntimeException("Failed to obtain DelegationToken for " + connectionParams.getAccumuloUserName(), e); + throw new RuntimeException("Failed to obtain DelegationToken for " + + connectionParams.getAccumuloUserName(), e); } } } diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java new file mode 100644 index 0000000..51531d6 --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo.mr; + +import java.util.HashMap; +import java.util.Map; + +/** + * Index table definition. + */ +public class AccumuloIndexDefinition { + private final String baseTable; + private final String indexTable; + private final Map colMap; + + + public AccumuloIndexDefinition(String baseTable, String indexTable) { + this.colMap = new HashMap(); + this.baseTable = baseTable; + this.indexTable = indexTable; + } + + public String getBaseTable() { + return baseTable; + } + + + public String getIndexTable() { + return indexTable; + } + + public void addIndexCol(String cf, String cq, String colType) { + colMap.put(encode(cf, cq), colType); + } + + public Map getColumnMap() { + return colMap; + } + + public void setColumnTuples(String columns) { + if (columns != null) { + String cols = columns.trim(); + if (!cols.isEmpty() && !"*".equals(cols)) { + for (String col : cols.split(",")) { + String[] cfcqtp = col.trim().split(":"); + addIndexCol(cfcqtp[0], cfcqtp[1], cfcqtp[2]); + } + } + } + } + + public boolean contains(String cf, String cq) { + return colMap.containsKey(encode(cf, cq)); + } + + public String getColType(String cf, String cq) { + return colMap.get(encode(cf, cq)); + } + + private String encode(String cf, String cq) { + return cq + ":" + cq; + } +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java new file mode 100644 index 0000000..a055233 --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo.mr; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.accumulo.AccumuloIndexLexicoder; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Extension of AccumuloOutputFormat to support indexing. + */ +public class AccumuloIndexedOutputFormat extends AccumuloOutputFormat { + private static final Logger LOG = Logger.getLogger(AccumuloIndexedOutputFormat.class); + private static final Class CLASS = AccumuloOutputFormat.class; + private static final byte[] EMPTY_BYTES = new byte[0]; + + public static void setIndexTableName(JobConf job, String tableName) { + IndexOutputConfigurator.setIndexTableName(CLASS, job, tableName); + } + + protected static String getIndexTableName(JobConf job) { + return IndexOutputConfigurator.getIndexTableName(CLASS, job); + } + + public static void setIndexColumns(JobConf job, String fields) { + IndexOutputConfigurator.setIndexColumns(CLASS, job, fields); + } + + protected static String getIndexColumns(JobConf job) { + return IndexOutputConfigurator.getIndexColumns(CLASS, job); + } + + public static void setStringEncoding(JobConf job, Boolean isStringEncoding) { + IndexOutputConfigurator.setRecordEncoding(CLASS, job, isStringEncoding); + } + + protected static Boolean getStringEncoding(JobConf job) { + return IndexOutputConfigurator.getRecordEncoding(CLASS, job); + } + + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, + String name, Progressable progress) throws IOException { + try { + return new AccumuloIndexedOutputFormat.AccumuloRecordWriter(job); + } catch (Exception e) { + throw new IOException(e); + } + } + + protected static class AccumuloRecordWriter implements RecordWriter { + private MultiTableBatchWriter mtbw = null; + private Map bws = null; + private Text defaultTableName = null; + private Text indexTableName = null; + private boolean simulate = false; + private boolean createTables = false; + private boolean isStringEncoded = true; + private long mutCount = 0L; + private long valCount = 0L; + private Connector conn; + private AccumuloIndexDefinition indexDef = null; + + protected AccumuloRecordWriter(JobConf job) + throws AccumuloException, AccumuloSecurityException, IOException { + Level l = AccumuloIndexedOutputFormat.getLogLevel(job); + if (l != null) { + LOG.setLevel(AccumuloIndexedOutputFormat.getLogLevel(job)); + } + this.isStringEncoded = AccumuloIndexedOutputFormat.getStringEncoding(job).booleanValue(); + this.simulate = AccumuloIndexedOutputFormat.getSimulationMode(job).booleanValue(); + this.createTables = AccumuloIndexedOutputFormat.canCreateTables(job).booleanValue(); + if (this.simulate) { + LOG.info("Simulating output only. No writes to tables will occur"); + } + + this.bws = new HashMap(); + String tname = AccumuloIndexedOutputFormat.getDefaultTableName(job); + this.defaultTableName = tname == null ? null : new Text(tname); + + String iname = AccumuloIndexedOutputFormat.getIndexTableName(job); + if (iname != null) { + LOG.info("Index Table = " + iname); + this.indexTableName = new Text(iname); + this.indexDef = createIndexDefinition(job, tname, iname); + } + if (!this.simulate) { + this.conn = AccumuloIndexedOutputFormat.getInstance(job) + .getConnector(AccumuloIndexedOutputFormat.getPrincipal(job), + AccumuloIndexedOutputFormat.getAuthenticationToken(job)); + this.mtbw = this.conn.createMultiTableBatchWriter( + AccumuloIndexedOutputFormat.getBatchWriterOptions(job)); + } + } + + AccumuloIndexDefinition createIndexDefinition(JobConf job, String tname, String iname) { + AccumuloIndexDefinition def = new AccumuloIndexDefinition(tname, iname); + String cols = AccumuloIndexedOutputFormat.getIndexColumns(job); + LOG.info("Index Cols = " + cols); + def.setColumnTuples(cols); + return def; + } + + public void write(Text table, Mutation mutation) throws IOException { + if(table == null || table.toString().isEmpty()) { + table = this.defaultTableName; + } + + if(!this.simulate && table == null) { + throw new IOException("No table or default table specified. Try simulation mode next time"); + } else { + ++this.mutCount; + this.valCount += (long)mutation.size(); + this.printMutation(table, mutation); + if(!this.simulate) { + if(!this.bws.containsKey(table)) { + try { + this.addTable(table); + } catch (Exception var5) { + LOG.error(var5); + throw new IOException(var5); + } + } + if(indexTableName != null && !this.bws.containsKey(indexTableName)) { + try { + this.addTable(indexTableName); + } catch (Exception var6) { + LOG.error(var6); + throw new IOException(var6); + } + } + + try { + ((BatchWriter)this.bws.get(table)).addMutation(mutation); + } catch (MutationsRejectedException var4) { + throw new IOException(var4); + } + + // if this table has an associated index table then attempt to build + // index mutations + if (indexTableName != null) { + List idxMuts = getIndexMutations(mutation); + if (!idxMuts.isEmpty()) { + try { + BatchWriter writer = this.bws.get(indexTableName); + for (Mutation m : idxMuts) { + writer.addMutation(m); + } + } catch (MutationsRejectedException var4) { + throw new IOException(var4); + } + } + } + } + } + } + + public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException { + if(this.simulate) { + LOG.info("Simulating adding table: " + tableName); + } else { + LOG.debug("Adding table: " + tableName); + BatchWriter bw = null; + String table = tableName.toString(); + if(this.createTables && !this.conn.tableOperations().exists(table)) { + try { + this.conn.tableOperations().create(table); + } catch (AccumuloSecurityException var8) { + LOG.error("Accumulo security violation creating " + table, var8); + throw var8; + } catch (TableExistsException var9) { + LOG.warn("Table Exists " + table, var9); + } + } + + try { + bw = this.mtbw.getBatchWriter(table); + } catch (TableNotFoundException var5) { + LOG.error("Accumulo table " + table + " doesn't exist and cannot be created.", var5); + throw new AccumuloException(var5); + } + + if(bw != null) { + this.bws.put(tableName, bw); + } + + } + } + + private int printMutation(Text table, Mutation m) { + if(LOG.isTraceEnabled()) { + LOG.trace(String.format("Table %s row key: %s", + new Object[]{table, this.hexDump(m.getRow())})); + Iterator itr = m.getUpdates().iterator(); + + while(itr.hasNext()) { + ColumnUpdate cu = (ColumnUpdate)itr.next(); + LOG.trace(String.format("Table %s column: %s:%s", + new Object[]{table, this.hexDump(cu.getColumnFamily()), + this.hexDump(cu.getColumnQualifier())})); + LOG.trace(String.format("Table %s security: %s", + new Object[]{table, (new ColumnVisibility(cu.getColumnVisibility())).toString()})); + LOG.trace(String.format("Table %s value: %s", + new Object[]{table, this.hexDump(cu.getValue())})); + } + } + + return m.getUpdates().size(); + } + + private List getIndexMutations(Mutation baseMut) { + List indexMuts = new ArrayList(); + + // nothing to do if there is not a index definition for this table + if (null != indexDef) { + + byte[] rowId = baseMut.getRow(); + + + for (ColumnUpdate cu : baseMut.getUpdates()) { + String cf = new String(cu.getColumnFamily()); + String cq = new String(cu.getColumnQualifier()); + + // if this columnFamily/columnQualifier pair is defined in the index build a new mutation + // so key=value, cf=columnFamily_columnQualifer, cq=rowKey, cv=columnVisibility value=[] + String colType = indexDef.getColType(cf, cq); + if (colType != null) { + LOG.trace(String.format("Building index for column %s:%s", new Object[]{cf, cq})); + Mutation m = new Mutation(AccumuloIndexLexicoder.encodeValue(cu.getValue(), colType, + isStringEncoded)); + String colFam = cf + "_" + cq; + m.put(colFam.getBytes(), rowId, new ColumnVisibility(cu.getColumnVisibility()), + EMPTY_BYTES); + indexMuts.add(m); + } + } + } + return indexMuts; + } + + private String hexDump(byte[] ba) { + StringBuilder sb = new StringBuilder(); + byte[] arr = ba; + int len = ba.length; + + for(int i = 0; i < len; ++i) { + byte b = arr[i]; + if(b > 32 && b < 126) { + sb.append((char)b); + } else { + sb.append(String.format("x%02x", new Object[]{Byte.valueOf(b)})); + } + } + + return sb.toString(); + } + + public void close(Reporter reporter) throws IOException { + LOG.debug("mutations written: " + this.mutCount + ", values written: " + this.valCount); + if(!this.simulate) { + try { + this.mtbw.close(); + } catch (MutationsRejectedException var7) { + if(var7.getAuthorizationFailuresMap().size() >= 0) { + Map tables = new HashMap(); + + Map.Entry ke; + Object secCodes; + for(Iterator itr = var7.getAuthorizationFailuresMap().entrySet().iterator(); + itr.hasNext(); ((Set)secCodes).addAll((Collection)ke.getValue())) { + ke = (Map.Entry)itr.next(); + secCodes = (Set)tables.get(((KeyExtent)ke.getKey()).getTableId().toString()); + if(secCodes == null) { + secCodes = new HashSet(); + tables.put(((KeyExtent)ke.getKey()).getTableId().toString(), secCodes); + } + } + + LOG.error("Not authorized to write to tables : " + tables); + } + + if(var7.getConstraintViolationSummaries().size() > 0) { + LOG.error("Constraint violations : " + var7.getConstraintViolationSummaries().size()); + } + throw new IOException(var7); + } + } + } + } +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java index 3ae5431..bfa764a 100644 --- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java @@ -1,10 +1,11 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.accumulo.mr; import java.io.IOException; @@ -27,8 +29,11 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; +import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; +import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters; import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper; import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; @@ -42,7 +47,7 @@ /** * */ -public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { +public class HiveAccumuloTableOutputFormat extends AccumuloIndexedOutputFormat { protected final HiveAccumuloHelper helper = new HiveAccumuloHelper(); @@ -54,7 +59,8 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException } @Override - public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, + Progressable progress) throws IOException { configureAccumuloOutputFormat(job); return super.getRecordWriter(ignored, job, name, progress); @@ -117,6 +123,16 @@ protected void configureAccumuloOutputFormat(JobConf job) throws IOException { // Set the table where we're writing this data setDefaultAccumuloTableName(job, tableName); + + // Set the index table information + final String indexTableName = job.get(AccumuloIndexParameters.INDEXTABLE_NAME); + final String indexedColumns = job.get(AccumuloIndexParameters.INDEXED_COLUMNS); + final String columnTypes = job.get(serdeConstants.LIST_COLUMN_TYPES); + final boolean binaryEncoding = ColumnEncoding.BINARY.getName() + .equalsIgnoreCase(job.get(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)); + setAccumuloIndexTableName(job, indexTableName); + setAccumuloIndexColumns(job, indexedColumns); + setAccumuloStringEncoding(job, !binaryEncoding); } catch (AccumuloSecurityException e) { log.error("Could not connect to Accumulo with provided credentials", e); throw new IOException(e); @@ -125,10 +141,10 @@ protected void configureAccumuloOutputFormat(JobConf job) throws IOException { // Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing - protected void setConnectorInfoWithErrorChecking(JobConf conf, String username, AuthenticationToken token) - throws AccumuloSecurityException { + protected void setConnectorInfoWithErrorChecking(JobConf conf, String username, + AuthenticationToken token) throws AccumuloSecurityException { try { - AccumuloOutputFormat.setConnectorInfo(conf, username, token); + AccumuloIndexedOutputFormat.setConnectorInfo(conf, username, token); } catch (IllegalStateException e) { // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e); @@ -136,8 +152,8 @@ protected void setConnectorInfoWithErrorChecking(JobConf conf, String username, } @SuppressWarnings("deprecation") - protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instanceName, String zookeepers, - boolean isSasl) throws IOException { + protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instanceName, + String zookeepers, boolean isSasl) throws IOException { try { if (isSasl) { // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped @@ -146,7 +162,7 @@ protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instan getHelper().setZooKeeperInstance(conf, AccumuloOutputFormat.class, zookeepers, instanceName, isSasl); } else { - AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); + AccumuloIndexedOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); } } catch (IllegalStateException ise) { // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. @@ -157,7 +173,7 @@ protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instan protected void setMockInstanceWithErrorChecking(JobConf conf, String instanceName) { try { - AccumuloOutputFormat.setMockInstance(conf, instanceName); + AccumuloIndexedOutputFormat.setMockInstance(conf, instanceName); } catch (IllegalStateException e) { // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. log.debug("Ignoring exception setting mock instance of " + instanceName, e); @@ -165,7 +181,19 @@ protected void setMockInstanceWithErrorChecking(JobConf conf, String instanceNam } protected void setDefaultAccumuloTableName(JobConf conf, String tableName) { - AccumuloOutputFormat.setDefaultTableName(conf, tableName); + AccumuloIndexedOutputFormat.setDefaultTableName(conf, tableName); + } + + protected void setAccumuloIndexTableName(JobConf conf, String indexTableName) { + AccumuloIndexedOutputFormat.setIndexTableName(conf, indexTableName); + } + + protected void setAccumuloIndexColumns(JobConf conf, String indexColumns) { + AccumuloIndexedOutputFormat.setIndexColumns(conf, indexColumns); + } + + protected void setAccumuloStringEncoding(JobConf conf, Boolean isStringEncoded) { + AccumuloIndexedOutputFormat.setStringEncoding(conf, isStringEncoded); } HiveAccumuloHelper getHelper() { diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java new file mode 100644 index 0000000..98294bb --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo.mr; + +import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; +import org.apache.hadoop.conf.Configuration; + +/** + * Extension of OutputConfigurtion to support indexing. + */ +public class IndexOutputConfigurator extends OutputConfigurator { + /** + * Accumulo Write options. + */ + public static enum WriteOpts { + DEFAULT_TABLE_NAME, + INDEX_TABLE_NAME, + INDEX_COLUMNS, + COLUMN_TYPES, + BINARY_ENCODING, + BATCH_WRITER_CONFIG; + + private WriteOpts() { + } + } + + public static void setIndexTableName(Class implementingClass, Configuration conf, + String tableName) { + if(tableName != null) { + conf.set(enumToConfKey(implementingClass, WriteOpts.INDEX_TABLE_NAME), tableName); + } + } + + public static String getIndexTableName(Class implementingClass, Configuration conf) { + return conf.get(enumToConfKey(implementingClass, WriteOpts.INDEX_TABLE_NAME)); + } + + public static void setIndexColumns(Class implementingClass, Configuration conf, + String tableName) { + if(tableName != null) { + conf.set(enumToConfKey(implementingClass, WriteOpts.INDEX_COLUMNS), tableName); + } + } + + public static String getIndexColumns(Class implementingClass, Configuration conf) { + return conf.get(enumToConfKey(implementingClass, WriteOpts.INDEX_COLUMNS)); + } + + + public static void setRecordEncoding(Class implementingClass, Configuration conf, + Boolean isBinary) { + conf.set(enumToConfKey(implementingClass, WriteOpts.BINARY_ENCODING), isBinary.toString()); + } + + public static Boolean getRecordEncoding(Class implementingClass, Configuration conf) { + return Boolean.valueOf(conf.get(enumToConfKey(implementingClass, WriteOpts.BINARY_ENCODING))); + } + +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java new file mode 100644 index 0000000..599b1ea --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java @@ -0,0 +1,4 @@ +/** + * map reduce and supporting classes + */ +package org.apache.hadoop.hive.accumulo.mr; \ No newline at end of file diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java index a7ec7c5..718a5c5 100644 --- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java @@ -1,10 +1,11 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -29,6 +30,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; import org.apache.hadoop.hive.accumulo.columns.ColumnMapper; import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping; import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp; @@ -86,13 +88,13 @@ private static final List TOTAL_RANGE = Collections.singletonList(new Range()); private static AccumuloPredicateHandler handler = new AccumuloPredicateHandler(); - private static Map> compareOps = Maps.newHashMap(); - private static Map> pComparisons = Maps.newHashMap(); + private static Map> compareOps = Maps.newHashMap(); + private static Map> pComparisons = Maps.newHashMap(); // Want to start sufficiently "high" enough in the iterator stack private static int iteratorCount = 50; - private static final Logger log = LoggerFactory.getLogger(AccumuloPredicateHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(AccumuloPredicateHandler.class); static { compareOps.put(GenericUDFOPEqual.class.getName(), Equal.class); compareOps.put(GenericUDFOPNotEqual.class.getName(), NotEqual.class); @@ -136,8 +138,9 @@ public static AccumuloPredicateHandler getInstance() { */ public Class getCompareOpClass(String udfType) throws NoSuchCompareOpException { - if (!compareOps.containsKey(udfType)) + if (!compareOps.containsKey(udfType)) { throw new NoSuchCompareOpException("Null compare op for specified key: " + udfType); + } return compareOps.get(udfType); } @@ -167,9 +170,10 @@ public CompareOp getCompareOp(String udfType, IndexSearchCondition sc) */ public Class getPrimitiveComparisonClass(String type) throws NoSuchPrimitiveComparisonException { - if (!pComparisons.containsKey(type)) + if (!pComparisons.containsKey(type)) { throw new NoSuchPrimitiveComparisonException("Null primitive comparison for specified key: " + type); + } return pComparisons.get(type); } @@ -196,7 +200,8 @@ private AccumuloPredicateHandler() {} /** * Loop through search conditions and build ranges for predicates involving rowID column, if any. */ - public List getRanges(Configuration conf, ColumnMapper columnMapper) throws SerDeException { + public List getRanges(Configuration conf, ColumnMapper columnMapper) + throws SerDeException { if (!columnMapper.hasRowIdMapping()) { return TOTAL_RANGE; } @@ -218,16 +223,16 @@ private AccumuloPredicateHandler() {} return TOTAL_RANGE; } - Object result = generateRanges(columnMapper, hiveRowIdColumnName, root); + Object result = generateRanges(conf, columnMapper, hiveRowIdColumnName, root); if (null == result) { - log.info("Calculated null set of ranges, scanning full table"); + LOG.info("Calculated null set of ranges, scanning full table"); return TOTAL_RANGE; } else if (result instanceof Range) { - log.info("Computed a single Range for the query: " + result); + LOG.info("Computed a single Range for the query: " + result); return Collections.singletonList((Range) result); } else if (result instanceof List) { - log.info("Computed a collection of Ranges for the query: " + result); + LOG.info("Computed a collection of Ranges for the query: " + result); @SuppressWarnings("unchecked") List ranges = (List) result; return ranges; @@ -237,9 +242,11 @@ private AccumuloPredicateHandler() {} } /** - * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo - * Ranges using expressions involving the Accumulo rowid-mapped Hive column + * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo. + * Ranges using expressions involving the Accumulo rowid-mapped Hive column. * + * @param conf + * Hadoop configuration * @param columnMapper * Mapping of Hive to Accumulo columns for the query * @param hiveRowIdColumnName @@ -249,15 +256,16 @@ private AccumuloPredicateHandler() {} * @return An object representing the result from the ExprNodeDesc tree traversal using the * AccumuloRangeGenerator */ - protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColumnName, ExprNodeDesc root) { - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, + protected Object generateRanges(Configuration conf, ColumnMapper columnMapper, + String hiveRowIdColumnName, ExprNodeDesc root) { + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, columnMapper.getRowIdMapping(), hiveRowIdColumnName); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, - Collections. emptyMap(), null); + Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); - ArrayList roots = new ArrayList(); + List roots = new ArrayList(); roots.add(root); - HashMap nodeOutput = new HashMap(); + HashMap nodeOutput = new HashMap(); try { ogw.startWalking(roots, nodeOutput); @@ -282,10 +290,13 @@ protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColum boolean shouldPushdown = conf.getBoolean(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY, AccumuloSerDeParameters.ITERATOR_PUSHDOWN_DEFAULT); if (!shouldPushdown) { - log.info("Iterator pushdown is disabled for this table"); + LOG.info("Iterator pushdown is disabled for this table"); return itrs; } + boolean binaryEncodedRow = ColumnEncoding.BINARY.getName(). + equalsIgnoreCase(conf.get(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)); + int rowIdOffset = columnMapper.getRowIdOffset(); String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS); @@ -306,11 +317,12 @@ protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColum if (hiveRowIdColumnName == null || !hiveRowIdColumnName.equals(col)) { HiveAccumuloColumnMapping mapping = (HiveAccumuloColumnMapping) columnMapper .getColumnMappingForHiveColumn(hiveColumnNames, col); - itrs.add(toSetting(mapping, sc)); + itrs.add(toSetting(mapping, sc, binaryEncodedRow)); } } - if (log.isInfoEnabled()) - log.info("num iterators = " + itrs.size()); + if (LOG.isInfoEnabled()) { + LOG.info("num iterators = " + itrs.size()); + } return itrs; } @@ -322,15 +334,19 @@ protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColum * ColumnMapping to filter * @param sc * IndexSearchCondition + * @param binaryEncodedValues + * flag for binary encodedValues * @return IteratorSetting * @throws SerDeException */ public IteratorSetting toSetting(HiveAccumuloColumnMapping accumuloColumnMapping, - IndexSearchCondition sc) throws SerDeException { + IndexSearchCondition sc, boolean binaryEncodedValues) throws SerDeException { iteratorCount++; final IteratorSetting is = new IteratorSetting(iteratorCount, - PrimitiveComparisonFilter.FILTER_PREFIX + iteratorCount, PrimitiveComparisonFilter.class); - final String type = sc.getColumnDesc().getTypeString(); + PrimitiveComparisonFilter.FILTER_PREFIX + iteratorCount, + PrimitiveComparisonFilter.class); + final String type = binaryEncodedValues ? sc.getColumnDesc().getTypeString() + : ColumnEncoding.STRING.getName(); final String comparisonOpStr = sc.getComparisonOp(); PushdownTuple tuple; @@ -355,8 +371,9 @@ public IteratorSetting toSetting(HiveAccumuloColumnMapping accumuloColumnMapping public ExprNodeDesc getExpression(Configuration conf) { String filteredExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - if (filteredExprSerialized == null) + if (filteredExprSerialized == null) { return null; + } return SerializationUtilities.deserializeExpression(filteredExprSerialized); } @@ -375,8 +392,9 @@ public ExprNodeDesc getExpression(Configuration conf) { } IndexPredicateAnalyzer analyzer = newAnalyzer(conf); ExprNodeDesc residual = analyzer.analyzePredicate(filterExpr, sConditions); - if (residual != null) + if (residual != null) { throw new RuntimeException("Unexpected residual predicate: " + residual.getExprString()); + } return sConditions; } @@ -394,8 +412,7 @@ public DecomposedPredicate decompose(Configuration conf, ExprNodeDesc desc) { ExprNodeDesc residualPredicate = analyzer.analyzePredicate(desc, sConditions); if (sConditions.size() == 0) { - if (log.isInfoEnabled()) - log.info("nothing to decompose. Returning"); + LOG.info("nothing to decompose. Returning"); return null; } diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java index 21392d1..afdc647 100644 --- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java @@ -1,10 +1,11 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.accumulo.predicate; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Stack; +package org.apache.hadoop.hive.accumulo.predicate; import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters; +import org.apache.hadoop.hive.accumulo.AccumuloIndexScanner; +import org.apache.hadoop.hive.accumulo.AccumuloIndexScannerException; +import org.apache.hadoop.hive.accumulo.AccumuloIndexLexicoder; import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping; import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp; import org.apache.hadoop.hive.accumulo.predicate.compare.Equal; @@ -43,17 +44,19 @@ import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.UTF8; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +import static java.nio.charset.StandardCharsets.UTF_8; + /** * */ @@ -63,12 +66,27 @@ private final AccumuloPredicateHandler predicateHandler; private final HiveAccumuloRowIdColumnMapping rowIdMapping; private final String hiveRowIdColumnName; + private AccumuloIndexScanner indexScanner; - public AccumuloRangeGenerator(AccumuloPredicateHandler predicateHandler, + public AccumuloRangeGenerator(Configuration conf, AccumuloPredicateHandler predicateHandler, HiveAccumuloRowIdColumnMapping rowIdMapping, String hiveRowIdColumnName) { this.predicateHandler = predicateHandler; this.rowIdMapping = rowIdMapping; this.hiveRowIdColumnName = hiveRowIdColumnName; + try { + this.indexScanner = new AccumuloIndexParameters(conf).createScanner(); + } catch (AccumuloIndexScannerException e) { + LOG.error(e.getLocalizedMessage(), e); + this.indexScanner = null; + } + } + + public AccumuloIndexScanner getIndexScanner() { + return indexScanner; + } + + public void setIndexScanner(AccumuloIndexScanner indexScanner) { + this.indexScanner = indexScanner; } @Override @@ -234,13 +252,39 @@ protected Object processExpression(ExprNodeGenericFuncDesc func, Object[] nodeOu return null; } - // Reject any clauses that are against a column that isn't the rowId mapping + ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector(); + + // Reject any clauses that are against a column that isn't the rowId mapping or indexed if (!this.hiveRowIdColumnName.equals(columnDesc.getColumn())) { + if (this.indexScanner != null && this.indexScanner.isIndexed(columnDesc.getColumn())) { + return getIndexedRowIds(genericUdf, leftHandNode, columnDesc.getColumn(), objInspector); + } return null; } - ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector(); + Text constText = getConstantText(objInspector); + + return getRange(genericUdf, leftHandNode, constText); + } + + private Range getRange(GenericUDF genericUdf, ExprNodeDesc leftHandNode, Text constText) { + Class opClz; + try { + opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName()); + } catch (NoSuchCompareOpException e) { + throw new IllegalArgumentException("Unhandled UDF class: " + genericUdf.getUdfName()); + } + + if (leftHandNode instanceof ExprNodeConstantDesc) { + return getConstantOpColumnRange(opClz, constText); + } else if (leftHandNode instanceof ExprNodeColumnDesc) { + return getColumnOpConstantRange(opClz, constText); + } else { + throw new IllegalStateException("Expected column or constant on LHS of expression"); + } + } + private Text getConstantText(ConstantObjectInspector objInspector) throws SemanticException { Text constText; switch (rowIdMapping.getEncoding()) { case STRING: @@ -257,21 +301,7 @@ protected Object processExpression(ExprNodeGenericFuncDesc func, Object[] nodeOu throw new SemanticException("Unable to parse unknown encoding: " + rowIdMapping.getEncoding()); } - - Class opClz; - try { - opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName()); - } catch (NoSuchCompareOpException e) { - throw new IllegalArgumentException("Unhandled UDF class: " + genericUdf.getUdfName()); - } - - if (leftHandNode instanceof ExprNodeConstantDesc) { - return getConstantOpColumnRange(opClz, constText); - } else if (leftHandNode instanceof ExprNodeColumnDesc) { - return getColumnOpConstantRange(opClz, constText); - } else { - throw new IllegalStateException("Expected column or constant on LHS of expression"); - } + return constText; } protected Range getConstantOpColumnRange(Class opClz, Text constText) { @@ -311,6 +341,21 @@ protected Range getColumnOpConstantRange(Class opClz, Text } } + + protected Object getIndexedRowIds(GenericUDF genericUdf, ExprNodeDesc leftHandNode, + String columnName, ConstantObjectInspector objInspector) + throws SemanticException { + Text constText = getConstantText(objInspector); + byte[] value = constText.toString().getBytes(UTF_8); + byte[] encoded = AccumuloIndexLexicoder.encodeValue(value, objInspector.getTypeName(), true); + Range range = getRange(genericUdf, leftHandNode, new Text(encoded)); + if (indexScanner != null) { + return indexScanner.getIndexRowRanges(columnName, range); + } + return null; + } + + protected Text getUtf8Value(ConstantObjectInspector objInspector) { // TODO is there a more correct way to get the literal value for the Object? return new Text(objInspector.getWritableConstantValue().toString()); @@ -327,7 +372,7 @@ protected Text getBinaryValue(ConstantObjectInspector objInspector) throws IOExc ByteArrayOutputStream out = new ByteArrayOutputStream(); if (objInspector instanceof PrimitiveObjectInspector) { LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(), - (PrimitiveObjectInspector) objInspector); + (PrimitiveObjectInspector) objInspector); } else { return getUtf8Value(objInspector); } diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java index 17d5529..5121ea3 100644 --- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp; import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +53,7 @@ */ public class PrimitiveComparisonFilter extends WholeRowIterator { @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(PrimitiveComparisonFilter.class); + private static final Logger LOG = LoggerFactory.getLogger(PrimitiveComparisonFilter.class); public static final String FILTER_PREFIX = "accumulo.filter.compare.iterator."; public static final String P_COMPARE_CLASS = "accumulo.filter.iterator.p.compare.class"; @@ -68,7 +67,7 @@ @Override protected boolean filter(Text currentRow, List keys, List values) { - SortedMap items; + SortedMap items; boolean allow; try { // if key doesn't contain CF, it's an encoded value from a previous iterator. while (keys.get(0).getColumnFamily().getBytes().length == 0) { @@ -103,11 +102,11 @@ private boolean matchQualAndFam(Key k) { } @Override - public void init(SortedKeyValueIterator source, Map options, + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { super.init(source, options, env); String serializedColumnMapping = options.get(COLUMN); - Entry pair = ColumnMappingFactory.parseMapping(serializedColumnMapping); + Entry pair = ColumnMappingFactory.parseMapping(serializedColumnMapping); // The ColumnEncoding, column name and type are all irrelevant at this point, just need the // cf:[cq] @@ -135,7 +134,7 @@ public void init(SortedKeyValueIterator source, Map op } } - protected byte[] getConstant(Map options) { + protected byte[] getConstant(Map options) { String b64Const = options.get(CONST_VAL); return Base64.decodeBase64(b64Const.getBytes()); } diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java new file mode 100644 index 0000000..d295c7b --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo.serde; + +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.AccumuloDefaultIndexScanner; +import org.apache.hadoop.hive.accumulo.AccumuloIndexScanner; +import org.apache.hadoop.hive.accumulo.AccumuloIndexScannerException; + +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Set; + + +/** + * Accumulo Index Parameters for Hive tables. + */ +public class AccumuloIndexParameters { + public static final int DEFAULT_MAX_ROWIDS = 20000; + public static final String INDEX_SCANNER = "accumulo.index.scanner"; + public static final String MAX_INDEX_ROWS = "accumulo.index.rows.max"; + public static final String INDEXED_COLUMNS = "accumulo.indexed.columns"; + public static final String INDEXTABLE_NAME = "accumulo.indextable.name"; + private static final Set EMPTY_SET = new HashSet(); + private Configuration conf; + + public AccumuloIndexParameters(Configuration conf) { + this.conf = conf; + } + + public String getIndexTable() { + return this.conf.get(INDEXTABLE_NAME); + } + + public int getMaxIndexRows() { + return this.conf.getInt(MAX_INDEX_ROWS, DEFAULT_MAX_ROWIDS); + } + + public final Set getIndexColumns() { + String colmap = conf.get(INDEXED_COLUMNS); + if (colmap != null) { + Set cols = new HashSet(); + for (String col : colmap.split(",")) { + cols.add(col.trim()); + } + return cols; + } + return EMPTY_SET; + } + + + public final Authorizations getTableAuths() { + String auths = conf.get(AccumuloSerDeParameters.AUTHORIZATIONS_KEY); + if (auths != null && !auths.isEmpty()) { + return new Authorizations(auths.trim().getBytes(StandardCharsets.UTF_8)); + } + return new Authorizations(); + } + + public Configuration getConf() { + return conf; + } + + public final AccumuloIndexScanner createScanner() throws AccumuloIndexScannerException { + AccumuloIndexScanner handler; + + String classname = conf.get(INDEX_SCANNER); + if (classname != null) { + try { + handler = (AccumuloIndexScanner) Class.forName(classname).newInstance(); + } catch (ClassCastException | InstantiationException | IllegalAccessException + | ClassNotFoundException e) { + throw new AccumuloIndexScannerException("Cannot use index scanner class: " + classname, e); + } + } else { + handler = new AccumuloDefaultIndexScanner(); + } + if (handler != null) { + handler.init(conf); + } + return handler; + } +} diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java index 09c5f24..ef454f0 100644 --- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java @@ -17,9 +17,11 @@ package org.apache.hadoop.hive.accumulo.serde; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.NoSuchElementException; import java.util.Properties; +import java.util.Set; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; @@ -58,12 +60,21 @@ public static final String COMPOSITE_ROWID_FACTORY = "accumulo.composite.rowid.factory"; public static final String COMPOSITE_ROWID_CLASS = "accumulo.composite.rowid"; + public static final int DEFAULT_MAX_ROWIDS = 20000; + public static final String INDEX_SCANNER = "accumulo.index.scanner"; + public static final String MAX_INDEX_ROWS = "accumulo.index.rows.max"; + public static final String INDEXED_COLUMNS = "accumulo.indexed.columns"; + public static final String INDEXTABLE_NAME = "accumulo.indextable.name"; + private static final Set EMPTY_SET = new HashSet(); + + protected final ColumnMapper columnMapper; private Properties tableProperties; private String serdeName; private LazySerDeParameters lazySerDeParameters; + private AccumuloIndexParameters indexParams; private AccumuloRowIdFactory rowIdFactory; public AccumuloSerDeParameters(Configuration conf, Properties tableProperties, String serdeName) @@ -73,6 +84,7 @@ public AccumuloSerDeParameters(Configuration conf, Properties tableProperties, S this.serdeName = serdeName; lazySerDeParameters = new LazySerDeParameters(conf, tableProperties, serdeName); + indexParams = new AccumuloIndexParameters(conf); // The default encoding for this table when not otherwise specified String defaultStorage = tableProperties.getProperty(DEFAULT_STORAGE_TYPE); @@ -135,10 +147,17 @@ protected AccumuloRowIdFactory createRowIdFactory(Configuration job, Properties return new DefaultAccumuloRowIdFactory(); } + public AccumuloIndexParameters getIndexParams() { + return indexParams; + } + public LazySerDeParameters getSerDeParameters() { + return lazySerDeParameters; } + + public Properties getTableProperties() { return tableProperties; } diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java new file mode 100644 index 0000000..7311e87 --- /dev/null +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java @@ -0,0 +1,4 @@ +/** + * accumulo serde classes + */ +package org.apache.hadoop.hive.accumulo.serde; \ No newline at end of file diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java new file mode 100644 index 0000000..7d6cc0e --- /dev/null +++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestAccumuloDefaultIndexScanner { + private static final Logger LOG = LoggerFactory.getLogger(TestAccumuloDefaultIndexScanner.class); + private static final Value EMPTY_VALUE = new Value(); + + private static void addRow(BatchWriter writer, String rowId, String cf, String cq) throws MutationsRejectedException { + Mutation mut = new Mutation(rowId); + mut.put(new Text(cf), new Text(cq), EMPTY_VALUE); + writer.addMutation(mut); + } + + private static void addRow(BatchWriter writer, Integer rowId, String cf, String cq) throws MutationsRejectedException { + Mutation mut = new Mutation(AccumuloIndexLexicoder.encodeValue(String.valueOf(rowId).getBytes(), "int", true)); + mut.put(new Text(cf), new Text(cq), EMPTY_VALUE); + writer.addMutation(mut); + } + + private static void addRow(BatchWriter writer, boolean rowId, String cf, String cq) throws MutationsRejectedException { + Mutation mut = new Mutation(String.valueOf(rowId)); + mut.put(new Text(cf), new Text(cq), EMPTY_VALUE); + writer.addMutation(mut); + } + + public static AccumuloDefaultIndexScanner buildMockHandler(int maxMatches) { + try { + String table = "table"; + Text emptyText = new Text(""); + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, table); + conf.setInt(AccumuloIndexParameters.MAX_INDEX_ROWS, maxMatches); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "*"); + conf.set(serdeConstants.LIST_COLUMNS, "rid,name,age,cars,mgr"); + conf.set(AccumuloSerDeParameters.COLUMN_MAPPINGS, ":rowId,name:name,age:age,cars:cars,mgr:mgr"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + + MockInstance inst = new MockInstance("test_instance"); + Connector conn = inst.getConnector("root", new PasswordToken("")); + if (!conn.tableOperations().exists(table)) { + conn.tableOperations().create(table); + BatchWriterConfig batchConfig = new BatchWriterConfig(); + BatchWriter writer = conn.createBatchWriter(table, batchConfig); + addRow(writer, "fred", "name_name", "row1"); + addRow(writer, "25", "age_age", "row1"); + addRow(writer, 5, "cars_cars", "row1"); + addRow(writer, true, "mgr_mgr", "row1"); + addRow(writer, "bill", "name_name", "row2"); + addRow(writer, "20", "age_age", "row2"); + addRow(writer, 2, "cars_cars", "row2"); + addRow(writer, false, "mgr_mgr", "row2"); + addRow(writer, "sally", "name_name", "row3"); + addRow(writer, "23", "age_age", "row3"); + addRow(writer, 6, "cars_cars", "row3"); + addRow(writer, true, "mgr_mgr", "row3"); + addRow(writer, "rob", "name_name", "row4"); + addRow(writer, "60", "age_age", "row4"); + addRow(writer, 1, "cars_cars", "row4"); + addRow(writer, false, "mgr_mgr", "row4"); + writer.close(); + } + AccumuloConnectionParameters connectionParams = Mockito + .mock(AccumuloConnectionParameters.class); + AccumuloStorageHandler storageHandler = Mockito.mock(AccumuloStorageHandler.class); + + Mockito.when(connectionParams.getConnector()).thenReturn(conn); + handler.setConnectParams(connectionParams); + return handler; + } catch (AccumuloSecurityException | AccumuloException | TableExistsException | TableNotFoundException e) { + LOG.error(e.getLocalizedMessage(), e); + } + return null; + } + + @Test + public void testMatchNone() { + AccumuloDefaultIndexScanner handler = buildMockHandler(10); + List ranges = handler.getIndexRowRanges("name", new Range("mike")); + assertEquals(0, ranges.size()); + } + + @Test + public void testMatchRange() { + AccumuloDefaultIndexScanner handler = buildMockHandler(10); + List ranges = handler.getIndexRowRanges("age", new Range("10", "50")); + assertEquals(3, ranges.size()); + assertTrue("does not contain row1", ranges.contains(new Range("row1"))); + assertTrue("does not contain row2", ranges.contains(new Range("row2"))); + assertTrue("does not contain row3", ranges.contains(new Range("row3"))); + } + + public void testTooManyMatches() { + AccumuloDefaultIndexScanner handler = buildMockHandler(2); + List ranges = handler.getIndexRowRanges("age", new Range("10", "50")); + assertNull("ranges should be null", ranges); + } + + @Test + public void testMatchExact() { + AccumuloDefaultIndexScanner handler = buildMockHandler(10); + List ranges = handler.getIndexRowRanges("age", new Range("20")); + assertEquals(1, ranges.size()); + assertTrue("does not contain row2", ranges.contains(new Range("row2"))); + } + + @Test + public void testValidIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "name,age,phone,email"); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertTrue("name is not identified as an index", handler.isIndexed("name")); + assertTrue("age is not identified as an index", handler.isIndexed("age")); + assertTrue("phone is not identified as an index", handler.isIndexed("phone")); + assertTrue("email is not identified as an index", handler.isIndexed("email")); + } + + @Test + public void testInvalidIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "name,age,phone,email"); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertFalse("mobile is identified as an index", handler.isIndexed("mobile")); + assertFalse("mail is identified as an index", handler.isIndexed("mail")); + } + + + @Test + public void testMissingTable() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "name,age,phone,email"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertFalse("name is identified as an index", handler.isIndexed("name")); + assertFalse("age is identified as an index", handler.isIndexed("age")); + } + + @Test + public void testWildcardIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "*"); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertTrue("name is not identified as an index", handler.isIndexed("name")); + assertTrue("age is not identified as an index", handler.isIndexed("age")); + } + + @Test + public void testNullIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertTrue("name is not identified as an index", handler.isIndexed("name")); + } + + @Test + public void testEmptyIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, ""); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertFalse("name is identified as an index", handler.isIndexed("name")); + } +} diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java new file mode 100644 index 0000000..1eda364 --- /dev/null +++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java @@ -0,0 +1,160 @@ +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.client.lexicoder.BigIntegerLexicoder; +import org.apache.accumulo.core.client.lexicoder.DoubleLexicoder; +import org.apache.accumulo.core.client.lexicoder.IntegerLexicoder; +import org.apache.accumulo.core.client.lexicoder.LongLexicoder; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.junit.Assert; +import org.junit.Test; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * + */ +public class TestAccumuloIndexLexicoder { + + @Test + public void testBooleanString() { + byte[] value = Boolean.TRUE.toString().getBytes(UTF_8); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.BOOLEAN_TYPE_NAME, + true), value); + } + + @Test + public void testBooleanBinary() { + byte[] value = new byte[] { 1 }; + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.BOOLEAN_TYPE_NAME, + false), Boolean.TRUE.toString().getBytes(UTF_8)); + } + + @Test + public void testIntString() { + byte[] value = "10".getBytes(UTF_8); + byte[] encoded = new IntegerLexicoder().encode(10); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.INT_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.SMALLINT_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.TINYINT_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + } + + @Test + public void testIntBinary() { + byte[] value = ByteBuffer.allocate(4).putInt(10).array(); + byte[] encoded = new IntegerLexicoder().encode(10); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.INT_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + + value = ByteBuffer.allocate(2).putShort((short) 10).array(); + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.SMALLINT_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + + value = ByteBuffer.allocate(1).put((byte)10).array(); + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.TINYINT_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + } + + @Test + public void testFloatBinary() { + byte[] value = ByteBuffer.allocate(4).putFloat(10.55f).array(); + byte[] encoded = new DoubleLexicoder().encode((double)10.55f); + String val = new String(encoded); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.FLOAT_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + + value = ByteBuffer.allocate(8).putDouble(10.55).array(); + encoded = new DoubleLexicoder().encode(10.55); + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DOUBLE_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + } + + @Test + public void testFloatString() { + byte[] value = "10.55".getBytes(UTF_8); + byte[] encoded = new DoubleLexicoder().encode(10.55); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.FLOAT_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DOUBLE_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + } + + @Test + public void testBigIntBinary() { + byte[] value = ByteBuffer.allocate(8).putLong(1232322323).array(); + byte[] encoded = new LongLexicoder().encode(1232322323L); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.BIGINT_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + + value = new BigInteger( "1232322323", 10 ).toByteArray(); + encoded = new BigIntegerLexicoder().encode(new BigInteger("1232322323", 10 )); + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DECIMAL_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + } + + @Test + public void testDecimalString() { + String strVal = "12323232233434"; + byte[] value = strVal.getBytes(UTF_8); + byte[] encoded = new BigIntegerLexicoder().encode(new BigInteger(strVal, 10)); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DECIMAL_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + + + lex = AccumuloIndexLexicoder.encodeValue(value, "DECIMAL (10,3)", true); + assertArrayEquals(lex, encoded); + } + + @Test + public void testDecimalBinary() { + BigInteger value = new BigInteger("12323232233434", 10); + byte[] encoded = new BigIntegerLexicoder().encode(value); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value.toByteArray(), serdeConstants.DECIMAL_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + } + + @Test + public void testDateString() { + String date = "2016-02-22"; + byte[] value = date.getBytes(UTF_8); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DATE_TYPE_NAME, + true), value); + } + + @Test + public void testDateTimeString() { + String timestamp = "2016-02-22 12:12:06.000000005"; + byte[] value = timestamp.getBytes(UTF_8); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.TIMESTAMP_TYPE_NAME, + true), value); + } + + @Test + public void testString() { + String strVal = "The quick brown fox"; + byte[] value = strVal.getBytes(UTF_8); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.STRING_TYPE_NAME, + true), value); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, "varChar(20)", + true), value); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, "CHAR (20)", + true), value); + } +} diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java new file mode 100644 index 0000000..976fd27 --- /dev/null +++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.junit.Test; + +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestAccumuloIndexParameters { + + public static class MockAccumuloIndexScanner implements AccumuloIndexScanner { + + @Override + public void init(Configuration conf) { + } + + @Override + public boolean isIndexed(String columnName) { + return false; + } + + @Override + public List getIndexRowRanges(String column, Range indexRange) { + return null; + } + } + + @Test + public void testDefaultScanner() { + try { + AccumuloIndexScanner scanner = new AccumuloIndexParameters(new Configuration()).createScanner(); + assertTrue(scanner instanceof AccumuloDefaultIndexScanner); + } catch (AccumuloIndexScannerException e) { + fail("Unexpected exception thrown"); + } + } + + @Test + public void testUserHandler() throws AccumuloIndexScannerException { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEX_SCANNER, MockAccumuloIndexScanner.class.getName()); + AccumuloIndexScanner scanner = new AccumuloIndexParameters(conf).createScanner(); + assertTrue(scanner instanceof MockAccumuloIndexScanner); + } + + @Test + public void testBadHandler() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEX_SCANNER, "a.class.does.not.exist.IndexHandler"); + try { + AccumuloIndexScanner scanner = new AccumuloIndexParameters(conf).createScanner(); + } catch (AccumuloIndexScannerException e) { + return; + } + fail("Failed to throw exception for class not found"); + } + + @Test + public void getIndexColumns() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "a,b,c"); + Set cols = new AccumuloIndexParameters(conf).getIndexColumns(); + assertEquals(3, cols.size()); + assertTrue("Missing column a", cols.contains("a")); + assertTrue("Missing column b", cols.contains("b")); + assertTrue("Missing column c", cols.contains("c")); + } + + @Test + public void getMaxIndexRows() { + Configuration conf = new Configuration(); + conf.setInt(AccumuloIndexParameters.MAX_INDEX_ROWS, 10); + int maxRows = new AccumuloIndexParameters(conf).getMaxIndexRows(); + assertEquals(10, maxRows); + } + + @Test + public void getAuths() { + Configuration conf = new Configuration(); + conf.set(AccumuloSerDeParameters.AUTHORIZATIONS_KEY, "public,open"); + Authorizations auths = new AccumuloIndexParameters(conf).getTableAuths(); + assertEquals(2, auths.size()); + assertTrue("Missing auth public", auths.contains("public")); + assertTrue("Missing auth open", auths.contains("open")); + } + +} diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java index 0aaa782..8d195ee 100644 --- accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java +++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -59,6 +60,8 @@ public void testTablePropertiesPassedToOutputJobProperties() { Map jobProperties = new HashMap(); props.setProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS, "cf:cq1,cf:cq2,cf:cq3"); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string:int:string"); + props.setProperty(serdeConstants.LIST_COLUMNS, "name,age,email"); props.setProperty(AccumuloSerDeParameters.TABLE_NAME, "table"); props.setProperty(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY, "foo"); diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java index 88e4530..0bb50e8 100644 --- accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java +++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java @@ -488,6 +488,7 @@ public void testCreateIteratorSettings() throws Exception { TypeInfoFactory.intTypeInfo, TypeInfoFactory.stringTypeInfo); conf.set(serdeConstants.LIST_COLUMNS, Joiner.on(',').join(columnNames)); conf.set(serdeConstants.LIST_COLUMN_TYPES, "string,int,string"); + conf.set(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE, ColumnEncoding.BINARY.getName()); String columnMappingStr = "cf:f1,cf:f2,:rowID"; conf.set(AccumuloSerDeParameters.COLUMN_MAPPINGS, columnMappingStr); columnMapper = new ColumnMapper(columnMappingStr, ColumnEncoding.STRING.getName(), columnNames, @@ -758,7 +759,7 @@ public void testNullRangeGeneratorOutput() throws SerDeException { String hiveRowIdColumnName = "rid"; Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(null); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)).thenReturn(null); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges @@ -776,7 +777,8 @@ public void testEmptyListRangeGeneratorOutput() throws SerDeException { String hiveRowIdColumnName = "rid"; Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(Collections.emptyList()); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)) + .thenReturn(Collections.emptyList()); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges @@ -795,7 +797,7 @@ public void testSingleRangeGeneratorOutput() throws SerDeException { Range r = new Range("a"); Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(r); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)).thenReturn(r); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges @@ -814,7 +816,8 @@ public void testManyRangesGeneratorOutput() throws SerDeException { Range r1 = new Range("a"), r2 = new Range("z"); Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(Arrays.asList(r1, r2)); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)) + .thenReturn(Arrays.asList(r1, r2)); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges diff --git accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java index 339da07..5f3baab 100644 --- accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java +++ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloRangeGenerator.java @@ -16,20 +16,15 @@ */ package org.apache.hadoop.hive.accumulo.predicate; -import static org.junit.Assert.assertNotNull; - -import java.sql.Date; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; - +import com.google.common.collect.Lists; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants; +import org.apache.hadoop.hive.accumulo.TestAccumuloDefaultIndexScanner; import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -42,22 +37,29 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.hive.ql.udf.UDFLike; import org.apache.hadoop.hive.ql.udf.UDFToString; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPPlus; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.Lists; +import java.sql.Date; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertNotNull; /** * @@ -66,12 +68,14 @@ private AccumuloPredicateHandler handler; private HiveAccumuloRowIdColumnMapping rowIdMapping; + private Configuration conf; @Before public void setup() { handler = AccumuloPredicateHandler.getInstance(); rowIdMapping = new HiveAccumuloRowIdColumnMapping(AccumuloHiveConstants.ROWID, - ColumnEncoding.STRING, "row", TypeInfoFactory.stringTypeInfo.toString()); + ColumnEncoding.STRING,"row", TypeInfoFactory.stringTypeInfo.toString()); + conf = new Configuration(true); } @Test @@ -108,7 +112,7 @@ public void testRangeConjunction() throws Exception { List expectedRanges = Arrays .asList(new Range(new Key("f"), true, new Key("m\0"), false)); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -163,7 +167,7 @@ public void testRangeDisjunction() throws Exception { // Should generate (-inf,+inf) List expectedRanges = Arrays.asList(new Range()); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -236,7 +240,7 @@ public void testRangeConjunctionWithDisjunction() throws Exception { // Should generate ['q', +inf) List expectedRanges = Arrays.asList(new Range(new Key("q"), true, null, false)); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -291,7 +295,7 @@ public void testPartialRangeConjunction() throws Exception { // Should generate [f,+inf) List expectedRanges = Arrays.asList(new Range(new Key("f"), true, null, false)); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -349,7 +353,7 @@ public void testDateRangeConjunction() throws Exception { List expectedRanges = Arrays.asList(new Range(new Key("2014-01-01"), true, new Key( "2014-07-01"), false)); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -397,7 +401,7 @@ public void testCastExpression() throws Exception { ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPEqualOrGreaterThan(), Arrays.asList(key, cast)); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "key"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "key"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -446,7 +450,7 @@ public void testRangeOverNonRowIdField() throws Exception { ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPAnd(), bothFilters); - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, rowIdMapping, "rid"); + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, Collections. emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -464,4 +468,161 @@ public void testRangeOverNonRowIdField() throws Exception { Object result = nodeOutput.get(both); Assert.assertNull(result); } + + @Test + public void testRangeOverStringIndexedField() throws Exception { + // age >= '10' + ExprNodeDesc column = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "age", null, false); + ExprNodeDesc constant = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "10"); + List children = Lists.newArrayList(); + children.add(column); + children.add(constant); + ExprNodeDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, + new GenericUDFOPEqualOrGreaterThan(), children); + assertNotNull(node); + + // age <= '50' + ExprNodeDesc column2 = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "age", null, + false); + ExprNodeDesc constant2 = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "50"); + List children2 = Lists.newArrayList(); + children2.add(column2); + children2.add(constant2); + ExprNodeDesc node2 = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, + new GenericUDFOPEqualOrLessThan(), children2); + assertNotNull(node2); + + // And UDF + List bothFilters = Lists.newArrayList(); + bothFilters.add(node); + bothFilters.add(node2); + ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, + new GenericUDFOPAnd(), bothFilters); + + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); + rangeGenerator.setIndexScanner(TestAccumuloDefaultIndexScanner.buildMockHandler(10)); + Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, + Collections. emptyMap(), null); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.add(both); + HashMap nodeOutput = new HashMap(); + + try { + ogw.startWalking(topNodes, nodeOutput); + } catch (SemanticException ex) { + throw new RuntimeException(ex); + } + + // Filters are using an index which should match 3 rows + Object result = nodeOutput.get(both); + if ( result instanceof List) { + List results = (List) result; + Assert.assertEquals(3, results.size()); + Assert.assertTrue("does not contain row1", results.contains(new Range("row1"))); + Assert.assertTrue("does not contain row2", results.contains(new Range("row2"))); + Assert.assertTrue("does not contain row3", results.contains(new Range("row3"))); + } else { + Assert.fail("Results not a list"); + } + } + + @Test + public void testRangeOverIntegerIndexedField() throws Exception { + // cars >= 2 + ExprNodeDesc column = new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, "cars", null, false); + ExprNodeDesc constant = new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, 2); + List children = Lists.newArrayList(); + children.add(column); + children.add(constant); + ExprNodeDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, + new GenericUDFOPEqualOrGreaterThan(), children); + assertNotNull(node); + + // cars <= 9 + ExprNodeDesc column2 = new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, "cars", null, + false); + ExprNodeDesc constant2 = new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, 9); + List children2 = Lists.newArrayList(); + children2.add(column2); + children2.add(constant2); + ExprNodeDesc node2 = new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, + new GenericUDFOPEqualOrLessThan(), children2); + assertNotNull(node2); + + // And UDF + List bothFilters = Lists.newArrayList(); + bothFilters.add(node); + bothFilters.add(node2); + ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, + new GenericUDFOPAnd(), bothFilters); + + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); + rangeGenerator.setIndexScanner(TestAccumuloDefaultIndexScanner.buildMockHandler(10)); + Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, + Collections. emptyMap(), null); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.add(both); + HashMap nodeOutput = new HashMap(); + + try { + ogw.startWalking(topNodes, nodeOutput); + } catch (SemanticException ex) { + throw new RuntimeException(ex); + } + + // Filters are using an index which should match 3 rows + Object result = nodeOutput.get(both); + if ( result instanceof List) { + List results = (List) result; + Assert.assertEquals(3, results.size()); + Assert.assertTrue("does not contain row1", results.contains(new Range("row1"))); + Assert.assertTrue("does not contain row2", results.contains(new Range("row2"))); + Assert.assertTrue("does not contain row3", results.contains(new Range("row3"))); + } else { + Assert.fail("Results not a list"); + } + } + + @Test + public void testRangeOverBooleanIndexedField() throws Exception { + // mgr == true + ExprNodeDesc column = new ExprNodeColumnDesc(TypeInfoFactory.booleanTypeInfo, "mgr", null, false); + ExprNodeDesc constant = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, true); + List children = Lists.newArrayList(); + children.add(column); + children.add(constant); + ExprNodeDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, + new GenericUDFOPEqual(), children); + assertNotNull(node); + + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, rowIdMapping, "rid"); + rangeGenerator.setIndexScanner(TestAccumuloDefaultIndexScanner.buildMockHandler(10)); + Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, + Collections. emptyMap(), null); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.add(node); + HashMap nodeOutput = new HashMap(); + + try { + ogw.startWalking(topNodes, nodeOutput); + } catch (SemanticException ex) { + throw new RuntimeException(ex); + } + + // Filters are using an index which should match 2 rows + Object result = nodeOutput.get(node); + if ( result instanceof List) { + List results = (List) result; + Assert.assertEquals(2, results.size()); + Assert.assertTrue("does not contain row1", results.contains( new Range( "row1"))); + Assert.assertTrue("does not contain row3", results.contains( new Range( "row3"))); + } + else { + Assert.fail("Results not a list"); + } + } + } diff --git accumulo-handler/src/test/queries/positive/accumulo_index.q accumulo-handler/src/test/queries/positive/accumulo_index.q new file mode 100644 index 0000000..52a33af --- /dev/null +++ accumulo-handler/src/test/queries/positive/accumulo_index.q @@ -0,0 +1,44 @@ +DROP TABLE accumulo_index_test; + +CREATE TABLE accumulo_index_test ( + rowid string, + active boolean, + num_offices tinyint, + num_personel smallint, + total_manhours int, + num_shareholders bigint, + eff_rating float, + err_rating double, + yearly_production decimal, + start_date date, + address varchar(100), + phone char(13), + last_update timestamp ) +ROW FORMAT SERDE 'org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe' +STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler' +WITH SERDEPROPERTIES ( + "accumulo.columns.mapping" = ":rowID,a:act,a:off,a:per,a:mhs,a:shs,a:eff,a:err,a:yp,a:sd,a:addr,a:ph,a:lu", + "accumulo.table.name"="accumulo_index_test", + "accumulo.indexed.columns"="*", + "accumulo.indextable.name"="accumulo_index_idx" + ); + + +insert into accumulo_index_test values( "row1", true, 55, 107, 555555, 1223232332, + 4.5, 0.8, 1232223, "2001-10-10", "123 main street", + "555-555-5555", "2016-02-22 12:45:07.000000000"); + +select * from accumulo_index_test where active = 'true'; +select * from accumulo_index_test where num_offices = 55; +select * from accumulo_index_test where num_personel = 107; +select * from accumulo_index_test where total_manhours < 555556; +select * from accumulo_index_test where num_shareholders >= 1223232331; +select * from accumulo_index_test where eff_rating <= 4.5; +select * from accumulo_index_test where err_rating >= 0.8; +select * from accumulo_index_test where yearly_production = 1232223; +select * from accumulo_index_test where start_date = "2001-10-10"; +select * from accumulo_index_test where address >= "100 main street"; +select * from accumulo_index_test where phone <= "555-555-5555"; +select * from accumulo_index_test where last_update >= "2016-02-22 12:45:07"; + +DROP TABLE accumulo_index_test; diff --git accumulo-handler/src/test/results/positive/accumulo_index.q.out accumulo-handler/src/test/results/positive/accumulo_index.q.out new file mode 100644 index 0000000..5cb3d73 --- /dev/null +++ accumulo-handler/src/test/results/positive/accumulo_index.q.out @@ -0,0 +1,180 @@ +PREHOOK: query: DROP TABLE accumulo_index_test +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE accumulo_index_test +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE accumulo_index_test ( + rowid string, + active boolean, + num_offices tinyint, + num_personel smallint, + total_manhours int, + num_shareholders bigint, + eff_rating float, + err_rating double, + yearly_production decimal, + start_date date, + address varchar(100), + phone char(13), + last_update timestamp ) +ROW FORMAT SERDE 'org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe' +STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler' +WITH SERDEPROPERTIES ( + "accumulo.columns.mapping" = ":rowID,a:act,a:off,a:per,a:mhs,a:shs,a:eff,a:err,a:yp,a:sd,a:addr,a:ph,a:lu", + "accumulo.table.name"="accumulo_index_test", + "accumulo.indexed.columns"="*", + "accumulo.indextable.name"="accumulo_index_idx" + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@accumulo_index_test +POSTHOOK: query: CREATE TABLE accumulo_index_test ( + rowid string, + active boolean, + num_offices tinyint, + num_personel smallint, + total_manhours int, + num_shareholders bigint, + eff_rating float, + err_rating double, + yearly_production decimal, + start_date date, + address varchar(100), + phone char(13), + last_update timestamp ) +ROW FORMAT SERDE 'org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe' +STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler' +WITH SERDEPROPERTIES ( + "accumulo.columns.mapping" = ":rowID,a:act,a:off,a:per,a:mhs,a:shs,a:eff,a:err,a:yp,a:sd,a:addr,a:ph,a:lu", + "accumulo.table.name"="accumulo_index_test", + "accumulo.indexed.columns"="*", + "accumulo.indextable.name"="accumulo_index_idx" + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@accumulo_index_test +PREHOOK: query: insert into accumulo_index_test values( "row1", true, 55, 107, 555555, 1223232332, + 4.5, 0.8, 1232223, "2001-10-10", "123 main street", + "555-555-5555", "2016-02-22 12:45:07.000000000") +PREHOOK: type: QUERY +PREHOOK: Output: default@accumulo_index_test +POSTHOOK: query: insert into accumulo_index_test values( "row1", true, 55, 107, 555555, 1223232332, + 4.5, 0.8, 1232223, "2001-10-10", "123 main street", + "555-555-5555", "2016-02-22 12:45:07.000000000") +POSTHOOK: type: QUERY +POSTHOOK: Output: default@accumulo_index_test +PREHOOK: query: select * from accumulo_index_test where active = 'true' +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where active = 'true' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where num_offices = 55 +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where num_offices = 55 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where num_personel = 107 +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where num_personel = 107 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where total_manhours < 555556 +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where total_manhours < 555556 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where num_shareholders >= 1223232331 +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where num_shareholders >= 1223232331 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where eff_rating <= 4.5 +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where eff_rating <= 4.5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where err_rating >= 0.8 +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where err_rating >= 0.8 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where yearly_production = 1232223 +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where yearly_production = 1232223 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where start_date = "2001-10-10" +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where start_date = "2001-10-10" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where address >= "100 main street" +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where address >= "100 main street" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where phone <= "555-555-5555" +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where phone <= "555-555-5555" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: select * from accumulo_index_test where last_update >= "2016-02-22 12:45:07" +PREHOOK: type: QUERY +PREHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +POSTHOOK: query: select * from accumulo_index_test where last_update >= "2016-02-22 12:45:07" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@accumulo_index_test +#### A masked pattern was here #### +row1 true 55 107 555555 1223232332 4.5 0.8 1232223 2001-10-10 123 main street 555-555-5555 2016-02-22 12:45:07 +PREHOOK: query: DROP TABLE accumulo_index_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@accumulo_index_test +PREHOOK: Output: default@accumulo_index_test +POSTHOOK: query: DROP TABLE accumulo_index_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@accumulo_index_test +POSTHOOK: Output: default@accumulo_index_test