diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index bc56d77..c33e48e 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; @@ -130,6 +131,8 @@ public void configureInputJobProperties(TableDesc tableDesc, jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesSb.toString()); jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, typeNamesSb.toString()); + boolean isAcidTable = AcidUtils.isTablePropertyTransactional(tableProperties); + AcidUtils.setTablePropertyTransactional(jobProperties, isAcidTable); } } catch (IOException e) { throw new IllegalStateException("Failed to set output path", e); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index ff2598f..f4e142a 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -55,10 +55,12 @@ import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.orc.FileDump; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; @@ -464,8 +466,9 @@ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int JobConf job = new JobConf(); job.set("mapred.input.dir", partitionPath.toString()); job.set("bucket_count", Integer.toString(buckets)); - job.set("columns", "id,msg"); - job.set("columns.types", "bigint:string"); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg"); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); + job.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inf.getSplits(job, buckets); Assert.assertEquals(buckets, splits.length); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 59e9674..826adaf 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; @@ -949,8 +951,9 @@ public long getHighWatermark() { OrcInputFormat aif = new OrcInputFormat(); Configuration conf = new Configuration(); - conf.set("columns", columnNamesProperty); - conf.set("columns.types", columnTypesProperty); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesProperty); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypesProperty); + conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); AcidInputFormat.RawReader reader = aif.getRawReader(conf, false, bucket, txnList, base, deltas); RecordIdentifier identifier = reader.createKey(); diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 7acca77..7c65bef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -509,7 +509,8 @@ "schema.evolution.columns / schema.evolution.columns.types " + "nor the " + "columns / columns.types " + - "are set. Table schema information is required to read ACID tables") + "are set. Table schema information is required to read ACID tables"), + ACID_TABLES_MUST_BE_READ_WITH_ACID_READER(30021, "An ORC ACID reader required to read ACID tables") ; private int errorCode; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 31aa3dc..3691167 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; @@ -75,6 +76,8 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { job, ts.getNeededColumnIDs(), ts.getNeededColumns()); // push down filters HiveInputFormat.pushFilters(job, ts); + + AcidUtils.setTablePropertyTransactional(job, ts.getConf().isAcidTable()); } sink = work.getSink(); fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 390a12e..d339e64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; @@ -197,6 +198,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, // push down filters HiveInputFormat.pushFilters(jobClone, ts); + AcidUtils.setTablePropertyTransactional(jobClone, ts.getConf().isAcidTable()); ts.passExecContext(getExecContext()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index a5c1463..6530c12 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; @@ -432,6 +433,8 @@ private void initializeOperators(Map fetchOpJobConfMap) // push down filters HiveInputFormat.pushFilters(jobClone, ts); + AcidUtils.setTablePropertyTransactional(job, ts.getConf().isAcidTable()); + // create a fetch operator FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); fetchOpJobConfMap.put(fetchOp, jobClone); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 99c4435..b94ccd3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -599,4 +599,24 @@ public static boolean isTablePropertyTransactional(Map parameter } return resultStr != null && resultStr.equalsIgnoreCase("true"); } + + public static boolean isTablePropertyTransactional(Configuration conf) { + String resultStr = conf.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (resultStr == null) { + resultStr = conf.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return resultStr != null && resultStr.equalsIgnoreCase("true"); + } + + public static void setTablePropertyTransactional(Properties props, boolean isAcidTable) { + props.setProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.toString(isAcidTable)); + } + + public static void setTablePropertyTransactional(Map parameters, boolean isAcidTable) { + parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.toString(isAcidTable)); + } + + public static void setTablePropertyTransactional(Configuration conf, boolean isAcidTable) { + conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.toString(isAcidTable)); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 181ce84..795bf9b 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -586,6 +586,8 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns()); // push down filters pushFilters(jobConf, ts); + + AcidUtils.setTablePropertyTransactional(job, ts.getConf().isAcidTable()); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index d81a12d..d9341e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -227,11 +227,13 @@ public static RecordReader createReaderFromFile(Reader file, long offset, long length ) throws IOException { + boolean isAcid = AcidUtils.isTablePropertyTransactional(conf); + if (isAcid) { + throw new IOException(ErrorMsg.ACID_TABLES_MUST_BE_READ_WITH_ACID_READER.getErrorCodedMsg()); + } + /** * Do we have schema on read in the configuration variables? - * - * NOTE: This code path is NOT used by ACID. OrcInputFormat.getRecordReader intercepts for - * ACID tables creates raw record merger, etc. */ TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ false); @@ -1150,10 +1152,9 @@ private static void cancelFutures(List> futures) { getRecordReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { boolean vectorMode = Utilities.isVectorMode(conf); + boolean isAcidTable = AcidUtils.isTablePropertyTransactional(conf); - // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits, - // we know it is not ACID. (see a check in CombineHiveInputFormat.getSplits() that assures this) - if (inputSplit.getClass() == FileSplit.class) { + if (!isAcidTable) { if (vectorMode) { return createVectorizedReader(inputSplit, conf, reporter); } @@ -1168,17 +1169,6 @@ private static void cancelFutures(List> futures) { Options options = new Options(conf).reporter(reporter); final RowReader inner = getReader(inputSplit, options); - - /*Even though there are no delta files, we still need to produce row ids so that an - * UPDATE or DELETE statement would work on a table which didn't have any previous updates*/ - if (split.isOriginal() && split.getDeltas().isEmpty()) { - if (vectorMode) { - return createVectorizedReader(inputSplit, conf, reporter); - } else { - return new NullKeyRecordReader(inner, conf); - } - } - if (vectorMode) { return (org.apache.hadoop.mapred.RecordReader) new VectorizedOrcAcidRowReader(inner, conf, @@ -1241,6 +1231,7 @@ public float getProgress() throws IOException { public RowReader getReader(InputSplit inputSplit, Options options) throws IOException { + final OrcSplit split = (OrcSplit) inputSplit; final Path path = split.getPath(); Path root; @@ -1261,9 +1252,6 @@ public float getProgress() throws IOException { * Do we have schema on read in the configuration variables? */ TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ true); - if (schema == null) { - throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); - } final Reader reader; final int bucket; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index bad2a4c..e71ed3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -441,9 +441,6 @@ private void discoverKeyBounds(Reader reader, this.validTxnList = validTxnList; TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ true); - if (typeDescr == null) { - throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); - } objectInspector = OrcRecordUpdater.createEventSchema (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java index 84fd3c3..ea97e06 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -34,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; @@ -690,7 +692,8 @@ public static int appendOrcTypesRebuildSubtypes(List result, return columnId; } - public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcid) { + public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcid) + throws IOException { String columnNameProperty = null; String columnTypeProperty = null; @@ -718,6 +721,8 @@ public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean haveSchemaEvolutionProperties = false; } } + } else if (isAcid) { + throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index d90425a..aeb191f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -26,10 +26,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.io.NullWritable; @@ -61,18 +63,15 @@ VectorizedOrcRecordReader(Reader file, Configuration conf, FileSplit fileSplit) throws IOException { - // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits, - // we know it is not ACID. (see a check in CombineHiveInputFormat.getSplits() that assures this). - // - // Why would an ACID table reach here instead of VectorizedOrcAcidRowReader? - // OrcInputFormat.getRecordReader will use this reader for original files that have no deltas. - // - boolean isAcid = (fileSplit instanceof OrcSplit); + boolean isAcid = AcidUtils.isTablePropertyTransactional(conf); + if (isAcid) { + throw new IOException(ErrorMsg.ACID_TABLES_MUST_BE_READ_WITH_ACID_READER.getErrorCodedMsg()); + } /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, isAcid); + TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ false); List types = file.getTypes(); Reader.Options options = new Reader.Options(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 1e7e617..3a10f8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -98,6 +98,8 @@ private boolean isMetadataOnly = false; + private boolean isAcidTable; + private transient TableSample tableSample; private transient Table tableMetadata; @@ -120,6 +122,7 @@ public TableScanDesc(final String alias, List vcs, Table tblMetad this.alias = alias; this.virtualCols = vcs; this.tableMetadata = tblMetadata; + isAcidTable = SemanticAnalyzer.isAcidTable(this.tableMetadata); } @Override @@ -134,7 +137,7 @@ public String getAlias() { } public boolean isAcidTable() { - return SemanticAnalyzer.isAcidTable(this.tableMetadata); + return isAcidTable; } @Explain(displayName = "filterExpr")