diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1b6aff0..916d16c 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -915,6 +915,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_SCHEMA_EVOLUTION("hive.exec.schema.evolution", false, "Use schema evolution to convert self-describing file format's data to the schema desired by the reader."), + HIVE_TRANSACTIONAL_TABLE_SCAN("hive.transactional.table.scan", false, + "Do transaction (ACID) table scan."), + HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0, "A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."), diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index bc56d77..ef7aa48 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; @@ -130,6 +131,8 @@ public void configureInputJobProperties(TableDesc tableDesc, jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesSb.toString()); jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, typeNamesSb.toString()); + boolean isAcidTable = AcidUtils.isTablePropertyTransactional(tableProperties); + AcidUtils.setTransactionalTableScan(jobProperties, isAcidTable); } } catch (IOException e) { throw new IllegalStateException("Failed to set output path", e); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index ff2598f..bde78e4 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -55,10 +56,12 @@ import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.orc.FileDump; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; @@ -464,8 +467,9 @@ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int JobConf job = new JobConf(); job.set("mapred.input.dir", partitionPath.toString()); job.set("bucket_count", Integer.toString(buckets)); - job.set("columns", "id,msg"); - job.set("columns.types", "bigint:string"); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg"); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); + job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true"); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inf.getSplits(job, buckets); Assert.assertEquals(buckets, splits.length); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 59e9674..d73fcbf 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -8,6 +8,7 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -19,6 +20,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -27,6 +29,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; @@ -949,8 +952,9 @@ public long getHighWatermark() { OrcInputFormat aif = new OrcInputFormat(); Configuration conf = new Configuration(); - conf.set("columns", columnNamesProperty); - conf.set("columns.types", columnTypesProperty); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesProperty); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypesProperty); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); AcidInputFormat.RawReader reader = aif.getRawReader(conf, false, bucket, txnList, base, deltas); RecordIdentifier identifier = reader.createKey(); diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 7acca77..7c65bef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -509,7 +509,8 @@ "schema.evolution.columns / schema.evolution.columns.types " + "nor the " + "columns / columns.types " + - "are set. Table schema information is required to read ACID tables") + "are set. Table schema information is required to read ACID tables"), + ACID_TABLES_MUST_BE_READ_WITH_ACID_READER(30021, "An ORC ACID reader required to read ACID tables") ; private int errorCode; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 31aa3dc..b859c5f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; @@ -75,6 +76,8 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { job, ts.getNeededColumnIDs(), ts.getNeededColumns()); // push down filters HiveInputFormat.pushFilters(job, ts); + + AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); } sink = work.getSink(); fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 390a12e..9fc5e07 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; @@ -197,6 +198,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, // push down filters HiveInputFormat.pushFilters(jobClone, ts); + AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); ts.passExecContext(getExecContext()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index a5c1463..a07f822 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; @@ -432,6 +433,8 @@ private void initializeOperators(Map fetchOpJobConfMap) // push down filters HiveInputFormat.pushFilters(jobClone, ts); + AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + // create a fetch operator FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); fetchOpJobConfMap.put(fetchOp, jobClone); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 99c4435..f1ff24c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -26,7 +26,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; @@ -599,4 +603,34 @@ public static boolean isTablePropertyTransactional(Map parameter } return resultStr != null && resultStr.equalsIgnoreCase("true"); } + + public static boolean isTablePropertyTransactional(Configuration conf) { + String resultStr = conf.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (resultStr == null) { + resultStr = conf.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return resultStr != null && resultStr.equalsIgnoreCase("true"); + } + + public static void setTransactionalTableScan(Map parameters, boolean isAcidTable) { + parameters.put(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, Boolean.toString(isAcidTable)); + } + + public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) { + HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable); + } + + // If someone is trying to read a table with transactional=true they must be using the + // right TxnManager. We do not look at SessionState.get().getTxnMgr().supportsAcid(). + public static boolean isAcidTable(Table table) { + if (table == null) { + return false; + } + String tableIsTransactional = + table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if(tableIsTransactional == null) { + tableIsTransactional = table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 181ce84..05239e6 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner; @@ -586,6 +587,8 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns()); // push down filters pushFilters(jobConf, ts); + + AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index d81a12d..c7e106b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -145,6 +145,35 @@ public boolean shouldSkipCombine(Path path, return (conf.get(AcidUtils.CONF_ACID_KEY) != null) || AcidUtils.isAcid(path, conf); } + + /** + * We can derive if a split is ACID or not from the flags encoded in OrcSplit. + * If the file split is not instance of OrcSplit then its definitely not ACID. + * If file split is instance of OrcSplit and the flags contain hasBase or deltas then it's + * definitely ACID. + * Else fallback to configuration object/table property. + * @param conf + * @param inputSplit + * @return + */ + public boolean isAcidRead(Configuration conf, InputSplit inputSplit) { + if (!(inputSplit instanceof OrcSplit)) { + return false; + } + + /* + * If OrcSplit.isAcid returns true, we know for sure it is ACID. + */ + // if (((OrcSplit) inputSplit).isAcid()) { + // return true; + // } + + /* + * Fallback for the case when OrcSplit flags do not contain hasBase and deltas + */ + return HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + } + private static class OrcRecordReader implements org.apache.hadoop.mapred.RecordReader, StatsProvidingRecordReader { @@ -227,13 +256,15 @@ public static RecordReader createReaderFromFile(Reader file, long offset, long length ) throws IOException { + boolean isTransactionalTableScan = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + if (isTransactionalTableScan) { + throw new IOException(ErrorMsg.ACID_TABLES_MUST_BE_READ_WITH_ACID_READER.getErrorCodedMsg()); + } + /** * Do we have schema on read in the configuration variables? - * - * NOTE: This code path is NOT used by ACID. OrcInputFormat.getRecordReader intercepts for - * ACID tables creates raw record merger, etc. */ - TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ false); + TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcidRead */ false); Reader.Options options = new Reader.Options().range(offset, length); options.schema(schema); @@ -1150,16 +1181,16 @@ private static void cancelFutures(List> futures) { getRecordReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { boolean vectorMode = Utilities.isVectorMode(conf); + boolean isAcidRead = isAcidRead(conf, inputSplit); - // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits, - // we know it is not ACID. (see a check in CombineHiveInputFormat.getSplits() that assures this) - if (inputSplit.getClass() == FileSplit.class) { + if (!isAcidRead) { if (vectorMode) { return createVectorizedReader(inputSplit, conf, reporter); + } else { + return new OrcRecordReader(OrcFile.createReader( + ((FileSplit) inputSplit).getPath(), + OrcFile.readerOptions(conf)), conf, (FileSplit) inputSplit); } - return new OrcRecordReader(OrcFile.createReader( - ((FileSplit) inputSplit).getPath(), - OrcFile.readerOptions(conf)), conf, (FileSplit) inputSplit); } OrcSplit split = (OrcSplit) inputSplit; @@ -1168,23 +1199,13 @@ private static void cancelFutures(List> futures) { Options options = new Options(conf).reporter(reporter); final RowReader inner = getReader(inputSplit, options); - - /*Even though there are no delta files, we still need to produce row ids so that an - * UPDATE or DELETE statement would work on a table which didn't have any previous updates*/ - if (split.isOriginal() && split.getDeltas().isEmpty()) { - if (vectorMode) { - return createVectorizedReader(inputSplit, conf, reporter); - } else { - return new NullKeyRecordReader(inner, conf); - } - } - if (vectorMode) { return (org.apache.hadoop.mapred.RecordReader) new VectorizedOrcAcidRowReader(inner, conf, Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit); + } else { + return new NullKeyRecordReader(inner, conf); } - return new NullKeyRecordReader(inner, conf); } /** * Return a RecordReader that is compatible with the Hive 0.12 reader @@ -1241,6 +1262,7 @@ public float getProgress() throws IOException { public RowReader getReader(InputSplit inputSplit, Options options) throws IOException { + final OrcSplit split = (OrcSplit) inputSplit; final Path path = split.getPath(); Path root; @@ -1260,10 +1282,7 @@ public float getProgress() throws IOException { /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ true); - if (schema == null) { - throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); - } + TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcidRead */ true); final Reader reader; final int bucket; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index bad2a4c..58bac6b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -440,10 +440,7 @@ private void discoverKeyBounds(Reader reader, this.length = options.getLength(); this.validTxnList = validTxnList; - TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ true); - if (typeDescr == null) { - throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); - } + TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcidRead */ true); objectInspector = OrcRecordUpdater.createEventSchema (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 8cf4cc0..3e74df5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -156,6 +156,15 @@ public boolean hasBase() { return deltas; } + /** + * If this method returns true, then for sure it is ACID. + * However, if it returns false.. it could be ACID or non-ACID. + * @return + */ + public boolean isAcid() { + return hasBase || deltas.size() > 0; + } + public long getProjectedColumnsUncompressedSize() { return projColsUncompressedSize; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java index 84fd3c3..ea97e06 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -34,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; @@ -690,7 +692,8 @@ public static int appendOrcTypesRebuildSubtypes(List result, return columnId; } - public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcid) { + public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcid) + throws IOException { String columnNameProperty = null; String columnTypeProperty = null; @@ -718,6 +721,8 @@ public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean haveSchemaEvolutionProperties = false; } } + } else if (isAcid) { + throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index d90425a..29dbaa0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -26,10 +26,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.io.NullWritable; @@ -61,18 +64,15 @@ VectorizedOrcRecordReader(Reader file, Configuration conf, FileSplit fileSplit) throws IOException { - // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits, - // we know it is not ACID. (see a check in CombineHiveInputFormat.getSplits() that assures this). - // - // Why would an ACID table reach here instead of VectorizedOrcAcidRowReader? - // OrcInputFormat.getRecordReader will use this reader for original files that have no deltas. - // - boolean isAcid = (fileSplit instanceof OrcSplit); + boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + if (isAcidRead) { + throw new IOException(ErrorMsg.ACID_TABLES_MUST_BE_READ_WITH_ACID_READER.getErrorCodedMsg()); + } /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, isAcid); + TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcidRead */ false); List types = file.getTypes(); Reader.Options options = new Reader.Options(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java index a090a5b..f2bee21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java @@ -383,7 +383,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if(stack.get(0) instanceof TableScanOperator) { TableScanOperator tso = ((TableScanOperator)stack.get(0)); - if(SemanticAnalyzer.isAcidTable(tso.getConf().getTableMetadata())) { + if(AcidUtils.isAcidTable(tso.getConf().getTableMetadata())) { /*ACID tables have complex directory layout and require merging of delta files * on read thus we should not try to read bucket files directly*/ return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 789a493..dff8ccd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1588,7 +1588,7 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException } // Disallow INSERT INTO on bucketized tables - boolean isAcid = isAcidTable(tab); + boolean isAcid = AcidUtils.isAcidTable(tab); boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName()); if (isTableWrittenTo && tab.getNumBuckets() > 0 && !isAcid) { @@ -6123,7 +6123,7 @@ private Operator genBucketingSortingDest(String dest, Operator input, QB qb, order.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? '+' : '-'); } input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), maxReducers, - (isAcidTable(dest_tab) ? getAcidType() : AcidUtils.Operation.NOT_ACID)); + (AcidUtils.isAcidTable(dest_tab) ? getAcidType() : AcidUtils.Operation.NOT_ACID)); reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0)); ctx.setMultiFileSpray(multiFileSpray); ctx.setNumFiles(numFiles); @@ -6209,7 +6209,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) case QBMetaData.DEST_TABLE: { dest_tab = qbm.getDestTableForAlias(dest); - destTableIsAcid = isAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isAcidTable(dest_tab); destTableIsTemporary = dest_tab.isTemporary(); // Is the user trying to insert into a external tables @@ -6367,7 +6367,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_part = qbm.getDestPartitionForAlias(dest); dest_tab = dest_part.getTable(); - destTableIsAcid = isAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isAcidTable(dest_tab); if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) && dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE)) { throw new SemanticException( @@ -12220,19 +12220,6 @@ else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : WriteEntity.WriteType.INSERT); } - // Even if the table is of Acid type, if we aren't working with an Acid compliant TxnManager - // then return false. - public static boolean isAcidTable(Table tab) { - if (tab == null) return false; - if (!SessionState.get().getTxnMgr().supportsAcid()) return false; - String tableIsTransactional = - tab.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); - if(tableIsTransactional == null) { - tableIsTransactional = tab.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); - } - return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); - } - private boolean isAcidOutputFormat(Class of) { Class[] interfaces = of.getInterfaces(); for (Class iface : interfaces) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 1e7e617..2c2d745 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -24,9 +24,9 @@ import java.util.Map; import org.apache.hadoop.hive.ql.exec.PTFUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.TableSample; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -98,6 +98,8 @@ private boolean isMetadataOnly = false; + private boolean isAcidTable; + private transient TableSample tableSample; private transient Table tableMetadata; @@ -120,6 +122,7 @@ public TableScanDesc(final String alias, List vcs, Table tblMetad this.alias = alias; this.virtualCols = vcs; this.tableMetadata = tblMetadata; + isAcidTable = AcidUtils.isAcidTable(this.tableMetadata); } @Override @@ -134,7 +137,7 @@ public String getAlias() { } public boolean isAcidTable() { - return SemanticAnalyzer.isAcidTable(this.tableMetadata); + return isAcidTable; } @Explain(displayName = "filterExpr") diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index b9eec92..3c38b74 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -62,9 +62,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy; import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger.MyRow; @@ -193,6 +195,13 @@ public String toString() { return builder.toString(); } + + static String getColumnNamesProperty() { + return "booleanValue,byteValue,shortValue,intValue,longValue,floatValue,doubleValue,stringValue,decimalValue,dateValue,timestampValue"; + } + static String getColumnTypesProperty() { + return "boolean:tinyint:smallint:int:bigint:float:double:string:decimal:date:timestamp"; + } } public static class BigRowField implements StructField { @@ -1144,8 +1153,8 @@ public void testInOutFormat() throws Exception { // read the whole file - conf.set("columns", MyRow.getColumnNamesProperty()); - conf.set("columns.types", MyRow.getColumnTypesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty()); org.apache.hadoop.mapred.RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL); Object key = reader.createKey(); @@ -1657,6 +1666,10 @@ public void testVectorizationWithAcid() throws Exception { InputSplit[] splits = inputFormat.getSplits(conf, 10); assertEquals(1, splits.length); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty()); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); + org.apache.hadoop.mapred.RecordReader reader = inputFormat.getRecordReader(splits[0], conf, Reporter.NULL); NullWritable key = reader.createKey(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index bfdc83f..ed31577 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -357,10 +358,9 @@ public void testOriginalReaderPairNoMin() throws Exception { @Test public void testNewBase() throws Exception { Configuration conf = new Configuration(); - conf.set("columns", "col1"); - conf.set("columns.types", "string"); - conf.set(serdeConstants.LIST_COLUMNS, "col1"); - conf.set(serdeConstants.LIST_COLUMN_TYPES, "string"); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "col1"); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "string"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); Reader reader = Mockito.mock(Reader.class, settings); RecordReader recordReader = Mockito.mock(RecordReader.class, settings); @@ -520,8 +520,9 @@ public void testEmpty() throws Exception { BUCKET); Reader baseReader = OrcFile.createReader(basePath, OrcFile.readerOptions(conf)); - conf.set("columns", MyRow.getColumnNamesProperty()); - conf.set("columns.types", MyRow.getColumnTypesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty()); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), @@ -591,8 +592,9 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(), BUCKET); - conf.set("columns", MyRow.getColumnNamesProperty()); - conf.set("columns.types", MyRow.getColumnTypesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty()); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); Reader baseReader = OrcFile.createReader(basePath, OrcFile.readerOptions(conf)); @@ -897,8 +899,9 @@ synchronized void addedRow() throws IOException { InputFormat inf = new OrcInputFormat(); JobConf job = new JobConf(); - job.set("columns", BigRow.getColumnNamesProperty()); - job.set("columns.types", BigRow.getColumnTypesProperty()); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty()); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty()); + HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); job.set("mapred.min.split.size", "1"); job.set("mapred.max.split.size", "2"); job.set("mapred.input.dir", root.toString()); @@ -1003,8 +1006,9 @@ synchronized void addedRow() throws IOException { job.set("mapred.min.split.size", "1"); job.set("mapred.max.split.size", "2"); job.set("mapred.input.dir", root.toString()); - job.set("columns", BigRow.getColumnNamesProperty()); - job.set("columns.types", BigRow.getColumnTypesProperty()); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty()); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty()); + HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); InputSplit[] splits = inf.getSplits(job, 5); assertEquals(5, splits.length); org.apache.hadoop.mapred.RecordReader rr; @@ -1075,8 +1079,9 @@ public void testRecordReaderDelta() throws Exception { job.set("mapred.max.split.size", "2"); job.set("mapred.input.dir", root.toString()); job.set("bucket_count", "1"); - job.set("columns", MyRow.getColumnNamesProperty()); - job.set("columns.types", MyRow.getColumnTypesProperty()); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty()); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty()); + HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); InputSplit[] splits = inf.getSplits(job, 5); assertEquals(1, splits.length); org.apache.hadoop.mapred.RecordReader rr; @@ -1144,8 +1149,9 @@ private void testRecordReaderIncompleteDelta(boolean use130Format) throws Except JobConf job = new JobConf(); job.set("mapred.input.dir", root.toString()); job.set("bucket_count", "2"); - job.set("columns", MyRow.getColumnNamesProperty()); - job.set("columns.types", MyRow.getColumnTypesProperty()); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty()); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty()); + HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); // read the keys before the delta is flushed InputSplit[] splits = inf.getSplits(job, 1);