diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bae1825..9a78a06 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1169,6 +1169,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SCHEMA_EVOLUTION("hive.exec.schema.evolution", true, "Use schema evolution to convert self-describing file format's data to the schema desired by the reader."), + HIVE_ORC_READER_PROPERTIES_SET("hive.orc.reader.properties.set", false, + "internal usage only -- we have set schema evolution, transaction table scan,\n" + + "push down predicate and more properties.", true), + HIVE_TRANSACTIONAL_TABLE_SCAN("hive.transactional.table.scan", false, "internal usage only -- do transaction (ACID) table scan.", true), diff --git metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java index 14ff187..800a2b3 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java @@ -31,13 +31,20 @@ */ public interface FileFormatProxy { + public static class ApplySargToMetadataContext { + public ApplySargToMetadataContext() { + } + } + /** * Applies SARG to file metadata, and produces some result for this file. + * @param context File format specific context for evaluating the function. * @param sarg SARG * @param fileMetadata File metadata from metastore cache. * @return The result to return to client for this file, or null if file is eliminated. */ - SplitInfos applySargToMetadata(SearchArgument sarg, ByteBuffer fileMetadata) throws IOException; + SplitInfos applySargToMetadata(ApplySargToMetadataContext context, + SearchArgument sarg, ByteBuffer fileMetadata) throws IOException; /** * @param fs The filesystem of the file. diff --git metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java index 832daec..95a76e0 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.FileFormatProxy.ApplySargToMetadataContext; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; import org.apache.hadoop.hive.metastore.hbase.MetadataStore; @@ -44,11 +45,22 @@ private FileFormatProxy fileFormatProxy; private MetadataStore store; + public static class GetFileMetadataByExprContext { + private final ApplySargToMetadataContext applySargToMetadataContext; + public GetFileMetadataByExprContext(ApplySargToMetadataContext applySargToMetadataContext) { + this.applySargToMetadataContext = applySargToMetadataContext; + } + public ApplySargToMetadataContext getApplySargToMetadataContext() { + return applySargToMetadataContext; + } + } + /** * Same as RawStore.getFileMetadataByExpr. */ - public abstract void getFileMetadataByExpr(List fileIds, byte[] expr, - ByteBuffer[] metadatas, ByteBuffer[] results, boolean[] eliminated) throws IOException; + public abstract void getFileMetadataByExpr(GetFileMetadataByExprContext context, + List fileIds, byte[] expr, ByteBuffer[] metadatas, ByteBuffer[] results, + boolean[] eliminated) throws IOException; protected abstract FileMetadataExprType getType(); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index c6c1e11..56aa7e9 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6259,7 +6259,9 @@ public GetFileMetadataByExprResult get_file_metadata_by_expr(GetFileMetadataByEx ByteBuffer[] ppdResults = new ByteBuffer[fileIds.size()]; boolean[] eliminated = new boolean[fileIds.size()]; - getMS().getFileMetadataByExpr(fileIds, type, req.getExpr(), metadatas, ppdResults, eliminated); + // UNDONE: Need to pass readerTypes ... + getMS().getFileMetadataByExpr(null, fileIds, type, req.getExpr(), metadatas, ppdResults, + eliminated); for (int i = 0; i < fileIds.size(); ++i) { if (!eliminated[i] && ppdResults[i] == null) continue; // No metadata => no ppd. MetadataPpdResult mpr = new MetadataPpdResult(); @@ -6792,7 +6794,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, LOG.info("Starting DB backed MetaStore Server"); } } - + TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 5adfa02..bc7025c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -55,6 +55,7 @@ import javax.jdo.identity.IntIdentity; import com.google.common.collect.Maps; + import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -69,6 +70,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.FileMetadataHandler.GetFileMetadataByExprContext; import org.apache.hadoop.hive.metastore.MetaStoreDirectSql.SqlFilterForPushdown; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; @@ -8332,8 +8334,9 @@ public void putFileMetadata( } @Override - public void getFileMetadataByExpr(List fileIds, FileMetadataExprType type, byte[] expr, - ByteBuffer[] metadatas, ByteBuffer[] stripeBitsets, boolean[] eliminated) { + public void getFileMetadataByExpr(GetFileMetadataByExprContext context,List fileIds, + FileMetadataExprType type, byte[] expr, ByteBuffer[] metadatas, ByteBuffer[] stripeBitsets, + boolean[] eliminated) { throw new UnsupportedOperationException(); } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index bbd47b8..070790f 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.FileMetadataHandler.GetFileMetadataByExprContext; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -630,6 +631,7 @@ void putFileMetadata(List fileIds, List metadata, /** * Gets file metadata from cache after applying a format-specific expression that can * produce additional information based on file metadata and also filter the file list. + * @param context File format specific context. * @param fileIds List of file IDs from the filesystem. * @param expr Format-specific serialized expression applicable to the files' metadatas. * @param type Expression type; used to determine the class that handles the metadata. @@ -640,8 +642,9 @@ void putFileMetadata(List fileIds, List metadata, * @param eliminated Output parameter; fileIds-sized array to receive the indication of whether * the corresponding files are entirely eliminated by the expression. */ - void getFileMetadataByExpr(List fileIds, FileMetadataExprType type, byte[] expr, - ByteBuffer[] metadatas, ByteBuffer[] exprResults, boolean[] eliminated) + void getFileMetadataByExpr(GetFileMetadataByExprContext context, List fileIds, + FileMetadataExprType type, byte[] expr, ByteBuffer[] metadatas, ByteBuffer[] exprResults, + boolean[] eliminated) throws MetaException; /** Gets file metadata handler for the corresponding type. */ diff --git metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java index 3bca85d..ecc5781 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.List; +import org.apache.hadoop.hive.metastore.FileFormatProxy.ApplySargToMetadataContext; import org.apache.hadoop.hive.metastore.FileMetadataHandler; import org.apache.hadoop.hive.metastore.Metastore.SplitInfos; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -35,8 +36,9 @@ protected FileMetadataExprType getType() { } @Override - public void getFileMetadataByExpr(List fileIds, byte[] expr, - ByteBuffer[] metadatas, ByteBuffer[] results, boolean[] eliminated) throws IOException { + public void getFileMetadataByExpr(GetFileMetadataByExprContext context, List fileIds, + byte[] expr, ByteBuffer[] metadatas, ByteBuffer[] results, boolean[] eliminated) + throws IOException { SearchArgument sarg = getExpressionProxy().createSarg(expr); // For now, don't push anything into HBase, nor store anything special in HBase if (metadatas == null) { @@ -51,7 +53,9 @@ public void getFileMetadataByExpr(List fileIds, byte[] expr, ByteBuffer metadata = metadatas[i].duplicate(); // Duplicate to avoid modification. SplitInfos result = null; try { - result = getFileFormatProxy().applySargToMetadata(sarg, metadata); + result = + getFileFormatProxy().applySargToMetadata( + context.getApplySargToMetadataContext(), sarg, metadata); } catch (IOException ex) { LOG.error("Failed to apply SARG to metadata", ex); metadatas[i] = null; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index c65c7a4..04e4a5a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.FileMetadataHandler; +import org.apache.hadoop.hive.metastore.FileMetadataHandler.GetFileMetadataByExprContext; import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.PartFilterExprUtil; import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; @@ -2643,12 +2644,13 @@ public boolean isFileMetadataSupported() { } @Override - public void getFileMetadataByExpr(List fileIds, FileMetadataExprType type, byte[] expr, - ByteBuffer[] metadatas, ByteBuffer[] results, boolean[] eliminated) throws MetaException { + public void getFileMetadataByExpr(GetFileMetadataByExprContext context, List fileIds, + FileMetadataExprType type, byte[] expr, ByteBuffer[] metadatas, ByteBuffer[] results, + boolean[] eliminated) throws MetaException { FileMetadataHandler fmh = fmHandlers.get(type); boolean commit = true; try { - fmh.getFileMetadataByExpr(fileIds, expr, metadatas, results, eliminated); + fmh.getFileMetadataByExpr(context, fileIds, expr, metadatas, results, eliminated); } catch (IOException e) { LOG.error("Unable to get file metadata by expr", e); commit = false; diff --git metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 1ea72a0..bdb514d 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.FileMetadataHandler.GetFileMetadataByExprContext; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -800,8 +801,9 @@ public boolean isFileMetadataSupported() { @Override - public void getFileMetadataByExpr(List fileIds, FileMetadataExprType type, byte[] expr, - ByteBuffer[] metadatas, ByteBuffer[] stripeBitsets, boolean[] eliminated) { + public void getFileMetadataByExpr(GetFileMetadataByExprContext context, List fileIds, + FileMetadataExprType type, byte[] expr, ByteBuffer[] metadatas, ByteBuffer[] stripeBitsets, + boolean[] eliminated) { } @Override diff --git metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 3e6acc7..c7bd9f1 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.FileMetadataHandler.GetFileMetadataByExprContext; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -816,8 +817,9 @@ public boolean isFileMetadataSupported() { } @Override - public void getFileMetadataByExpr(List fileIds, FileMetadataExprType type, byte[] expr, - ByteBuffer[] metadatas, ByteBuffer[] stripeBitsets, boolean[] eliminated) { + public void getFileMetadataByExpr(GetFileMetadataByExprContext context, List fileIds, + FileMetadataExprType type, byte[] expr, ByteBuffer[] metadatas, ByteBuffer[] stripeBitsets, + boolean[] eliminated) { } @Override diff --git orc/src/java/org/apache/orc/impl/SchemaEvolution.java orc/src/java/org/apache/orc/impl/SchemaEvolution.java index a6c1d60..6388eed 100644 --- orc/src/java/org/apache/orc/impl/SchemaEvolution.java +++ orc/src/java/org/apache/orc/impl/SchemaEvolution.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,14 +35,20 @@ */ public class SchemaEvolution { private final Map readerToFile; + private final TypeDescription[] readerFileTypes; private final boolean[] included; private final TypeDescription readerSchema; + private boolean hasConversion; private static final Log LOG = LogFactory.getLog(SchemaEvolution.class); public SchemaEvolution(TypeDescription readerSchema, boolean[] included) { this.included = included; readerToFile = null; this.readerSchema = readerSchema; + + hasConversion = false; + + readerFileTypes = flattenReaderIncluded(); } public SchemaEvolution(TypeDescription fileSchema, @@ -55,12 +62,24 @@ public SchemaEvolution(TypeDescription fileSchema, this.readerSchema = readerSchema; } buildMapping(fileSchema, this.readerSchema); + + hasConversion = false; + + readerFileTypes = flattenReaderIncluded(); } public TypeDescription getReaderSchema() { return readerSchema; } + /** + * Is there Schema Evolution data type conversion? + * @return + */ + public boolean hasConversion() { + return hasConversion; + } + public TypeDescription getFileType(TypeDescription readerType) { TypeDescription result; if (readerToFile == null) { @@ -75,6 +94,15 @@ public TypeDescription getFileType(TypeDescription readerType) { return result; } + /** + * Get the file type by reader type id. + * @param readerType + * @return + */ + public TypeDescription getFileType(int id) { + return readerFileTypes[id]; + } + void buildMapping(TypeDescription fileType, TypeDescription readerType) throws IOException { // if the column isn't included, don't map it @@ -101,9 +129,16 @@ void buildMapping(TypeDescription fileType, case CHAR: case VARCHAR: // We do conversion when same CHAR/VARCHAR type but different maxLength. + if (fileType.getMaxLength() != readerType.getMaxLength()) { + hasConversion = true; + } break; case DECIMAL: // We do conversion when same DECIMAL type but different precision/scale. + if (fileType.getPrecision() != readerType.getPrecision() || + fileType.getScale() != readerType.getScale()) { + hasConversion = true; + } break; case UNION: case MAP: @@ -124,6 +159,9 @@ void buildMapping(TypeDescription fileType, // allow either side to have fewer fields than the other List fileChildren = fileType.getChildren(); List readerChildren = readerType.getChildren(); + if (fileChildren.size() != readerChildren.size()) { + hasConversion = true; + } int jointSize = Math.min(fileChildren.size(), readerChildren.size()); for(int i=0; i < jointSize; ++i) { buildMapping(fileChildren.get(i), readerChildren.get(i)); @@ -139,6 +177,7 @@ void buildMapping(TypeDescription fileType, */ isOk = ConvertTreeReaderFactory.canConvert(fileType, readerType); + hasConversion = true; } if (isOk) { readerToFile.put(readerType, fileType); @@ -151,6 +190,26 @@ void buildMapping(TypeDescription fileType, } } + TypeDescription[] flattenReaderIncluded() { + TypeDescription[] result = new TypeDescription[readerSchema.getMaximumId() + 1]; + flattenReaderIncludedRecurse(readerSchema, result); + return result; + } + + void flattenReaderIncludedRecurse(TypeDescription readerType, TypeDescription[] readerFileTypes) { + TypeDescription fileSchema = getFileType(readerType); + if (fileSchema == null) { + return; + } + readerFileTypes[readerType.getId()] = fileSchema; + List children = readerType.getChildren(); + if (children != null) { + for (TypeDescription child : children) { + flattenReaderIncludedRecurse(child, readerFileTypes); + } + } + } + private static boolean checkAcidSchema(TypeDescription type) { if (type.getCategory().equals(TypeDescription.Category.STRUCT)) { List rootFields = type.getFieldNames(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index dff1815..e790370 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -76,7 +76,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ColumnProjectionUtils.appendReadColumns( job, ts.getNeededColumnIDs(), ts.getNeededColumns()); // push down filters - HiveInputFormat.pushFilters(job, ts); + HiveInputFormat.pushFiltersAndMore(job, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java index 9035723..c800a21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java @@ -510,11 +510,12 @@ public void unregisterFunction(String functionName) throws HiveException { private void removePersistentFunctionUnderLock(FunctionInfo fi) { Class functionClass = getPermanentUdfClass(fi); Integer refCount = persistent.get(functionClass); - assert refCount != null; - if (refCount == 1) { - persistent.remove(functionClass); - } else { - persistent.put(functionClass, Integer.valueOf(refCount - 1)); + if (refCount != null) { + if (refCount == 1) { + persistent.remove(functionClass); + } else { + persistent.put(functionClass, Integer.valueOf(refCount - 1)); + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 57b6c67..83fdf6b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -209,9 +209,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, ColumnProjectionUtils.appendReadColumns( jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); // push down filters - HiveInputFormat.pushFilters(jobClone, ts); - - AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); + HiveInputFormat.pushFiltersAndMore(jobClone, ts); ts.passExecContext(getExecContext()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 23a13d6..d75899d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -469,9 +469,7 @@ private void initializeOperators(Map fetchOpJobConfMap) ColumnProjectionUtils.appendReadColumns( jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); // push down filters - HiveInputFormat.pushFilters(jobClone, ts); - - AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + HiveInputFormat.pushFiltersAndMore(jobClone, ts); // create a fetch operator FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 227a051..bbb4bb8 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -311,28 +311,33 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, return rr; } - protected void init(JobConf job) { - if (mrwork == null || pathToPartitionInfo == null) { - if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { - mrwork = (MapWork) Utilities.getMergeWork(job); - if (mrwork == null) { - mrwork = Utilities.getMapWork(job); - } - } else { - mrwork = Utilities.getMapWork(job); + public static MapWork getMapWork(JobConf jobConf) { + MapWork mwork; + if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + mwork = (MapWork) Utilities.getMergeWork(jobConf); + if (mwork == null) { + mwork = Utilities.getMapWork(jobConf); } + } else { + mwork = Utilities.getMapWork(jobConf); + } - // Prune partitions - if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") - && HiveConf.getBoolVar(job, HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) { - SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner(); - try { - pruner.prune(mrwork, job); - } catch (Exception e) { - throw new RuntimeException(e); - } + // Prune partitions + if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") + && HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) { + SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner(); + try { + pruner.prune(mwork, jobConf); + } catch (Exception e) { + throw new RuntimeException(e); } + } + return mwork; + } + protected void init(JobConf job) { + if (mrwork == null || pathToPartitionInfo == null) { + mrwork = getMapWork(job); pathToPartitionInfo = mrwork.getPathToPartitionInfo(); } } @@ -350,7 +355,7 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job Utilities.copyTablePropertiesToConf(table, conf); if (tableScan != null) { - pushFilters(conf, tableScan); + pushFiltersAndMore(conf, tableScan); } FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()])); @@ -442,7 +447,7 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job tableScan.getNeededColumnIDs(), tableScan.getNeededColumns()); pushDownProjection = true; // push down filters - pushFilters(newjob, tableScan); + pushFiltersAndMore(newjob, tableScan); } } else { if (LOG.isDebugEnabled()) { @@ -533,7 +538,13 @@ protected static PartitionDesc getPartitionDescFromPath( return partDesc; } - public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { + public static void pushFiltersAndMore(JobConf jobConf, TableScanOperator tableScan) { + + if (HiveConf.getBoolVar(jobConf, ConfVars.HIVE_ORC_READER_PROPERTIES_SET)) { + return; + } + + HiveConf.setBoolVar(jobConf, ConfVars.HIVE_ORC_READER_PROPERTIES_SET, true); // ensure filters are not set from previous pushFilters jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR); @@ -548,6 +559,8 @@ public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { Utilities.addTableSchemaToConf(jobConf, tableScan); + AcidUtils.setTransactionalTableScan(jobConf, tableScan.getConf().isAcidTable()); + // construct column name list and types for reference by filter push down Utilities.setColumnNameList(jobConf, tableScan); Utilities.setColumnTypeList(jobConf, tableScan); @@ -608,18 +621,15 @@ private static boolean isMatch(String splitPath, String key) { return splitPath.substring(index).equals(key) || splitPath.charAt(index+key.length()) == '/'; } - protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass, + protected static void pushProjectionsAndFilters(MapWork mWork, JobConf jobConf, String splitPath, String splitPathWithNoSchema, boolean nonNative) { - if (this.mrwork == null) { - init(job); - } - if(this.mrwork.getPathToAliases() == null) { + if(mWork.getPathToAliases() == null) { return; } ArrayList aliases = new ArrayList(); - Iterator>> iterator = this.mrwork + Iterator>> iterator = mWork .getPathToAliases().entrySet().iterator(); while (iterator.hasNext()) { @@ -648,7 +658,7 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass } for (String alias : aliases) { - Operator op = this.mrwork.getAliasToWork().get( + Operator op = mWork.getAliasToWork().get( alias); if (op instanceof TableScanOperator) { TableScanOperator ts = (TableScanOperator) op; @@ -656,10 +666,31 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass ColumnProjectionUtils.appendReadColumns( jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns()); // push down filters - pushFilters(jobConf, ts); - - AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + pushFiltersAndMore(jobConf, ts); } } } + + protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass, + String splitPath, String splitPathWithNoSchema, boolean nonNative) { + if (this.mrwork == null) { + init(job); + } + + pushProjectionsAndFilters(mrwork, jobConf, splitPath, splitPathWithNoSchema, + nonNative); + } + + public static void pushProjectionsAndFilters(MapWork mWork, JobConf jobConf, Path path) { + String pathString = path.toString(); + + boolean nonNative = false; + PartitionDesc part = mWork.getPathToPartitionInfo().get(pathString); + if ((part != null) && (part.getTableDesc() != null)) { + Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), jobConf); + nonNative = part.getTableDesc().isNonNative(); + } + + pushProjectionsAndFilters(mWork, jobConf, pathString, path.toUri().getPath(), nonNative); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java index 9299306..ea4ac74 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.orc.OrcProto; import org.apache.orc.impl.OrcTail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,12 +106,13 @@ public void configure(HiveConf queryConfig) { } @Override - public void getAndValidate(List files, boolean isOriginal, - OrcTail[] result, ByteBuffer[] ppdResult) throws IOException, HiveException { + public void getAndValidate(List files, List readerTypes, + boolean isOriginal, OrcTail[] result, ByteBuffer[] ppdResult) + throws IOException, HiveException { assert result.length == files.size(); assert ppdResult == null || ppdResult.length == files.size(); // First, check the local cache. - localCache.getAndValidate(files, isOriginal, result, ppdResult); + localCache.getAndValidate(files, readerTypes, isOriginal, result, ppdResult); // posMap is an unfortunate consequence of batching/iterating thru MS results. HashMap posMap = new HashMap(); @@ -124,7 +126,7 @@ public void getAndValidate(List files, boolean isOriginal, } if (serializedSarg != null) { Iterator> iter = cache.getFileMetadataByExpr( - fileIds, serializedSarg, false); // don't fetch the footer, PPD happens in MS. + fileIds, serializedSarg, readerTypes, false); // don't fetch the footer, PPD happens in MS. while (iter.hasNext()) { Entry e = iter.next(); int ix = getAndVerifyIndex(posMap, files, result, e.getKey()); @@ -322,7 +324,8 @@ public ByteBuffer get() { public interface ExternalFooterCachesByConf { public interface Cache { Iterator> getFileMetadataByExpr(List fileIds, - ByteBuffer serializedSarg, boolean doGetFooters) throws HiveException; + ByteBuffer serializedSarg, List readerTypes, boolean doGetFooters) + throws HiveException; void clearFileMetadata(List fileIds) throws HiveException; Iterator> getFileMetadata(List fileIds) throws HiveException; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java index 88b65dc..eebe820 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.orc.OrcProto; import org.apache.orc.impl.OrcTail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +65,7 @@ public OrcTail get(Path path) { @Override public void getAndValidate(final List files, - final boolean isOriginal, + List readerTypes, boolean isOriginal, final OrcTail[] result, final ByteBuffer[] ppdResult) throws IOException, HiveException { // TODO: should local cache also be by fileId? Preserve the original logic for now. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java index ad8f4ef..9af7113 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.orc.OrcProto; /** * An implementation of external cache and factory based on metastore. @@ -45,7 +46,8 @@ public HBaseCache(Hive hive) { @Override public Iterator> getFileMetadataByExpr( - List fileIds, ByteBuffer sarg, boolean doGetFooters) throws HiveException { + List fileIds, ByteBuffer sarg, List readerTypes, boolean doGetFooters) + throws HiveException { return hive.getFileMetadataByExpr(fileIds, sarg, doGetFooters).iterator(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java index 6d9f653..f1d54ef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java @@ -24,13 +24,17 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.FileFormatProxy; +import org.apache.hadoop.hive.metastore.FileFormatProxy.ApplySargToMetadataContext; import org.apache.hadoop.hive.metastore.Metastore.SplitInfo; import org.apache.hadoop.hive.metastore.Metastore.SplitInfos; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.OrcTail; +import org.apache.orc.impl.SchemaEvolution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,20 +42,54 @@ public class OrcFileFormatProxy implements FileFormatProxy { private static final Logger LOG = LoggerFactory.getLogger(OrcFileFormatProxy.class); + public static class OrcApplySargToMetadataContext extends ApplySargToMetadataContext { + private final List readerTypes; + public OrcApplySargToMetadataContext(List readerTypes) { + super(); + this.readerTypes = readerTypes; + } + public List getReaderTypes() { + return readerTypes; + } + } + @Override - public SplitInfos applySargToMetadata( - SearchArgument sarg, ByteBuffer fileMetadata) throws IOException { + public SplitInfos applySargToMetadata(ApplySargToMetadataContext context, + SearchArgument sarg, ByteBuffer fileMetadata) + throws IOException { // TODO: ideally we should store shortened representation of only the necessary fields // in HBase; it will probably require custom SARG application code. OrcTail orcTail = ReaderImpl.extractFileTail(fileMetadata); OrcProto.Footer footer = orcTail.getFooter(); int stripeCount = footer.getStripesCount(); + SplitInfos.Builder sb = SplitInfos.newBuilder(); + List stripes = orcTail.getStripes(); + + OrcApplySargToMetadataContext orcContext = (OrcApplySargToMetadataContext) context; + List readerTypes = orcContext.getReaderTypes(); + + // Before we do PPD, we need to see if this file has Schema Evolution. + if (readerTypes != null) { + TypeDescription readerSchema = OrcUtils.convertTypeFromProtobuf(readerTypes, 0); + TypeDescription fileSchema = OrcUtils.convertTypeFromProtobuf(footer.getTypesList(), 0); + // UNDONE: Pass in included, too? + SchemaEvolution evolution = new SchemaEvolution(fileSchema, readerSchema, null); + if (evolution.hasConversion()) { + + // We currently do not support PPD when a column has changed type. + for (int i = 0; i < stripeCount; i++) { + StripeInformation si = stripes.get(i); + sb.addInfos(SplitInfo.newBuilder().setIndex(i) + .setOffset(si.getOffset()).setLength(si.getLength())); + } + return sb.build(); + } + } + boolean[] result = OrcInputFormat.pickStripesViaTranslatedSarg( sarg, orcTail.getWriterVersion(), footer.getTypesList(), orcTail.getStripeStatistics(), stripeCount); // For ORC case, send the boundaries of the stripes so we don't have to send the footer. - SplitInfos.Builder sb = SplitInfos.newBuilder(); - List stripes = orcTail.getStripes(); boolean isEliminated = true; for (int i = 0; i < result.length; ++i) { if (result != null && !result[i]) continue; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 69d58d6..23ec2fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.orc; import org.apache.orc.impl.InStream; +import org.apache.orc.impl.SchemaEvolution; import java.io.IOException; import java.nio.ByteBuffer; @@ -98,6 +99,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -655,19 +657,21 @@ public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo, private final FileSystem fs; private final HdfsFileStatusWithId fileWithId; private final OrcTail orcTail; + private final List readerTypes; private final boolean isOriginal; private final List deltas; private final boolean hasBase; private final ByteBuffer ppdResult; SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, OrcTail orcTail, - boolean isOriginal, List deltas, boolean hasBase, Path dir, - boolean[] covered, ByteBuffer ppdResult) throws IOException { + List readerTypes, boolean isOriginal, List deltas, + boolean hasBase, Path dir, boolean[] covered, ByteBuffer ppdResult) throws IOException { super(dir, context.numBuckets, deltas, covered); this.context = context; this.fs = fs; this.fileWithId = fileWithId; this.orcTail = orcTail; + this.readerTypes = readerTypes; this.isOriginal = isOriginal; this.deltas = deltas; this.hasBase = hasBase; @@ -676,10 +680,10 @@ public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo, @VisibleForTesting public SplitInfo(Context context, FileSystem fs, FileStatus fileStatus, OrcTail orcTail, - boolean isOriginal, ArrayList deltas, boolean hasBase, Path dir, - boolean[] covered) throws IOException { + List readerTypes, boolean isOriginal, ArrayList deltas, + boolean hasBase, Path dir, boolean[] covered) throws IOException { this(context, fs, AcidUtils.createOriginalObj(null, fileStatus), - orcTail, isOriginal, deltas, hasBase, dir, covered, null); + orcTail, readerTypes, isOriginal, deltas, hasBase, dir, covered, null); } } @@ -707,6 +711,7 @@ public ETLDir(Path dir, FileSystem fs, int fileCount) { private final List deltas; private final boolean[] covered; final boolean isOriginal; + final List readerTypes; // References to external fields for async SplitInfo generation. private List>> splitFuturesRef = null; private List splitsRef = null; @@ -714,13 +719,14 @@ public ETLDir(Path dir, FileSystem fs, int fileCount) { private final boolean allowSyntheticFileIds; public ETLSplitStrategy(Context context, FileSystem fs, Path dir, - List children, boolean isOriginal, List deltas, - boolean[] covered, UserGroupInformation ugi, boolean allowSyntheticFileIds) { + List children, List readerTypes, boolean isOriginal, + List deltas, boolean[] covered, UserGroupInformation ugi, boolean allowSyntheticFileIds) { assert !children.isEmpty(); this.context = context; this.dirs = Lists.newArrayList(new ETLDir(dir, fs, children.size())); this.files = children; this.isOriginal = isOriginal; + this.readerTypes = readerTypes; this.deltas = deltas; this.covered = covered; this.ugi = ugi; @@ -740,7 +746,7 @@ public ETLSplitStrategy(Context context, FileSystem fs, Path dir, ppdResults = new ByteBuffer[files.size()]; } try { - cache.getAndValidate(files, isOriginal, orcTails, ppdResults); + cache.getAndValidate(files, readerTypes, isOriginal, orcTails, ppdResults); } catch (HiveException e) { throw new IOException(e); } @@ -760,7 +766,7 @@ public ETLSplitStrategy(Context context, FileSystem fs, Path dir, } // Ignore files eliminated by PPD, or of 0 length. if (ppdResult != FooterCache.NO_SPLIT_AFTER_PPD && file.getFileStatus().getLen() > 0) { - result.add(new SplitInfo(context, dir.fs, file, orcTail, + result.add(new SplitInfo(context, dir.fs, file, orcTail, readerTypes, isOriginal, deltas, true, dir.dir, covered, ppdResult)); } } @@ -774,7 +780,7 @@ public ETLSplitStrategy(Context context, FileSystem fs, Path dir, } // ignore files of 0 length if (file.getFileStatus().getLen() > 0) { - result.add(new SplitInfo(context, dir.fs, file, null, + result.add(new SplitInfo(context, dir.fs, file, null, readerTypes, isOriginal, deltas, true, dir.dir, covered, null)); } } @@ -1061,10 +1067,11 @@ private AcidDirInfo callInternal() throws IOException { private final long blockSize; private final TreeMap locations; private OrcTail orcTail; + private final List readerTypes; private List stripes; private List stripeStats; - private List types; - private boolean[] includedCols; + private List fileTypes; + private boolean[] readerIncluded; private final boolean isOriginal; private final List deltas; private final boolean hasBase; @@ -1074,6 +1081,7 @@ private AcidDirInfo callInternal() throws IOException { private final ByteBuffer ppdResult; private final UserGroupInformation ugi; private final boolean allowSyntheticFileIds; + private SchemaEvolution evolution; public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi, boolean allowSyntheticFileIds) throws IOException { @@ -1084,6 +1092,7 @@ public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi, this.fsFileId = splitInfo.fileWithId.getFileId(); this.blockSize = this.file.getBlockSize(); this.orcTail = splitInfo.orcTail; + this.readerTypes = splitInfo.readerTypes; // TODO: potential DFS call this.locations = SHIMS.getLocationsWithOffset(fs, file); this.isOriginal = splitInfo.isOriginal; @@ -1255,7 +1264,9 @@ public String toString() { // We can't eliminate stripes if there are deltas because the // deltas may change the rows making them match the predicate. if ((deltas == null || deltas.isEmpty()) && context.sarg != null) { - String[] colNames = extractNeededColNames(types, context.conf, includedCols, isOriginal); + String[] colNames = + extractNeededColNames((readerTypes == null ? fileTypes : readerTypes), + context.conf, readerIncluded, isOriginal); if (colNames == null) { LOG.warn("Skipping split elimination for {} as column names is null", file.getPath()); } else { @@ -1374,29 +1385,51 @@ private void populateAndCacheStripeDetails() throws IOException { } stripes = orcTail.getStripes(); stripeStats = orcTail.getStripeStatistics(); - types = orcTail.getTypes(); + fileTypes = orcTail.getTypes(); + TypeDescription fileSchema = OrcUtils.convertTypeFromProtobuf(fileTypes, 0); + if (readerTypes == null) { + readerIncluded = genIncludedColumns(fileTypes, context.conf, isOriginal); + evolution = new SchemaEvolution(fileSchema, readerIncluded); + } else { + readerIncluded = genIncludedColumns(readerTypes, context.conf, isOriginal); + TypeDescription readerSchema = OrcUtils.convertTypeFromProtobuf(readerTypes, 0); + String neededColumnNames = getNeededColumnNamesString(context.conf); + evolution = new SchemaEvolution(fileSchema, readerSchema, readerIncluded); + } writerVersion = orcTail.getWriterVersion(); - includedCols = genIncludedColumns(types, context.conf, isOriginal); List fileColStats = orcTail.getFooter().getStatisticsList(); - projColsUncompressedSize = computeProjectionSize(types, fileColStats, includedCols, + boolean[] fileIncluded; + if (readerTypes == null) { + fileIncluded = readerIncluded; + } else { + fileIncluded = new boolean[fileTypes.size()]; + final int readerSchemaSize = readerTypes.size(); + for (int i = 0; i < readerSchemaSize; i++) { + TypeDescription fileType = evolution.getFileType(i); + if (fileType != null) { + fileIncluded[fileType.getId()] = true; + } + } + } + projColsUncompressedSize = computeProjectionSize(fileTypes, fileColStats, fileIncluded, isOriginal); if (!context.footerInSplits) { orcTail = null; } } - private long computeProjectionSize(List types, - List stats, boolean[] includedCols, boolean isOriginal) { + private long computeProjectionSize(List fileTypes, + List stats, boolean[] fileIncluded, boolean isOriginal) { final int rootIdx = getRootColumn(isOriginal); List internalColIds = Lists.newArrayList(); - if (includedCols != null) { - for (int i = 0; i < includedCols.length; i++) { - if (includedCols[i]) { + if (fileIncluded != null) { + for (int i = 0; i < fileIncluded.length; i++) { + if (fileIncluded[i]) { internalColIds.add(rootIdx + i); } } } - return ReaderImpl.getRawDataSizeFromColIndices(internalColIds, types, stats); + return ReaderImpl.getRawDataSizeFromColIndices(internalColIds, fileTypes, stats); } } @@ -1431,6 +1464,31 @@ private long computeProjectionSize(List types, pathFutures.add(ecs.submit(fileGenerator)); } + LOG.info("*** DEBUG *** generateSplitsInfo paths " + Arrays.toString(paths)); + boolean isOrcReaderPropertiesSet = + HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_READER_PROPERTIES_SET); + if (!isOrcReaderPropertiesSet) { + JobConf jobConf = (JobConf) conf; + MapWork mWork = HiveInputFormat.getMapWork(jobConf); + HiveInputFormat.pushProjectionsAndFilters(mWork, jobConf, paths[0]); + isOrcReaderPropertiesSet = + HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_READER_PROPERTIES_SET); + } + boolean isTransactionalTableScan = + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION); + TypeDescription readerSchema = + OrcInputFormat.getDesiredRowTypeDescr(conf, isTransactionalTableScan, Integer.MAX_VALUE); + LOG.info("*** DEBUG *** generateSplitsInfo isOrcReaderPropertiesSet " + isOrcReaderPropertiesSet); + LOG.info("*** DEBUG *** generateSplitsInfo isTransactionalTableScan " + isTransactionalTableScan); + LOG.info("*** DEBUG *** generateSplitsInfo isSchemaEvolution " + isSchemaEvolution); + LOG.info("*** DEBUG *** generateSplitsInfo readerSchema " + + (readerSchema == null ? "NULL" : readerSchema.toString())); + List readerTypes = null; + if (readerSchema != null) { + readerTypes = OrcUtils.getOrcTypes(readerSchema); + } + // complete path futures and schedule split generation try { CombinedCtx combinedCtx = (context.splitStrategyBatchMs > 0) ? new CombinedCtx() : null; @@ -1459,7 +1517,8 @@ private long computeProjectionSize(List types, // We have received a new directory information, make a split strategy. --resultsLeft; SplitStrategy splitStrategy = determineSplitStrategy(combinedCtx, context, adi.fs, - adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, ugi, allowSyntheticFileIds); + adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, readerTypes, ugi, + allowSyntheticFileIds); if (splitStrategy == null) continue; // Combined. if (isDebugEnabled) { @@ -1537,14 +1596,16 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte private static SplitStrategy combineOrCreateETLStrategy(CombinedCtx combinedCtx, Context context, FileSystem fs, Path dir, List files, - List deltas, boolean[] covered, boolean isOriginal, - UserGroupInformation ugi, boolean allowSyntheticFileIds) { + List deltas, boolean[] covered, List readerTypes, + boolean isOriginal, UserGroupInformation ugi, boolean allowSyntheticFileIds) { if (!deltas.isEmpty() || combinedCtx == null) { return new ETLSplitStrategy( - context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds); + context, fs, dir, files, readerTypes, isOriginal, deltas, covered, ugi, + allowSyntheticFileIds); } else if (combinedCtx.combined == null) { combinedCtx.combined = new ETLSplitStrategy( - context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds); + context, fs, dir, files, readerTypes, isOriginal, deltas, covered, ugi, + allowSyntheticFileIds); combinedCtx.combineStartUs = System.nanoTime(); return null; } else { @@ -1554,11 +1615,13 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte case YES: return null; case NO_AND_CONTINUE: return new ETLSplitStrategy( - context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds); + context, fs, dir, files, readerTypes, isOriginal, deltas, covered, ugi, + allowSyntheticFileIds); case NO_AND_SWAP: { ETLSplitStrategy oldBase = combinedCtx.combined; combinedCtx.combined = new ETLSplitStrategy( - context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds); + context, fs, dir, files, readerTypes, isOriginal, deltas, covered, ugi, + allowSyntheticFileIds); combinedCtx.combineStartUs = System.nanoTime(); return oldBase; } @@ -1868,8 +1931,8 @@ private static boolean isStripeSatisfyPredicate( @VisibleForTesting static SplitStrategy determineSplitStrategy(CombinedCtx combinedCtx, Context context, FileSystem fs, Path dir, AcidUtils.Directory dirInfo, - List baseOrOriginalFiles, UserGroupInformation ugi, - boolean allowSyntheticFileIds) { + List baseOrOriginalFiles, List readerTypes, + UserGroupInformation ugi, boolean allowSyntheticFileIds) { Path base = dirInfo.getBaseDirectory(); List original = dirInfo.getOriginalFiles(); List deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); @@ -1902,12 +1965,12 @@ private static boolean isStripeSatisfyPredicate( case ETL: // ETL strategy requested through config return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles, - deltas, covered, isOriginal, ugi, allowSyntheticFileIds); + deltas, covered, readerTypes, isOriginal, ugi, allowSyntheticFileIds); default: // HYBRID strategy if (avgFileSize > context.maxSize || totalFiles <= context.etlFileThreshold) { return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles, - deltas, covered, isOriginal, ugi, allowSyntheticFileIds); + deltas, covered, readerTypes, isOriginal, ugi, allowSyntheticFileIds); } else { return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered, allowSyntheticFileIds); @@ -1950,8 +2013,9 @@ private static boolean isStripeSatisfyPredicate( public interface FooterCache { ByteBuffer NO_SPLIT_AFTER_PPD = ByteBuffer.wrap(new byte[0]); - void getAndValidate(List files, boolean isOriginal, - OrcTail[] result, ByteBuffer[] ppdResult) throws IOException, HiveException; + void getAndValidate(List files, List readerTypes, + boolean isOriginal, OrcTail[] result, ByteBuffer[] ppdResult) + throws IOException, HiveException; boolean hasPpd(); boolean isBlocking(); void put(FooterCacheKey cacheKey, OrcTail orcTail) throws IOException; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 3c89dd6..cddd8d5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -768,7 +768,7 @@ public void testEtlCombinedStrategy() throws Exception { MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException { OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path); return OrcInputFormat.determineSplitStrategy(combineCtx, context, - adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, true); + adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, null, true); } public OrcInputFormat.AcidDirInfo createAdi( @@ -781,7 +781,7 @@ public void testEtlCombinedStrategy() throws Exception { OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException { OrcInputFormat.AcidDirInfo adi = gen.call(); return OrcInputFormat.determineSplitStrategy( - null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, true); + null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, null, true); } public static class MockBlock { @@ -1389,7 +1389,7 @@ public void testAddSplit() throws Exception { OrcInputFormat.Context context = new OrcInputFormat.Context(conf); OrcInputFormat.SplitGenerator splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, - fs.getFileStatus(new Path("/a/file")), null, true, + fs.getFileStatus(new Path("/a/file")), null, null, true, new ArrayList(), true, null, null), null, true); OrcSplit result = splitter.createSplit(0, 200, null); assertEquals(0, result.getStart()); @@ -1430,7 +1430,7 @@ public void testSplitGenerator() throws Exception { OrcInputFormat.Context context = new OrcInputFormat.Context(conf); OrcInputFormat.SplitGenerator splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, - fs.getFileStatus(new Path("/a/file")), null, true, + fs.getFileStatus(new Path("/a/file")), null, null, true, new ArrayList(), true, null, null), null, true); List results = splitter.call(); OrcSplit result = results.get(0); @@ -1453,7 +1453,7 @@ public void testSplitGenerator() throws Exception { HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, 0); context = new OrcInputFormat.Context(conf); splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, - fs.getFileStatus(new Path("/a/file")), null, true, + fs.getFileStatus(new Path("/a/file")), null, null, true, new ArrayList(), true, null, null), null, true); results = splitter.call(); for(int i=0; i < stripeSizes.length; ++i) { @@ -1481,7 +1481,7 @@ public void testProjectedColumnSize() throws Exception { OrcInputFormat.Context context = new OrcInputFormat.Context(conf); OrcInputFormat.SplitGenerator splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, - fs.getFileStatus(new Path("/a/file")), null, true, + fs.getFileStatus(new Path("/a/file")), null, null, true, new ArrayList(), true, null, null), null, true); List results = splitter.call(); OrcSplit result = results.get(0); @@ -1503,7 +1503,7 @@ public void testProjectedColumnSize() throws Exception { HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, 0); context = new OrcInputFormat.Context(conf); splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, - fs.getFileStatus(new Path("/a/file")), null, true, + fs.getFileStatus(new Path("/a/file")), null, null, true, new ArrayList(), true, null, null), null, true); results = splitter.call(); @@ -1523,7 +1523,7 @@ public void testProjectedColumnSize() throws Exception { HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, 100000); context = new OrcInputFormat.Context(conf); splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, - fs.getFileStatus(new Path("/a/file")), null, true, + fs.getFileStatus(new Path("/a/file")), null, null, true, new ArrayList(), true, null, null), null, true); results = splitter.call(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java index 62a0ab0..d8f7fba 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java @@ -63,6 +63,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.orc.OrcProto; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -432,7 +433,8 @@ public Cache getCache(HiveConf conf) throws IOException { @Override public Iterator> getFileMetadataByExpr( - List fileIds, ByteBuffer sarg, boolean doGetFooters) throws HiveException { + List fileIds, ByteBuffer sarg, List readerTypes, boolean doGetFooters) + throws HiveException { getByExprCount.incrementAndGet(); ByteBuffer[] metadatas = new ByteBuffer[fileIds.size()]; ByteBuffer[] ppdResults = new ByteBuffer[fileIds.size()]; @@ -440,7 +442,8 @@ public Cache getCache(HiveConf conf) throws IOException { try { byte[] bb = new byte[sarg.remaining()]; System.arraycopy(sarg.array(), sarg.arrayOffset(), bb, 0, sarg.remaining()); - handler.getFileMetadataByExpr(fileIds, bb, metadatas, ppdResults, eliminated); + // UNDONE: Add the context to MetadataPpdResult thrift ... + handler.getFileMetadataByExpr(null, fileIds, bb, metadatas, ppdResults, eliminated); } catch (IOException e) { throw new HiveException(e); }