diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 3e707e3415..fc5a5fa062 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -81,9 +81,11 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -98,7 +100,6 @@ import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; - import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -888,6 +889,14 @@ public Configuration getConf() { return conf; } + @Override public LockType getLockType(WriteEntity writeEntity + ) { + if (writeEntity.getWriteType().equals(WriteEntity.WriteType.INSERT)) { + return LockType.SHARED_READ; + } + return LockType.SHARED_WRITE; + } + @Override public String toString() { return Constants.DRUID_HIVE_STORAGE_HANDLER_ID; @@ -918,55 +927,50 @@ private MetadataStorageTablesConfig getDruidMetadataStorageTablesConfig() { } private SQLMetadataConnector getConnector() { + return Suppliers.memoize(this::buildConnector).get(); + } + + private SQLMetadataConnector buildConnector() { + if (connector != null) { return connector; } - final String dbType = HiveConf - .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE); - final String username = HiveConf - .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME); - final String password = HiveConf - .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD); - final String uri = HiveConf - .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI); - - - final Supplier storageConnectorConfigSupplier = Suppliers.ofInstance( - new MetadataStorageConnectorConfig() { - @Override - public String getConnectURI() { - return uri; - } - - @Override - public String getUser() { - return Strings.emptyToNull(username); - } - - @Override - public String getPassword() { - return Strings.emptyToNull(password); - } - }); + final String dbType = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE); + final String username = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME); + final String password = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD); + final String uri = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI); + LOG.debug("Supplying SQL Connector with DB type {}, URI {}, User {}", dbType, uri, username); + final Supplier storageConnectorConfigSupplier = + Suppliers.ofInstance(new MetadataStorageConnectorConfig() { + @Override public String getConnectURI() { + return uri; + } + + @Override public String getUser() { + return Strings.emptyToNull(username); + } + + @Override public String getPassword() { + return Strings.emptyToNull(password); + } + }); if (dbType.equals("mysql")) { connector = new MySQLConnector(storageConnectorConfigSupplier, - Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) - , new MySQLConnectorConfig()); + Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()), new MySQLConnectorConfig() + ); } else if (dbType.equals("postgresql")) { connector = new PostgreSQLConnector(storageConnectorConfigSupplier, - Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) + Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) ); } else if (dbType.equals("derby")) { connector = new DerbyConnector(new DerbyMetadataStorage(storageConnectorConfigSupplier.get()), - storageConnectorConfigSupplier, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) + storageConnectorConfigSupplier, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) ); - } - else { + } else { throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType)); } - return connector; } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 248632127a..4fd1d4ec54 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -18,39 +18,48 @@ Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.hive.ql.lockmgr; import com.google.common.annotations.VisibleForTesting; - -import org.apache.curator.shaded.com.google.common.collect.Lists; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.plan.HiveOperation; -import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; -import org.apache.hadoop.hive.ql.plan.LockTableDesc; -import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; -import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hive.common.util.ShutdownHookManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.LockTableDesc; +import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.common.util.ShutdownHookManager; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -503,60 +512,80 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB /* base this on HiveOperation instead? this and DDL_NO_LOCK is peppered all over the code... Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think makes sense everywhere). This however would be problematic for merge...*/ - case DDL_EXCLUSIVE: + case DDL_EXCLUSIVE: + compBuilder.setExclusive(); + compBuilder.setOperationType(DataOperationType.NO_TXN); + break; + case INSERT_OVERWRITE: + t = getTable(output); + if (AcidUtils.isTransactionalTable(t)) { + if (conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK)) { + compBuilder.setExclusive(); + } else { + compBuilder.setSemiShared(); + } + compBuilder.setOperationType(DataOperationType.UPDATE); + } else { compBuilder.setExclusive(); compBuilder.setOperationType(DataOperationType.NO_TXN); - break; - case INSERT_OVERWRITE: - t = getTable(output); - if (AcidUtils.isTransactionalTable(t)) { - if(conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK)) { - compBuilder.setExclusive(); - } else { - compBuilder.setSemiShared(); - } - compBuilder.setOperationType(DataOperationType.UPDATE); - } else { + } + break; + case INSERT: + assert t != null; + if (AcidUtils.isTransactionalTable(t)) { + compBuilder.setShared(); + } else if (MetaStoreUtils.isNonNativeTable(t.getTTable())) { + final HiveStorageHandler storageHandler = Preconditions.checkNotNull(t.getStorageHandler(), + "Thought all the non native tables have an instance of storage handler" + ); + LockType lockType = storageHandler.getLockType(output); + switch (lockType) { + case EXCLUSIVE: compBuilder.setExclusive(); - compBuilder.setOperationType(DataOperationType.NO_TXN); - } - break; - case INSERT: - assert t != null; - if(AcidUtils.isTransactionalTable(t)) { + break; + case SHARED_READ: compBuilder.setShared(); + break; + case SHARED_WRITE: + compBuilder.setSemiShared(); + break; + default: + throw new IllegalArgumentException(String + .format("Lock type [%s] for Database.Table [%s.%s] is unknown", lockType, t.getDbName(), + t.getTableName() + )); } - else { - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) { - compBuilder.setExclusive(); - } else { // this is backward compatible for non-ACID resources, w/o ACID semantics - compBuilder.setShared(); - } + + } else { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) { + compBuilder.setExclusive(); + } else { // this is backward compatible for non-ACID resources, w/o ACID semantics + compBuilder.setShared(); } - compBuilder.setOperationType(DataOperationType.INSERT); - break; - case DDL_SHARED: - compBuilder.setShared(); - compBuilder.setOperationType(DataOperationType.NO_TXN); - break; + } + compBuilder.setOperationType(DataOperationType.INSERT); + break; + case DDL_SHARED: + compBuilder.setShared(); + compBuilder.setOperationType(DataOperationType.NO_TXN); + break; - case UPDATE: - compBuilder.setSemiShared(); - compBuilder.setOperationType(DataOperationType.UPDATE); - break; - case DELETE: - compBuilder.setSemiShared(); - compBuilder.setOperationType(DataOperationType.DELETE); - break; + case UPDATE: + compBuilder.setSemiShared(); + compBuilder.setOperationType(DataOperationType.UPDATE); + break; + case DELETE: + compBuilder.setSemiShared(); + compBuilder.setOperationType(DataOperationType.DELETE); + break; - case DDL_NO_LOCK: - continue; // No lock required here + case DDL_NO_LOCK: + continue; // No lock required here - default: - throw new RuntimeException("Unknown write type " + - output.getWriteType().toString()); + default: + throw new RuntimeException("Unknown write type " + output.getWriteType().toString()); } - if(t != null) { + if (t != null) { compBuilder.setIsTransactional(AcidUtils.isTransactionalTable(t)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 1696243aeb..2ebb149354 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -18,22 +18,23 @@ package org.apache.hadoop.hive.ql.metadata; -import java.util.Collections; -import java.util.Map; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import java.util.Map; + /** * HiveStorageHandler defines a pluggable interface for adding * new storage handlers to Hive. A storage handler consists of @@ -167,4 +168,8 @@ public default StorageHandlerInfo getStorageHandlerInfo(Table table) throws Meta { return null; } + + default LockType getLockType(WriteEntity writeEntity){ + return LockType.EXCLUSIVE; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 7ff7e189ea..134fdb5327 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -20,6 +20,7 @@ import com.google.common.base.Splitter; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.math.IntMath; @@ -7297,6 +7298,10 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) boolean overwrite = !qb.getParseInfo().isInsertIntoTable( String.format("%s.%s", dest_tab.getDbName(), dest_tab.getTableName())); createPreInsertDesc(dest_tab, overwrite); + + ltd = new LoadTableDesc(queryTmpdir, table_desc, partSpec == null ? ImmutableMap.of() : partSpec); + ltd.setInsertOverwrite(overwrite); + ltd.setLoadFileType(overwrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING); } if (dest_tab.isMaterializedView()) { @@ -14408,15 +14413,13 @@ private void addAlternateGByKeyMappings(ASTNode gByExpr, ColumnInfo colInfo, } private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable, String dest) { - // Don't know the characteristics of non-native tables, - // and don't have a rational way to guess, so assume the most - // conservative case. - if (isNonNativeTable) { + + if (ltd == null) { return WriteEntity.WriteType.INSERT_OVERWRITE; - } else { - return ((ltd.getLoadFileType() == LoadFileType.REPLACE_ALL || ltd.isInsertOverwrite()) - ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType(dest)); } + return ((ltd.getLoadFileType() == LoadFileType.REPLACE_ALL || ltd + .isInsertOverwrite()) ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType(dest)); + } private WriteEntity.WriteType getWriteType(String dest) { diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index e06f0a4f5c..77fe73687a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -771,6 +771,35 @@ public void checkExpectedLocks2() throws Exception { conf.setBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE, true); } + @Test + public void testLockingOnInsertIntoNonNativeTables() throws Exception { + dropTable(new String[] {"tab_not_acid"}); + checkCmdOnDriver(driver.run("create table if not exists tab_not_acid (a int, b int) " + + " STORED BY 'org.apache.hadoop.hive.ql.metadata.StorageHandlerMock'")); + txnMgr.openTxn(ctx, "T1"); + checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid values(1,2)", true)); + + txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); + List locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "tab_not_acid", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "_dummy_database", "_dummy_table", null, locks); + } + + @Test + public void testLockingOnInsertOverwriteNonNativeTables() throws Exception { + dropTable(new String[] {"tab_not_acid"}); + checkCmdOnDriver(driver.run("create table if not exists tab_not_acid (a int, b int) " + + " STORED BY 'org.apache.hadoop.hive.ql.metadata.StorageHandlerMock'")); + txnMgr.openTxn(ctx, "T1"); + checkCmdOnDriver(driver.compileAndRespond("insert overwrite table tab_not_acid values(1,2)", true)); + txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); + List locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "tab_not_acid", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "_dummy_database", "_dummy_table", null, locks); + } + /** The list is small, and the object is generated, so we don't use sets/equals/etc. */ public static ShowLocksResponseElement checkLock(LockType expectedType, LockState expectedState, String expectedDb, String expectedTable, String expectedPartition, List actuals) { diff --git ql/src/test/org/apache/hadoop/hive/ql/metadata/StorageHandlerMock.java ql/src/test/org/apache/hadoop/hive/ql/metadata/StorageHandlerMock.java new file mode 100644 index 0000000000..4de7d3e18c --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/metadata/StorageHandlerMock.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; + +import java.io.IOException; + +/** + * Mock class used for unit testing. + * {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2#testLockingOnInsertIntoNonNativeTables()} + */ +public class StorageHandlerMock extends DefaultStorageHandler { + @Override public HiveMetaHook getMetaHook() { + return new HiveMetaHook() { + @Override public void preCreateTable(Table table) throws MetaException { + + } + + @Override public void rollbackCreateTable(Table table) throws MetaException { + + } + + @Override public void commitCreateTable(Table table) throws MetaException { + + } + + @Override public void preDropTable(Table table) throws MetaException { + + } + + @Override public void rollbackDropTable(Table table) throws MetaException { + + } + + @Override public void commitDropTable(Table table, boolean deleteData) throws MetaException { + + } + }; + } + + @Override public LockType getLockType(WriteEntity writeEntity + ) { + if (writeEntity.getWriteType().equals(WriteEntity.WriteType.INSERT)) { + return LockType.SHARED_READ; + } + return LockType.SHARED_WRITE; + } + + @Override public Class getOutputFormatClass() { + return MockOutputFormat.class; + } + + /** + * Dummy no op output format. + */ + public static class MockOutputFormat implements OutputFormat { + + @Override public RecordWriter getRecordWriter(FileSystem fileSystem, JobConf jobConf, String s, + Progressable progressable + ) throws IOException { + return new RecordWriter() { + @Override public void write(Object o, Object o2) throws IOException { + //noop + } + + @Override public void close(Reporter reporter) throws IOException { + + } + }; + } + + @Override public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOException { + + } + } +}