diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5ea9751..479cc8f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -18,6 +18,30 @@ package org.apache.hadoop.hive.conf; +import com.google.common.base.Joiner; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.hive.conf.Validator.PatternSet; +import org.apache.hadoop.hive.conf.Validator.RangeValidator; +import org.apache.hadoop.hive.conf.Validator.RatioValidator; +import org.apache.hadoop.hive.conf.Validator.SizeValidator; +import org.apache.hadoop.hive.conf.Validator.StringSet; +import org.apache.hadoop.hive.conf.Validator.TimeValidator; +import org.apache.hadoop.hive.conf.Validator.WritableDirectoryValidator; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Shell; +import org.apache.hive.common.HiveCompat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.LoginException; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; @@ -43,32 +67,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.security.auth.login.LoginException; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; -import org.apache.hadoop.hive.conf.Validator.PatternSet; -import org.apache.hadoop.hive.conf.Validator.RangeValidator; -import org.apache.hadoop.hive.conf.Validator.RatioValidator; -import org.apache.hadoop.hive.conf.Validator.SizeValidator; -import org.apache.hadoop.hive.conf.Validator.StringSet; -import org.apache.hadoop.hive.conf.Validator.TimeValidator; -import org.apache.hadoop.hive.conf.Validator.WritableDirectoryValidator; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Shell; -import org.apache.hive.common.HiveCompat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; - /** * Hive Configuration. */ @@ -247,6 +245,7 @@ private static URL checkConfigFile(File f) { HiveConf.ConfVars.METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS, HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX, HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, + HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, HiveConf.ConfVars.METASTORE_EVENT_CLEAN_FREQ, HiveConf.ConfVars.METASTORE_EVENT_EXPIRY_DURATION, HiveConf.ConfVars.METASTORE_FILTER_HOOK, @@ -762,6 +761,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal METASTORE_PRE_EVENT_LISTENERS("hive.metastore.pre.event.listeners", "", "List of comma separated listeners for metastore events."), METASTORE_EVENT_LISTENERS("hive.metastore.event.listeners", "", ""), + METASTORE_TRANSACTIONAL_EVENT_LISTENERS("hive.metastore.transactional.event.listeners", "", + "The Listerners added to this property will run within the actual metastore ops transaction"), METASTORE_EVENT_DB_LISTENER_TTL("hive.metastore.event.db.listener.timetolive", "86400s", new TimeValidator(TimeUnit.SECONDS), "time after which events will be removed from the database listener queue"), diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java index af16f75..ec2c903 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java @@ -18,17 +18,8 @@ package org.apache.hadoop.hive.metastore; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - import com.google.common.collect.Lists; import junit.framework.TestCase; - import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; @@ -67,6 +58,16 @@ import org.apache.hadoop.hive.ql.processors.SetProcessor; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.ShimLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; /** * TestMetaStoreEventListener. Test case for @@ -74,6 +75,8 @@ * {@link org.apache.hadoop.hive.metastore.MetaStorePreEventListener} */ public class TestMetaStoreEventListener extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(TestMetaStoreEventListener.class); + private HiveConf hiveConf; private HiveMetaStoreClient msc; private Driver driver; @@ -91,6 +94,8 @@ protected void setUp() throws Exception { DummyListener.class.getName()); System.setProperty("hive.metastore.pre.event.listeners", DummyPreListener.class.getName()); + System.setProperty(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS.varname, + DummyTransactionalListener.class.getName()); int port = MetaStoreUtils.findFreePort(); MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge()); @@ -240,6 +245,10 @@ public void testListener() throws Exception { assert dbEvent.getStatus(); validateCreateDb(db, dbEvent.getDatabase()); + LOG.info("Going to get txnl event for create db"); + dbEvent = (CreateDatabaseEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert dbEvent.getStatus(); + validateCreateDb(db, dbEvent.getDatabase()); driver.run("use " + dbName); driver.run(String.format("create table %s (a string) partitioned by (b string)", tblName)); @@ -253,6 +262,10 @@ public void testListener() throws Exception { assert tblEvent.getStatus(); validateCreateTable(tbl, tblEvent.getTable()); + tblEvent = (CreateTableEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert tblEvent.getStatus(); + validateCreateTable(tbl, tblEvent.getTable()); + driver.run("create index tmptbl_i on table tmptbl(a) as 'compact' " + "WITH DEFERRED REBUILD IDXPROPERTIES ('prop1'='val1', 'prop2'='val2')"); listSize += 2; // creates index table internally @@ -268,6 +281,10 @@ public void testListener() throws Exception { validateAddIndex(oldIndex, preAddIndexEvent.getIndex()); + addIndexEvent = (AddIndexEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert addIndexEvent.getStatus(); + validateAddIndex(oldIndex, addIndexEvent.getIndex()); + driver.run("alter index tmptbl_i on tmptbl set IDXPROPERTIES " + "('prop1'='val1_new', 'prop3'='val3')"); listSize++; @@ -280,6 +297,11 @@ public void testListener() throws Exception { validateAlterIndex(oldIndex, alterIndexEvent.getOldIndex(), newIndex, alterIndexEvent.getNewIndex()); + alterIndexEvent = (AlterIndexEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert alterIndexEvent.getStatus(); + validateAlterIndex(oldIndex, alterIndexEvent.getOldIndex(), + newIndex, alterIndexEvent.getNewIndex()); + PreAlterIndexEvent preAlterIndexEvent = (PreAlterIndexEvent) (preNotifyList.get(preNotifyList.size() - 1)); validateAlterIndex(oldIndex, preAlterIndexEvent.getOldIndex(), newIndex, preAlterIndexEvent.getNewIndex()); @@ -292,6 +314,10 @@ public void testListener() throws Exception { assert dropIndexEvent.getStatus(); validateDropIndex(newIndex, dropIndexEvent.getIndex()); + dropIndexEvent = (DropIndexEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert dropIndexEvent.getStatus(); + validateDropIndex(newIndex, dropIndexEvent.getIndex()); + PreDropIndexEvent preDropIndexEvent = (PreDropIndexEvent) (preNotifyList.get(preNotifyList.size() - 1)); validateDropIndex(newIndex, preDropIndexEvent.getIndex()); @@ -308,6 +334,12 @@ public void testListener() throws Exception { validateTableInAddPartition(tbl, partEvent.getTable()); validateAddPartition(part, prePartEvent.getPartitions().get(0)); + partEvent = (AddPartitionEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert partEvent.getStatus(); + partAdded = partEvent.getPartitionIterator().next(); + validateAddPartition(part, partAdded); + validateTableInAddPartition(tbl, partEvent.getTable()); + // Test adding multiple partitions in a single partition-set, atomically. int currentTime = (int)System.currentTimeMillis(); HiveMetaStoreClient hmsClient = new HiveMetaStoreClient(hiveConf); @@ -328,6 +360,14 @@ public void testListener() throws Exception { assertEquals("Unexpected partition value.", partition2.getValues(), multiParts.get(1).getValues()); assertEquals("Unexpected partition value.", partition3.getValues(), multiParts.get(2).getValues()); + multiplePartitionEvent = (AddPartitionEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assertEquals("Unexpected table value.", table, multiplePartitionEvent.getTable()); + multiParts = Lists.newArrayList(multiplePartitionEvent.getPartitionIterator()); + assertEquals("Unexpected number of partitions in event!", 3, multiParts.size()); + assertEquals("Unexpected partition value.", partition1.getValues(), multiParts.get(0).getValues()); + assertEquals("Unexpected partition value.", partition2.getValues(), multiParts.get(1).getValues()); + assertEquals("Unexpected partition value.", partition3.getValues(), multiParts.get(2).getValues()); + driver.run(String.format("alter table %s touch partition (%s)", tblName, "b='2011'")); listSize++; assertEquals(notifyList.size(), listSize); @@ -349,6 +389,12 @@ public void testListener() throws Exception { preAlterPartEvent.getTableName(), preAlterPartEvent.getNewPartition().getValues(), preAlterPartEvent.getNewPartition()); + alterPartEvent = (AlterPartitionEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert alterPartEvent.getStatus(); + validateAlterPartition(origP, origP, alterPartEvent.getOldPartition().getDbName(), + alterPartEvent.getOldPartition().getTableName(), + alterPartEvent.getOldPartition().getValues(), alterPartEvent.getNewPartition()); + List part_vals = new ArrayList(); part_vals.add("c=2012"); int preEventListSize; @@ -381,6 +427,10 @@ public void testListener() throws Exception { validateAlterTable(tbl, renamedTable, preAlterTableE.getOldTable(), preAlterTableE.getNewTable()); + alterTableE = (AlterTableEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert alterTableE.getStatus(); + validateAlterTable(tbl, renamedTable, alterTableE.getOldTable(), alterTableE.getNewTable()); + //change the table name back driver.run(String.format("alter table %s rename to %s", renamed, tblName)); listSize++; @@ -399,6 +449,10 @@ public void testListener() throws Exception { validateAlterTableColumns(tbl, altTable, preAlterTableE.getOldTable(), preAlterTableE.getNewTable()); + alterTableE = (AlterTableEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert alterTableE.getStatus(); + validateAlterTableColumns(tbl, altTable, alterTableE.getOldTable(), alterTableE.getNewTable()); + Map kvs = new HashMap(1); kvs.put("b", "2011"); msc.markPartitionForEvent("hive2038", "tmptbl", kvs, PartitionEventType.LOAD_DONE); @@ -429,6 +483,11 @@ public void testListener() throws Exception { validateDropPartition(Collections.singletonList(part).iterator(), preDropPart.getPartitionIterator()); validateTableInDropPartition(tbl, preDropPart.getTable()); + dropPart = (DropPartitionEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert dropPart.getStatus(); + validateDropPartition(Collections.singletonList(part).iterator(), dropPart.getPartitionIterator()); + validateTableInDropPartition(tbl, dropPart.getTable()); + driver.run("drop table " + tblName); listSize++; assertEquals(notifyList.size(), listSize); @@ -439,6 +498,10 @@ public void testListener() throws Exception { validateDropTable(tbl, dropTbl.getTable()); validateDropTable(tbl, preDropTbl.getTable()); + dropTbl = (DropTableEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert dropTbl.getStatus(); + validateDropTable(tbl, dropTbl.getTable()); + driver.run("drop database " + dbName); listSize++; assertEquals(notifyList.size(), listSize); @@ -449,6 +512,10 @@ public void testListener() throws Exception { validateDropDb(db, dropDB.getDatabase()); validateDropDb(db, preDropDB.getDatabase()); + dropDB = (DropDatabaseEvent)(DummyTransactionalListener.getLastTransactionalEvent()); + assert dropDB.getStatus(); + validateDropDb(db, dropDB.getDatabase()); + SetProcessor.setVariable("metaconf:hive.metastore.try.direct.sql", "false"); ConfigChangeEvent event = (ConfigChangeEvent) notifyList.get(notifyList.size() - 1); assertEquals("hive.metastore.try.direct.sql", event.getKey()); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java index dedd449..ca36be0 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; /** * Interface for Alter Table and Alter Partition code @@ -49,14 +50,16 @@ * new table object * @param cascade * if the changes will be cascaded to its partitions if applicable + * @param transactionalListeners + * list of listeners to be run within the transaction * @throws InvalidOperationException * thrown if the newTable object is invalid * @throws MetaException * thrown if there is any other error */ public abstract void alterTable(RawStore msdb, Warehouse wh, String dbname, - String name, Table newTable, EnvironmentContext envContext) throws InvalidOperationException, - MetaException; + String name, Table newTable, EnvironmentContext envContext, List transactionalListeners, + HMSHandler handler) throws InvalidOperationException, MetaException; /** * handles alter partition @@ -72,6 +75,8 @@ public abstract void alterTable(RawStore msdb, Warehouse wh, String dbname, * original values of the partition being altered * @param new_part * new partition object + * @param transactionalListeners + * list of listeners to be run within the transaction * @return the altered partition * @throws InvalidOperationException * @throws InvalidObjectException @@ -79,9 +84,9 @@ public abstract void alterTable(RawStore msdb, Warehouse wh, String dbname, * @throws MetaException */ public abstract Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname, - final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext) - throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, - MetaException; + final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext, + List transactionalListeners, HMSHandler handler) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException; /** * handles alter partitions @@ -95,6 +100,8 @@ public abstract Partition alterPartition(final RawStore msdb, Warehouse wh, fina * table of the partition being altered * @param new_parts * new partition list + * @param transactionalListeners + * list of listeners to be run within the transaction * @return the altered partition list * @throws InvalidOperationException * @throws InvalidObjectException @@ -102,7 +109,7 @@ public abstract Partition alterPartition(final RawStore msdb, Warehouse wh, fina * @throws MetaException */ public abstract List alterPartitions(final RawStore msdb, Warehouse wh, - final String dbname, final String name, final List new_part, EnvironmentContext environmentContext) - throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, - MetaException; + final String dbname, final String name, final List new_part, EnvironmentContext environmentContext, + List transactionalListeners, HMSHandler handler) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException; } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 7b84595..fcbd904 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -20,6 +20,8 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -46,6 +48,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.ipc.RemoteException; import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import java.io.IOException; import java.net.URI; @@ -75,7 +78,9 @@ public void setConf(Configuration conf) { @Override public void alterTable(RawStore msdb, Warehouse wh, String dbname, - String name, Table newt, EnvironmentContext environmentContext) throws InvalidOperationException, MetaException { + String name, Table newt, EnvironmentContext environmentContext, + List transactionalListeners, HMSHandler handler) + throws InvalidOperationException, MetaException { final boolean cascade = environmentContext != null && environmentContext.isSetProperties() && StatsSetupConst.TRUE.equals(environmentContext.getProperties().get( @@ -240,6 +245,13 @@ public void alterTable(RawStore msdb, Warehouse wh, String dbname, } alterTableUpdateTableColumnStats(msdb, oldt, newt); + if (transactionalListeners.size() > 0) { + AlterTableEvent alterTableEvent = new AlterTableEvent(oldt, newt, true, handler); + alterTableEvent.setEnvironmentContext(environmentContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterTable(alterTableEvent); + } + } // commit the changes success = msdb.commitTransaction(); } catch (InvalidObjectException e) { @@ -319,7 +331,8 @@ String getSimpleMessage(IOException ex) { } @Override public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname, - final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext) + final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext, + List transactionalListeners, HMSHandler handler) throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException { boolean success = false; @@ -344,18 +357,33 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String //alter partition if (part_vals == null || part_vals.size() == 0) { try { + msdb.openTransaction(); oldPart = msdb.getPartition(dbname, name, new_part.getValues()); if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl, environmentContext)) { MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true, environmentContext); } updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part); msdb.alterPartition(dbname, name, new_part.getValues(), new_part); + if (transactionalListeners.size() > 0) { + AlterPartitionEvent alterPartitionEvent = + new AlterPartitionEvent(oldPart, new_part, tbl, true, handler); + alterPartitionEvent.setEnvironmentContext(environmentContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterPartition(alterPartitionEvent); + } + } + success = msdb.commitTransaction(); } catch (InvalidObjectException e) { throw new InvalidOperationException("alter is not possible"); } catch (NoSuchObjectException e){ //old partition does not exist throw new InvalidOperationException("alter is not possible"); } + finally { + if(!success) { + msdb.rollbackTransaction(); + } + } return oldPart; } //rename partition @@ -452,7 +480,14 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String msdb.alterPartition(dbname, name, part_vals, new_part); } } - + if (transactionalListeners.size() > 0) { + AlterPartitionEvent alterPartitionEvent = + new AlterPartitionEvent(oldPart, new_part, tbl, true, handler); + alterPartitionEvent.setEnvironmentContext(environmentContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterPartition(alterPartitionEvent); + } + } success = msdb.commitTransaction(); } finally { if (!success) { @@ -475,6 +510,7 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String try { msdb.openTransaction(); msdb.alterPartition(dbname, name, new_part.getValues(), oldPart); + //TO DO: Should the transactional listeners run here too?. revertMetaDataTransaction = msdb.commitTransaction(); } catch (Exception e1) { LOG.error("Reverting metadata opeation failed During HDFS operation failed", e1); @@ -493,13 +529,16 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String @Override public List alterPartitions(final RawStore msdb, Warehouse wh, final String dbname, - final String name, final List new_parts, EnvironmentContext environmentContext) + final String name, final List new_parts, EnvironmentContext environmentContext, + List transactionalListeners, HMSHandler handler) throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException { List oldParts = new ArrayList(); List> partValsList = new ArrayList>(); Table tbl = msdb.getTable(dbname, name); + boolean success = false; try { + msdb.openTransaction(); for (Partition tmpPart: new_parts) { // Set DDL time to now if not specified if (tmpPart.getParameters() == null || @@ -519,9 +558,31 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String updatePartColumnStats(msdb, dbname, name, oldTmpPart.getValues(), tmpPart); } msdb.alterPartitions(dbname, name, partValsList, new_parts); + Iterator olditr = oldParts.iterator(); + for (Partition tmpPart : new_parts) { + Partition oldTmpPart = null; + if (olditr.hasNext()) { + oldTmpPart = olditr.next(); + } else { + throw new InvalidOperationException("failed to alterpartitions"); + } + if (transactionalListeners.size() > 0) { + AlterPartitionEvent alterPartitionEvent = + new AlterPartitionEvent(oldTmpPart, tmpPart, tbl, true, handler); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterPartition(alterPartitionEvent); + } + } + } + success = msdb.commitTransaction(); } catch (InvalidObjectException | NoSuchObjectException e) { throw new InvalidOperationException("Alter partition operation fails: " + e); } + finally { + if(!success) { + msdb.rollbackTransaction(); + } + } return oldParts; } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 530d2f4..4fb888a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; - import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.cli.OptionBuilder; import org.apache.hadoop.conf.Configuration; @@ -36,8 +35,8 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JvmPauseMonitor; import org.apache.hadoop.hive.common.LogUtils; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.common.cli.CommonCliOptions; @@ -47,9 +46,120 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AddForeignKeyRequest; +import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest; +import org.apache.hadoop.hive.metastore.api.AddPartitionsResult; +import org.apache.hadoop.hive.metastore.api.AddPrimaryKeyRequest; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.CacheFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.CacheFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.ClearFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.DropConstraintRequest; +import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr; +import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest; +import org.apache.hadoop.hive.metastore.api.DropPartitionsResult; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; +import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventResponse; +import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; +import org.apache.hadoop.hive.metastore.api.ForeignKeysResponse; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; +import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse; +import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; +import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse; +import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest; +import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeResponse; +import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleRequest; +import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; +import org.apache.hadoop.hive.metastore.api.HiveObjectRef; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.InvalidInputException; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.MetadataPpdResult; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; +import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD; +import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD; +import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult; +import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsStatsResult; +import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; +import org.apache.hadoop.hive.metastore.api.PrimaryKeysResponse; +import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.PrivilegeBag; +import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo; +import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.PutFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; +import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.TableStatsRequest; +import org.apache.hadoop.hive.metastore.api.TableStatsResult; +import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; +import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; @@ -92,8 +202,8 @@ import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.hive.thrift.TUGIContainingTransport; import org.apache.hadoop.hive.thrift.HiveDelegationTokenManager; +import org.apache.hadoop.hive.thrift.TUGIContainingTransport; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -118,7 +228,6 @@ import org.slf4j.LoggerFactory; import javax.jdo.JDOException; - import java.io.IOException; import java.nio.ByteBuffer; import java.text.DateFormat; @@ -140,8 +249,8 @@ import java.util.Timer; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -387,6 +496,7 @@ public HiveConf getHiveConf() { private AlterHandler alterHandler; private List preListeners; private List listeners; + private List transactionalListeners; private List endFunctionListeners; private List initListeners; private Pattern partitionValidationPattern; @@ -469,7 +579,8 @@ public Object getValue() { hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS)); listeners.add(new SessionPropertiesListener(hiveConf)); listeners.add(new AcidEventListener(hiveConf)); - + transactionalListeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class,hiveConf, + hiveConf.getVar(ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS)); if (metrics != null) { listeners.add(new HMSMetricsListener(hiveConf, metrics)); } @@ -542,6 +653,14 @@ public void setMetaConf(String key, String value) throws MetaException { for (MetaStoreEventListener listener : listeners) { listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, value)); } + if (transactionalListeners.size() > 0) { + // All the fields of this event are final, so no reason to create a new one for each + // listener + ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, value); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onConfigChange(cce); + } + } } @Override @@ -858,9 +977,14 @@ private void create_database_core(RawStore ms, final Database db) } madeDir = true; } - ms.openTransaction(); ms.createDatabase(db); + if (transactionalListeners.size() > 0) { + CreateDatabaseEvent cde = new CreateDatabaseEvent(db, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onCreateDatabase(cde); + } + } success = ms.commitTransaction(); } finally { if (!success) { @@ -898,7 +1022,6 @@ public void create_database(final Database db) } Deadline.checkTimeout(); } - create_database_core(getMS(), db); success = true; } catch (Exception e) { @@ -1066,8 +1189,13 @@ private void drop_database_core(RawStore ms, startIndex = endIndex; } } - if (ms.dropDatabase(name)) { + if (transactionalListeners.size() > 0) { + DropDatabaseEvent dde = new DropDatabaseEvent(db, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropDatabase(dde); + } + } success = ms.commitTransaction(); } } finally { @@ -1407,8 +1535,14 @@ private void create_table_core(final RawStore ms, final Table tbl, } else { ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys); } + if (transactionalListeners.size() > 0) { + CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, this); + createTableEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onCreateTable(createTableEvent); + } + } success = ms.commitTransaction(); - } finally { if (!success) { ms.rollbackTransaction(); @@ -1639,6 +1773,13 @@ private boolean drop_table_core(final RawStore ms, final String dbname, final St throw new MetaException(indexName == null ? "Unable to drop table " + tableName: "Unable to drop index table " + tableName + " for index " + indexName); } + if (transactionalListeners.size() > 0) { + DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, deleteData, this); + dropTableEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropTable(dropTableEvent); + } + } success = ms.commitTransaction(); } finally { if (!success) { @@ -2122,6 +2263,13 @@ private Partition append_partition_common(RawStore ms, String dbName, String tab success = ms.addPartition(part); if (success) { + if (transactionalListeners.size() > 0) { + AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, part, true, this); + addPartitionEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAddPartition(addPartitionEvent); + } + } success = ms.commitTransaction(); } } finally { @@ -2346,7 +2494,13 @@ public Partition call() throws Exception { } else { success = true; } - success = success && ms.commitTransaction(); + //setting success to false to make sure that if the listener fails, rollback happens. + success = false; + fireMetaStoreAddPartitionEventTransactional(tbl, result, null, true); + if (existingParts != null) { + fireMetaStoreAddPartitionEventTransactional(tbl, existingParts, null, false); + } + success = ms.commitTransaction(); } finally { if (!success) { ms.rollbackTransaction(); @@ -2357,6 +2511,7 @@ public Partition call() throws Exception { } } fireMetaStoreAddPartitionEvent(tbl, parts, null, false); + //ToVerify : Should transactional listeners be fired here ? } else { fireMetaStoreAddPartitionEvent(tbl, result, null, true); if (existingParts != null) { @@ -2502,9 +2657,11 @@ private int add_partitions_pspec_core( throw new MetaException(e.getMessage()); } - success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists) - && ms.commitTransaction(); - + success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists); + //setting success to false to make sure that if the listener fails, rollback happens. + success = false; + fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, null, true); + success = ms.commitTransaction(); return addedPartitions.size(); } finally { if (!success) { @@ -2642,9 +2799,12 @@ private Partition add_partition_core(final RawStore ms, wh.deleteDir(new Path(part.getSd().getLocation()), true); } } + //setting success to false to make sure that if the listener fails, rollback happens. + success = false; + fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), envContext, true); // we proceed only if we'd actually succeeded anyway, otherwise, // we'd have thrown an exception - success = success && ms.commitTransaction(); + success = ms.commitTransaction(); } finally { if (!success) { ms.rollbackTransaction(); @@ -2682,6 +2842,35 @@ private void fireMetaStoreAddPartitionEvent(final Table tbl, } } + private void fireMetaStoreAddPartitionEventTransactional(final Table tbl, + final List parts, final EnvironmentContext envContext, boolean success) + throws MetaException { + if (tbl != null && parts != null && !parts.isEmpty()) { + AddPartitionEvent addPartitionEvent = + new AddPartitionEvent(tbl, parts, success, this); + addPartitionEvent.setEnvironmentContext(envContext); + + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAddPartition(addPartitionEvent); + } + } + } + + private void fireMetaStoreAddPartitionEventTransactional(final Table tbl, + final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success) + throws MetaException { + if (tbl != null && partitionSpec != null) { + AddPartitionEvent addPartitionEvent = + new AddPartitionEvent(tbl, partitionSpec, success, this); + addPartitionEvent.setEnvironmentContext(envContext); + + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAddPartition(addPartitionEvent); + } + } + } + + @Override public Partition add_partition(final Partition part) throws InvalidObjectException, AlreadyExistsException, MetaException { @@ -2844,6 +3033,14 @@ private boolean drop_partition_common(RawStore ms, String db_name, String tbl_na if (!ms.dropPartition(db_name, tbl_name, part_vals)) { throw new MetaException("Unable to drop partition"); } + if (transactionalListeners.size() > 0) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(tbl, part, true, deleteData, this); + dropPartitionEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropPartition(dropPartitionEvent); + } + } success = ms.commitTransaction(); } finally { if (!success) { @@ -3016,8 +3213,17 @@ public DropPartitionsResult drop_partitions_req( dirsToDelete.add(new PathAndPartValSize(partPath, part.getValues().size())); } } - ms.dropPartitions(dbName, tblName, partNames); + if (parts != null && transactionalListeners.size() > 0) { + for (Partition part : parts) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(tbl, part, true, deleteData, this); + dropPartitionEvent.setEnvironmentContext(envContext); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropPartition(dropPartitionEvent); + } + } + } success = ms.commitTransaction(); DropPartitionsResult result = new DropPartitionsResult(); if (needResult) { @@ -3485,7 +3691,8 @@ private void rename_partition(final String db_name, final String tbl_name, partitionValidationPattern); } - oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part, envContext); + oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part, + envContext, transactionalListeners, this); // Only fetch the table if we actually have a listener Table table = null; @@ -3549,8 +3756,8 @@ public void alter_partitions_with_environment_context(final String db_name, fina for (Partition tmpPart : new_parts) { firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, null, tmpPart, this)); } - oldParts = alterHandler.alterPartitions(getMS(), wh, db_name, tbl_name, new_parts, environmentContext); - + oldParts = alterHandler.alterPartitions(getMS(), wh, db_name, tbl_name, new_parts, + environmentContext, transactionalListeners, this); Iterator olditr = oldParts.iterator(); // Only fetch the table if we have a listener that needs it. Table table = null; @@ -3606,13 +3813,23 @@ public void alter_index(final String dbname, final String base_table_name, boolean success = false; Exception ex = null; Index oldIndex = null; + //getting an instance of rawstore to create a transaction + RawStore ms = getMS(); try { + ms.openTransaction(); oldIndex = get_index_by_name(dbname, base_table_name, index_name); firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this)); - getMS().alterIndex(dbname, base_table_name, index_name, newIndex); - success = true; + ms.alterIndex(dbname, base_table_name, index_name, newIndex); + //DBNotification listener currently does not override onAlterIndex. + if (transactionalListeners.size() > 0) { + AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAlterIndex(alterIndexEvent); + } + } + success = ms.commitTransaction(); } catch (InvalidObjectException e) { ex = e; throw new InvalidOperationException(e.getMessage()); @@ -3695,9 +3912,9 @@ private void alter_table_core(final String dbname, final String name, final Tabl try { Table oldt = get_table_core(dbname, name); firePreEvent(new PreAlterTableEvent(oldt, newTable, this)); - alterHandler.alterTable(getMS(), wh, dbname, name, newTable, envContext); + alterHandler.alterTable(getMS(), wh, dbname, name, newTable, + envContext, transactionalListeners, this); success = true; - for (MetaStoreEventListener listener : listeners) { AlterTableEvent alterTableEvent = new AlterTableEvent(oldt, newTable, success, this); @@ -4311,6 +4528,12 @@ private Index add_index_core(final RawStore ms, final Index index, final Table i index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); ms.addIndex(index); + if (transactionalListeners.size() > 0) { + AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAddIndex(addIndexEvent); + } + } success = ms.commitTransaction(); return index; } finally { @@ -4400,6 +4623,12 @@ private boolean drop_index_by_name_core(final RawStore ms, + qualified[0] + "." + qualified[1] + " for index " + indexName); } } + if (transactionalListeners.size() > 0) { + DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropIndex(dropIndexEvent); + } + } success = ms.commitTransaction(); } finally { if (!success) { @@ -5701,17 +5930,26 @@ public void markPartitionForEvent(final String db_name, final String tbl_name, Table tbl = null; Exception ex = null; + RawStore ms = getMS(); try { + ms.openTransaction(); startPartitionFunction("markPartitionForEvent", db_name, tbl_name, partName); firePreEvent(new PreLoadPartitionDoneEvent(db_name, tbl_name, partName, this)); - tbl = getMS().markPartitionForEvent(db_name, tbl_name, partName, evtType); + tbl = ms.markPartitionForEvent(db_name, tbl_name, partName, evtType); if (null == tbl) { throw new UnknownTableException("Table: " + tbl_name + " not found."); - } else { - for (MetaStoreEventListener listener : listeners) { - listener.onLoadPartitionDone(new LoadPartitionDoneEvent(true, tbl, partName, this)); + } + //DBNotification Listener currently does not support LoadPartitionDoneEvent + if (transactionalListeners.size() > 0) { + LoadPartitionDoneEvent lpde = new LoadPartitionDoneEvent(true, tbl, partName, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onLoadPartitionDone(lpde); } } + ms.commitTransaction(); + for (MetaStoreEventListener listener : listeners) { + listener.onLoadPartitionDone(new LoadPartitionDoneEvent(true, tbl, partName, this)); + } } catch (Exception original) { ex = original; LOG.error("Exception caught in mark partition event ", original); @@ -6238,6 +6476,9 @@ public FireEventResponse fire_listener_event(FireEventRequest rqst) throws TExce InsertEvent event = new InsertEvent(rqst.getDbName(), rqst.getTableName(), rqst.getPartitionVals(), rqst.getData().getInsertData().getFilesAdded(), rqst.isSuccessful(), this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onInsert(event); + } for (MetaStoreEventListener listener : listeners) { listener.onInsert(event); } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java index a3b16d0..6678796 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hive.metastore; -import java.util.ArrayList; -import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; @@ -38,13 +35,16 @@ import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import java.util.ArrayList; +import java.util.List; + /** A dummy implementation for * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} * for testing purposes. */ public class DummyListener extends MetaStoreEventListener{ - public static final List notifyList = new ArrayList(); + public static final List notifyList = new ArrayList<>(); /** * @return The last event received, or null if no event was received. @@ -126,7 +126,7 @@ public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException { addEvent(indexEvent); } - private void addEvent(ListenerEvent event) { + protected void addEvent(ListenerEvent event) { notifyList.add(event); } } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/DummyTransactionalListener.java metastore/src/test/org/apache/hadoop/hive/metastore/DummyTransactionalListener.java new file mode 100644 index 0000000..878d5d5 --- /dev/null +++ metastore/src/test/org/apache/hadoop/hive/metastore/DummyTransactionalListener.java @@ -0,0 +1,47 @@ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class DummyTransactionalListener extends DummyListener { + private static final Logger LOG = LoggerFactory.getLogger(DummyTransactionalListener.class); + public static final List transactionalNotifyList = new ArrayList<>(); + + public static ListenerEvent getLastTransactionalEvent() { + if (transactionalNotifyList.isEmpty()) { + return null; + } else { + return transactionalNotifyList.get(transactionalNotifyList.size() - 1); + } + } + + public DummyTransactionalListener(Configuration config) { + super(config); + } + + protected void addEvent(ListenerEvent event) { + transactionalNotifyList.add(event); + } +}