diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index c4ffb9f..cc4e10b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.llap.io.api.impl; +import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -30,11 +31,14 @@ import com.google.common.util.concurrent.ListeningExecutorService; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -64,8 +68,10 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -116,6 +122,12 @@ useLlapIo = ((LlapAwareSplit)split).canUseLlapIo(); } boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job); + + // validate for supported types. Until we fix HIVE-14089 we need this check. + if (useLlapIo) { + useLlapIo = Utilities.checkLlapIOSupportedTypes(job); + } + if (!useLlapIo) { LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split); return sourceInputFormat.getRecordReader(split, job, reporter); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 44a3699..a1f67f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -113,6 +113,7 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -140,6 +141,7 @@ import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -159,6 +161,7 @@ import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.Serializer; @@ -169,6 +172,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; @@ -3667,4 +3672,83 @@ public static String maskIfPassword(String key, String value) { } return value; } + + /** + * Check if LLAP IO supports the column type that is being read + * @param conf - configuration + * @return false for types not supported by vectorization, true otherwise + */ + public static boolean checkLlapIOSupportedTypes(final Configuration conf) { + final String[] readColumnNames = ColumnProjectionUtils.getReadColumnNames(conf); + final String columnNames = conf.get(serdeConstants.LIST_COLUMNS); + final String columnTypes = conf.get(serdeConstants.LIST_COLUMN_TYPES); + if (columnNames == null || columnTypes == null || columnNames.isEmpty() || + columnTypes.isEmpty()) { + LOG.warn("Column names ({}) or types ({}) is null. Skipping type checking for LLAP IO.", + columnNames, columnTypes); + return true; + } + final List allColumnNames = Lists.newArrayList(columnNames.split(",")); + final List typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes); + final List allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos); + return checkLlapIOSupportedTypes(Lists.newArrayList(readColumnNames), allColumnNames, + allColumnTypes); + } + + /** + * Check if LLAP IO supports the column type that is being read + * @param readColumnNames - columns that will be read from the table/partition + * @param allColumnNames - all columns + * @param allColumnTypes - all column types + * @return false for types not supported by vectorization, true otherwise + */ + public static boolean checkLlapIOSupportedTypes(final List readColumnNames, + final List allColumnNames, final List allColumnTypes) { + final String[] readColumnTypes = getReadColumnTypes(readColumnNames, allColumnNames, + allColumnTypes); + + if (readColumnTypes != null) { + for (String readColumnType : readColumnTypes) { + if (readColumnType != null) { + if (!Vectorizer.validateDataType(readColumnType, + VectorExpressionDescriptor.Mode.PROJECTION)) { + LOG.warn("Unsupported column type encountered ({}). Disabling LLAP IO.", + readColumnType); + return false; + } + } + } + } else { + LOG.warn("readColumnTypes is null. Skipping type checking for LLAP IO. " + + "readColumnNames: {} allColumnNames: {} allColumnTypes: {} readColumnTypes: {}", + readColumnNames, allColumnNames, allColumnTypes, readColumnTypes); + } + return true; + } + + private static String[] getReadColumnTypes(final List readColumnNames, + final List allColumnNames, final List allColumnTypes) { + if (readColumnNames == null || allColumnNames == null || allColumnTypes == null || + readColumnNames.isEmpty() || allColumnNames.isEmpty() || allColumnTypes.isEmpty()) { + return null; + } + Map columnNameToType = new HashMap<>(); + List types = TypeInfoUtils.typeInfosFromTypeNames(allColumnTypes); + if (allColumnNames.size() != types.size()) { + LOG.warn("Column names count does not match column types count." + + " ColumnNames: {} [{}] ColumnTypes: {} [{}]", allColumnNames, allColumnNames.size(), + allColumnTypes, types.size()); + return null; + } + + for (int i = 0; i < allColumnNames.size(); i++) { + columnNameToType.put(allColumnNames.get(i), types.get(i).toString()); + } + + String[] result = new String[readColumnNames.size()]; + for (int i = 0; i < readColumnNames.size(); i++) { + result[i] = columnNameToType.get(readColumnNames.get(i)); + } + return result; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index f51a084..bce3853 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -186,25 +186,9 @@ protected static transient final Logger LOG = LoggerFactory.getLogger(Vectorizer.class); - Pattern supportedDataTypesPattern; - List> vectorizableTasks = - new ArrayList>(); - Set> supportedGenericUDFs = new HashSet>(); - - Set supportedAggregationUdfs = new HashSet(); - - private HiveConf hiveConf; - - private boolean isSpark; - - boolean useVectorizedInputFileFormat; - boolean useVectorDeserialize; - boolean useRowDeserialize; - - boolean isSchemaEvolution; - - public Vectorizer() { + static Pattern supportedDataTypesPattern; + static { StringBuilder patternBuilder = new StringBuilder(); patternBuilder.append("int"); patternBuilder.append("|smallint"); @@ -235,6 +219,25 @@ public Vectorizer() { patternBuilder.append("|varchar.*"); supportedDataTypesPattern = Pattern.compile(patternBuilder.toString()); + } + + List> vectorizableTasks = + new ArrayList>(); + Set> supportedGenericUDFs = new HashSet>(); + + Set supportedAggregationUdfs = new HashSet(); + + private HiveConf hiveConf; + + private boolean isSpark; + + boolean useVectorizedInputFileFormat; + boolean useVectorDeserialize; + boolean useRowDeserialize; + + boolean isSchemaEvolution; + + public Vectorizer() { supportedGenericUDFs.add(GenericUDFOPPlus.class); supportedGenericUDFs.add(GenericUDFOPMinus.class); @@ -1928,7 +1931,7 @@ private boolean validateAggregationIsPrimitive(VectorAggregateExpression vectorA return new Pair(true, outputIsPrimitive); } - private boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) { + public static boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) { type = type.toLowerCase(); boolean result = supportedDataTypesPattern.matcher(type).matches(); if (result && mode == VectorExpressionDescriptor.Mode.PROJECTION && type.equals("void")) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index d908d48..8d329d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import java.util.ArrayList; @@ -31,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -47,6 +49,9 @@ import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.hive.ql.plan.Explain.Level; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.JobConf; import com.google.common.collect.Interner; @@ -243,6 +248,28 @@ public void deriveLlap(Configuration conf, boolean isExecDriver) { } } } + + // check if the column types that are read are supported by LLAP IO + for (Map.Entry> entry : aliasToWork.entrySet()) { + if (hasLlap) { + final String alias = entry.getKey(); + Operator op = entry.getValue(); + PartitionDesc partitionDesc = aliasToPartnInfo.get(alias); + if (op instanceof TableScanOperator && partitionDesc != null && + partitionDesc.getTableDesc() != null) { + final TableScanOperator tsOp = (TableScanOperator) op; + final List readColumnNames = tsOp.getNeededColumns(); + final Properties props = partitionDesc.getTableDesc().getProperties(); + final List typeInfos = TypeInfoUtils.getTypeInfosFromTypeString( + props.getProperty(serdeConstants.LIST_COLUMN_TYPES)); + final List allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos); + final List allColumnNames = Utilities.getColumnNames(props); + hasLlap = Utilities.checkLlapIOSupportedTypes(readColumnNames, allColumnNames, + allColumnTypes); + } + } + } + llapIoDesc = deriveLlapIoDescString( isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid); } diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out index 480627f..142bdee 100644 --- a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out +++ b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out @@ -70,7 +70,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 2 Map Operator Tree: TableScan @@ -195,7 +195,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 2 Map Operator Tree: TableScan diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java index abd2838..8f7b799 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java @@ -814,15 +814,16 @@ public static TypeInfo getTypeInfoFromObjectInspector(ObjectInspector oi) { return parser.parseTypeInfos(); } - public static String getTypesString(List typeInfos) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < typeInfos.size(); i++) { - if (i > 0) { - sb.append(":"); - } - sb.append(typeInfos.get(i).getTypeName()); + public static List getTypeStringsFromTypeInfo(List typeInfos) { + if (typeInfos == null) { + return null; } - return sb.toString(); + + List result = new ArrayList<>(typeInfos.size()); + for (TypeInfo typeInfo : typeInfos) { + result.add(typeInfo.toString()); + } + return result; } public static TypeInfo getTypeInfoFromTypeString(String typeString) {