diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 040906f34d..02ccd48533 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -131,10 +131,12 @@ public void configureInputJobProperties(TableDesc tableDesc, jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesSb.toString()); jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, typeNamesSb.toString()); - boolean isAcidTable = AcidUtils.isTablePropertyTransactional(tableProperties); - AcidUtils.setTransactionalTableScan(jobProperties, isAcidTable); + boolean isTransactionalTable = AcidUtils.isTablePropertyTransactional(tableProperties); AcidUtils.AcidOperationalProperties acidOperationalProperties = AcidUtils.getAcidOperationalProperties(tableProperties); + if(acidOperationalProperties.isSplitUpdate()) { + AcidUtils.setAcidTableScan(jobProperties, isTransactionalTable); + } AcidUtils.setAcidOperationalProperties(jobProperties, acidOperationalProperties); } } catch (IOException e) { diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java index 996bb02dc7..4f74349198 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java @@ -113,9 +113,9 @@ public static void setOutput(Configuration conf, Credentials credentials, if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with sorted column definition from Pig/Mapreduce is not supported"); } - - if (AcidUtils.isAcidTable(table)) { - throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into an insert-only ACID table from Pig/Mapreduce is not supported"); + if (AcidUtils.isTransactionalTable(table)) { + throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a transactional table " + + table.getFullyQualifiedName() + " from Pig/Mapreduce is not supported"); } // Set up a common id hash for this job, so that when we create any temporary directory diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 49aad392d8..dc8eee1aac 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -564,7 +564,7 @@ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int job.set(BUCKET_COUNT, Integer.toString(buckets)); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg"); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); - AcidUtils.setTransactionalTableScan(job,true); + AcidUtils.setAcidTableScan(job,true); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inf.getSplits(job, buckets); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java index d5429fbbd6..25db0fb4af 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -143,7 +143,7 @@ public void assertMaxTransactionId(long expectedMaxTransactionId) { job.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(table.getSd().getNumBuckets())); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg"); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); - AcidUtils.setTransactionalTableScan(job,true); + AcidUtils.setAcidTableScan(job,true); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inputFormat.getSplits(job, 1); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 55ef8de9a5..a39b403867 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -2163,7 +2163,7 @@ private void checkArchiveProperty(int partSpecLevel, private int compact(Hive db, AlterTableSimpleDesc desc) throws HiveException { Table tbl = db.getTable(desc.getTableName()); - if (!AcidUtils.isFullAcidTable(tbl) && !AcidUtils.isInsertOnlyTable(tbl.getParameters())) { + if (!AcidUtils.isAcidTable(tbl) && !AcidUtils.isInsertOnlyTable(tbl.getParameters())) { throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, tbl.getDbName(), tbl.getTableName()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 6589bb2091..fc6052b04e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -24,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -42,7 +41,6 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.StringUtils; /** * FetchTask implementation. @@ -81,7 +79,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext // push down filters HiveInputFormat.pushFilters(job, ts); - AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidTableScan(job, ts.getConf().isAcidTable()); AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } sink = work.getSink(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 64aa744206..6df1e32c53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -209,7 +209,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, // push down filters HiveInputFormat.pushFilters(jobClone, ts); - AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); + AcidUtils.setAcidTableScan(jobClone, ts.getConf().isAcidTable()); AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties()); ts.passExecContext(getExecContext()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index b6a988dc59..60d2b7b744 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -485,7 +485,7 @@ private void initializeOperators(Map fetchOpJobConfMap) // push down filters HiveInputFormat.pushFilters(jobClone, ts); - AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); + AcidUtils.setAcidTableScan(jobClone, ts.getConf().isAcidTable()); AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties()); // create a fetch operator diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index a85713b350..ad6cd623e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1204,15 +1204,17 @@ public static boolean isTablePropertyTransactional(Configuration conf) { } return resultStr != null && resultStr.equalsIgnoreCase("true"); } - - public static void setTransactionalTableScan(Map parameters, boolean isAcidTable) { + /** + * Means it's a full acid table + */ + public static void setAcidTableScan(Map parameters, boolean isAcidTable) { parameters.put(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, Boolean.toString(isAcidTable)); } /** * Means it's a full acid table */ - public static void setTransactionalTableScan(Configuration conf, boolean isFullAcidTable) { + public static void setAcidTableScan(Configuration conf, boolean isFullAcidTable) { HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isFullAcidTable); } /** @@ -1221,15 +1223,7 @@ public static void setTransactionalTableScan(Configuration conf, boolean isFullA public static boolean isDeleteDelta(Path p) { return p.getName().startsWith(DELETE_DELTA_PREFIX); } - /** Checks if a table is a valid ACID table. - * Note, users are responsible for using the correct TxnManager. We do not look at - * SessionState.get().getTxnMgr().supportsAcid() here - * @param table table - * @return true if table is a legit ACID table, false otherwise - * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and covers - * both Acid and MM tables. HIVE-18124 - */ - public static boolean isAcidTable(Table table) { + public static boolean isTransactionalTable(Table table) { if (table == null) { return false; } @@ -1240,11 +1234,7 @@ public static boolean isAcidTable(Table table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } - /** - * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and convers - * both Acid and MM tables. HIVE-18124 - */ - public static boolean isAcidTable(CreateTableDesc table) { + public static boolean isTransactionalTable(CreateTableDesc table) { if (table == null || table.getTblProps() == null) { return false; } @@ -1255,14 +1245,11 @@ public static boolean isAcidTable(CreateTableDesc table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } - /** - * after isTransactionalTable() then make this isAcid() HIVE-18124 - */ - public static boolean isFullAcidTable(Table table) { - return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table); + public static boolean isAcidTable(Table table) { + return isTransactionalTable(table) && !AcidUtils.isInsertOnlyTable(table); } - public static boolean isFullAcidTable(CreateTableDesc td) { + public static boolean isAcidTable(CreateTableDesc td) { if (td == null || td.getTblProps() == null) { return false; } @@ -1392,7 +1379,7 @@ public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOExc /** - * Checks if a table is an ACID table that only supports INSERT, but not UPDATE/DELETE + * Checks if a table is a transactional table that only supports INSERT, but not UPDATE/DELETE * @param params table properties * @return true if table is an INSERT_ONLY table, false otherwise */ @@ -1400,7 +1387,7 @@ public static boolean isInsertOnlyTable(Map params) { return isInsertOnlyTable(params, false); } public static boolean isInsertOnlyTable(Table table) { - return isAcidTable(table) && getAcidOperationalProperties(table).isInsertOnly(); + return isTransactionalTable(table) && getAcidOperationalProperties(table).isInsertOnly(); } // TODO [MM gap]: CTAS may currently be broken. It used to work. See the old code, and why isCtas isn't used? diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index b35df69885..0718995ebe 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -468,7 +468,7 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job try { Utilities.copyTablePropertiesToConf(table, conf); if(tableScan != null) { - AcidUtils.setTransactionalTableScan(conf, tableScan.getConf().isAcidTable()); + AcidUtils.setAcidTableScan(conf, tableScan.getConf().isAcidTable()); } } catch (HiveException e) { throw new IOException(e); @@ -851,7 +851,7 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass // push down filters pushFilters(jobConf, ts); - AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidTableScan(job, ts.getConf().isAcidTable()); AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index fdb3603338..c2c7a4abf2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -26,7 +26,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.plan.LockTableDesc; import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; -import org.apache.hadoop.hive.ql.plan.api.Query; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; @@ -326,7 +325,7 @@ private boolean allowOperationInATransaction(QueryPlan queryPlan) { //in a txn assuming we can determine the target is a suitable table type. if(queryPlan.getOperation() == HiveOperation.LOAD && queryPlan.getOutputs() != null && queryPlan.getOutputs().size() == 1) { WriteEntity writeEntity = queryPlan.getOutputs().iterator().next(); - if(AcidUtils.isFullAcidTable(writeEntity.getTable()) || AcidUtils.isInsertOnlyTable(writeEntity.getTable())) { + if(AcidUtils.isAcidTable(writeEntity.getTable()) || AcidUtils.isInsertOnlyTable(writeEntity.getTable())) { switch (writeEntity.getWriteType()) { case INSERT: //allow operation in a txn @@ -406,7 +405,7 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB continue; } if(t != null) { - compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t)); + compBuilder.setIsAcid(AcidUtils.isAcidTable(t)); } LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); @@ -460,7 +459,7 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi break; case INSERT_OVERWRITE: t = getTable(output); - if (AcidUtils.isAcidTable(t)) { + if (AcidUtils.isTransactionalTable(t)) { compBuilder.setSemiShared(); compBuilder.setOperationType(DataOperationType.UPDATE); } else { @@ -470,7 +469,7 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi break; case INSERT: assert t != null; - if(AcidUtils.isFullAcidTable(t)) { + if(AcidUtils.isAcidTable(t)) { compBuilder.setShared(); } else { @@ -504,7 +503,7 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi output.getWriteType().toString()); } if(t != null) { - compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t)); + compBuilder.setIsAcid(AcidUtils.isAcidTable(t)); } compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite()); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 50bdce89a4..7e059da2f6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1720,7 +1720,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par Path tblDataLocationPath = tbl.getDataLocation(); boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); - boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); + boolean isFullAcidTable = AcidUtils.isAcidTable(tbl); try { // Get the partition object if it already exists Partition oldPart = getPartition(tbl, partSpec, false); @@ -2317,7 +2317,7 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType Table tbl = getTable(tableName); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl); - boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); + boolean isFullAcidTable = AcidUtils.isAcidTable(tbl); HiveConf sessionConf = SessionState.getSessionConf(); if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { newFiles = Collections.synchronizedList(new ArrayList()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java index 31d2b2342b..7f5e543c1c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java @@ -410,7 +410,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (stack.get(0) instanceof TableScanOperator) { TableScanOperator tso = ((TableScanOperator)stack.get(0)); Table tab = tso.getConf().getTableMetadata(); - if (AcidUtils.isFullAcidTable(tab)) { + if (AcidUtils.isAcidTable(tab)) { /*ACID tables have complex directory layout and require merging of delta files * on read thus we should not try to read bucket files directly*/ return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java index 85f198b6cd..0f3a8d1561 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java @@ -19,13 +19,10 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.Stack; @@ -277,7 +274,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Logger.info("Table " + tbl.getTableName() + " is external. Skip StatsOptimizer."); return null; } - if (AcidUtils.isAcidTable(tbl)) { + if (AcidUtils.isTransactionalTable(tbl)) { + //todo: should this be OK for MM table? Logger.info("Table " + tbl.getTableName() + " is ACID table. Skip StatsOptimizer."); return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index a09b7961c2..8b53cb7ed6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1903,9 +1903,9 @@ private WriteType determineAlterTableWriteType(Table tab, AlterTableDesc desc, A if(desc != null && desc.getProps() != null && Boolean.parseBoolean(desc.getProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL))) { convertingToAcid = true; } - if(!AcidUtils.isAcidTable(tab) && convertingToAcid) { - //non to acid conversion (property itself) must be mutexed to prevent concurrent writes. - // See HIVE-16688 for use case. + if(!AcidUtils.isTransactionalTable(tab) && convertingToAcid) { + //non-acid to transactional conversion (property itself) must be mutexed to prevent concurrent writes. + // See HIVE-16688 for use cases. return WriteType.DDL_EXCLUSIVE; } return WriteEntity.determineAlterTableWriteType(op); @@ -2125,7 +2125,7 @@ private void analyzeAlterTablePartMergeFiles(ASTNode ast, } // transactional tables are compacted and no longer needs to be bucketed, so not safe for merge/concatenation - boolean isAcid = AcidUtils.isAcidTable(tblObj); + boolean isAcid = AcidUtils.isTransactionalTable(tblObj); if (isAcid) { throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL.getMsg()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index cc956da575..26b4fea2b7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -159,12 +159,12 @@ private URI initializeFromURI(String fromPath, boolean isLocal) throws IOExcepti throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast, "source contains directory: " + oneSrc.getPath().toString())); } - if(AcidUtils.isFullAcidTable(table)) { + if(AcidUtils.isAcidTable(table)) { if(!AcidUtils.originalBucketFilter.accept(oneSrc.getPath())) { //acid files (e.g. bucket_0000) have ROW_ID embedded in them and so can't be simply //copied to a table so only allow non-acid files for now throw new SemanticException(ErrorMsg.ACID_LOAD_DATA_INVALID_FILE_NAME, - oneSrc.getPath().getName(), table.getDbName() + "." + table.getTableName()); + oneSrc.getPath().getName(), table.getFullyQualifiedName()); } } } @@ -283,7 +283,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Long txnId = null; int stmtId = -1; - if (AcidUtils.isAcidTable(ts.tableHandle)) { + if (AcidUtils.isTransactionalTable(ts.tableHandle)) { txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); stmtId = SessionState.get().getTxnMgr().getWriteIdAndIncrement(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 28e3621d32..96c1977da4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6716,7 +6716,7 @@ private Operator genBucketingSortingDest(String dest, Operator input, QB qb, nullOrder.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? 'a' : 'z'); } input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(), - maxReducers, (AcidUtils.isFullAcidTable(dest_tab) ? + maxReducers, (AcidUtils.isAcidTable(dest_tab) ? getAcidType(table_desc.getOutputFileFormatClass(), dest) : AcidUtils.Operation.NOT_ACID)); reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0)); ctx.setMultiFileSpray(multiFileSpray); @@ -6803,8 +6803,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) case QBMetaData.DEST_TABLE: { dest_tab = qbm.getDestTableForAlias(dest); - destTableIsAcid = AcidUtils.isAcidTable(dest_tab); - destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isTransactionalTable(dest_tab); + destTableIsFullAcid = AcidUtils.isAcidTable(dest_tab); destTableIsTemporary = dest_tab.isTemporary(); // Is the user trying to insert into a external tables @@ -6874,6 +6874,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsFullAcid) { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); + //todo: should this be done for MM? is it ok to use CombineHiveInputFormat with MM checkAcidConstraints(qb, table_desc, dest_tab); } if (AcidUtils.isInsertOnlyTable(table_desc.getProperties())) { @@ -6915,8 +6916,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_part = qbm.getDestPartitionForAlias(dest); dest_tab = dest_part.getTable(); - destTableIsAcid = AcidUtils.isAcidTable(dest_tab); - destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isTransactionalTable(dest_tab); + destTableIsFullAcid = AcidUtils.isAcidTable(dest_tab); checkExternalTable(dest_tab); @@ -6951,6 +6952,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsFullAcid) { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); + //todo: should this be done for MM? is it ok to use CombineHiveInputFormat with MM? checkAcidConstraints(qb, table_desc, dest_tab); } if (AcidUtils.isInsertOnlyTable(dest_part.getTable().getParameters())) { @@ -7039,8 +7041,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) viewDesc.setSchema(new ArrayList(field_schemas)); } - destTableIsAcid = tblDesc != null && AcidUtils.isAcidTable(tblDesc); - destTableIsFullAcid = tblDesc != null && AcidUtils.isFullAcidTable(tblDesc); + destTableIsAcid = tblDesc != null && AcidUtils.isTransactionalTable(tblDesc); + destTableIsFullAcid = tblDesc != null && AcidUtils.isAcidTable(tblDesc); boolean isDestTempFile = true; if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) { @@ -7495,11 +7497,6 @@ String fixCtasColumnName(String colName) { return colName; } - // Check constraints on acid tables. This includes - // * Check that the table is bucketed - // * Check that the table is not sorted - // This method assumes you have already decided that this is an Acid write. Don't call it if - // that isn't true. private void checkAcidConstraints(QB qb, TableDesc tableDesc, Table table) throws SemanticException { /* @@ -7512,10 +7509,6 @@ These props are now enabled elsewhere (see commit diffs). It would be better in backwards incompatible. */ conf.set(AcidUtils.CONF_ACID_KEY, "true"); - - if (table.getSortCols() != null && table.getSortCols().size() > 0) { - throw new SemanticException(ErrorMsg.ACID_NO_SORTED_BUCKETS, table.getTableName()); - } } /** @@ -12123,7 +12116,7 @@ public void validate() throws SemanticException { if (p != null) { tbl = p.getTable(); } - if (tbl != null && (AcidUtils.isFullAcidTable(tbl) || AcidUtils.isInsertOnlyTable(tbl.getParameters()))) { + if (tbl != null && (AcidUtils.isAcidTable(tbl) || AcidUtils.isInsertOnlyTable(tbl.getParameters()))) { acidInQuery = true; checkAcidTxnManager(tbl); } @@ -12186,7 +12179,7 @@ public void validate() throws SemanticException { tbl = writeEntity.getTable(); } - if (tbl != null && (AcidUtils.isFullAcidTable(tbl) || AcidUtils.isInsertOnlyTable(tbl.getParameters()))) { + if (tbl != null && (AcidUtils.isAcidTable(tbl) || AcidUtils.isInsertOnlyTable(tbl.getParameters()))) { acidInQuery = true; checkAcidTxnManager(tbl); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 7ecd1ffa5e..075aac506f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -175,7 +175,7 @@ private void checkValidSetClauseTarget(ASTNode colName, Table targetTable) throw } if(!foundColumnInTargetTable) { throw new SemanticException(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE, colName.getText(), - getDotName(new String[] {targetTable.getDbName(), targetTable.getTableName()})); + targetTable.getFullyQualifiedName()); } } private ASTNode findLHSofAssignment(ASTNode assignment) { @@ -318,7 +318,7 @@ private ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, Strin private void validateTargetTable(Table mTable) throws SemanticException { if (mTable.getTableType() == TableType.VIRTUAL_VIEW || mTable.getTableType() == TableType.MATERIALIZED_VIEW) { - LOG.error("Table " + getDotName(new String[] {mTable.getDbName(), mTable.getTableName()}) + " is a view or materialized view"); + LOG.error("Table " + mTable.getFullyQualifiedName() + " is a view or materialized view"); throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 4b7d2b45b4..aa96072f36 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -135,7 +135,7 @@ public TableScanDesc(final String alias, List vcs, Table tblMetad this.alias = alias; this.virtualCols = vcs; this.tableMetadata = tblMetadata; - isAcidTable = AcidUtils.isFullAcidTable(this.tableMetadata); + isAcidTable = AcidUtils.isAcidTable(this.tableMetadata); if (isAcidTable) { acidOperationalProperties = AcidUtils.getAcidOperationalProperties(this.tableMetadata); } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java index e8d3184f40..6bba6b06a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java @@ -48,7 +48,7 @@ public static Partish buildFor(Table table, Partition part) { // rename @Deprecated public final boolean isAcid() { - return AcidUtils.isFullAcidTable(getTable()); + return AcidUtils.isAcidTable(getTable()); } public abstract Table getTable(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 5d2652457b..8511e0b3a8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -836,7 +836,7 @@ public void testBIStrategySplitBlockBoundary() throws Exception { public void testEtlCombinedStrategy() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL"); conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS.varname, "1000000"); - AcidUtils.setTransactionalTableScan(conf, true); + AcidUtils.setAcidTableScan(conf, true); conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default"); @@ -3569,7 +3569,7 @@ public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception { //set up props for read conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); - AcidUtils.setTransactionalTableScan(conf, true); + AcidUtils.setAcidTableScan(conf, true); OrcInputFormat orcInputFormat = new OrcInputFormat(); InputSplit[] splits = orcInputFormat.getSplits(conf, 2); @@ -3648,7 +3648,7 @@ public void testACIDReaderFooterSerializeWithDeltas() throws Exception { //set up props for read conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); - AcidUtils.setTransactionalTableScan(conf, true); + AcidUtils.setAcidTableScan(conf, true); OrcInputFormat orcInputFormat = new OrcInputFormat(); InputSplit[] splits = orcInputFormat.getSplits(conf, 2); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 030f012a21..78b4351ac9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -20,8 +20,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.orc.CompressionKind; @@ -67,10 +65,7 @@ import com.google.common.collect.Lists; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -686,7 +681,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty()); conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty()); - AcidUtils.setTransactionalTableScan(conf,true); + AcidUtils.setAcidTableScan(conf,true); conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); //the first "split" is for base/ @@ -1154,7 +1149,7 @@ public synchronized void addedRow(int rows) throws IOException { JobConf job = new JobConf(); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty()); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty()); - AcidUtils.setTransactionalTableScan(job,true); + AcidUtils.setAcidTableScan(job,true); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); job.set("mapred.min.split.size", "1"); job.set("mapred.max.split.size", "2"); @@ -1289,7 +1284,7 @@ public synchronized void addedRow(int rows) throws IOException { job.set("mapred.input.dir", root.toString()); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty()); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty()); - AcidUtils.setTransactionalTableScan(job,true); + AcidUtils.setAcidTableScan(job,true); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); InputSplit[] splits = inf.getSplits(job, 5); //base has 10 rows, so 5 splits, 1 delta has 2 rows so 1 split, and 1 delta has 3 so 2 splits @@ -1386,7 +1381,7 @@ public void testRecordReaderDelta() throws Exception { job.set("bucket_count", "1"); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty()); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty()); - AcidUtils.setTransactionalTableScan(job,true); + AcidUtils.setAcidTableScan(job,true); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); InputSplit[] splits = inf.getSplits(job, 5); assertEquals(2, splits.length); @@ -1460,7 +1455,7 @@ private void testRecordReaderIncompleteDelta(boolean use130Format) throws Except job.set("bucket_count", "2"); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty()); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty()); - AcidUtils.setTransactionalTableScan(job,true); + AcidUtils.setAcidTableScan(job,true); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); // read the keys before the delta is flushed diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java index 1512ffbfe1..f849b1a0c3 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java @@ -54,7 +54,7 @@ public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { @Override public void onDropTable(DropTableEvent tableEvent) throws MetaException { - if (TxnUtils.isAcidTable(tableEvent.getTable())) { + if (TxnUtils.isTransactionalTable(tableEvent.getTable())) { txnHandler = getTxnHandler(); txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null); } @@ -62,7 +62,7 @@ public void onDropTable(DropTableEvent tableEvent) throws MetaException { @Override public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { - if (TxnUtils.isAcidTable(partitionEvent.getTable())) { + if (TxnUtils.isTransactionalTable(partitionEvent.getTable())) { txnHandler = getTxnHandler(); txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(), partitionEvent.getPartitionIterator()); @@ -76,7 +76,7 @@ private TxnStore getTxnHandler() { boolean origConcurrency = false; // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues, - // which may change the values of below two entries, we need to avoid pulluting the original values + // which may change the values of below two entries, we need to avoid polluting the original values if (hackOn) { origTxnMgr = MetastoreConf.getVar(conf, ConfVars.HIVE_TXN_MANAGER); origConcurrency = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index da1031300a..f7afe55b8a 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -142,7 +142,7 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw } if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { - throw new MetaException(getTableName(newTable) + + throw new MetaException(Warehouse.getQualifiedName(newTable) + " cannot be declared transactional because it's an external table"); } validateTableStructure(context.getHandler(), newTable); @@ -182,6 +182,33 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw } } } + checkSorted(newTable); + } + private static boolean isTransactional(Table table) { + if(table == null) { + return false; + } + String tableIsTransactional = + table.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null) { + tableIsTransactional = + table.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } + private static boolean isAcid(Table table) { + return isTransactional(table) && DEFAULT_TRANSACTIONAL_PROPERTY.equals(table.getParameters() + .get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES)); + } + private void checkSorted(Table newTable) throws MetaException { + if(!isAcid(newTable)) { + return; + } + StorageDescriptor sd = newTable.getSd(); + if (sd.getSortCols() != null && sd.getSortCols().size() > 0) { + throw new MetaException("Sorted table " + Warehouse.getQualifiedName(newTable) + + " cannot support full ACID functionality"); + } } /** @@ -231,7 +258,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr } if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { - throw new MetaException(newTable.getDbName() + "." + newTable.getTableName() + + throw new MetaException(Warehouse.getQualifiedName(newTable) + " cannot be declared transactional because it's an external table"); } @@ -243,7 +270,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr initializeTransactionalProperties(newTable); return; } - + checkSorted(newTable); // transactional is found, but the value is not in expected range throw new MetaException("'transactional' property of TBLPROPERTIES may only have value 'true'"); } @@ -366,18 +393,16 @@ private void validateTableStructure(IHMSHandler hmsHandler, Table table) ); if (!validFile) { throw new IllegalStateException("Unexpected data file name format. Cannot convert " + - getTableName(table) + " to transactional table. File: " + fileStatus.getPath()); + Warehouse.getQualifiedName(table) + " to transactional table. File: " + + fileStatus.getPath()); } } } catch (IOException|NoSuchObjectException e) { - String msg = "Unable to list files for " + getTableName(table); + String msg = "Unable to list files for " + Warehouse.getQualifiedName(table); LOG.error(msg, e); MetaException e1 = new MetaException(msg); e1.initCause(e); throw e1; } } - private static String getTableName(Table table) { - return table.getDbName() + "." + table.getTableName(); - } } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index afb4f6b7fb..ee4970f0ff 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -129,13 +129,13 @@ public static TxnStore getTxnStore(Configuration conf) { } } - /** Checks if a table is a valid ACID table. + /** * Note, users are responsible for using the correct TxnManager. We do not look at * SessionState.get().getTxnMgr().supportsAcid() here * @param table table - * @return true if table is a legit ACID table, false otherwise + * @return true if table is a transactional table, false otherwise */ - public static boolean isAcidTable(Table table) { + public static boolean isTransactionalTable(Table table) { if (table == null) { return false; }