diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index b168906b44..a793313465 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1332,6 +1332,9 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa // If we've opened a transaction we need to commit or rollback rather than explicitly // releasing the locks. conf.unset(ValidTxnList.VALID_TXNS_KEY); + if(!checkConcurrency()) { + return; + } if (txnMgr.isTxnOpen()) { if (commit) { if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index dcda8b3e00..11c412fd91 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -108,6 +108,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; @@ -119,6 +120,7 @@ import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -12212,14 +12214,6 @@ private void validate(Task task, boolean reworkMapredWor validate(childTask, reworkMapredWork); } } - - /** - * Get the row resolver given an operator. - */ - public RowResolver getRowResolver(Operator opt) { - return opParseCtx.get(opt).getRowResolver(); - } - /** * Add default properties for table property. If a default parameter exists * in the tblProp, the value in tblProp will be kept. @@ -12229,7 +12223,8 @@ public RowResolver getRowResolver(Operator opt) { * @return Modified table property map */ private Map addDefaultProperties( - Map tblProp, boolean isExt, StorageFormat storageFormat) { + Map tblProp, boolean isExt, StorageFormat storageFormat, + String qualifiedTableName, List sortCols) { Map retValue; if (tblProp == null) { retValue = new HashMap(); @@ -12248,16 +12243,44 @@ public RowResolver getRowResolver(Operator opt) { } } } - if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY) + boolean makeInsertOnly = HiveConf.getBoolVar(conf, ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY); + boolean makeAcid = HiveConf.getBoolVar(conf, ConfVars.HIVE_CREATE_TABLES_AS_ACID) && + HiveConf.getBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY) && + DbTxnManager.class.getCanonicalName().equals(HiveConf.getVar(conf, ConfVars.HIVE_TXN_MANAGER)); + if ((makeInsertOnly || makeAcid) && !isExt && StringUtils.isBlank(storageFormat.getStorageHandler()) + //don't overwrite user choice if transactional attribute is explicitly set && !retValue.containsKey(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL)) { - retValue.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); - String oldProps = retValue.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); - if (oldProps != null) { - LOG.warn("Non-transactional table has transactional properties; overwriting " + oldProps); + if(makeInsertOnly) { + retValue.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); + retValue.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY); + } + if(makeAcid) { + /*for CTAS, TransactionalValidationListener.makeAcid() runs to late to make table Acid + so the initial write ends up running as non-acid...*/ + try { + Class inputFormatClass = storageFormat.getInputFormat() == null ? null : + Class.forName(storageFormat.getInputFormat()); + Class outputFormatClass = storageFormat.getOutputFormat() == null ? null : + Class.forName(storageFormat.getOutputFormat()); + if (inputFormatClass == null || outputFormatClass == null || + !AcidInputFormat.class.isAssignableFrom(inputFormatClass) || + !AcidOutputFormat.class.isAssignableFrom(outputFormatClass)) { + return retValue; + } + } catch (ClassNotFoundException e) { + LOG.warn("Could not verify InputFormat=" + storageFormat.getInputFormat() + " or OutputFormat=" + + storageFormat.getOutputFormat() + " for " + qualifiedTableName); + return retValue; + } + if(sortCols != null && !sortCols.isEmpty()) { + return retValue; + } + retValue.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); + retValue.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY); } - retValue.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, - TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY); } return retValue; } @@ -12482,7 +12505,7 @@ ASTNode analyzeCreateTable( switch (command_type) { case CREATE_TABLE: // REGULAR CREATE TABLE DDL - tblProps = addDefaultProperties(tblProps, isExt, storageFormat); + tblProps = addDefaultProperties(tblProps, isExt, storageFormat, dbDotTab, sortCols); CreateTableDesc crtTblDesc = new CreateTableDesc(dbDotTab, isExt, isTemporary, cols, partCols, bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim, @@ -12503,7 +12526,7 @@ ASTNode analyzeCreateTable( break; case CTLT: // create table like - tblProps = addDefaultProperties(tblProps, isExt, storageFormat); + tblProps = addDefaultProperties(tblProps, isExt, storageFormat, dbDotTab, sortCols); if (isTemporary) { Table likeTable = getTable(likeTableName, false); @@ -12580,7 +12603,7 @@ ASTNode analyzeCreateTable( } } - tblProps = addDefaultProperties(tblProps, isExt, storageFormat); + tblProps = addDefaultProperties(tblProps, isExt, storageFormat, dbDotTab, sortCols); tableDesc = new CreateTableDesc(qualifiedTabName[0], dbDotTab, isExt, isTemporary, cols, partCols, bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim, rowFormatParams.fieldEscape, rowFormatParams.collItemDelim, rowFormatParams.mapKeyDelim, diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index da1031300a..bcfa1a187e 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -34,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent; import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent; import org.apache.hadoop.hive.metastore.events.PreEventContext; @@ -137,7 +139,8 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw if (!conformToAcid(newTable)) { // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing if (transactionalPropertiesValue == null || !"insert_only".equalsIgnoreCase(transactionalPropertiesValue)) { - throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)"); + throw new MetaException("The table must be stored using an ACID compliant format (such as ORC): " + + Warehouse.getQualifiedName(newTable)); } } @@ -160,7 +163,8 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw if (!hasValidTransactionalValue && !MetaStoreUtils.isInsertOnlyTableParam(oldTable.getParameters())) { // if here, there is attempt to set transactional to something other than 'true' // and NOT the same value it was before - throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset"); + throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset: " + + Warehouse.getQualifiedName(newTable)); } if (isTransactionalPropertiesPresent) { @@ -178,13 +182,63 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw || !oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue)) && !MetaStoreUtils.isInsertOnlyTableParam(oldTable.getParameters())) { throw new MetaException("TBLPROPERTIES with 'transactional_properties' cannot be " - + "altered after the table is created"); + + "altered after the table is created: " + Warehouse.getQualifiedName(newTable)); } } } } /** + * Want to make a a newly create table Acid (unless it explicitly has transactional=false param) + * if table can support it. Also see SemanticAnalyzer.addDefaultProperties() which performs the + * same logic. This code path is more general since it is activated even if you create a table + * via Thrift, WebHCat etc but some operations like CTAS create the table (metastore object) as + * the last step (i.e. after the data is written) but write itself is has to be aware of the type + * of table so this Listener is too late. + */ + private void makeAcid(Table newTable) throws MetaException { + if(newTable.getParameters() != null && + newTable.getParameters().containsKey(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL)) { + LOG.info("Could not make " + Warehouse.getQualifiedName(newTable) + " acid: already has " + + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL + "=" + + newTable.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL)); + return; + } + Configuration conf = MetastoreConf.newMetastoreConf(); + boolean makeAcid = + //no point making an acid table if these other props are not set since it will just throw + //exceptions when someone tries to use the table. + MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID) && + MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.HIVE_SUPPORT_CONCURRENCY) && + "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager".equals( + MetastoreConf.getVar(conf, MetastoreConf.ConfVars.HIVE_TXN_MANAGER) + ); + + if(makeAcid) { + if(!conformToAcid(newTable)) { + LOG.info("Could not make " + Warehouse.getQualifiedName(newTable) + " acid: wrong IO format"); + return; + } + if(!TableType.MANAGED_TABLE.toString().equalsIgnoreCase(newTable.getTableType())) { + //todo should this check be in conformToAcid()? + LOG.info("Could not make " + Warehouse.getQualifiedName(newTable) + " acid: it's " + + newTable.getTableType()); + return; + } + if(newTable.getSd().getSortColsSize() > 0) { + LOG.info("Could not make " + Warehouse.getQualifiedName(newTable) + " acid: it's sorted"); + return; + } + //check if orc and not sorted + Map parameters = newTable.getParameters(); + if (parameters == null || parameters.isEmpty()) { + parameters = new HashMap<>(); + } + parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); + newTable.setParameters(parameters); + } + } + /** * Normalize case and make sure: * 1. 'true' is the only value to be set for 'transactional' (if set at all) * 2. If set to 'true', we should also enforce bucketing and ORC format @@ -193,6 +247,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr Table newTable = context.getTable(); Map parameters = newTable.getParameters(); if (parameters == null || parameters.isEmpty()) { + makeAcid(newTable); return; } String transactional = null; @@ -212,13 +267,15 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr } if (transactional == null) { + makeAcid(newTable); return; } if ("false".equalsIgnoreCase(transactional)) { // just drop transactional=false. For backward compatibility in case someone has scripts // with transactional=false - LOG.info("'transactional'='false' is no longer a valid property and will be ignored"); + LOG.info("'transactional'='false' is no longer a valid property and will be ignored: " + + Warehouse.getQualifiedName(newTable)); return; } @@ -226,13 +283,15 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr if (!conformToAcid(newTable)) { // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing if (transactionalProperties == null || !"insert_only".equalsIgnoreCase(transactionalProperties)) { - throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)"); + throw new MetaException("The table must be stored using an ACID compliant format (such as ORC): " + + Warehouse.getQualifiedName(newTable)); } } if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { throw new MetaException(newTable.getDbName() + "." + newTable.getTableName() + - " cannot be declared transactional because it's an external table"); + " cannot be declared transactional because it's an external table: " + + Warehouse.getQualifiedName(newTable)); } // normalize prop name @@ -245,7 +304,8 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr } // transactional is found, but the value is not in expected range - throw new MetaException("'transactional' property of TBLPROPERTIES may only have value 'true'"); + throw new MetaException("'transactional' property of TBLPROPERTIES may only have value 'true': " + + Warehouse.getQualifiedName(newTable)); } /** @@ -262,11 +322,13 @@ private void normazlieTransactionalPropertyDefault(Table table) { * Check that InputFormatClass/OutputFormatClass should implement * AcidInputFormat/AcidOutputFormat */ - private boolean conformToAcid(Table table) throws MetaException { + public static boolean conformToAcid(Table table) throws MetaException { StorageDescriptor sd = table.getSd(); try { - Class inputFormatClass = Class.forName(sd.getInputFormat()); - Class outputFormatClass = Class.forName(sd.getOutputFormat()); + Class inputFormatClass = sd.getInputFormat() == null ? null : + Class.forName(sd.getInputFormat()); + Class outputFormatClass = sd.getOutputFormat() == null ? null : + Class.forName(sd.getOutputFormat()); if (inputFormatClass == null || outputFormatClass == null || !Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat").isAssignableFrom(inputFormatClass) || @@ -274,7 +336,9 @@ private boolean conformToAcid(Table table) throws MetaException { return false; } } catch (ClassNotFoundException e) { - throw new MetaException("Invalid input/output format for table"); + LOG.warn("Could not verify InputFormat=" + sd.getInputFormat() + " or OutputFormat=" + + sd.getOutputFormat() + " for " + Warehouse.getQualifiedName(table)); + return false; } return true; @@ -299,8 +363,8 @@ private void initializeTransactionalProperties(Table table) throws MetaException parameters.remove(key); String validationError = validateTransactionalProperties(tableTransactionalProperties); if (validationError != null) { - throw new MetaException("Invalid transactional properties specified for the " - + "table with the error " + validationError); + throw new MetaException("Invalid transactional properties specified for " + + Warehouse.getQualifiedName(table) + " with the error " + validationError); } break; } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index b46cc38a22..05e9a72f9a 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -53,7 +53,7 @@ /** * A set of definitions of config values used by the Metastore. One of the key aims of this * class is to provide backwards compatibility with existing Hive configuration keys while - * allowing the metastore to have its own, Hive independant keys. For this reason access to the + * allowing the metastore to have its own, Hive independent keys. For this reason access to the * underlying Configuration object should always be done via the static methods provided here * rather than directly via {@link Configuration#get(String)} and * {@link Configuration#set(String, String)}. All the methods of this class will handle checking @@ -373,6 +373,9 @@ public static ConfVars getMetaConf(String name) { CONNECTION_USER_NAME("javax.jdo.option.ConnectionUserName", "javax.jdo.option.ConnectionUserName", "APP", "Username to use against metastore database"), + CREATE_TABLES_AS_ACID("metastore.create.as.acid", "hive.create.as.acid", true, + "Whether the eligible tables should be created as full ACID by default. Does \n" + + "not apply to external tables, the ones using storage handlers, etc."), COUNT_OPEN_TXNS_INTERVAL("metastore.count.open.txns.interval", "hive.count.open.txns.interval", 1, TimeUnit.SECONDS, "Time in seconds between checks to count open transactions."), DATANUCLEUS_AUTOSTART("datanucleus.autoStartMechanismMode", @@ -826,7 +829,7 @@ public static ConfVars getMetaConf(String name) { // The metastore shouldn't care what txn manager Hive is running, but in various tests it // needs to set these values. We should do the work to detangle this. HIVE_TXN_MANAGER("hive.txn.manager", "hive.txn.manager", - "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", + "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager", "Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive\n" + "transactions, which also requires appropriate settings for hive.compactor.initiator.on,\n" + "hive.compactor.worker.threads, hive.support.concurrency (true),\n" + @@ -835,7 +838,7 @@ public static ConfVars getMetaConf(String name) { "no transactions."), // Metastore always support concurrency, but certain ACID tests depend on this being set. We // need to do the work to detangle this - HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", "hive.support.concurrency", false, + HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", "hive.support.concurrency", true, "Whether Hive supports concurrency control or not. \n" + "A ZooKeeper instance must be up and running when using zookeeper Hive lock manager "),