commit 2791655d802168098d6acb867b82f6ba098482b9 Author: Mithun RK Date: Fri Aug 11 10:50:12 2017 -0700 HIVE-11548: HCatLoader must support predicate pushdown (Mithun Radhakrishnan, reviewed by Thejas M Nair) diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 1ef454572c..0bbc25fa7d 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -214,22 +215,28 @@ private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean return scan; } + String columnString = jobConf.get(serdeConstants.LIST_COLUMNS); String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - if (filterExprSerialized == null) { + if (filterExprSerialized == null || StringUtils.isBlank(columnString) ) { + LOG.warn("Abandoning filter push-down for HBase table."); + if (LOG.isDebugEnabled()) { + LOG.debug("filterExprSerialized == " + filterExprSerialized + + ", columnString == " + columnString); + } return scan; } ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized); - String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; + String keyColName = columnString.split(",")[iKey]; ArrayList cols = TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES)); String colType = cols.get(iKey).getTypeName(); boolean isKeyComparable = isKeyBinary || colType.equalsIgnoreCase("string"); String tsColName = null; if (iTimestamp >= 0) { - tsColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp]; + tsColName = columnString.split(",")[iTimestamp]; } IndexPredicateAnalyzer analyzer = diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java index 3998407803..d0f4131798 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java @@ -43,6 +43,8 @@ public static final String HCAT_PIG_INNER_TUPLE_NAME_DEFAULT = "innertuple"; public static final String HCAT_PIG_INNER_FIELD_NAME = "hcat.pig.inner.field.name"; public static final String HCAT_PIG_INNER_FIELD_NAME_DEFAULT = "innerfield"; + public static final String HCAT_PIG_LOADER_PREDICATE_PUSHDOWN_ENABLED + = HCAT_PIG_LOADER + ".predicate.pushdown.enabled"; /** * {@value} (default: null) diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java index ec1e1ca1bb..0f771fcac7 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -29,7 +29,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; @@ -86,6 +90,30 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) HCatUtil.serialize(hcatSchema)); } + /** + * Set expression for predicate pushdown. + * @param job the job object + * @param serializedPredicateExpression Serialized ExprNodeGenericFuncDesc object representing the pushdown-predicate. + * @throws IOException + */ + public static void setPushdownPredicate(Job job, String serializedPredicateExpression) + throws IOException { + // TODO: Consider new HCatConstants constant for the conf-label. + job.getConfiguration().set(TableScanDesc.FILTER_EXPR_CONF_STR, + serializedPredicateExpression); + } + + /** + * Set expression for predicate pushdown. + * @param job the job object + * @param predicateExpression ExprNodeGenericFuncDesc object representing the pushdown-predicate. + * @throws IOException + */ + public static void setPushdownPredicate(Job job, ExprNodeGenericFuncDesc predicateExpression) + throws IOException { + setPushdownPredicate(job, SerializationUtilities.serializeExpression(predicateExpression)); + } + protected static org.apache.hadoop.mapred.InputFormat getMapRedInputFormat(JobConf job, Class inputFormatClass) throws IOException { return ( diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java index d9d8251c85..b44e87c246 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java @@ -21,183 +21,70 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; -/** - * This class builds a single instance of itself with the Singleton - * design pattern. While building the single instance, it sets up a - * mini cluster that actually consists of a mini DFS cluster and a - * mini MapReduce cluster on the local machine and also sets up the - * environment for Pig to run on top of the mini cluster. - */ -public class MiniCluster { - private MiniDFSCluster m_dfs = null; - private MiniMRCluster m_mr = null; - private FileSystem m_fileSys = null; - private JobConf m_conf = null; +public class MiniCluster extends MiniGenericCluster { + private static final File CONF_DIR = new File("target/test/pigtest/conf/"); + private static final File CONF_FILE = new File(CONF_DIR, "hadoop-site.xml"); - private final static MiniCluster INSTANCE = new MiniCluster(); - private static boolean isSetup = true; + private MiniMRCluster m_mr = null; - private MiniCluster() { - setupMiniDfsAndMrClusters(); + @Override + public ExecType getExecType() { + return ExecType.MAPREDUCE; } - private void setupMiniDfsAndMrClusters() { + @Override + protected void setupMiniDfsAndMrClusters(Configuration config) { try { - final int dataNodes = 1; // There will be 4 data nodes - final int taskTrackers = 1; // There will be 4 task tracker nodes - Configuration config = new Configuration(); + System.setProperty("hadoop.log.dir", "target/test/logs"); + final int dataNodes = 1; + final int taskTrackers = 1; + + // Create the dir that holds hadoop-site.xml file + // Delete if hadoop-site.xml exists already + CONF_DIR.mkdirs(); + if(CONF_FILE.exists()) { + CONF_FILE.delete(); + } // Builds and starts the mini dfs and mapreduce clusters - if(System.getProperty("hadoop.log.dir") == null) { - System.setProperty("hadoop.log.dir", "target/tmp/logs/"); - } m_dfs = new MiniDFSCluster(config, dataNodes, true, null); - m_fileSys = m_dfs.getFileSystem(); m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1); - // Create the configuration hadoop-site.xml file - File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/"); - conf_dir.mkdirs(); - File conf_file = new File(conf_dir, "hadoop-site.xml"); - // Write the necessary config info to hadoop-site.xml m_conf = m_mr.createJobConf(); - m_conf.setInt("mapred.submit.replication", 1); + m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2); + m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2); + m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2); m_conf.set("dfs.datanode.address", "0.0.0.0:0"); m_conf.set("dfs.datanode.http.address", "0.0.0.0:0"); - m_conf.writeXml(new FileOutputStream(conf_file)); + m_conf.set("pig.jobcontrol.sleep", "100"); + m_conf.writeXml(new FileOutputStream(CONF_FILE)); // Set the system properties needed by Pig - System.setProperty("cluster", m_conf.get("mapred.job.tracker")); + System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER)); System.setProperty("namenode", m_conf.get("fs.default.name")); - System.setProperty("junit.hadoop.conf", conf_dir.getPath()); + System.setProperty("junit.hadoop.conf", CONF_DIR.getPath()); } catch (IOException e) { throw new RuntimeException(e); } } - /** - * Returns the single instance of class MiniClusterBuilder that - * represents the resouces for a mini dfs cluster and a mini - * mapreduce cluster. - */ - public static MiniCluster buildCluster() { - if (!isSetup) { - INSTANCE.setupMiniDfsAndMrClusters(); - isSetup = true; - } - return INSTANCE; - } - - public void shutDown() { - INSTANCE.shutdownMiniDfsAndMrClusters(); - } - @Override - protected void finalize() { - shutdownMiniDfsAndMrClusters(); - } - - private void shutdownMiniDfsAndMrClusters() { - isSetup = false; - try { - if (m_fileSys != null) { - m_fileSys.close(); - } - } catch (IOException e) { - e.printStackTrace(); - } - if (m_dfs != null) { - m_dfs.shutdown(); + protected void shutdownMiniMrClusters() { + // Delete hadoop-site.xml on shutDown + if(CONF_FILE.exists()) { + CONF_FILE.delete(); } - if (m_mr != null) { - m_mr.shutdown(); - } - m_fileSys = null; - m_dfs = null; + if (m_mr != null) { m_mr.shutdown(); } m_mr = null; } - - public Properties getProperties() { - errorIfNotSetup(); - Properties properties = new Properties(); - assert m_conf != null; - Iterator> iter = m_conf.iterator(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - properties.put(entry.getKey(), entry.getValue()); - } - return properties; - } - - public void setProperty(String name, String value) { - errorIfNotSetup(); - m_conf.set(name, value); - } - - public FileSystem getFileSystem() { - errorIfNotSetup(); - return m_fileSys; - } - - /** - * Throw RunTimeException if isSetup is false - */ - private void errorIfNotSetup() { - if (isSetup) { - return; - } - String msg = "function called on MiniCluster that has been shutdown"; - throw new RuntimeException(msg); - } - - static public void createInputFile(MiniCluster miniCluster, String fileName, - String[] inputData) - throws IOException { - FileSystem fs = miniCluster.getFileSystem(); - createInputFile(fs, fileName, inputData); - } - - static public void createInputFile(FileSystem fs, String fileName, - String[] inputData) throws IOException { - Path path = new Path(fileName); - if (fs.exists(path)) { - throw new IOException("File " + fileName + " already exists on the minicluster"); - } - FSDataOutputStream stream = fs.create(path); - PrintWriter pw = new PrintWriter(new OutputStreamWriter(stream, "UTF-8")); - for (int i = 0; i < inputData.length; i++) { - pw.println(inputData[i]); - } - pw.close(); - - } - - /** - * Helper to remove a dfs file from the minicluster DFS - * - * @param miniCluster reference to the Minicluster where the file should be deleted - * @param fileName pathname of the file to be deleted - * @throws IOException - */ - static public void deleteFile(MiniCluster miniCluster, String fileName) - throws IOException { - FileSystem fs = miniCluster.getFileSystem(); - fs.delete(new Path(fileName), true); - } } + diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniGenericCluster.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniGenericCluster.java new file mode 100644 index 0000000000..b5974a88f4 --- /dev/null +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniGenericCluster.java @@ -0,0 +1,130 @@ +package org.apache.hive.hcatalog; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; + +import java.io.IOException; +import java.util.Properties; + +abstract public class MiniGenericCluster { + protected MiniDFSCluster m_dfs = null; + protected FileSystem m_fileSys = null; + protected Configuration m_conf = null; + + protected static MiniGenericCluster INSTANCE = null; + protected static boolean isSetup = false; + + public static String EXECTYPE_MR = "mr"; + + /** + * Returns the single instance of class MiniGenericCluster that represents + * the resources for a mini dfs cluster and a mini mr (or tez) cluster. The + * system property "test.exec.type" is used to decide whether a mr or tez mini + * cluster will be returned. + */ + public static MiniGenericCluster buildCluster() { + return buildCluster(new Configuration()); + } + + /** + * Returns the single instance of class MiniGenericCluster that represents + * the resources for a mini dfs cluster and a mini mr (or tez) cluster. The + * system property "test.exec.type" is used to decide whether a mr or tez mini + * cluster will be returned. + */ + public static MiniGenericCluster buildCluster(Configuration configuration) { + if (INSTANCE == null) { + String execType = System.getProperty("test.exec.type"); + if (execType == null) { + // Default to MR + System.setProperty("test.exec.type", EXECTYPE_MR); + return buildCluster(configuration, EXECTYPE_MR); + } + + return buildCluster(configuration, execType); + } + return INSTANCE; + } + + public static MiniGenericCluster buildCluster(Configuration configuration, String execType) { + if (INSTANCE == null) { + if (execType.equalsIgnoreCase(EXECTYPE_MR)) { + INSTANCE = new MiniCluster(); + } + // TODO: Add support for TezMiniCluster. + else { + throw new RuntimeException("Unknown test.exec.type: " + execType); + } + } + if (!isSetup) { + INSTANCE.setupMiniDfsAndMrClusters(configuration); + isSetup = true; + } + return INSTANCE; + } + + abstract public ExecType getExecType(); + + abstract protected void setupMiniDfsAndMrClusters(Configuration configuration); + + public void shutDown(){ + INSTANCE.shutdownMiniDfsAndMrClusters(); + } + + @Override + protected void finalize() { + shutdownMiniDfsAndMrClusters(); + } + + protected void shutdownMiniDfsAndMrClusters() { + isSetup = false; + shutdownMiniDfsClusters(); + shutdownMiniMrClusters(); + m_conf = null; + } + + protected void shutdownMiniDfsClusters() { + try { + if (m_fileSys != null) { m_fileSys.close(); } + } catch (IOException e) { + e.printStackTrace(); + } + if (m_dfs != null) { m_dfs.shutdown(); } + m_fileSys = null; + m_dfs = null; + } + + abstract protected void shutdownMiniMrClusters(); + + public Properties getProperties() { + errorIfNotSetup(); + return ConfigurationUtil.toProperties(m_conf); + } + + public Configuration getConfiguration() { + return new Configuration(m_conf); + } + + public void setProperty(String name, String value) { + errorIfNotSetup(); + m_conf.set(name, value); + } + + public FileSystem getFileSystem() { + errorIfNotSetup(); + return m_fileSys; + } + + /** + * Throw RunTimeException if isSetup is false + */ + private void errorIfNotSetup(){ + if(isSetup) + return; + String msg = "function called on MiniCluster that has been shutdown"; + throw new RuntimeException(msg); + } +} diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java index ff56234cc1..c04bbaff26 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java @@ -55,15 +55,6 @@ public static Driver instantiateDriver(MiniCluster cluster) { return driver; } - public static void generateDataFile(MiniCluster cluster, String fileName) throws IOException { - MiniCluster.deleteFile(cluster, fileName); - String[] input = new String[50]; - for (int i = 0; i < 50; i++) { - input[i] = (i % 5) + "\t" + i + "\t" + "_S" + i + "S_"; - } - MiniCluster.createInputFile(cluster, fileName, input); - } - public static void createTable(Driver driver, String tableName, String createTableArgs) throws CommandNeedRetryException, IOException { String createTable = "create table " + tableName + createTableArgs; diff --git a/hcatalog/hcatalog-pig-adapter/pom.xml b/hcatalog/hcatalog-pig-adapter/pom.xml index c50a4d5b09..f8a0ab0dc0 100644 --- a/hcatalog/hcatalog-pig-adapter/pom.xml +++ b/hcatalog/hcatalog-pig-adapter/pom.xml @@ -112,6 +112,17 @@ org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + test + + + com.sun.jersey + jersey-servlet + test + + + org.apache.hadoop hadoop-mapreduce-client-common ${hadoop.version} true diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java index b6746e8d80..4caeb081f4 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java @@ -19,21 +19,58 @@ package org.apache.hive.hcatalog.pig; import java.io.IOException; -import java.util.ArrayList; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; import java.util.Enumeration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.Set; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.Credentials; import org.apache.hive.hcatalog.common.HCatConstants; @@ -43,15 +80,18 @@ import org.apache.hive.hcatalog.data.Pair; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.hive.hcatalog.mapreduce.InputJobInfo; import org.apache.hive.hcatalog.mapreduce.SpecialCases; import org.apache.pig.Expression; import org.apache.pig.Expression.BinaryExpression; import org.apache.pig.Expression.Const; +import org.apache.pig.LoadPredicatePushdown; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceStatistics; +import org.apache.pig.data.DataType; import org.apache.pig.impl.util.UDFContext; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; @@ -63,7 +103,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public class HCatLoader extends HCatBaseLoader { +public class HCatLoader extends HCatBaseLoader implements LoadPredicatePushdown { private static final Logger LOG = LoggerFactory.getLogger(HCatLoader.class); private static final String PARTITION_FILTER = "partition.filter"; // for future use @@ -74,10 +114,14 @@ private String hcatServerUri; private String partitionFilterString; private final PigHCatUtil phutil = new PigHCatUtil(); + private Map typeInfoMap = null; + private Job localJobClone = null; // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize - final public static String INNER_SIGNATURE = "hcatloader.inner.signature"; - final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature"; + final private static String INNER_SIGNATURE = "hcatloader.inner.signature"; + final private static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature"; + final private static String PREDICATE_FOR_PUSHDOWN_SUFFIX = TableScanDesc.FILTER_EXPR_CONF_STR; + private boolean predicatePushdownEnabled = true; // A hash map which stores job credentials. The key is a signature passed by Pig, which is //unique to the load func and input file name (table, in our case). private static Map jobCredentials = new HashMap(); @@ -95,6 +139,103 @@ public String relativeToAbsolutePath(String location, Path curDir) throws IOExce return location; } + private static void restoreLoaderSpecificStateFromUDFContext(Job job, Properties udfProps) throws IOException { + for (Enumeration emr = udfProps.keys(); emr.hasMoreElements(); ) { + PigHCatUtil.getConfigFromUDFProperties(udfProps, + job.getConfiguration(), emr.nextElement().toString()); + } + } + + private void setProjectionSchemaInfoInUDFContext(Job job, Properties udfProps) throws IOException { + Job localJob = getLocalJobClone(job); + RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps.get(PRUNE_PROJECTION_INFO); + boolean localJobConfHasChanged = false; + final String PROJECTIONS_PUSHED_DOWN_TO_JOB_CONF = "hcat.loader.projections.pushed.down.to.job.conf"; + if (requiredFieldsInfo != null) { + // pushProjection() was called. + if (!udfProps.containsKey(PROJECTIONS_PUSHED_DOWN_TO_JOB_CONF)) { // Protect against pushing projections twice. + // Required-fields were never set. + // Store projection information in local job-instance. + ArrayList columnIds = Lists.newArrayListWithExpectedSize(requiredFieldsInfo.getFields().size()); + ArrayList columnNames = Lists.newArrayListWithExpectedSize(requiredFieldsInfo.getFields().size()); + for (RequiredField rf : requiredFieldsInfo.getFields()) { + columnIds.add(rf.getIndex()); + columnNames.add(rf.getAlias()); + } + ColumnProjectionUtils.appendReadColumns( + localJob.getConfiguration(), + columnIds, + columnNames, + null // TODO: Implement push-down for nested structures later. + ); + outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(), signature, this.getClass()); + HCatInputFormat.setOutputSchema(localJob, outputSchema); + udfProps.put(PROJECTIONS_PUSHED_DOWN_TO_JOB_CONF, true); + localJobConfHasChanged = true; + } + else { + // OutputSchema was already serialized. Skip serialization. Restore from requiredFieldsInfo. + outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(), signature, this.getClass()); + } + } + else { + // pushProjection() hasn't been called yet. + // If this is the Pig backend, no projections were ever pushed. Assume all columns have to be read. + if (HCatUtil.checkJobContextIfRunningFromBackend(job)) { + ColumnProjectionUtils.setReadAllColumns(localJob.getConfiguration()); + outputSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA); + HCatInputFormat.setOutputSchema(localJob, outputSchema); + localJobConfHasChanged = true; + } + // If this is the Pig frontend, pushProjection() might still be called later. + } + + LOG.debug("outputSchema=" + outputSchema); + + // Store modified localJobConf settings to UDFContext. + if (localJobConfHasChanged) { + storeDifferenceToUDFProperties(localJob.getConfiguration(), job.getConfiguration(), udfProps); + } + } + + private void setPushdownPredicateInfoInUDFContext(Job job, Properties udfProperties) throws IOException { + String pushdownPredicate = udfProperties.getProperty(signature + PREDICATE_FOR_PUSHDOWN_SUFFIX); + if (StringUtils.isNotBlank(pushdownPredicate)) { + LOG.info("Pushing down predicate."); + Job localJob = getLocalJobClone(job); + HCatInputFormat.setPushdownPredicate(localJob, pushdownPredicate); + storeDifferenceToUDFProperties(localJob.getConfiguration(), job.getConfiguration(), udfProperties); + } + else { + LOG.info("Predicate is empty/blank. Skipping pushdown-predicate."); + } + } + + /** + * (localConf - jobConf) -> udfProperties + */ + private static void storeDifferenceToUDFProperties(Configuration localConf, + Configuration jobConf, + Properties udfProperties) { + for (Entry localJobKeyValue : localConf) { + String jobConfValue = jobConf.getRaw(localJobKeyValue.getKey()); + if ( jobConfValue==null || !localJobKeyValue.getValue().equals(jobConfValue) ) { + udfProperties.put(localJobKeyValue.getKey(), localJobKeyValue.getValue()); + } + } + } + + /** + * Get clone of the current Job, local to this HCatLoader instance. + */ + private Job getLocalJobClone(Job job) throws IOException { + if (localJobClone == null) { + localJobClone = new Job(job.getConfiguration()); + localJobClone.getCredentials().addAll(job.getCredentials()); + } + return localJobClone; + } + @Override public void setLocation(String location, Job job) throws IOException { HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get() @@ -108,44 +249,36 @@ public void setLocation(String location, Job job) throws IOException { dbName = dbTablePair.first; tableName = dbTablePair.second; - RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps - .get(PRUNE_PROJECTION_INFO); - // get partitionFilterString stored in the UDFContext - it would have - // been stored there by an earlier call to setPartitionFilter - // call setInput on HCatInputFormat only in the frontend because internally - // it makes calls to the hcat server - we don't want these to happen in - // the backend - // in the hadoop front end mapred.task.id property will not be set in - // the Configuration if (udfProps.containsKey(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET)) { - for (Enumeration emr = udfProps.keys(); emr.hasMoreElements(); ) { - PigHCatUtil.getConfigFromUDFProperties(udfProps, - job.getConfiguration(), emr.nextElement().toString()); - } + + // setLocation() has been called on this Loader before. + // Don't call HCatInputFormat.setInput(). Why? + // 1. Can be expensive. E.g. Metastore.getPartitions(). + // 2. Can't call getPartitions() in backend. + // Instead, restore settings from UDFContext. + restoreLoaderSpecificStateFromUDFContext(job, udfProps); + if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) { //Combine credentials and credentials from job takes precedence for freshness Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + signature); job.getCredentials().addAll(crd); } } else { - Job clone = new Job(job.getConfiguration()); - HCatInputFormat.setInput(job, dbName, tableName, getPartitionFilterString()); + // This is the first time setLocation() was called. + // Must bear the full cost of setInput(). + Job localJobClone = getLocalJobClone(job); + HCatInputFormat.setInput(localJobClone, dbName, tableName, getPartitionFilterString()); InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize( - job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); + localJobClone.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); - SpecialCases.addSpecialCasesParametersForHCatLoader(job.getConfiguration(), + SpecialCases.addSpecialCasesParametersForHCatLoader(localJobClone.getConfiguration(), inputJobInfo.getTableInfo()); - // We will store all the new /changed properties in the job in the - // udf context, so the the HCatInputFormat.setInput method need not - //be called many times. - for (Entry keyValue : job.getConfiguration()) { - String oldValue = clone.getConfiguration().getRaw(keyValue.getKey()); - if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) { - udfProps.put(keyValue.getKey(), keyValue.getValue()); - } - } + // Store changed properties in localJob into UDF-properties. + storeDifferenceToUDFProperties(localJobClone.getConfiguration(), job.getConfiguration(), udfProps); + + // ... and that's all she wrote. udfProps.put(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET, true); //Store credentials in a private hash map and not the udf context to @@ -155,49 +288,22 @@ public void setLocation(String location, Job job) throws IOException { jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + signature, crd); } - // Need to also push projections by calling setOutputSchema on - // HCatInputFormat - we have to get the RequiredFields information - // from the UdfContext, translate it to an Schema and then pass it - // The reason we do this here is because setLocation() is called by - // Pig runtime at InputFormat.getSplits() and - // InputFormat.createRecordReader() time - we are not sure when - // HCatInputFormat needs to know about pruned projections - so doing it - // here will ensure we communicate to HCatInputFormat about pruned - // projections at getSplits() and createRecordReader() time + // setLocation() could be called several times, before and after pushProjection() is called. + // Must check and store projection-columns to UDFContext. + setProjectionSchemaInfoInUDFContext(job, udfProps); - if (requiredFieldsInfo != null) { - // convert to hcatschema and pass to HCatInputFormat - try { - //push down projections to columnar store works for RCFile and ORCFile - ArrayList list = new ArrayList(requiredFieldsInfo.getFields().size()); - for (RequiredField rf : requiredFieldsInfo.getFields()) { - list.add(rf.getIndex()); - } - ColumnProjectionUtils.setReadColumns(job.getConfiguration(), list); - outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(), signature, this.getClass()); - HCatInputFormat.setOutputSchema(job, outputSchema); - } catch (Exception e) { - throw new IOException(e); - } - } else { - // else - this means pig's optimizer never invoked the pushProjection - // method - so we need all fields and hence we should not call the - // setOutputSchema on HCatInputFormat - ColumnProjectionUtils.setReadAllColumns(job.getConfiguration()); - if (HCatUtil.checkJobContextIfRunningFromBackend(job)) { - try { - HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA); - outputSchema = hcatTableSchema; - HCatInputFormat.setOutputSchema(job, outputSchema); - } catch (Exception e) { - throw new IOException(e); - } - } + // Handle pushdown predicate. + predicatePushdownEnabled = job.getConfiguration() + .getBoolean(HCatConstants.HCAT_PIG_LOADER_PREDICATE_PUSHDOWN_ENABLED, + true); + + if (predicatePushdownEnabled) { + LOG.info("Predicate push-down is enabled for HCatLoader."); + setPushdownPredicateInfoInUDFContext(job, udfProps); } - if(LOG.isDebugEnabled()) { - LOG.debug("outputSchema=" + outputSchema); + else { + LOG.info("Predicate push-down is disabled for HCatLoader."); } - } @Override @@ -261,7 +367,8 @@ public ResourceStatistics getStatistics(String location, Job job) throws IOExcep try { ResourceStatistics stats = new ResourceStatistics(); InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize( - job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); + UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{signature}) + .getProperty(HCatConstants.HCAT_KEY_JOB_INFO)); stats.setSizeInBytes(getSizeInBytes(inputJobInfo)); return stats; } catch (Exception e) { @@ -333,4 +440,205 @@ private String getHCatComparisonString(Expression expr) { } } + private static final Set SUPPORTED_PREDICATE_PUSHDOWN_DATA_TYPES = Sets.newHashSet( + DataType.BOOLEAN, + DataType.INTEGER, + DataType.LONG, + DataType.FLOAT, + DataType.DOUBLE, + DataType.DATETIME, + DataType.CHARARRAY, + DataType.BIGINTEGER, + DataType.BIGDECIMAL + ); + + @Override + public List getPredicateFields(String location, Job job) throws IOException { + List allFields = Arrays.asList(getSchema(location, job).getFields()); + Iterable filteredPredicateFields + = Iterables.filter(allFields, new Predicate(){ + @Override + public boolean apply(ResourceSchema.ResourceFieldSchema input) { + return SUPPORTED_PREDICATE_PUSHDOWN_DATA_TYPES.contains(input.getType()); + } + }); + + // Return the names of the filtered predicate-fields. + return Lists.newArrayList( + Iterables.transform(filteredPredicateFields, new Function(){ + @Override + public String apply(ResourceSchema.ResourceFieldSchema input) { + return input.getName(); + } + })); + + } + + @Override + public List getSupportedExpressionTypes() { + return Arrays.asList( + Expression.OpType.OP_EQ, + Expression.OpType.OP_NE, + Expression.OpType.OP_GT, + Expression.OpType.OP_GE, + Expression.OpType.OP_LT, + Expression.OpType.OP_LE, + Expression.OpType.OP_IN, + Expression.OpType.OP_BETWEEN, + Expression.OpType.OP_NULL, + Expression.OpType.OP_NOT, + Expression.OpType.OP_AND, + Expression.OpType.OP_OR + ); + } + + @Override + public void setPushdownPredicate(Expression predicate) throws IOException { + LOG.info("HCatLoader::setPushdownPredicate(). Predicate == " + predicate); + ExprNodeDesc hiveExpression = getHiveExpressionFor(predicate); + try { + LOG.debug("HiveExpression: " + hiveExpression); + ExprNodeGenericFuncDesc genericFuncDesc = (ExprNodeGenericFuncDesc)hiveExpression; + UDFContext.getUDFContext() + .getUDFProperties(getClass(), new String[]{signature}) + .setProperty(signature + PREDICATE_FOR_PUSHDOWN_SUFFIX, + SerializationUtilities.serializeExpression(genericFuncDesc)); + } + catch (Exception exception) { + throw new IOException("Invalid pushdown-predicate received: " + + "PigExpr == (" + predicate + "). " + + "HiveExpr == (" + hiveExpression + ")", exception); + } + } + + private ExprNodeDesc getHiveExpressionFor(Expression expression) throws IOException { + + if (expression instanceof BinaryExpression) { + return getHiveExpressionFor((BinaryExpression)expression); + } + else + if (expression instanceof Expression.UnaryExpression) { + return getHiveExpressionFor((Expression.UnaryExpression)expression); + } + else + if (expression instanceof Expression.Const) { + assert expression.getOpType().equals(Expression.OpType.TERM_CONST); + return new ExprNodeConstantDescConstructor().apply(((Expression.Const)expression).getValue()); + } + else + if (expression instanceof Expression.Column) { + assert expression.getOpType().equals(Expression.OpType.TERM_COL); + Expression.Column columnExpression = (Expression.Column) expression; + return new ExprNodeColumnDesc(getTypeInfoMap().get(columnExpression.getName()), + columnExpression.getName(), + "TableNameNotSet!", // Table-name isn't required, for predicate-pushdown. + false); + } + else { + throw new IOException("Could not convert pig's push-down predicate " + + "(" + expression + ") into a Hive expression."); + } + } + + private ExprNodeGenericFuncDesc getHiveExpressionFor(BinaryExpression binaryPredicate) throws IOException { + + List arguments = Lists.newArrayList(); + // Add LHS column expression. + arguments.add(getHiveExpressionFor(binaryPredicate.getLhs())); + + Expression.OpType opType = binaryPredicate.getOpType(); + if (opType.equals(Expression.OpType.OP_IN)) { + // Add RHS value-list, as constant values. + // TODO: Short circuit for values that aren't BigInt/BigDecimal/DateTime. + arguments.addAll( + Lists.newArrayList( + Iterators.transform( + ((Expression.InExpression) binaryPredicate.getRhs()).getValues().iterator(), + new ExprNodeConstantDescConstructor()) + ) + ); + } + else { + arguments.add(getHiveExpressionFor(binaryPredicate.getRhs())); + } + + try { + return ExprNodeGenericFuncDesc.newInstance(getHiveFunctionFor(opType), arguments); + } + catch (UDFArgumentException exception) { + throw new IOException("Could not convert pig's push-down predicate " + + "(" + binaryPredicate + ") into a Hive expression.", exception); + } + + } + + private static class ExprNodeConstantDescConstructor implements Function { + + @Override + public ExprNodeConstantDesc apply(Object input) { + if (input instanceof BigInteger) { + input = new BigDecimal((BigInteger)input); + } + else + if (input instanceof DateTime) { + input = new Timestamp(((DateTime)input).getMillis()); + } + + return new ExprNodeConstantDesc(input); + } + } + + private ExprNodeGenericFuncDesc getHiveExpressionFor(Expression.UnaryExpression unaryPredicate) throws IOException { + + try { + return ExprNodeGenericFuncDesc.newInstance( + getHiveFunctionFor(unaryPredicate.getOpType()), + Collections.singletonList(getHiveExpressionFor(unaryPredicate.getExpression())) + ); + } + catch (UDFArgumentException exception) { + throw new IOException("Could not convert pig's push-down predicate " + + "(" + unaryPredicate + ") into a Hive expression.", exception); + } + + } + + private GenericUDF getHiveFunctionFor(Expression.OpType operator) throws IOException { + switch (operator) { + + // + case OP_AND: return new GenericUDFOPAnd(); + case OP_OR: return new GenericUDFOPOr(); + case OP_EQ: return new GenericUDFOPEqual(); + case OP_NE: return new GenericUDFOPNotEqual(); + case OP_LT: return new GenericUDFOPLessThan(); + case OP_LE: return new GenericUDFOPEqualOrLessThan(); + case OP_GT: return new GenericUDFOPGreaterThan(); + case OP_GE: return new GenericUDFOPEqualOrGreaterThan(); + case OP_BETWEEN: return new GenericUDFBetween(); + case OP_IN: return new GenericUDFIn(); + // + + // + case OP_NOT: return new GenericUDFOPNot(); + case OP_NULL: return new GenericUDFOPNull(); + // + + default: + throw new IOException("Unsupported operator for predicate push-down: " + operator); + } + } + + private Map getTypeInfoMap() { + if (typeInfoMap == null) { + + typeInfoMap = Maps.newHashMapWithExpectedSize(outputSchema.size()); + + for (FieldSchema field : HCatSchemaUtils.getFieldSchemas(outputSchema.getFields())) { + typeInfoMap.put(field.getName(), TypeInfoUtils.getTypeInfoFromTypeString(field.getType())); + } + } + + return typeInfoMap; + } } diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java index 59d2efb156..f67c418800 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java @@ -40,6 +40,7 @@ import java.util.Properties; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -61,6 +62,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; +import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.joda.time.DateTime; @@ -516,8 +518,11 @@ public void testColumnarStorePushdown() throws Exception { //Single MapReduce job is launched OutputStats outstats = stats.getOutputStats().get(0); assertTrue(outstats!= null); - assertEquals(expectedCols,outstats.getConf() - .get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + Configuration conf = outstats.getConf(); + Map udfContext = (Map) ObjectSerializer.deserialize(conf.get("pig.udf.context")); + assertTrue("UDFProperties should not be empty.", !udfContext.isEmpty()); + Properties udfProperties = udfContext.entrySet().iterator().next().getValue(); + assertEquals(expectedCols, udfProperties.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); //delete output file on exit FileSystem fs = FileSystem.get(outstats.getConf()); if (fs.exists(new Path(PIGOUTPUT_DIR))) { diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderPredicatePushDown.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderPredicatePushDown.java new file mode 100644 index 0000000000..7872be834c --- /dev/null +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderPredicatePushDown.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.pig; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.HcatTestUtils; +import org.apache.hive.hcatalog.MiniGenericCluster; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigImplConstants; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.tools.pigstats.JobStats; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; + +@RunWith(Parameterized.class) +public class TestHCatLoaderPredicatePushDown { + private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderPredicatePushDown.class); + private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") + + File.separator + + TestHCatLoaderPredicatePushDown.class.getCanonicalName() + + "-" + System.currentTimeMillis()); + private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + private static final String TEXT_DATA_FILE = TEST_DATA_DIR + "/basic.input.data"; + private static final String COLUMNAR_TABLE_NAME_PREFIX = "columnar_table_"; + + private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); + private static FileSystem clusterFS = cluster.getFileSystem(); + private static Driver driver; + private static Random random = new Random(); + private static Set tablesCreated = Sets.newHashSet(); + + private String tableName; + private String storageFormat; + private String tblProperties; + + @Parameterized.Parameters + public static Collection storageFormatsThatSupportPPD() { + ArrayList list = Lists.newArrayList(); + list.add(new String[]{ + IOConstants.ORCFILE, + "(\"orc.stripe.size\"=\"100000\", " + + "\"orc.row.index.stride\"=\"1000\", " + + "\"orc.compress\"=\"NONE\" " + + ")" + }); + return list; + } + + public TestHCatLoaderPredicatePushDown(String storageFormat, String tblProperties) { + this.tableName = COLUMNAR_TABLE_NAME_PREFIX + storageFormat; + this.storageFormat = storageFormat; + this.tblProperties = tblProperties; + } + + @BeforeClass + public static void setupAllTests() throws Exception { + setUpCluster(); + setUpLocalFileSystemDirectories(); + setUpClusterFileSystemDirectories(); + setUpHiveDriver(); + createTextData(); + } + + @Before + public void setupSingleTest() throws Exception { + if (!tablesCreated.contains(tableName)) { + createColumnarTable(); + convertTextDataToColumnarStorage(); + tablesCreated.add(tableName); + } + } + + @AfterClass + public static void tearDownAllTests() throws Exception { + for (String table : tablesCreated) { + dropTable(table); + } + tearDownCluster(); + clearUpLocalFileSystemDirectories(); + } + + private static void setUpCluster() throws Exception { + cluster = MiniGenericCluster.buildCluster(); + clusterFS = cluster.getFileSystem(); + } + + private static void tearDownCluster() throws Exception { + cluster.shutDown(); + } + + private static void setUpLocalFileSystemDirectories() { + File f = new File(TEST_DATA_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if(!(new File(TEST_DATA_DIR).mkdirs())) { + throw new RuntimeException("Could not create test-directory " + TEST_DATA_DIR + " on local filesystem."); + } + } + + private static void clearUpLocalFileSystemDirectories() { + File f = new File(TEST_DATA_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + } + + private static void setUpClusterFileSystemDirectories() throws IOException { + FileSystem clusterFS = cluster.getFileSystem(); + Path warehouseDir = new Path(TEST_WAREHOUSE_DIR); + if (clusterFS.exists(warehouseDir)) { + clusterFS.delete(warehouseDir, true); + } + clusterFS.mkdirs(warehouseDir); + } + + private static void setUpHiveDriver() throws IOException { + HiveConf hiveConf = createHiveConf(); + driver = new Driver(hiveConf); + driver.setMaxRows(1000); + SessionState.start(new CliSessionState(hiveConf)); + } + + private static HiveConf createHiveConf() { + HiveConf hiveConf = new HiveConf(cluster.getConfiguration(), TestHCatLoaderPredicatePushDown.class); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + hiveConf.set("hive.security.authorization.manager", ""); + return hiveConf; + } + + /** + * Create data with schema: + * number \t string \t filler_string + * @throws Exception + */ + private static void createTextData() throws Exception { + int LOOP_SIZE = 1000; + ArrayList input = Lists.newArrayListWithExpectedSize((LOOP_SIZE+1) * LOOP_SIZE); + for (int i = 1; i <= LOOP_SIZE; i++) { + String si = i + ""; + for (int j = 1; j <= LOOP_SIZE; j++) { + String sj = "S" + j + "S"; + input.add(si + "\t" + (i*j) + "\t" + sj); + } + } + + // Add nulls. + for (int i=0; i results = Lists.newArrayList(); + if (driver.getResults(results)) { + System.out.println("Got results: "); + for (String result : results) { + System.out.println(result); + } + } + else { + System.out.println("Got no results!"); + } + } + } + + private void convertTextDataToColumnarStorage() throws IOException { + PigServer server = getPigServer(); + server.setBatchOn(); + int i = 0; + server.registerQuery("A = load '" + TEXT_DATA_FILE + "' as (a:int, b:chararray, c:chararray);", ++i); + + server.registerQuery("store A into '" + tableName + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i); + server.executeBatch(); + } + + private static void dropTable(String tablename) throws IOException, CommandNeedRetryException { + driver.run("drop table if exists " + tablename); + } + + private long runQueryAndGetHdfsBytesReadForAlias(PigServer server, String query, String resultAlias) throws IOException { + Path outputPath = new Path("/tmp/output_" + random.nextInt()); + if (clusterFS.exists(outputPath)) { + Assert.assertTrue("Couldn't delete outputPath: " + outputPath, clusterFS.delete(outputPath, true)); + } + + for (String line : query.split("\n")) { + server.registerQuery(line); + } + ExecJob pigJob = server.store(resultAlias, outputPath.getName()); + JobStats jobStats = (JobStats)pigJob.getStatistics().getJobGraph().getSources().get(0); + Assert.assertTrue("Query should have succeeded.", jobStats.isSuccessful()); + return jobStats.getHdfsBytesRead(); + } + + private Iterator getResultsForAlias(PigServer server, String query, String resultAlias) throws IOException { + for (String line : query.split("\n")) { + server.registerQuery(line); + } + return server.openIterator(resultAlias); + } + + private PigServer getPigServer() throws IOException { + return new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + } + + private PigServer getPigServerWithPPDDisabled() throws IOException { + PigServer server = getPigServer(); + server.getPigContext() + .getProperties() + .setProperty( + PigImplConstants.PIG_OPTIMIZER_RULES_KEY, + ObjectSerializer.serialize(Sets.newHashSet("PredicatePushdownOptimizer"))); + return server; + } + + private static final String PPD_LOADER = "org.apache.hive.hcatalog.pig.HCatLoader"; + + private void compareResultsFor(String query, String resultAlias) throws IOException { + // Compare HDFS Bytes read. + long bytesReadWithPPD = runQueryAndGetHdfsBytesReadForAlias(getPigServer(), query, resultAlias); + long bytesReadWithoutPPD = runQueryAndGetHdfsBytesReadForAlias(getPigServerWithPPDDisabled(), query, resultAlias); + + Assert.assertTrue("HDFSBytesRead with " + PPD_LOADER + "(" + bytesReadWithPPD + ") should not exceed " + + PPD_LOADER + " with PPD disabled(" + bytesReadWithoutPPD + ").", + bytesReadWithPPD <= bytesReadWithoutPPD); + + // Compare results. + Iterator resultsWithPPD = getResultsForAlias(getPigServer(), query, resultAlias); + Iterator resultsWithoutPPD = getResultsForAlias(getPigServerWithPPDDisabled(), query, resultAlias); + + while (resultsWithPPD.hasNext() && resultsWithoutPPD.hasNext()) { + Tuple ppdLoaderTuple = resultsWithPPD.next(); + Tuple ppdDisabledTuple = resultsWithoutPPD.next(); + Assert.assertEquals("Results don't match, with and without PPD.", ppdLoaderTuple, ppdDisabledTuple); + + } + + Assert.assertFalse("Query with HCatLoaderPPD returned more results than expected.", resultsWithPPD.hasNext()); + Assert.assertFalse("Query without PPD returned more results than expected.", resultsWithoutPPD.hasNext()); + } + + private void test(String projection, String filter) throws IOException { + String query = "X = LOAD '" + tableName + "' USING " + PPD_LOADER + "();\n" + + "X = FOREACH X generate " + projection + " ;\n" + + "X = FILTER X by " + filter + " ;"; + compareResultsFor(query, "X"); + } + + @Test + public void testEmptySelect() throws IOException { + test("a", "a < 0"); + } + + @Test + public void testLessThan() throws IOException { + test("a", "a < 10"); + } + + @Test + public void testGreaterThanEquals() throws IOException { + test("a", "a >= 990"); + } + + @Test + public void testCompoundPredicate() throws IOException { + test("a,b", "a < 10 AND (b < '20' OR b > '1000')"); + } +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java index 78e767e7fc..c805348829 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java @@ -1030,12 +1030,16 @@ public void testTableSchemaPropagation() throws Exception { assertEquals("Table after deserialization should have been identical to sourceTable.", HCatTable.NO_DIFF, sourceTable.diff(targetTable)); + EnumSet ignoreTableProperties + = EnumSet.copyOf(HCatTable.DEFAULT_COMPARISON_ATTRIBUTES); + ignoreTableProperties.remove(HCatTable.TableAttribute.TABLE_PROPERTIES); + // Create table on Target. targetMetaStore().createTable(HCatCreateTableDesc.create(targetTable).build()); // Verify that the created table is identical to sourceTable. targetTable = targetMetaStore().getTable(dbName, tableName); assertEquals("Table after deserialization should have been identical to sourceTable.", - HCatTable.NO_DIFF, sourceTable.diff(targetTable)); + HCatTable.NO_DIFF, sourceTable.diff(targetTable, ignoreTableProperties)); // Ignore differences in table-properties. // Modify sourceTable. List newColumnSchema = new ArrayList(columnSchema);