diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index bc56d77..13dcbd1 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; @@ -130,6 +131,8 @@ public void configureInputJobProperties(TableDesc tableDesc, jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesSb.toString()); jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, typeNamesSb.toString()); + AcidUtils.setTablePropertyTransactional(jobProperties, + AcidUtils.isTablePropertyTransactional(tableProperties)); } } catch (IOException e) { throw new IllegalStateException("Failed to set output path", e); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index ff2598f..f4e142a 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -55,10 +55,12 @@ import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.orc.FileDump; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; @@ -464,8 +466,9 @@ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int JobConf job = new JobConf(); job.set("mapred.input.dir", partitionPath.toString()); job.set("bucket_count", Integer.toString(buckets)); - job.set("columns", "id,msg"); - job.set("columns.types", "bigint:string"); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg"); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); + job.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inf.getSplits(job, buckets); Assert.assertEquals(buckets, splits.length); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java index 339e9ef..34f9ae3 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -32,8 +32,10 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; @@ -128,8 +130,9 @@ public void assertMaxTransactionId(long expectedMaxTransactionId) { JobConf job = new JobConf(); job.set("mapred.input.dir", partitionLocation.toString()); job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets())); - job.set("columns", "id,msg"); - job.set("columns.types", "bigint:string"); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg"); + job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); + job.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inputFormat.getSplits(job, 1); assertEquals(1, splits.length); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index da367ca..fd246da 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; @@ -967,8 +969,9 @@ public long getHighWatermark() { OrcInputFormat aif = new OrcInputFormat(); Configuration conf = new Configuration(); - conf.set("columns", columnNamesProperty); - conf.set("columns.types", columnTypesProperty); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesProperty); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypesProperty); + conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); AcidInputFormat.RawReader reader = aif.getRawReader(conf, false, bucket, txnList, base, deltas); RecordIdentifier identifier = reader.createKey(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 4415328..a889a16 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; @@ -77,6 +78,8 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx, job, ts.getNeededColumnIDs(), ts.getNeededColumns()); // push down filters HiveInputFormat.pushFilters(job, ts); + + AcidUtils.setTablePropertyTransactional(job, ts.getConf().isAcidTable()); } sink = work.getSink(); fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 7cc534b..22e578c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; @@ -203,6 +204,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, // push down filters HiveInputFormat.pushFilters(jobClone, ts); + AcidUtils.setTablePropertyTransactional(jobClone, ts.getConf().isAcidTable()); ts.passExecContext(getExecContext()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 1d97a44..2fb380d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; @@ -458,6 +459,8 @@ private void initializeOperators(Map fetchOpJobConfMap) // push down filters HiveInputFormat.pushFilters(jobClone, ts); + AcidUtils.setTablePropertyTransactional(job, ts.getConf().isAcidTable()); + // create a fetch operator FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); fetchOpJobConfMap.put(fetchOp, jobClone); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 14f7374..cd92988 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -682,4 +682,24 @@ public static boolean isTablePropertyTransactional(Map parameter } return resultStr != null && resultStr.equalsIgnoreCase("true"); } + + public static boolean isTablePropertyTransactional(Configuration conf) { + String resultStr = conf.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (resultStr == null) { + resultStr = conf.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return resultStr != null && resultStr.equalsIgnoreCase("true"); + } + + public static void setTablePropertyTransactional(Properties props, boolean isAcidTable) { + props.setProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.toString(isAcidTable)); + } + + public static void setTablePropertyTransactional(Map parameters, boolean isAcidTable) { + parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.toString(isAcidTable)); + } + + public static void setTablePropertyTransactional(Configuration conf, boolean isAcidTable) { + conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.toString(isAcidTable)); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 1f262d0..d163467 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -634,11 +634,14 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass alias); if (op instanceof TableScanOperator) { TableScanOperator ts = (TableScanOperator) op; + // push down projections. ColumnProjectionUtils.appendReadColumns( jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns()); // push down filters pushFilters(jobConf, ts); + + AcidUtils.setTablePropertyTransactional(job, ts.getConf().isAcidTable()); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 8119449..9e46c29 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -251,11 +251,8 @@ public static RecordReader createReaderFromFile(Reader file, /** * Do we have schema on read in the configuration variables? - * - * NOTE: This code path is NOT used by ACID. OrcInputFormat.getRecordReader intercepts for - * ACID tables creates raw record merger, etc. */ - TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcid */ false); + TypeDescription schema = getDesiredRowTypeDescr(conf); Reader.Options options = new Reader.Options().range(offset, length); options.schema(schema); @@ -1528,10 +1525,7 @@ public float getProgress() throws IOException { /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcid */ true); - if (schema == null) { - throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); - } + TypeDescription schema = getDesiredRowTypeDescr(conf); final Reader reader; final int bucket; @@ -2058,9 +2052,15 @@ public static TypeDescription convertTypeInfo(TypeInfo info) { } } + public static TypeDescription getDesiredRowTypeDescr(Configuration conf) + throws IOException { + boolean isAcid = AcidUtils.isTablePropertyTransactional(conf); + return getDesiredRowTypeDescr(conf, isAcid); + } - public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcid) { - + public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcid) + throws IOException { + String columnNameProperty = null; String columnTypeProperty = null; @@ -2087,6 +2087,8 @@ public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean haveSchemaEvolutionProperties = false; } } + } else if (isAcid) { + throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 2c8dae2..9f072b6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -447,10 +447,7 @@ private void discoverKeyBounds(Reader reader, this.length = options.getLength(); this.validTxnList = validTxnList; - TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcid */ true); - if (typeDescr == null) { - throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); - } + TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf); objectInspector = OrcRecordUpdater.createEventSchema (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 14f275f..e08aaf3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -63,18 +63,10 @@ VectorizedOrcRecordReader(Reader file, Configuration conf, FileSplit fileSplit) throws IOException { - // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits, - // we know it is not ACID. (see a check in CombineHiveInputFormat.getSplits() that assures this). - // - // Why would an ACID table reach here instead of VectorizedOrcAcidRowReader? - // OrcInputFormat.getRecordReader will use this reader for original files that have no deltas. - // - boolean isAcid = (fileSplit instanceof OrcSplit); - /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, isAcid); + TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf); List types = file.getTypes(); Reader.Options options = new Reader.Options(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 098aa89..4a8a94e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -98,6 +98,8 @@ private boolean isMetadataOnly = false; + private boolean isAcidTable; + private transient TableSample tableSample; private transient Table tableMetadata; @@ -123,6 +125,7 @@ public TableScanDesc(final String alias, List vcs, Table tblMetad this.alias = alias; this.virtualCols = vcs; this.tableMetadata = tblMetadata; + isAcidTable = SemanticAnalyzer.isAcidTable(this.tableMetadata); } @Override @@ -138,7 +141,7 @@ public String getAlias() { @Explain(displayName = "ACID table", explainLevels = { Level.USER }, displayOnlyOnTrue = true) public boolean isAcidTable() { - return SemanticAnalyzer.isAcidTable(this.tableMetadata); + return isAcidTable; } @Explain(displayName = "filterExpr")