diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index ff9604e..bf17b27 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.llap.io.api.impl; +import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -30,10 +31,14 @@ import com.google.common.util.concurrent.ListeningExecutorService; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.ConsumerFeedback; @@ -60,8 +65,10 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -112,6 +119,11 @@ useLlapIo = ((LlapAwareSplit)split).canUseLlapIo(); } boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job); + + // validate for supported types. Until we fix HIVE-14089 we need this check. + boolean supportedTypes = checkSupportedTypes(job); + useLlapIo = supportedTypes ? useLlapIo : false; + if (!useLlapIo) { LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split); return sourceInputFormat.getRecordReader(split, job, reporter); @@ -133,6 +145,65 @@ } } + private boolean checkSupportedTypes(final JobConf job) { + final String[] readColumnNames = ColumnProjectionUtils.getReadColumnNames(job); + final String allColumnNames = job.get(serdeConstants.LIST_COLUMNS); + final String allColumnTypes = job.get(serdeConstants.LIST_COLUMN_TYPES); + final String[] readColumnTypes = getReadColumnTypes(readColumnNames, allColumnNames, + allColumnTypes); + + if (LlapIoImpl.LOG.isDebugEnabled()) { + final boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(job); + LlapIoImpl.LOG.debug("readAllColumns: {} readColumnNames: {} allColumnNames: {}" + + " allColumnTypes: {} readColumnTypes: {}", readAllColumns, + Arrays.toString(readColumnNames), allColumnNames, allColumnTypes, + Arrays.toString(readColumnTypes)); + } + + if (readColumnTypes != null) { + for (String readColumnType : readColumnTypes) { + if (readColumnType != null) { + if (!Vectorizer.validateDataType(readColumnType, + VectorExpressionDescriptor.Mode.PROJECTION)) { + LlapIoImpl.LOG.warn("Unsupported column type encountered ({}). Disabling LLAP IO.", + readColumnType); + return false; + } + } + } + } else { + LlapIoImpl.LOG.warn("readColumnTypes is null. Skipping type checking for LLAP IO."); + } + return true; + } + + private String[] getReadColumnTypes(final String[] readColumnNames, final String allColumnNames, + final String allColumnTypes) { + if (readColumnNames == null || allColumnNames == null || allColumnTypes == null || + readColumnNames.length == 0 || allColumnNames.isEmpty() || allColumnTypes.isEmpty()) { + return null; + } + Map columnNameToType = new HashMap<>(); + String[] names = allColumnNames.split(","); + List types = TypeInfoUtils.getTypeInfosFromTypeString(allColumnTypes); + if (names.length != types.size()) { + LlapIoImpl.LOG.warn("Column names count does not match column types count." + + " ColumnNames: {} [{}] ColumnTypes: {} [{}]", allColumnNames, names.length, + allColumnTypes, types.size()); + return null; + } + + for (int i = 0; i < names.length; i++) { + columnNameToType.put(names[i], types.get(i).toString()); + } + + String[] result = new String[readColumnNames.length]; + for (int i = 0; i < readColumnNames.length; i++) { + result[i] = columnNameToType.get(readColumnNames[i]); + } + return result; + } + // Returning either a vectorized or non-vectorized reader from the same call requires breaking // generics... this is how vectorization currently works. @SuppressWarnings("unchecked") diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index f51a084..bce3853 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -186,25 +186,9 @@ protected static transient final Logger LOG = LoggerFactory.getLogger(Vectorizer.class); - Pattern supportedDataTypesPattern; - List> vectorizableTasks = - new ArrayList>(); - Set> supportedGenericUDFs = new HashSet>(); - - Set supportedAggregationUdfs = new HashSet(); - - private HiveConf hiveConf; - - private boolean isSpark; - - boolean useVectorizedInputFileFormat; - boolean useVectorDeserialize; - boolean useRowDeserialize; - - boolean isSchemaEvolution; - - public Vectorizer() { + static Pattern supportedDataTypesPattern; + static { StringBuilder patternBuilder = new StringBuilder(); patternBuilder.append("int"); patternBuilder.append("|smallint"); @@ -235,6 +219,25 @@ public Vectorizer() { patternBuilder.append("|varchar.*"); supportedDataTypesPattern = Pattern.compile(patternBuilder.toString()); + } + + List> vectorizableTasks = + new ArrayList>(); + Set> supportedGenericUDFs = new HashSet>(); + + Set supportedAggregationUdfs = new HashSet(); + + private HiveConf hiveConf; + + private boolean isSpark; + + boolean useVectorizedInputFileFormat; + boolean useVectorDeserialize; + boolean useRowDeserialize; + + boolean isSchemaEvolution; + + public Vectorizer() { supportedGenericUDFs.add(GenericUDFOPPlus.class); supportedGenericUDFs.add(GenericUDFOPMinus.class); @@ -1928,7 +1931,7 @@ private boolean validateAggregationIsPrimitive(VectorAggregateExpression vectorA return new Pair(true, outputIsPrimitive); } - private boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) { + public static boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) { type = type.toLowerCase(); boolean result = supportedDataTypesPattern.matcher(type).matches(); if (result && mode == VectorExpressionDescriptor.Mode.PROJECTION && type.equals("void")) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index d908d48..e77cf4d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -243,6 +243,7 @@ public void deriveLlap(Configuration conf, boolean isExecDriver) { } } } + llapIoDesc = deriveLlapIoDescString( isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid); } diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out index 08d49bc..0c3d170 100644 --- a/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out +++ b/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out @@ -96,7 +96,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Stage: Stage-0 Fetch Operator diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out index 480627f..52b4d33 100644 --- a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out +++ b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out @@ -70,7 +70,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 2 Map Operator Tree: TableScan @@ -90,7 +90,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 190 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: map) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Stage: Stage-0 Fetch Operator @@ -195,7 +195,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 2 Map Operator Tree: TableScan @@ -211,7 +211,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: a (type: array) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Stage: Stage-0 Fetch Operator