commit 0d452900c942bb11339d2fe4c272ff37d5226b74 Author: Rahul Sharma Date: Mon Jul 11 11:56:42 2016 -0700 HIVE-13966: Added Transactional listener support diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 283ef2ef725d9df7a8359145c141b2494e718529..00d35ee02caea80468d41d894206fb0ef9f44a0d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -241,6 +241,7 @@ private static URL checkConfigFile(File f) { HiveConf.ConfVars.METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS, HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX, HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, + HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, HiveConf.ConfVars.METASTORE_EVENT_CLEAN_FREQ, HiveConf.ConfVars.METASTORE_EVENT_EXPIRY_DURATION, HiveConf.ConfVars.METASTORE_FILTER_HOOK, @@ -749,6 +750,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal METASTORE_PRE_EVENT_LISTENERS("hive.metastore.pre.event.listeners", "", "List of comma separated listeners for metastore events."), METASTORE_EVENT_LISTENERS("hive.metastore.event.listeners", "", ""), + METASTORE_TRANSACTIONAL_EVENT_LISTENERS("hive.metastore.transactional.event.listeners", "", + "The Listerners added to this property will run within the actual metastore ops transaction"), METASTORE_EVENT_DB_LISTENER_TTL("hive.metastore.event.db.listener.timetolive", "86400s", new TimeValidator(TimeUnit.SECONDS), "time after which events will be removed from the database listener queue"), diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 172f58d435ba06b4c3df0344a3f1f6567a5e970c..f0e061b4a8e4c7ed4cba6e065a77c4959e3cbf3d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.MessageFactory; @@ -56,7 +57,7 @@ * the database. Also, occasionally the thread needs to clean the database of old records. We * definitely don't want to do that as part of another metadata operation. */ -public class DbNotificationListener extends MetaStoreEventListener { +public class DbNotificationListener extends MetaStoreEventListener implements TransactionalMetaStoreEventListener{ private static final Logger LOG = LoggerFactory.getLogger(DbNotificationListener.class.getName()); private static CleanerThread cleaner = null; diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java index dedd4497adfcc9d57090a943f6bb4f35ea87fa61..ca36be016685a221ea4c8d3a9cde42badf34372d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/AlterHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; /** * Interface for Alter Table and Alter Partition code @@ -49,14 +50,16 @@ * new table object * @param cascade * if the changes will be cascaded to its partitions if applicable + * @param transactionalListeners + * list of listeners to be run within the transaction * @throws InvalidOperationException * thrown if the newTable object is invalid * @throws MetaException * thrown if there is any other error */ public abstract void alterTable(RawStore msdb, Warehouse wh, String dbname, - String name, Table newTable, EnvironmentContext envContext) throws InvalidOperationException, - MetaException; + String name, Table newTable, EnvironmentContext envContext, List transactionalListeners, + HMSHandler handler) throws InvalidOperationException, MetaException; /** * handles alter partition @@ -72,6 +75,8 @@ public abstract void alterTable(RawStore msdb, Warehouse wh, String dbname, * original values of the partition being altered * @param new_part * new partition object + * @param transactionalListeners + * list of listeners to be run within the transaction * @return the altered partition * @throws InvalidOperationException * @throws InvalidObjectException @@ -79,9 +84,9 @@ public abstract void alterTable(RawStore msdb, Warehouse wh, String dbname, * @throws MetaException */ public abstract Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname, - final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext) - throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, - MetaException; + final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext, + List transactionalListeners, HMSHandler handler) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException; /** * handles alter partitions @@ -95,6 +100,8 @@ public abstract Partition alterPartition(final RawStore msdb, Warehouse wh, fina * table of the partition being altered * @param new_parts * new partition list + * @param transactionalListeners + * list of listeners to be run within the transaction * @return the altered partition list * @throws InvalidOperationException * @throws InvalidObjectException @@ -102,7 +109,7 @@ public abstract Partition alterPartition(final RawStore msdb, Warehouse wh, fina * @throws MetaException */ public abstract List alterPartitions(final RawStore msdb, Warehouse wh, - final String dbname, final String name, final List new_part, EnvironmentContext environmentContext) - throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, - MetaException; + final String dbname, final String name, final List new_part, EnvironmentContext environmentContext, + List transactionalListeners, HMSHandler handler) + throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 7b8459556f54ad8d6e38526796c2ca0c48525cfb..22ab1687eb9c304d0c13092482d77d149e712ee1 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -20,6 +20,8 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -46,6 +48,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.ipc.RemoteException; import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import java.io.IOException; import java.net.URI; @@ -75,7 +78,9 @@ public void setConf(Configuration conf) { @Override public void alterTable(RawStore msdb, Warehouse wh, String dbname, - String name, Table newt, EnvironmentContext environmentContext) throws InvalidOperationException, MetaException { + String name, Table newt, EnvironmentContext environmentContext, + List transactionalListeners, HMSHandler handler) + throws InvalidOperationException, MetaException { final boolean cascade = environmentContext != null && environmentContext.isSetProperties() && StatsSetupConst.TRUE.equals(environmentContext.getProperties().get( @@ -240,6 +245,14 @@ public void alterTable(RawStore msdb, Warehouse wh, String dbname, } alterTableUpdateTableColumnStats(msdb, oldt, newt); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + AlterTableEvent alterTableEvent = + new AlterTableEvent(oldt, newt, true, handler); + alterTableEvent.setEnvironmentContext(environmentContext); + transactionalListener.onAlterTable(alterTableEvent); + } + } // commit the changes success = msdb.commitTransaction(); } catch (InvalidObjectException e) { @@ -319,7 +332,8 @@ String getSimpleMessage(IOException ex) { } @Override public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname, - final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext) + final String name, final List part_vals, final Partition new_part, EnvironmentContext environmentContext, + List transactionalListeners, HMSHandler handler) throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException { boolean success = false; @@ -344,18 +358,33 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String //alter partition if (part_vals == null || part_vals.size() == 0) { try { + msdb.openTransaction(); oldPart = msdb.getPartition(dbname, name, new_part.getValues()); if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl, environmentContext)) { MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true, environmentContext); } updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part); msdb.alterPartition(dbname, name, new_part.getValues(), new_part); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + AlterPartitionEvent alterPartitionEvent = + new AlterPartitionEvent(oldPart, new_part, tbl, true, handler); + alterPartitionEvent.setEnvironmentContext(environmentContext); + transactionalListener.onAlterPartition(alterPartitionEvent); + } + } + success = msdb.commitTransaction(); } catch (InvalidObjectException e) { throw new InvalidOperationException("alter is not possible"); } catch (NoSuchObjectException e){ //old partition does not exist throw new InvalidOperationException("alter is not possible"); } + finally { + if(!success) { + msdb.rollbackTransaction(); + } + } return oldPart; } //rename partition @@ -452,7 +481,14 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String msdb.alterPartition(dbname, name, part_vals, new_part); } } - + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + AlterPartitionEvent alterPartitionEvent = + new AlterPartitionEvent(oldPart, new_part, tbl, true, handler); + alterPartitionEvent.setEnvironmentContext(environmentContext); + transactionalListener.onAlterPartition(alterPartitionEvent); + } + } success = msdb.commitTransaction(); } finally { if (!success) { @@ -475,6 +511,7 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String try { msdb.openTransaction(); msdb.alterPartition(dbname, name, new_part.getValues(), oldPart); + //TO DO: Should the transactional listeners run here too?. revertMetaDataTransaction = msdb.commitTransaction(); } catch (Exception e1) { LOG.error("Reverting metadata opeation failed During HDFS operation failed", e1); @@ -493,13 +530,16 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String @Override public List alterPartitions(final RawStore msdb, Warehouse wh, final String dbname, - final String name, final List new_parts, EnvironmentContext environmentContext) + final String name, final List new_parts, EnvironmentContext environmentContext, + List transactionalListeners, HMSHandler handler) throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException { List oldParts = new ArrayList(); List> partValsList = new ArrayList>(); Table tbl = msdb.getTable(dbname, name); + boolean success = false; try { + msdb.openTransaction(); for (Partition tmpPart: new_parts) { // Set DDL time to now if not specified if (tmpPart.getParameters() == null || @@ -519,9 +559,31 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String updatePartColumnStats(msdb, dbname, name, oldTmpPart.getValues(), tmpPart); } msdb.alterPartitions(dbname, name, partValsList, new_parts); + Iterator olditr = oldParts.iterator(); + for (Partition tmpPart : new_parts) { + Partition oldTmpPart = null; + if (olditr.hasNext()) { + oldTmpPart = olditr.next(); + } else { + throw new InvalidOperationException("failed to alterpartitions"); + } + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + AlterPartitionEvent alterPartitionEvent = + new AlterPartitionEvent(oldTmpPart, tmpPart, tbl, true, handler); + transactionalListener.onAlterPartition(alterPartitionEvent); + } + } + } + success = msdb.commitTransaction(); } catch (InvalidObjectException | NoSuchObjectException e) { throw new InvalidOperationException("Alter partition operation fails: " + e); } + finally { + if(!success) { + msdb.rollbackTransaction(); + } + } return oldParts; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index c945bf73c068630f5e7a6ac28ae386a2e74e1755..63324f9a180cfb54ff22fd6ef222d99e29839894 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -385,6 +385,7 @@ public HiveConf getHiveConf() { private AlterHandler alterHandler; private List preListeners; private List listeners; + private List transactionalListeners; private List endFunctionListeners; private List initListeners; private Pattern partitionValidationPattern; @@ -467,7 +468,8 @@ public Object getValue() { hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS)); listeners.add(new SessionPropertiesListener(hiveConf)); listeners.add(new AcidEventListener(hiveConf)); - + transactionalListeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class,hiveConf, + hiveConf.getVar(ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS)); if (metrics != null) { listeners.add(new HMSMetricsListener(hiveConf, metrics)); } @@ -540,6 +542,11 @@ public void setMetaConf(String key, String value) throws MetaException { for (MetaStoreEventListener listener : listeners) { listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, value)); } + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + transactionalListener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, value)); + } + } } @Override @@ -875,9 +882,13 @@ private void create_database_core(RawStore ms, final Database db) } madeDir = true; } - ms.openTransaction(); ms.createDatabase(db); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if(transactionalListener instanceof TransactionalMetaStoreEventListener) { + transactionalListener.onCreateDatabase(new CreateDatabaseEvent(db, true, this)); + } + } success = ms.commitTransaction(); } finally { if (!success) { @@ -915,7 +926,6 @@ public void create_database(final Database db) } Deadline.checkTimeout(); } - create_database_core(getMS(), db); success = true; } catch (Exception e) { @@ -1083,8 +1093,12 @@ private void drop_database_core(RawStore ms, startIndex = endIndex; } } - if (ms.dropDatabase(name)) { + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + transactionalListener.onDropDatabase(new DropDatabaseEvent(db, true, this)); + } + } success = ms.commitTransaction(); } } finally { @@ -1424,8 +1438,15 @@ private void create_table_core(final RawStore ms, final Table tbl, } else { ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys); } + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + CreateTableEvent createTableEvent = + new CreateTableEvent(tbl, true, this); + createTableEvent.setEnvironmentContext(envContext); + transactionalListener.onCreateTable(createTableEvent); + } + } success = ms.commitTransaction(); - } finally { if (!success) { ms.rollbackTransaction(); @@ -1656,6 +1677,13 @@ private boolean drop_table_core(final RawStore ms, final String dbname, final St throw new MetaException(indexName == null ? "Unable to drop table " + tableName: "Unable to drop index table " + tableName + " for index " + indexName); } + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, deleteData, this); + dropTableEvent.setEnvironmentContext(envContext); + transactionalListener.onDropTable(dropTableEvent); + } + } success = ms.commitTransaction(); } finally { if (!success) { @@ -2139,6 +2167,14 @@ private Partition append_partition_common(RawStore ms, String dbName, String tab success = ms.addPartition(part); if (success) { + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + AddPartitionEvent addPartitionEvent = + new AddPartitionEvent(tbl, part, true, this); + addPartitionEvent.setEnvironmentContext(envContext); + transactionalListener.onAddPartition(addPartitionEvent); + } + } success = ms.commitTransaction(); } } finally { @@ -2363,7 +2399,13 @@ public Partition call() throws Exception { } else { success = true; } - success = success && ms.commitTransaction(); + if(success) { + fireMetaStoreAddPartitionEventTransactional(tbl, result, null, true); + if (existingParts != null) { + fireMetaStoreAddPartitionEventTransactional(tbl, existingParts, null, false); + } + success = ms.commitTransaction(); + } } finally { if (!success) { ms.rollbackTransaction(); @@ -2374,6 +2416,7 @@ public Partition call() throws Exception { } } fireMetaStoreAddPartitionEvent(tbl, parts, null, false); + //ToVerify : Should transactional listeners be fired here ? } else { fireMetaStoreAddPartitionEvent(tbl, result, null, true); if (existingParts != null) { @@ -2519,9 +2562,11 @@ private int add_partitions_pspec_core( throw new MetaException(e.getMessage()); } - success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists) - && ms.commitTransaction(); - + success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists); + if(success) { + fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, null, true); + success = ms.commitTransaction(); + } return addedPartitions.size(); } finally { if (!success) { @@ -2659,9 +2704,12 @@ private Partition add_partition_core(final RawStore ms, wh.deleteDir(new Path(part.getSd().getLocation()), true); } } - // we proceed only if we'd actually succeeded anyway, otherwise, - // we'd have thrown an exception - success = success && ms.commitTransaction(); + if(success) { + fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), envContext, success); + // we proceed only if we'd actually succeeded anyway, otherwise, + // we'd have thrown an exception + success = ms.commitTransaction(); + } } finally { if (!success) { ms.rollbackTransaction(); @@ -2699,6 +2747,39 @@ private void fireMetaStoreAddPartitionEvent(final Table tbl, } } + private void fireMetaStoreAddPartitionEventTransactional(final Table tbl, + final List parts, final EnvironmentContext envContext, boolean success) + throws MetaException { + if (tbl != null && parts != null && !parts.isEmpty()) { + AddPartitionEvent addPartitionEvent = + new AddPartitionEvent(tbl, parts, success, this); + addPartitionEvent.setEnvironmentContext(envContext); + + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + transactionalListener.onAddPartition(addPartitionEvent); + } + } + } + } + + private void fireMetaStoreAddPartitionEventTransactional(final Table tbl, + final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success) + throws MetaException { + if (tbl != null && partitionSpec != null) { + AddPartitionEvent addPartitionEvent = + new AddPartitionEvent(tbl, partitionSpec, success, this); + addPartitionEvent.setEnvironmentContext(envContext); + + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + transactionalListener.onAddPartition(addPartitionEvent); + } + } + } + } + + @Override public Partition add_partition(final Partition part) throws InvalidObjectException, AlreadyExistsException, MetaException { @@ -2853,6 +2934,14 @@ private boolean drop_partition_common(RawStore ms, String db_name, String tbl_na if (!ms.dropPartition(db_name, tbl_name, part_vals)) { throw new MetaException("Unable to drop partition"); } + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(tbl, part, true, deleteData, this); + dropPartitionEvent.setEnvironmentContext(envContext); + transactionalListener.onDropPartition(dropPartitionEvent); + } + } success = ms.commitTransaction(); if ((part.getSd() != null) && (part.getSd().getLocation() != null)) { partPath = new Path(part.getSd().getLocation()); @@ -3031,8 +3120,19 @@ public DropPartitionsResult drop_partitions_req( dirsToDelete.add(new PathAndPartValSize(partPath, part.getValues().size())); } } - ms.dropPartitions(dbName, tblName, partNames); + if (parts != null) { + for (Partition part : parts) { + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(tbl, part, true, deleteData, this); + dropPartitionEvent.setEnvironmentContext(envContext); + transactionalListener.onDropPartition(dropPartitionEvent); + } + } + } + } success = ms.commitTransaction(); DropPartitionsResult result = new DropPartitionsResult(); if (needResult) { @@ -3500,7 +3600,8 @@ private void rename_partition(final String db_name, final String tbl_name, partitionValidationPattern); } - oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part, envContext); + oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part, + envContext, transactionalListeners, this); // Only fetch the table if we actually have a listener Table table = null; @@ -3564,8 +3665,8 @@ public void alter_partitions_with_environment_context(final String db_name, fina for (Partition tmpPart : new_parts) { firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, null, tmpPart, this)); } - oldParts = alterHandler.alterPartitions(getMS(), wh, db_name, tbl_name, new_parts, environmentContext); - + oldParts = alterHandler.alterPartitions(getMS(), wh, db_name, tbl_name, new_parts, + environmentContext, transactionalListeners, this); Iterator olditr = oldParts.iterator(); // Only fetch the table if we have a listener that needs it. Table table = null; @@ -3621,13 +3722,23 @@ public void alter_index(final String dbname, final String base_table_name, boolean success = false; Exception ex = null; Index oldIndex = null; + //getting an instance of rawstore to create a transaction + RawStore ms = getMS(); try { + ms.openTransaction(); oldIndex = get_index_by_name(dbname, base_table_name, index_name); firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this)); - getMS().alterIndex(dbname, base_table_name, index_name, newIndex); - success = true; + ms.alterIndex(dbname, base_table_name, index_name, newIndex); + //DBNotification listener currently does not override onAlterIndex. + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true, this); + transactionalListener.onAlterIndex(alterIndexEvent); + } + } + success = ms.commitTransaction(); } catch (InvalidObjectException e) { ex = e; throw new InvalidOperationException(e.getMessage()); @@ -3710,9 +3821,9 @@ private void alter_table_core(final String dbname, final String name, final Tabl try { Table oldt = get_table_core(dbname, name); firePreEvent(new PreAlterTableEvent(oldt, newTable, this)); - alterHandler.alterTable(getMS(), wh, dbname, name, newTable, envContext); + alterHandler.alterTable(getMS(), wh, dbname, name, newTable, + envContext, transactionalListeners, this); success = true; - for (MetaStoreEventListener listener : listeners) { AlterTableEvent alterTableEvent = new AlterTableEvent(oldt, newTable, success, this); @@ -4304,6 +4415,12 @@ private Index add_index_core(final RawStore ms, final Index index, final Table i index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); ms.addIndex(index); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this); + transactionalListener.onAddIndex(addIndexEvent); + } + } success = ms.commitTransaction(); return index; } finally { @@ -4393,6 +4510,12 @@ private boolean drop_index_by_name_core(final RawStore ms, + qualified[0] + "." + qualified[1] + " for index " + indexName); } } + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, this); + transactionalListener.onDropIndex(dropIndexEvent); + } + } success = ms.commitTransaction(); } finally { if (!success) { @@ -5691,17 +5814,25 @@ public void markPartitionForEvent(final String db_name, final String tbl_name, Table tbl = null; Exception ex = null; + RawStore ms = getMS(); try { + ms.openTransaction(); startPartitionFunction("markPartitionForEvent", db_name, tbl_name, partName); firePreEvent(new PreLoadPartitionDoneEvent(db_name, tbl_name, partName, this)); - tbl = getMS().markPartitionForEvent(db_name, tbl_name, partName, evtType); + tbl = ms.markPartitionForEvent(db_name, tbl_name, partName, evtType); if (null == tbl) { throw new UnknownTableException("Table: " + tbl_name + " not found."); - } else { - for (MetaStoreEventListener listener : listeners) { - listener.onLoadPartitionDone(new LoadPartitionDoneEvent(true, tbl, partName, this)); + } + //DBNotification Listener currently does not support LoadPartitionDoneEvent + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + transactionalListener.onLoadPartitionDone(new LoadPartitionDoneEvent(true, tbl, partName, this)); } } + ms.commitTransaction(); + for (MetaStoreEventListener listener : listeners) { + listener.onLoadPartitionDone(new LoadPartitionDoneEvent(true, tbl, partName, this)); + } } catch (Exception original) { ex = original; LOG.error("Exception caught in mark partition event ", original); @@ -6224,6 +6355,11 @@ public FireEventResponse fire_listener_event(FireEventRequest rqst) throws TExce InsertEvent event = new InsertEvent(rqst.getDbName(), rqst.getTableName(), rqst.getPartitionVals(), rqst.getData().getInsertData().getFilesAdded(), rqst.isSuccessful(), this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + if (transactionalListener instanceof TransactionalMetaStoreEventListener) { + transactionalListener.onInsert(event); + } + } for (MetaStoreEventListener listener : listeners) { listener.onInsert(event); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalMetaStoreEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalMetaStoreEventListener.java new file mode 100644 index 0000000000000000000000000000000000000000..0103aca1632c43fd71482eff81656c8abc8f22a2 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalMetaStoreEventListener.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +/** + * This is a tag interface, to be implemented by listeners which need to be run in transaction + * of the metastore operation. + */ + +public interface TransactionalMetaStoreEventListener { +} + +