diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7ce29e10798730061d02fadb55423bea3b69c6ac..323f715669ac03030ba4b7682a6f69ce88d64a6b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -18,6 +18,31 @@ package org.apache.hadoop.hive.conf; +import com.google.common.base.Joiner; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.hive.conf.Validator.PatternSet; +import org.apache.hadoop.hive.conf.Validator.RangeValidator; +import org.apache.hadoop.hive.conf.Validator.RatioValidator; +import org.apache.hadoop.hive.conf.Validator.SizeValidator; +import org.apache.hadoop.hive.conf.Validator.StringSet; +import org.apache.hadoop.hive.conf.Validator.TimeValidator; +import org.apache.hadoop.hive.conf.Validator.WritableDirectoryValidator; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Shell; +import org.apache.hive.common.HiveCompat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.LoginException; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; @@ -43,32 +68,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.security.auth.login.LoginException; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; -import org.apache.hadoop.hive.conf.Validator.PatternSet; -import org.apache.hadoop.hive.conf.Validator.RangeValidator; -import org.apache.hadoop.hive.conf.Validator.RatioValidator; -import org.apache.hadoop.hive.conf.Validator.SizeValidator; -import org.apache.hadoop.hive.conf.Validator.StringSet; -import org.apache.hadoop.hive.conf.Validator.TimeValidator; -import org.apache.hadoop.hive.conf.Validator.WritableDirectoryValidator; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Shell; -import org.apache.hive.common.HiveCompat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; - /** * Hive Configuration. */ @@ -247,6 +246,7 @@ private static URL checkConfigFile(File f) { HiveConf.ConfVars.METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS, HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX, HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, + HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, HiveConf.ConfVars.METASTORE_EVENT_CLEAN_FREQ, HiveConf.ConfVars.METASTORE_EVENT_EXPIRY_DURATION, HiveConf.ConfVars.METASTORE_FILTER_HOOK, @@ -761,7 +761,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "An init hook is specified as the name of Java class which extends org.apache.hadoop.hive.metastore.MetaStoreInitListener."), METASTORE_PRE_EVENT_LISTENERS("hive.metastore.pre.event.listeners", "", "List of comma separated listeners for metastore events."), - METASTORE_EVENT_LISTENERS("hive.metastore.event.listeners", "", ""), + METASTORE_EVENT_LISTENERS("hive.metastore.event.listeners", "", + "A comma separated list of Java classes that implement the org.apache.hadoop.hive.metastore.MetaStoreEventListener" + + " interface. The metastore event and corresponding listener method will be invoked in separate JDO transactions. " + + "Alternatively, configure hive.metastore.transactional.event.listeners to ensure both are invoked in same JDO transaction."), + METASTORE_TRANSACTIONAL_EVENT_LISTENERS("hive.metastore.transactional.event.listeners", "", + "A comma separated list of Java classes that implement the org.apache.hadoop.hive.metastore.MetaStoreEventListener" + + " interface. Both the metastore event and corresponding listener method will be invoked in the same JDO transaction."), METASTORE_EVENT_DB_LISTENER_TTL("hive.metastore.event.db.listener.timetolive", "86400s", new TimeValidator(TimeUnit.SECONDS), "time after which events will be removed from the database listener queue"), diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..4a7801bbf010dbcfa2044381cfd2e913f83f5445 --- /dev/null +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -0,0 +1,894 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.listener; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.FileMetadataHandler; +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.InvalidInputException; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; +import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.PrivilegeBag; +import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; +import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.thrift.TException; + +/** + * An implementation {@link org.apache.hadoop.hive.metastore.RawStore} + * with the ability to fail metastore events for the purpose of testing. + * Events are expected to succeed by default and simply delegate to an + * embedded ObjectStore object. The behavior can be changed based on + * a flag by calling setEventSucceed(). + * + * Ideally, we should have just extended ObjectStore instead of using + * delegation. However, since HiveMetaStore uses a Proxy, this class must + * not inherit from any other class. + */ +public class DummyRawStoreFailEvent implements RawStore, Configurable { + + private final ObjectStore objectStore; + public DummyRawStoreFailEvent() { + objectStore = new ObjectStore(); + } + + private static boolean shouldEventSucceed = true; + public static void setEventSucceed(boolean flag) { + shouldEventSucceed = flag; + } + + @Override + public boolean commitTransaction() { + return objectStore.commitTransaction(); + } + + @Override + public Configuration getConf() { + return objectStore.getConf(); + } + + @Override + public void setConf(Configuration conf) { + objectStore.setConf(conf); + } + + @Override + public void shutdown() { + objectStore.shutdown(); + } + + @Override + public boolean openTransaction() { + return objectStore.openTransaction(); + } + + @Override + public void rollbackTransaction() { + objectStore.rollbackTransaction(); + } + + @Override + public void createDatabase(Database db) throws InvalidObjectException, MetaException { + if (shouldEventSucceed) { + objectStore.createDatabase(db); + } else { + throw new RuntimeException("Failed event"); + } + } + + @Override + public Database getDatabase(String dbName) throws NoSuchObjectException { + return objectStore.getDatabase(dbName); + } + + @Override + public boolean dropDatabase(String dbName) + throws NoSuchObjectException, MetaException { + if (shouldEventSucceed) { + return objectStore.dropDatabase(dbName); + } else { + throw new RuntimeException("Event failed."); + } + } + + @Override + public boolean alterDatabase(String dbName, Database db) + throws NoSuchObjectException, MetaException { + return objectStore.alterDatabase(dbName, db); + } + + @Override + public List getDatabases(String pattern) throws MetaException { + return objectStore.getDatabases(pattern); + } + + @Override + public List getAllDatabases() throws MetaException { + return objectStore.getAllDatabases(); + } + + @Override + public boolean createType(Type type) { + return objectStore.createType(type); + } + + @Override + public Type getType(String typeName) { + return objectStore.getType(typeName); + } + + @Override + public boolean dropType(String typeName) { + return objectStore.dropType(typeName); + } + + @Override + public void createTable(Table tbl) throws InvalidObjectException, MetaException { + if (shouldEventSucceed) { + objectStore.createTable(tbl); + } else { + throw new RuntimeException("Event failed."); + } + } + + @Override + public boolean dropTable(String dbName, String tableName) + throws MetaException, NoSuchObjectException, + InvalidObjectException, InvalidInputException { + if (shouldEventSucceed) { + return objectStore.dropTable(dbName, tableName); + } else { + throw new RuntimeException("Event failed."); + } + } + + @Override + public Table getTable(String dbName, String tableName) throws MetaException { + return objectStore.getTable(dbName, tableName); + } + + @Override + public boolean addPartition(Partition part) + throws InvalidObjectException, MetaException { + return objectStore.addPartition(part); + } + + @Override + public Partition getPartition(String dbName, String tableName, List partVals) + throws MetaException, NoSuchObjectException { + return objectStore.getPartition(dbName, tableName, partVals); + } + + @Override + public boolean dropPartition(String dbName, String tableName, List partVals) + throws MetaException, NoSuchObjectException, + InvalidObjectException, InvalidInputException { + if (shouldEventSucceed) { + return objectStore.dropPartition(dbName, tableName, partVals); + } else { + throw new RuntimeException("Event failed."); + } + } + + @Override + public List getPartitions(String dbName, String tableName, int max) + throws MetaException, NoSuchObjectException { + return objectStore.getPartitions(dbName, tableName, max); + } + + @Override + public void alterTable(String dbName, String name, Table newTable) + throws InvalidObjectException, MetaException { + if (shouldEventSucceed) { + objectStore.alterTable(dbName, name, newTable); + } else { + throw new RuntimeException("Event failed."); + } + } + + @Override + public List getTables(String dbName, String pattern) throws MetaException { + return objectStore.getTables(dbName, pattern); + } + + @Override + public List getTables(String dbName, String pattern, TableType tableType) throws MetaException { + return objectStore.getTables(dbName, pattern, tableType); + } + + @Override + public List getTableMeta(String dbNames, String tableNames, List tableTypes) + throws MetaException { + return objectStore.getTableMeta(dbNames, tableNames, tableTypes); + } + + @Override + public List getTableObjectsByName(String dbName, List tableNames) + throws MetaException, UnknownDBException { + return objectStore.getTableObjectsByName(dbName, tableNames); + } + + @Override + public List getAllTables(String dbName) throws MetaException { + return objectStore.getAllTables(dbName); + } + + @Override + public List listTableNamesByFilter(String dbName, String filter, + short maxTables) throws MetaException, UnknownDBException { + return objectStore.listTableNamesByFilter(dbName, filter, maxTables); + } + + @Override + public List listPartitionNames(String dbName, String tblName, short maxParts) + throws MetaException { + return objectStore.listPartitionNames(dbName, tblName, maxParts); + } + + @Override + public List listPartitionNamesByFilter(String dbName, String tblName, + String filter, short maxParts) throws MetaException { + return objectStore.listPartitionNamesByFilter(dbName, tblName, filter, maxParts); + } + + @Override + public void alterPartition(String dbName, String tblName, List partVals, + Partition newPart) throws InvalidObjectException, MetaException { + if (shouldEventSucceed) { + objectStore.alterPartition(dbName, tblName, partVals, newPart); + } else { + throw new RuntimeException("Event failed."); + } + } + + @Override + public void alterPartitions(String dbName, String tblName, + List> partValsList, List newParts) + throws InvalidObjectException, MetaException { + objectStore.alterPartitions(dbName, tblName, partValsList, newParts); + } + + @Override + public boolean addIndex(Index index) throws InvalidObjectException, MetaException { + return objectStore.addIndex(index); + } + + @Override + public Index getIndex(String dbName, String origTableName, String indexName) + throws MetaException { + return objectStore.getIndex(dbName, origTableName, indexName); + } + + @Override + public boolean dropIndex(String dbName, String origTableName, String indexName) + throws MetaException { + return objectStore.dropIndex(dbName, origTableName, indexName); + } + + @Override + public List getIndexes(String dbName, String origTableName, int max) + throws MetaException { + return objectStore.getIndexes(dbName, origTableName, max); + } + + @Override + public List listIndexNames(String dbName, String origTableName, short max) + throws MetaException { + return objectStore.listIndexNames(dbName, origTableName, max); + } + + @Override + public void alterIndex(String dbName, String baseTblName, String name, Index newIndex) + throws InvalidObjectException, MetaException { + objectStore.alterIndex(dbName, baseTblName, name, newIndex); + } + + @Override + public List getPartitionsByFilter(String dbName, String tblName, + String filter, short maxParts) throws MetaException, NoSuchObjectException { + return objectStore.getPartitionsByFilter(dbName, tblName, filter, maxParts); + } + + @Override + public int getNumPartitionsByFilter(String dbName, String tblName, + String filter) throws MetaException, NoSuchObjectException { + return objectStore.getNumPartitionsByFilter(dbName, tblName, filter); + } + + @Override + public int getNumPartitionsByExpr(String dbName, String tblName, + byte[] expr) throws MetaException, NoSuchObjectException { + return objectStore.getNumPartitionsByExpr(dbName, tblName, expr); + } + + @Override + public List getPartitionsByNames(String dbName, String tblName, + List partNames) throws MetaException, NoSuchObjectException { + return objectStore.getPartitionsByNames(dbName, tblName, partNames); + } + + @Override + public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, + String defaultPartitionName, short maxParts, List result) throws TException { + return objectStore.getPartitionsByExpr( + dbName, tblName, expr, defaultPartitionName, maxParts, result); + } + + @Override + public Table markPartitionForEvent(String dbName, String tblName, + Map partVals, PartitionEventType evtType) + throws MetaException, UnknownTableException, InvalidPartitionException, + UnknownPartitionException { + return objectStore.markPartitionForEvent(dbName, tblName, partVals, evtType); + } + + @Override + public boolean isPartitionMarkedForEvent(String dbName, String tblName, + Map partName, PartitionEventType evtType) + throws MetaException, UnknownTableException, InvalidPartitionException, + UnknownPartitionException { + return objectStore.isPartitionMarkedForEvent(dbName, tblName, partName, evtType); + } + + @Override + public boolean addRole(String rowName, String ownerName) throws InvalidObjectException, + MetaException, NoSuchObjectException { + return objectStore.addRole(rowName, ownerName); + } + + @Override + public boolean removeRole(String roleName) + throws MetaException, NoSuchObjectException { + return objectStore.removeRole(roleName); + } + + @Override + public boolean grantRole(Role role, String userName, PrincipalType principalType, + String grantor, PrincipalType grantorType, boolean grantOption) + throws MetaException, NoSuchObjectException, InvalidObjectException { + return objectStore.grantRole(role, userName, principalType, grantor, grantorType, + grantOption); + } + + @Override + public boolean revokeRole(Role role, String userName, PrincipalType principalType, boolean grantOption) + throws MetaException, NoSuchObjectException { + return objectStore.revokeRole(role, userName, principalType, grantOption); + } + + @Override + public PrincipalPrivilegeSet getUserPrivilegeSet(String userName, + List groupNames) throws InvalidObjectException, MetaException { + return objectStore.getUserPrivilegeSet(userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getDBPrivilegeSet(String dbName, String userName, + List groupNames) throws InvalidObjectException, MetaException { + return objectStore.getDBPrivilegeSet(dbName, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getTablePrivilegeSet(String dbName, String tableName, + String userName, List groupNames) + throws InvalidObjectException, MetaException { + return objectStore.getTablePrivilegeSet(dbName, tableName, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getPartitionPrivilegeSet(String dbName, String tableName, + String partition, String userName, List groupNames) + throws InvalidObjectException, MetaException { + return objectStore.getPartitionPrivilegeSet(dbName, tableName, partition, + userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, String tableName, + String partitionName, String columnName, String userName, + List groupNames) + throws InvalidObjectException, MetaException { + return objectStore.getColumnPrivilegeSet(dbName, tableName, partitionName, + columnName, userName, groupNames); + } + + @Override + public List listPrincipalGlobalGrants(String principalName, + PrincipalType principalType) { + return objectStore.listPrincipalGlobalGrants(principalName, principalType); + } + + @Override + public List listPrincipalDBGrants(String principalName, + PrincipalType principalType, String dbName) { + return objectStore.listPrincipalDBGrants(principalName, principalType, dbName); + } + + @Override + public List listAllTableGrants(String principalName, + PrincipalType principalType, String dbName, String tableName) { + return objectStore.listAllTableGrants(principalName, principalType, + dbName, tableName); + } + + @Override + public List listPrincipalPartitionGrants(String principalName, + PrincipalType principalType, String dbName, String tableName, + List partValues, + String partName) { + return objectStore.listPrincipalPartitionGrants(principalName, principalType, + dbName, tableName, partValues, partName); + } + + @Override + public List listPrincipalTableColumnGrants(String principalName, + PrincipalType principalType, String dbName, + String tableName, String columnName) { + return objectStore.listPrincipalTableColumnGrants(principalName, principalType, + dbName, tableName, columnName); + } + + @Override + public List listPrincipalPartitionColumnGrants( + String principalName, PrincipalType principalType, String dbName, String tableName, + List partVals, String partName, String columnName) { + return objectStore.listPrincipalPartitionColumnGrants(principalName, principalType, + dbName, tableName, partVals, partName, columnName); + } + + @Override + public boolean grantPrivileges(PrivilegeBag privileges) throws InvalidObjectException, + MetaException, NoSuchObjectException { + return objectStore.grantPrivileges(privileges); + } + + @Override + public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return objectStore.revokePrivileges(privileges, grantOption); + } + + @Override + public Role getRole(String roleName) throws NoSuchObjectException { + return objectStore.getRole(roleName); + } + + @Override + public List listRoleNames() { + return objectStore.listRoleNames(); + } + + @Override + public List listRoles(String principalName, PrincipalType principalType) { + return objectStore.listRoles(principalName, principalType); + } + + @Override + public List listRolesWithGrants(String principalName, + PrincipalType principalType) { + return objectStore.listRolesWithGrants(principalName, principalType); + } + + @Override + public List listRoleMembers(String roleName) { + return objectStore.listRoleMembers(roleName); + } + + @Override + public Partition getPartitionWithAuth(String dbName, String tblName, + List partVals, String userName, List groupNames) + throws MetaException, NoSuchObjectException, InvalidObjectException { + return objectStore.getPartitionWithAuth(dbName, tblName, partVals, userName, + groupNames); + } + + @Override + public List getPartitionsWithAuth(String dbName, String tblName, + short maxParts, String userName, List groupNames) + throws MetaException, NoSuchObjectException, InvalidObjectException { + return objectStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, + groupNames); + } + + @Override + public List listPartitionNamesPs(String dbName, String tblName, + List partVals, short maxParts) + throws MetaException, NoSuchObjectException { + return objectStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts); + } + + @Override + public List listPartitionsPsWithAuth(String dbName, String tblName, + List partVals, short maxParts, String userName, + List groupNames) + throws MetaException, InvalidObjectException, NoSuchObjectException { + return objectStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, + userName, groupNames); + } + + @Override + public long cleanupEvents() { + return objectStore.cleanupEvents(); + } + + @Override + public List listPrincipalDBGrantsAll( + String principalName, PrincipalType principalType) { + return objectStore.listPrincipalDBGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalTableGrantsAll( + String principalName, PrincipalType principalType) { + return objectStore.listPrincipalTableGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalPartitionGrantsAll( + String principalName, PrincipalType principalType) { + return objectStore.listPrincipalPartitionGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalTableColumnGrantsAll( + String principalName, PrincipalType principalType) { + return objectStore.listPrincipalTableColumnGrantsAll(principalName, principalType); + } + + @Override + public List listPrincipalPartitionColumnGrantsAll( + String principalName, PrincipalType principalType) { + return objectStore.listPrincipalPartitionColumnGrantsAll(principalName, principalType); + } + + @Override + public List listGlobalGrantsAll() { + return objectStore.listGlobalGrantsAll(); + } + + @Override + public List listDBGrantsAll(String dbName) { + return objectStore.listDBGrantsAll(dbName); + } + + @Override + public List listPartitionColumnGrantsAll(String dbName, String tableName, + String partitionName, String columnName) { + return objectStore.listPartitionColumnGrantsAll(dbName, tableName, partitionName, columnName); + } + + @Override + public List listTableGrantsAll(String dbName, String tableName) { + return objectStore.listTableGrantsAll(dbName, tableName); + } + + @Override + public List listPartitionGrantsAll(String dbName, String tableName, + String partitionName) { + return objectStore.listPartitionGrantsAll(dbName, tableName, partitionName); + } + + @Override + public List listTableColumnGrantsAll(String dbName, String tableName, + String columnName) { + return objectStore.listTableColumnGrantsAll(dbName, tableName, columnName); + } + + @Override + public ColumnStatistics getTableColumnStatistics(String dbName, String tableName, + List colNames) throws MetaException, NoSuchObjectException { + return objectStore.getTableColumnStatistics(dbName, tableName, colNames); + } + + @Override + public boolean deleteTableColumnStatistics(String dbName, String tableName, + String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, + InvalidInputException { + return objectStore.deleteTableColumnStatistics(dbName, tableName, colName); + } + + @Override + public boolean deletePartitionColumnStatistics(String dbName, String tableName, + String partName, List partVals, String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, + InvalidInputException { + return objectStore.deletePartitionColumnStatistics(dbName, tableName, partName, + partVals, colName); + } + + @Override + public boolean updateTableColumnStatistics(ColumnStatistics statsObj) + throws NoSuchObjectException, MetaException, InvalidObjectException, + InvalidInputException { + return objectStore.updateTableColumnStatistics(statsObj); + } + + @Override + public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj, + List partVals) + throws NoSuchObjectException, MetaException, InvalidObjectException, + InvalidInputException { + return objectStore.updatePartitionColumnStatistics(statsObj, partVals); + } + + @Override + public boolean addToken(String tokenIdentifier, String delegationToken) { + return false; + } + + @Override + public boolean removeToken(String tokenIdentifier) { + return false; + } + + @Override + public String getToken(String tokenIdentifier) { + return ""; + } + + @Override + public List getAllTokenIdentifiers() { + return new ArrayList(); + } + + @Override + public int addMasterKey(String key) throws MetaException { + return -1; + } + + @Override + public void updateMasterKey(Integer seqNo, String key) + throws NoSuchObjectException, MetaException {} + + @Override + public boolean removeMasterKey(Integer keySeq) { + return false; + } + + @Override + public String[] getMasterKeys() { + return new String[0]; + } + + @Override + public void verifySchema() throws MetaException { + } + + @Override + public String getMetaStoreSchemaVersion() throws MetaException { + return objectStore.getMetaStoreSchemaVersion(); + } + + @Override + public void setMetaStoreSchemaVersion(String schemaVersion, String comment) throws MetaException { + objectStore.setMetaStoreSchemaVersion(schemaVersion, comment); + + } + + @Override + public List getPartitionColumnStatistics(String dbName, + String tblName, List colNames, + List partNames) + throws MetaException, NoSuchObjectException { + return objectStore.getPartitionColumnStatistics(dbName, tblName , colNames, partNames); + } + + @Override + public boolean doesPartitionExist(String dbName, String tableName, + List partVals) throws MetaException, NoSuchObjectException { + return objectStore.doesPartitionExist(dbName, tableName, partVals); + } + + @Override + public boolean addPartitions(String dbName, String tblName, List parts) + throws InvalidObjectException, MetaException { + return objectStore.addPartitions(dbName, tblName, parts); + } + + @Override + public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy partitionSpec, + boolean ifNotExists) throws InvalidObjectException, MetaException { + return false; + } + + @Override + public void dropPartitions(String dbName, String tblName, List partNames) + throws MetaException, NoSuchObjectException { + objectStore.dropPartitions(dbName, tblName, partNames); + } + + @Override + public void createFunction(Function func) throws InvalidObjectException, + MetaException { + objectStore.createFunction(func); + } + + @Override + public void alterFunction(String dbName, String funcName, Function newFunction) + throws InvalidObjectException, MetaException { + objectStore.alterFunction(dbName, funcName, newFunction); + } + + @Override + public void dropFunction(String dbName, String funcName) + throws MetaException, NoSuchObjectException, InvalidObjectException, + InvalidInputException { + objectStore.dropFunction(dbName, funcName); + } + + @Override + public Function getFunction(String dbName, String funcName) + throws MetaException { + return objectStore.getFunction(dbName, funcName); + } + + @Override + public List getAllFunctions() + throws MetaException { + return Collections.emptyList(); + } + + @Override + public List getFunctions(String dbName, String pattern) + throws MetaException { + return objectStore.getFunctions(dbName, pattern); + } + + @Override + public AggrStats get_aggr_stats_for(String dbName, + String tblName, List partNames, List colNames) + throws MetaException { + return null; + } + + @Override + public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) { + return objectStore.getNextNotification(rqst); + } + + @Override + public void addNotificationEvent(NotificationEvent event) { + objectStore.addNotificationEvent(event); + } + + @Override + public void cleanNotificationEvents(int olderThan) { + objectStore.cleanNotificationEvents(olderThan); + } + + @Override + public CurrentNotificationEventId getCurrentNotificationEventId() { + return objectStore.getCurrentNotificationEventId(); + } + + @Override + public void flushCache() { + objectStore.flushCache(); + } + + @Override + public ByteBuffer[] getFileMetadata(List fileIds) { + return null; + } + + @Override + public void putFileMetadata( + List fileIds, List metadata, FileMetadataExprType type) { + } + + @Override + public boolean isFileMetadataSupported() { + return false; + } + + + @Override + public void getFileMetadataByExpr(List fileIds, FileMetadataExprType type, byte[] expr, + ByteBuffer[] metadatas, ByteBuffer[] stripeBitsets, boolean[] eliminated) { + } + + @Override + public int getTableCount() throws MetaException { + return objectStore.getTableCount(); + } + + @Override + public int getPartitionCount() throws MetaException { + return objectStore.getPartitionCount(); + } + + @Override + public int getDatabaseCount() throws MetaException { + return objectStore.getDatabaseCount(); + } + + @Override + public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { + return null; + } + + @Override + public List getPrimaryKeys(String db_name, String tbl_name) + throws MetaException { + return null; + } + + @Override + public List getForeignKeys(String parent_db_name, + String parent_tbl_name, String foreign_db_name, String foreign_tbl_name) + throws MetaException { + return null; + } + + @Override + public void createTableWithConstraints(Table tbl, + List primaryKeys, List foreignKeys) + throws InvalidObjectException, MetaException { + } + + @Override + public void dropConstraint(String dbName, String tableName, + String constraintName) throws NoSuchObjectException { + } + + @Override + public void addPrimaryKeys(List pks) + throws InvalidObjectException, MetaException { + } + + @Override + public void addForeignKeys(List fks) + throws InvalidObjectException, MetaException { + } +} \ No newline at end of file diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 81ce67bdc8fdaf11ff4fec3f255ed0021a4752c7..1cd32d5859899960337c9525ac033cd1cfb64ab2 100644 --- itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.EventRequestType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.FireEventRequestData; @@ -70,12 +69,14 @@ @BeforeClass public static void connectToMetastore() throws Exception { HiveConf conf = new HiveConf(); - conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, + conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, DbNotificationListener.class.getName()); conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL)+"s"); conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, + DummyRawStoreFailEvent.class.getName()); Class dbNotificationListener = Class.forName("org.apache.hive.hcatalog.listener.DbNotificationListener"); Class[] classes = dbNotificationListener.getDeclaredClasses(); @@ -101,6 +102,7 @@ public void setup() throws Exception { if (now > Integer.MAX_VALUE) fail("Bummer, time has fallen over the edge"); else startTime = (int) now; firstEventId = msClient.getCurrentNotificationEventId().getEventId(); + DummyRawStoreFailEvent.setEventSucceed(true); } @Test @@ -119,6 +121,17 @@ public void createDatabase() throws Exception { assertNull(event.getTableName()); assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\"," + "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"timestamp\":[0-9]+}")); + + DummyRawStoreFailEvent.setEventSucceed(false); + db = new Database("mydb2", "no description", "file:/tmp", emptyParameters); + try { + msClient.createDatabase(db); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); } @Test @@ -138,6 +151,18 @@ public void dropDatabase() throws Exception { assertNull(event.getTableName()); assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\"," + "\"servicePrincipal\":\"\",\"db\":\"dropdb\",\"timestamp\":[0-9]+}")); + + db = new Database("dropdb", "no description", "file:/tmp", emptyParameters); + msClient.createDatabase(db); + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.dropDatabase("dropdb"); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(3, rsp.getEventsSize()); } @Test @@ -162,6 +187,18 @@ public void createTable() throws Exception { assertEquals("mytable", event.getTableName()); assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"mytable\",\"timestamp\":[0-9]+}")); + + table = new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.createTable(table); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); } @Test @@ -192,6 +229,16 @@ public void alterTable() throws Exception { assertTrue(event.getMessage().matches("\\{\"eventType\":\"ALTER_TABLE\",\"server\":\"\"," + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alttable\"," + "\"timestamp\":[0-9]+}")); + + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.alter_table("default", "alttable", table); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(2, rsp.getEventsSize()); } @Test @@ -218,6 +265,19 @@ public void dropTable() throws Exception { assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\"," + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + "\"droptable\",\"timestamp\":[0-9]+}")); + + table = new Table("droptable2", "default", "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + msClient.createTable(table); + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.dropTable("default", "droptable2"); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(3, rsp.getEventsSize()); } @Test @@ -249,6 +309,18 @@ public void addPartition() throws Exception { assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + "\"addparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}")); + + partition = new Partition(Arrays.asList("tomorrow"), "default", "tableDoesNotExist", + startTime, startTime, sd, emptyParameters); + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.add_partition(partition); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(2, rsp.getEventsSize()); } @Test @@ -274,7 +346,6 @@ public void alterPartition() throws Exception { NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(3, rsp.getEventsSize()); - NotificationEvent event = rsp.getEvents().get(2); assertEquals(firstEventId + 3, event.getEventId()); assertTrue(event.getEventTime() >= startTime); @@ -285,6 +356,16 @@ public void alterPartition() throws Exception { event.getMessage().matches("\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alterparttable\"," + "\"timestamp\":[0-9]+,\"keyValues\":\\{\"ds\":\"today\"}}")); + + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.alter_partition("default", "alterparttable", newPart, null); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(3, rsp.getEventsSize()); } @Test @@ -318,6 +399,19 @@ public void dropPartition() throws Exception { assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_PARTITION\",\"server\":\"\"," + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + "\"dropparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}")); + + partition = new Partition(Arrays.asList("tomorrow"), "default", "dropPartTable", + startTime, startTime, sd, emptyParameters); + msClient.add_partition(partition); + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.dropPartition("default", "dropparttable", Arrays.asList("tomorrow"), false); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(4, rsp.getEventsSize()); } @Test diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java index af16f75e63c372c37bfd73567b777bba53f94db3..fd4527e65367a15ef2d07200ec3e8ceecfa541ee 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.metastore; +import com.google.common.collect.Lists; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -25,8 +27,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; - -import com.google.common.collect.Lists; import junit.framework.TestCase; import org.apache.hadoop.hive.cli.CliSessionState; @@ -455,5 +455,4 @@ public void testListener() throws Exception { assertEquals("true", event.getOldValue()); assertEquals("false", event.getNewValue()); } - } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java index dedd4497adfcc9d57090a943f6bb4f35ea87fa61..a3d322f3456d5e4f8a3e14d3ce543d507df2224a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; /** * Interface for Alter Table and Alter Partition code @@ -34,6 +35,33 @@ public interface AlterHandler extends Configurable { /** + * @deprecated As of release 2.2.0. Replaced by {@link #alterTable(RawStore, Warehouse, String, + * String, Table, EnvironmentContext, HMSHandler)} + * + * handles alter table, the changes could be cascaded to partitions if applicable + * + * @param msdb + * object to get metadata + * @param wh + * Hive Warehouse where table data is stored + * @param dbname + * database of the table being altered + * @param name + * original name of the table being altered. same as + * newTable.tableName if alter op is not a rename. + * @param newTable + * new table object + * @throws InvalidOperationException + * thrown if the newTable object is invalid + * @throws MetaException + * thrown if there is any other error + */ + @Deprecated + void alterTable(RawStore msdb, Warehouse wh, String dbname, + String name, Table newTable, EnvironmentContext envContext) + throws InvalidOperationException, MetaException; + + /** * handles alter table, the changes could be cascaded to partitions if applicable * * @param msdb @@ -47,18 +75,21 @@ * newTable.tableName if alter op is not a rename. * @param newTable * new table object - * @param cascade - * if the changes will be cascaded to its partitions if applicable + * @param handler + * HMSHandle object (required to log event notification) * @throws InvalidOperationException * thrown if the newTable object is invalid * @throws MetaException * thrown if there is any other error */ - public abstract void alterTable(RawStore msdb, Warehouse wh, String dbname, - String name, Table newTable, EnvironmentContext envContext) throws InvalidOperationException, - MetaException; + void alterTable(RawStore msdb, Warehouse wh, String dbname, + String name, Table newTable, EnvironmentContext envContext, + HMSHandler handler) throws InvalidOperationException, MetaException; /** + * @deprecated As of release 2.2.0. Replaced by {@link #alterPartition(RawStore, Warehouse, String, + * String, List, Partition, EnvironmentContext, HMSHandler)} + * * handles alter partition * * @param msdb @@ -78,10 +109,65 @@ public abstract void alterTable(RawStore msdb, Warehouse wh, String dbname, * @throws AlreadyExistsException * @throws MetaException */ - public abstract Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname, - final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext) - throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, - MetaException; + @Deprecated + Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname, + final String name, final List part_vals, final Partition new_part, + EnvironmentContext environmentContext) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException; + + /** + * handles alter partition + * + * @param msdb + * object to get metadata + * @param wh + * @param dbname + * database of the partition being altered + * @param name + * table of the partition being altered + * @param part_vals + * original values of the partition being altered + * @param new_part + * new partition object + * @param handler + * HMSHandle object (required to log event notification) + * @return the altered partition + * @throws InvalidOperationException + * @throws InvalidObjectException + * @throws AlreadyExistsException + * @throws MetaException + */ + Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname, + final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext, + HMSHandler handler) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException; + + /** + * @deprecated As of release 2.2.0. Replaced by {@link #alterPartitions(RawStore, Warehouse, String, + * String, List, EnvironmentContext, HMSHandler)} + * + * handles alter partitions + * + * @param msdb + * object to get metadata + * @param wh + * @param dbname + * database of the partition being altered + * @param name + * table of the partition being altered + * @param new_parts + * new partition list + * @return the altered partition list + * @throws InvalidOperationException + * @throws InvalidObjectException + * @throws AlreadyExistsException + * @throws MetaException + */ + @Deprecated + List alterPartitions(final RawStore msdb, Warehouse wh, + final String dbname, final String name, final List new_parts, + EnvironmentContext environmentContext) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException; /** * handles alter partitions @@ -95,14 +181,16 @@ public abstract Partition alterPartition(final RawStore msdb, Warehouse wh, fina * table of the partition being altered * @param new_parts * new partition list + * @param handler + * HMSHandle object (required to log event notification) * @return the altered partition list * @throws InvalidOperationException * @throws InvalidObjectException * @throws AlreadyExistsException * @throws MetaException */ - public abstract List alterPartitions(final RawStore msdb, Warehouse wh, - final String dbname, final String name, final List new_part, EnvironmentContext environmentContext) - throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, - MetaException; -} + List alterPartitions(final RawStore msdb, Warehouse wh, + final String dbname, final String name, final List new_parts, + EnvironmentContext environmentContext,HMSHandler handler) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException; +} \ No newline at end of file diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 40b337a9e40ea04a37f108146853d2d1f42dcd29..be7ed32fc8c82dcf54c00ccecefc90f3dc5b7d78 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -20,6 +20,8 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -43,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.ipc.RemoteException; import org.apache.hive.common.util.HiveStringUtils; @@ -75,7 +78,15 @@ public void setConf(Configuration conf) { @Override public void alterTable(RawStore msdb, Warehouse wh, String dbname, - String name, Table newt, EnvironmentContext environmentContext) throws InvalidOperationException, MetaException { + String name, Table newt, EnvironmentContext environmentContext) + throws InvalidOperationException, MetaException { + alterTable(msdb, wh, dbname, name, newt, environmentContext, null); + } + + @Override + public void alterTable(RawStore msdb, Warehouse wh, String dbname, + String name, Table newt, EnvironmentContext environmentContext, + HMSHandler handler) throws InvalidOperationException, MetaException { final boolean cascade = environmentContext != null && environmentContext.isSetProperties() && StatsSetupConst.TRUE.equals(environmentContext.getProperties().get( @@ -103,6 +114,10 @@ public void alterTable(RawStore msdb, Warehouse wh, String dbname, boolean rename = false; Table oldt = null; List> altps = new ArrayList>(); + List transactionalListeners = null; + if (handler != null) { + transactionalListeners = handler.getTransactionalListeners(); + } try { msdb.openTransaction(); @@ -252,6 +267,13 @@ public void alterTable(RawStore msdb, Warehouse wh, String dbname, } alterTableUpdateTableColumnStats(msdb, oldt, newt); + if (transactionalListeners != null && transactionalListeners.size() > 0) { + AlterTableEvent alterTableEvent = new AlterTableEvent(oldt, newt, true, handler); + alterTableEvent.setEnvironmentContext(environmentContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterTable(alterTableEvent); + } + } // commit the changes success = msdb.commitTransaction(); } catch (InvalidObjectException e) { @@ -268,6 +290,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String dbname, if (!success) { msdb.rollbackTransaction(); } + if (success && moveData) { // change the file name in hdfs // check that src exists otherwise there is no need to copy the data @@ -329,20 +352,32 @@ String getSimpleMessage(IOException ex) { } return ex.getMessage(); } + + @Override + public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname, + final String name, final List part_vals, final Partition new_part, + EnvironmentContext environmentContext) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException { + return alterPartition(msdb, wh, dbname, name, part_vals, new_part, environmentContext, null); + } + @Override public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname, - final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext) - throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, - MetaException { + final String name, final List part_vals, final Partition new_part, + EnvironmentContext environmentContext, HMSHandler handler) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException { boolean success = false; - Path srcPath = null; Path destPath = null; FileSystem srcFs = null; - FileSystem destFs = null; + FileSystem destFs; Partition oldPart = null; String oldPartLoc = null; String newPartLoc = null; + List transactionalListeners = null; + if (handler != null) { + transactionalListeners = handler.getTransactionalListeners(); + } // Set DDL time to now if not specified if (new_part.getParameters() == null || @@ -353,23 +388,44 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String } Table tbl = msdb.getTable(dbname, name); + if (tbl == null) { + throw new InvalidObjectException( + "Unable to alter partition because table or database does not exist."); + } + //alter partition if (part_vals == null || part_vals.size() == 0) { try { + msdb.openTransaction(); oldPart = msdb.getPartition(dbname, name, new_part.getValues()); if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl, environmentContext)) { MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true, environmentContext); } + updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part); msdb.alterPartition(dbname, name, new_part.getValues(), new_part); + if (transactionalListeners != null && transactionalListeners.size() > 0) { + AlterPartitionEvent alterPartitionEvent = + new AlterPartitionEvent(oldPart, new_part, tbl, true, handler); + alterPartitionEvent.setEnvironmentContext(environmentContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterPartition(alterPartitionEvent); + } + } + success = msdb.commitTransaction(); } catch (InvalidObjectException e) { throw new InvalidOperationException("alter is not possible"); } catch (NoSuchObjectException e){ //old partition does not exist throw new InvalidOperationException("alter is not possible"); + } finally { + if(!success) { + msdb.rollbackTransaction(); + } } return oldPart; } + //rename partition try { msdb.openTransaction(); @@ -380,21 +436,19 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String throw new InvalidObjectException( "Unable to rename partition because old partition does not exist"); } - Partition check_part = null; + + Partition check_part; try { check_part = msdb.getPartition(dbname, name, new_part.getValues()); } catch(NoSuchObjectException e) { // this means there is no existing partition check_part = null; } + if (check_part != null) { throw new AlreadyExistsException("Partition already exists:" + dbname + "." + name + "." + new_part.getValues()); } - if (tbl == null) { - throw new InvalidObjectException( - "Unable to rename partition because table or database do not exist"); - } // if the external partition is renamed, the file should not change if (tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { @@ -420,24 +474,24 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String "Unable to change partition or table. Database " + dbname + " does not exist" + " Check metastore logs for detailed stack." + e.getMessage()); } + if (destPath != null) { newPartLoc = destPath.toString(); oldPartLoc = oldPart.getSd().getLocation(); - - srcPath = new Path(oldPartLoc); - LOG.info("srcPath:" + oldPartLoc); LOG.info("descPath:" + newPartLoc); + srcPath = new Path(oldPartLoc); srcFs = wh.getFs(srcPath); destFs = wh.getFs(destPath); // check that src and dest are on the same file system if (!FileUtils.equalsFileSystem(srcFs, destFs)) { - throw new InvalidOperationException("table new location " + destPath + throw new InvalidOperationException("New table location " + destPath + " is on a different file system than the old location " - + srcPath + ". This operation is not supported"); + + srcPath + ". This operation is not supported."); } + try { - srcFs.exists(srcPath); // check that src exists and also checks + srcFs.exists(srcPath); if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) { throw new InvalidOperationException("New location for this table " + tbl.getDbName() + "." + tbl.getTableName() @@ -448,10 +502,12 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String + destPath + " for partition " + tbl.getDbName() + "." + tbl.getTableName() + " " + new_part.getValues()); } + new_part.getSd().setLocation(newPartLoc); if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl, environmentContext)) { MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true, environmentContext); } + String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues()); try { //existing partition column stats is no longer valid, remove @@ -461,15 +517,26 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String } catch (InvalidInputException iie) { throw new InvalidOperationException("Unable to update partition stats in table rename." + iie); } + msdb.alterPartition(dbname, name, part_vals, new_part); } } + if (transactionalListeners != null && transactionalListeners.size() > 0) { + AlterPartitionEvent alterPartitionEvent = + new AlterPartitionEvent(oldPart, new_part, tbl, true, handler); + alterPartitionEvent.setEnvironmentContext(environmentContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterPartition(alterPartitionEvent); + } + } + success = msdb.commitTransaction(); } finally { if (!success) { msdb.rollbackTransaction(); } + if (success && newPartLoc != null && newPartLoc.compareTo(oldPartLoc) != 0) { //rename the data directory try{ @@ -479,21 +546,35 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String if (!wh.mkdirs(destParentPath, true)) { throw new IOException("Unable to create path " + destParentPath); } + wh.renameDir(srcPath, destPath, true); - LOG.info("rename done!"); + LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done."); } - } catch (IOException e) { + } catch (IOException ex) { + LOG.error("Cannot rename partition directory from " + srcPath + " to " + + destPath, ex); boolean revertMetaDataTransaction = false; try { msdb.openTransaction(); msdb.alterPartition(dbname, name, new_part.getValues(), oldPart); + if (transactionalListeners != null && transactionalListeners.size() > 0) { + AlterPartitionEvent alterPartitionEvent = + new AlterPartitionEvent(new_part, oldPart, tbl, true, handler); + alterPartitionEvent.setEnvironmentContext(environmentContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterPartition(alterPartitionEvent); + } + } + revertMetaDataTransaction = msdb.commitTransaction(); - } catch (Exception e1) { - LOG.error("Reverting metadata opeation failed During HDFS operation failed", e1); + } catch (Exception ex2) { + LOG.error("Attempt to revert partition metadata change failed. The revert was attempted " + + "because associated filesystem rename operation failed with exception " + ex.getMessage(), ex2); if (!revertMetaDataTransaction) { msdb.rollbackTransaction(); } } + throw new InvalidOperationException("Unable to access old location " + srcPath + " for partition " + tbl.getDbName() + "." + tbl.getTableName() + " " + part_vals); @@ -505,13 +586,33 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String @Override public List alterPartitions(final RawStore msdb, Warehouse wh, final String dbname, - final String name, final List new_parts, EnvironmentContext environmentContext) - throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, - MetaException { + final String name, final List new_parts, + EnvironmentContext environmentContext) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException { + return alterPartitions(msdb, wh, dbname, name, new_parts, environmentContext, null); + } + + @Override + public List alterPartitions(final RawStore msdb, Warehouse wh, final String dbname, + final String name, final List new_parts, EnvironmentContext environmentContext, + HMSHandler handler) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException { List oldParts = new ArrayList(); List> partValsList = new ArrayList>(); + List transactionalListeners = null; + if (handler != null) { + transactionalListeners = handler.getTransactionalListeners(); + } + Table tbl = msdb.getTable(dbname, name); + if (tbl == null) { + throw new InvalidObjectException( + "Unable to alter partitions because table or database does not exist."); + } + + boolean success = false; try { + msdb.openTransaction(); for (Partition tmpPart: new_parts) { // Set DDL time to now if not specified if (tmpPart.getParameters() == null || @@ -530,10 +631,36 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String } updatePartColumnStats(msdb, dbname, name, oldTmpPart.getValues(), tmpPart); } + msdb.alterPartitions(dbname, name, partValsList, new_parts); + Iterator oldPartsIt = oldParts.iterator(); + for (Partition newPart : new_parts) { + Partition oldPart; + if (oldPartsIt.hasNext()) { + oldPart = oldPartsIt.next(); + } else { + throw new InvalidOperationException("Missing old partition corresponding to new partition " + + "when invoking MetaStoreEventListener for alterPartitions event."); + } + + if (transactionalListeners != null && transactionalListeners.size() > 0) { + AlterPartitionEvent alterPartitionEvent = + new AlterPartitionEvent(oldPart, newPart, tbl, true, handler); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterPartition(alterPartitionEvent); + } + } + } + + success = msdb.commitTransaction(); } catch (InvalidObjectException | NoSuchObjectException e) { - throw new InvalidOperationException("Alter partition operation fails: " + e); + throw new InvalidOperationException("Alter partition operation failed: " + e); + } finally { + if(!success) { + msdb.rollbackTransaction(); + } } + return oldParts; } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 60e462fd06a3f84d5b87cd335afb49768cb27562..6179c8e335501b420431ff94d71b927d252d7866 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -27,8 +27,8 @@ import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; - import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.cli.OptionBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JvmPauseMonitor; import org.apache.hadoop.hive.common.LogUtils; -import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.auth.HiveAuthUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; @@ -49,9 +48,120 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AddForeignKeyRequest; +import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest; +import org.apache.hadoop.hive.metastore.api.AddPartitionsResult; +import org.apache.hadoop.hive.metastore.api.AddPrimaryKeyRequest; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.CacheFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.CacheFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.ClearFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.DropConstraintRequest; +import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr; +import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest; +import org.apache.hadoop.hive.metastore.api.DropPartitionsResult; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; +import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventResponse; +import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; +import org.apache.hadoop.hive.metastore.api.ForeignKeysResponse; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; +import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse; +import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; +import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse; +import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest; +import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeResponse; +import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleRequest; +import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; +import org.apache.hadoop.hive.metastore.api.HiveObjectRef; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.InvalidInputException; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.MetadataPpdResult; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; +import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD; +import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD; +import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult; +import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsStatsResult; +import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; +import org.apache.hadoop.hive.metastore.api.PrimaryKeysResponse; +import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.PrivilegeBag; +import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo; +import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.PutFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; +import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.TableStatsRequest; +import org.apache.hadoop.hive.metastore.api.TableStatsResult; +import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; +import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; @@ -94,8 +204,8 @@ import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.hive.thrift.TUGIContainingTransport; import org.apache.hadoop.hive.thrift.HiveDelegationTokenManager; +import org.apache.hadoop.hive.thrift.TUGIContainingTransport; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -119,11 +229,7 @@ import org.slf4j.LoggerFactory; import javax.jdo.JDOException; - import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -144,8 +250,8 @@ import java.util.Timer; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -391,6 +497,7 @@ public HiveConf getHiveConf() { private AlterHandler alterHandler; private List preListeners; private List listeners; + private List transactionalListeners; private List endFunctionListeners; private List initListeners; private Pattern partitionValidationPattern; @@ -402,6 +509,10 @@ public HiveConf getHiveConf() { } } + List getTransactionalListeners() { + return transactionalListeners; + } + @Override public void init() throws MetaException { rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); @@ -473,7 +584,8 @@ public Object getValue() { hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS)); listeners.add(new SessionPropertiesListener(hiveConf)); listeners.add(new AcidEventListener(hiveConf)); - + transactionalListeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class,hiveConf, + hiveConf.getVar(ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS)); if (metrics != null) { listeners.add(new HMSMetricsListener(hiveConf, metrics)); } @@ -546,6 +658,15 @@ public void setMetaConf(String key, String value) throws MetaException { for (MetaStoreEventListener listener : listeners) { listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, value)); } + + if (transactionalListeners.size() > 0) { + // All the fields of this event are final, so no reason to create a new one for each + // listener + ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, value); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onConfigChange(cce); + } + } } @Override @@ -842,19 +963,18 @@ private void create_database_core(RawStore ms, final Database db) if (!validateName(db.getName(), null)) { throw new InvalidObjectException(db.getName() + " is not a valid database name"); } + if (null == db.getLocationUri()) { db.setLocationUri(wh.getDefaultDatabasePath(db.getName()).toString()); } else { db.setLocationUri(wh.getDnsPath(new Path(db.getLocationUri())).toString()); } + Path dbPath = new Path(db.getLocationUri()); boolean success = false; boolean madeDir = false; - try { - firePreEvent(new PreCreateDatabaseEvent(db, this)); - if (!wh.isDir(dbPath)) { if (!wh.mkdirs(dbPath, true)) { throw new MetaException("Unable to create database path " + dbPath + @@ -865,6 +985,13 @@ private void create_database_core(RawStore ms, final Database db) ms.openTransaction(); ms.createDatabase(db); + if (transactionalListeners.size() > 0) { + CreateDatabaseEvent cde = new CreateDatabaseEvent(db, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onCreateDatabase(cde); + } + } + success = ms.commitTransaction(); } finally { if (!success) { @@ -902,7 +1029,6 @@ public void create_database(final Database db) } Deadline.checkTimeout(); } - create_database_core(getMS(), db); success = true; } catch (Exception e) { @@ -1072,6 +1198,12 @@ private void drop_database_core(RawStore ms, } if (ms.dropDatabase(name)) { + if (transactionalListeners.size() > 0) { + DropDatabaseEvent dde = new DropDatabaseEvent(db, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropDatabase(dde); + } + } success = ms.commitTransaction(); } } finally { @@ -1321,10 +1453,10 @@ public boolean drop_type(final String name) throws MetaException, NoSuchObjectEx } private void create_table_core(final RawStore ms, final Table tbl, - final EnvironmentContext envContext, List primaryKeys, List foreignKeys) + final EnvironmentContext envContext, List primaryKeys, + List foreignKeys) throws AlreadyExistsException, MetaException, InvalidObjectException, NoSuchObjectException { - if (!MetaStoreUtils.validateName(tbl.getTableName(), hiveConf)) { throw new InvalidObjectException(tbl.getTableName() + " is not a valid object name"); @@ -1411,8 +1543,16 @@ private void create_table_core(final RawStore ms, final Table tbl, } else { ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys); } - success = ms.commitTransaction(); + if (transactionalListeners.size() > 0) { + CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, this); + createTableEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onCreateTable(createTableEvent); + } + } + + success = ms.commitTransaction(); } finally { if (!success) { ms.rollbackTransaction(); @@ -1637,13 +1777,20 @@ private boolean drop_table_core(final RawStore ms, final String dbname, final St // Drop the partitions and get a list of locations which need to be deleted partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath, tbl.getPartitionKeys(), deleteData && !isExternal); - if (!ms.dropTable(dbname, name)) { String tableName = dbname + "." + name; throw new MetaException(indexName == null ? "Unable to drop table " + tableName: "Unable to drop index table " + tableName + " for index " + indexName); + } else { + if (transactionalListeners.size() > 0) { + DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, deleteData, this); + dropTableEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropTable(dropTableEvent); + } + } + success = ms.commitTransaction(); } - success = ms.commitTransaction(); } finally { if (!success) { ms.rollbackTransaction(); @@ -2124,8 +2271,15 @@ private Partition append_partition_common(RawStore ms, String dbName, String tab MetaStoreUtils.updatePartitionStatsFast(part, wh, madeDir, envContext); } - success = ms.addPartition(part); - if (success) { + if (ms.addPartition(part)) { + if (transactionalListeners.size() > 0) { + AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, part, true, this); + addPartitionEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAddPartition(addPartitionEvent); + } + } + success = ms.commitTransaction(); } } finally { @@ -2278,13 +2432,13 @@ public boolean equals(Object obj) { private List add_partitions_core(final RawStore ms, String dbName, String tblName, List parts, final boolean ifNotExists) - throws MetaException, InvalidObjectException, AlreadyExistsException, TException { + throws MetaException, InvalidObjectException, AlreadyExistsException, TException { logInfo("add_partitions"); boolean success = false; // Ensures that the list doesn't have dups, and keeps track of directories we have created. final Map addedPartitions = Collections.synchronizedMap(new HashMap()); - final List result = new ArrayList(); + final List newParts = new ArrayList(); final List existingParts = new ArrayList();; Table tbl = null; try { @@ -2300,7 +2454,6 @@ public boolean equals(Object obj) { } List> partFutures = Lists.newArrayList(); - final Table table = tbl; for (final Partition part : parts) { if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { @@ -2315,7 +2468,6 @@ public boolean equals(Object obj) { continue; } - partFutures.add(threadPool.submit(new Callable() { @Override public Partition call() throws Exception { @@ -2331,11 +2483,12 @@ public Partition call() throws Exception { } })); } + try { for (Future partFuture : partFutures) { Partition part = partFuture.get(); if (part != null) { - result.add(part); + newParts.add(part); } } } catch (InterruptedException | ExecutionException e) { @@ -2345,12 +2498,19 @@ public Partition call() throws Exception { } throw new MetaException(e.getMessage()); } - if (!result.isEmpty()) { - success = ms.addPartitions(dbName, tblName, result); + + if (!newParts.isEmpty()) { + success = ms.addPartitions(dbName, tblName, newParts); } else { success = true; } - success = success && ms.commitTransaction(); + + // Setting success to false to make sure that if the listener fails, rollback happens. + success = false; + // Notification is generated for newly created partitions only. The subset of partitions + // that already exist (existingParts), will not generate notifications. + fireMetaStoreAddPartitionEventTransactional(tbl, newParts, null, true); + success = ms.commitTransaction(); } finally { if (!success) { ms.rollbackTransaction(); @@ -2362,14 +2522,14 @@ public Partition call() throws Exception { } fireMetaStoreAddPartitionEvent(tbl, parts, null, false); } else { - fireMetaStoreAddPartitionEvent(tbl, result, null, true); + fireMetaStoreAddPartitionEvent(tbl, newParts, null, true); if (existingParts != null) { // The request has succeeded but we failed to add these partitions. fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false); } } } - return result; + return newParts; } @Override @@ -2461,23 +2621,22 @@ private int add_partitions_pspec_core( } firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this)); - List> partFutures = Lists.newArrayList(); final Table table = tbl; - while(partitionIterator.hasNext()) { - final Partition part = partitionIterator.getCurrent(); if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { throw new MetaException("Partition does not belong to target table " + dbName + "." + tblName + ": " + part); } + boolean shouldAdd = startAddPartition(ms, part, ifNotExists); if (!shouldAdd) { LOG.info("Not adding partition " + part + " as it already exists"); continue; } + partFutures.add(threadPool.submit(new Callable() { @Override public Object call() throws Exception { boolean madeDir = createLocationForAddedPartition(table, part); @@ -2506,9 +2665,11 @@ private int add_partitions_pspec_core( throw new MetaException(e.getMessage()); } - success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists) - && ms.commitTransaction(); - + success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists); + //setting success to false to make sure that if the listener fails, rollback happens. + success = false; + fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, null, true); + success = ms.commitTransaction(); return addedPartitions.size(); } finally { if (!success) { @@ -2646,9 +2807,13 @@ private Partition add_partition_core(final RawStore ms, wh.deleteDir(new Path(part.getSd().getLocation()), true); } } + + // Setting success to false to make sure that if the listener fails, rollback happens. + success = false; + fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), envContext, true); // we proceed only if we'd actually succeeded anyway, otherwise, // we'd have thrown an exception - success = success && ms.commitTransaction(); + success = ms.commitTransaction(); } finally { if (!success) { ms.rollbackTransaction(); @@ -2665,7 +2830,6 @@ private void fireMetaStoreAddPartitionEvent(final Table tbl, AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, parts, success, this); addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener listener : listeners) { listener.onAddPartition(addPartitionEvent); } @@ -2679,13 +2843,39 @@ private void fireMetaStoreAddPartitionEvent(final Table tbl, AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, partitionSpec, success, this); addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener listener : listeners) { listener.onAddPartition(addPartitionEvent); } } } + private void fireMetaStoreAddPartitionEventTransactional(final Table tbl, + final List parts, final EnvironmentContext envContext, boolean success) + throws MetaException { + if (tbl != null && parts != null && !parts.isEmpty()) { + AddPartitionEvent addPartitionEvent = + new AddPartitionEvent(tbl, parts, success, this); + addPartitionEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAddPartition(addPartitionEvent); + } + } + } + + private void fireMetaStoreAddPartitionEventTransactional(final Table tbl, + final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success) + throws MetaException { + if (tbl != null && partitionSpec != null) { + AddPartitionEvent addPartitionEvent = + new AddPartitionEvent(tbl, partitionSpec, success, this); + addPartitionEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAddPartition(addPartitionEvent); + } + } + } + + @Override public Partition add_partition(final Partition part) throws InvalidObjectException, AlreadyExistsException, MetaException { @@ -2847,8 +3037,17 @@ private boolean drop_partition_common(RawStore ms, String db_name, String tbl_na if (!ms.dropPartition(db_name, tbl_name, part_vals)) { throw new MetaException("Unable to drop partition"); + } else { + if (transactionalListeners.size() > 0) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(tbl, part, true, deleteData, this); + dropPartitionEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropPartition(dropPartitionEvent); + } + } + success = ms.commitTransaction(); } - success = ms.commitTransaction(); } finally { if (!success) { ms.rollbackTransaction(); @@ -3022,11 +3221,23 @@ public DropPartitionsResult drop_partitions_req( } ms.dropPartitions(dbName, tblName, partNames); + if (parts != null && transactionalListeners.size() > 0) { + for (Partition part : parts) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(tbl, part, true, deleteData, this); + dropPartitionEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropPartition(dropPartitionEvent); + } + } + } + success = ms.commitTransaction(); DropPartitionsResult result = new DropPartitionsResult(); if (needResult) { result.setPartitions(parts); } + return result; } finally { if (!success) { @@ -3434,8 +3645,7 @@ private static boolean is_partition_spec_grouping_enabled(Table table) { @Override public void alter_partition(final String db_name, final String tbl_name, final Partition new_part) - throws InvalidOperationException, MetaException, - TException { + throws InvalidOperationException, MetaException, TException { rename_partition(db_name, tbl_name, null, new_part); } @@ -3459,8 +3669,7 @@ public void rename_partition(final String db_name, final String tbl_name, private void rename_partition(final String db_name, final String tbl_name, final List part_vals, final Partition new_part, final EnvironmentContext envContext) - throws InvalidOperationException, MetaException, - TException { + throws InvalidOperationException, MetaException, TException { startTableFunction("alter_partition", db_name, tbl_name); if (LOG.isInfoEnabled()) { @@ -3483,13 +3692,13 @@ private void rename_partition(final String db_name, final String tbl_name, Exception ex = null; try { firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this)); - if (part_vals != null && !part_vals.isEmpty()) { MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(), partitionValidationPattern); } - oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part, envContext); + oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part, + envContext, this); // Only fetch the table if we actually have a listener Table table = null; @@ -3522,21 +3731,19 @@ private void rename_partition(final String db_name, final String tbl_name, } finally { endFunction("alter_partition", oldPart != null, ex, tbl_name); } - return; } @Override public void alter_partitions(final String db_name, final String tbl_name, - final List new_parts) throws InvalidOperationException, MetaException, - TException { + final List new_parts) + throws InvalidOperationException, MetaException, TException { alter_partitions_with_environment_context(db_name, tbl_name, new_parts, null); } @Override public void alter_partitions_with_environment_context(final String db_name, final String tbl_name, final List new_parts, EnvironmentContext environmentContext) - throws InvalidOperationException, MetaException, - TException { + throws InvalidOperationException, MetaException, TException { startTableFunction("alter_partitions", db_name, tbl_name); @@ -3553,8 +3760,8 @@ public void alter_partitions_with_environment_context(final String db_name, fina for (Partition tmpPart : new_parts) { firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, null, tmpPart, this)); } - oldParts = alterHandler.alterPartitions(getMS(), wh, db_name, tbl_name, new_parts, environmentContext); - + oldParts = alterHandler.alterPartitions(getMS(), wh, db_name, tbl_name, new_parts, + environmentContext, this); Iterator olditr = oldParts.iterator(); // Only fetch the table if we have a listener that needs it. Table table = null; @@ -3595,7 +3802,6 @@ public void alter_partitions_with_environment_context(final String db_name, fina } finally { endFunction("alter_partition", oldParts != null, ex, tbl_name); } - return; } @Override @@ -3610,13 +3816,22 @@ public void alter_index(final String dbname, final String base_table_name, boolean success = false; Exception ex = null; Index oldIndex = null; + RawStore ms = getMS(); try { + ms.openTransaction(); oldIndex = get_index_by_name(dbname, base_table_name, index_name); firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this)); - getMS().alterIndex(dbname, base_table_name, index_name, newIndex); - success = true; + ms.alterIndex(dbname, base_table_name, index_name, newIndex); + if (transactionalListeners.size() > 0) { + AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterIndex(alterIndexEvent); + } + } + + success = ms.commitTransaction(); } catch (InvalidObjectException e) { ex = e; throw new InvalidOperationException(e.getMessage()); @@ -3630,13 +3845,16 @@ public void alter_index(final String dbname, final String base_table_name, throw newMetaException(e); } } finally { + if (!success) { + ms.rollbackTransaction(); + } + endFunction("alter_index", success, ex, base_table_name); for (MetaStoreEventListener listener : listeners) { AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, success, this); listener.onAlterIndex(alterIndexEvent); } } - return; } @Override @@ -3699,9 +3917,9 @@ private void alter_table_core(final String dbname, final String name, final Tabl try { Table oldt = get_table_core(dbname, name); firePreEvent(new PreAlterTableEvent(oldt, newTable, this)); - alterHandler.alterTable(getMS(), wh, dbname, name, newTable, envContext); + alterHandler.alterTable(getMS(), wh, dbname, name, newTable, + envContext, this); success = true; - for (MetaStoreEventListener listener : listeners) { AlterTableEvent alterTableEvent = new AlterTableEvent(oldt, newTable, success, this); @@ -4314,7 +4532,15 @@ private Index add_index_core(final RawStore ms, final Index index, final Table i index.setCreateTime((int) time); index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); - ms.addIndex(index); + if (ms.addIndex(index)) { + if (transactionalListeners.size() > 0) { + AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAddIndex(addIndexEvent); + } + } + } + success = ms.commitTransaction(); return index; } finally { @@ -4327,6 +4553,7 @@ private Index add_index_core(final RawStore ms, final Index index, final Table i } ms.rollbackTransaction(); } + for (MetaStoreEventListener listener : listeners) { AddIndexEvent addIndexEvent = new AddIndexEvent(index, success, this); listener.onAddIndex(addIndexEvent); @@ -4404,6 +4631,14 @@ private boolean drop_index_by_name_core(final RawStore ms, + qualified[0] + "." + qualified[1] + " for index " + indexName); } } + + if (transactionalListeners.size() > 0) { + DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropIndex(dropIndexEvent); + } + } + success = ms.commitTransaction(); } finally { if (!success) { @@ -5707,17 +5942,28 @@ public void markPartitionForEvent(final String db_name, final String tbl_name, Table tbl = null; Exception ex = null; + RawStore ms = getMS(); + boolean success = false; try { + ms.openTransaction(); startPartitionFunction("markPartitionForEvent", db_name, tbl_name, partName); firePreEvent(new PreLoadPartitionDoneEvent(db_name, tbl_name, partName, this)); - tbl = getMS().markPartitionForEvent(db_name, tbl_name, partName, evtType); + tbl = ms.markPartitionForEvent(db_name, tbl_name, partName, evtType); if (null == tbl) { throw new UnknownTableException("Table: " + tbl_name + " not found."); - } else { - for (MetaStoreEventListener listener : listeners) { - listener.onLoadPartitionDone(new LoadPartitionDoneEvent(true, tbl, partName, this)); + } + + if (transactionalListeners.size() > 0) { + LoadPartitionDoneEvent lpde = new LoadPartitionDoneEvent(true, tbl, partName, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onLoadPartitionDone(lpde); } } + + success = ms.commitTransaction(); + for (MetaStoreEventListener listener : listeners) { + listener.onLoadPartitionDone(new LoadPartitionDoneEvent(true, tbl, partName, this)); + } } catch (Exception original) { ex = original; LOG.error("Exception caught in mark partition event ", original); @@ -5737,7 +5983,11 @@ public void markPartitionForEvent(final String db_name, final String tbl_name, throw newMetaException(original); } } finally { - endFunction("markPartitionForEvent", tbl != null, ex, tbl_name); + if (!success) { + ms.rollbackTransaction(); + } + + endFunction("markPartitionForEvent", tbl != null, ex, tbl_name); } } @@ -6244,9 +6494,14 @@ public FireEventResponse fire_listener_event(FireEventRequest rqst) throws TExce InsertEvent event = new InsertEvent(rqst.getDbName(), rqst.getTableName(), rqst.getPartitionVals(), rqst.getData().getInsertData().getFilesAdded(), rqst.isSuccessful(), this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onInsert(event); + } + for (MetaStoreEventListener listener : listeners) { listener.onInsert(event); } + return new FireEventResponse(); default: