diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 351546c3bc9edb69a435f04795b5ea6c3421f5b0..1ff3b63c50e548ddf5fcf6e2b387d6b6cb37b7dc 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Iterators; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -59,6 +60,7 @@ import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; @@ -115,15 +117,17 @@ public DbNotificationListener(Configuration config) throws MetaException { */ @Override public void onConfigChange(ConfigChangeEvent tableEvent) throws MetaException { - String key = tableEvent.getKey(); - if (key.equals(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.toString())) { - // This weirdness of setting it in our hiveConf and then reading back does two things. - // One, it handles the conversion of the TimeUnit. Two, it keeps the value around for - // later in case we need it again. - hiveConf.set(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.name(), - tableEvent.getNewValue()); - cleaner.setTimeToLive(hiveConf.getTimeVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, - TimeUnit.SECONDS)); + if (tableEvent.getStatus()) { + String key = tableEvent.getKey(); + if (key.equals(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.toString())) { + // This weirdness of setting it in our hiveConf and then reading back does two things. + // One, it handles the conversion of the TimeUnit. Two, it keeps the value around for + // later in case we need it again. + hiveConf.set(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.name(), + tableEvent.getNewValue()); + cleaner.setTimeToLive(hiveConf.getTimeVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, + TimeUnit.SECONDS)); + } } } @@ -133,13 +137,17 @@ public void onConfigChange(ConfigChangeEvent tableEvent) throws MetaException { */ @Override public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { - Table t = tableEvent.getTable(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), msgFactory - .buildCreateTableMessage(t, new FileIterator(t.getSd().getLocation())).toString()); - event.setDbName(t.getDbName()); - event.setTableName(t.getTableName()); - process(event, tableEvent); + if (tableEvent.getStatus()) { + Table t = tableEvent.getTable(); + final CreateTableMessage tableMessage = + msgFactory.buildCreateTableMessage(t, + new FileIterator(t.getSd().getLocation())); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), tableMessage.toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + process(event, tableEvent); + } } /** @@ -148,13 +156,15 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { */ @Override public void onDropTable(DropTableEvent tableEvent) throws MetaException { - Table t = tableEvent.getTable(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory - .buildDropTableMessage(t).toString()); - event.setDbName(t.getDbName()); - event.setTableName(t.getTableName()); - process(event, tableEvent); + if (tableEvent.getStatus()) { + Table t = tableEvent.getTable(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory + .buildDropTableMessage(t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + process(event, tableEvent); + } } /** @@ -163,14 +173,16 @@ public void onDropTable(DropTableEvent tableEvent) throws MetaException { */ @Override public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { - Table before = tableEvent.getOldTable(); - Table after = tableEvent.getNewTable(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory - .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString()); - event.setDbName(after.getDbName()); - event.setTableName(after.getTableName()); - process(event, tableEvent); + if (tableEvent.getStatus()) { + Table before = tableEvent.getOldTable(); + Table after = tableEvent.getNewTable(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory + .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString()); + event.setDbName(after.getDbName()); + event.setTableName(after.getTableName()); + process(event, tableEvent); + } } class FileIterator implements Iterator { @@ -270,15 +282,17 @@ public void remove() { */ @Override public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { - Table t = partitionEvent.getTable(); - String msg = msgFactory - .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), - new PartitionFilesIterator(partitionEvent.getPartitionIterator(), t)).toString(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg); - event.setDbName(t.getDbName()); - event.setTableName(t.getTableName()); - process(event, partitionEvent); + if (partitionEvent.getStatus()) { + Table t = partitionEvent.getTable(); + String msg = msgFactory + .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), + new PartitionFilesIterator(partitionEvent.getPartitionIterator(), t)).toString(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + process(event, partitionEvent); + } } /** @@ -287,13 +301,15 @@ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaExceptio */ @Override public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { - Table t = partitionEvent.getTable(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory - .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); - event.setDbName(t.getDbName()); - event.setTableName(t.getTableName()); - process(event, partitionEvent); + if (partitionEvent.getStatus()) { + Table t = partitionEvent.getTable(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory + .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + process(event, partitionEvent); + } } /** @@ -302,14 +318,16 @@ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaExcept */ @Override public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException { - Partition before = partitionEvent.getOldPartition(); - Partition after = partitionEvent.getNewPartition(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory - .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString()); - event.setDbName(before.getDbName()); - event.setTableName(before.getTableName()); - process(event, partitionEvent); + if (partitionEvent.getStatus()) { + Partition before = partitionEvent.getOldPartition(); + Partition after = partitionEvent.getNewPartition(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory + .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString()); + event.setDbName(before.getDbName()); + event.setTableName(before.getTableName()); + process(event, partitionEvent); + } } /** @@ -318,12 +336,14 @@ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaExce */ @Override public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { - Database db = dbEvent.getDatabase(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory - .buildCreateDatabaseMessage(db).toString()); - event.setDbName(db.getName()); - process(event, dbEvent); + if (dbEvent.getStatus()) { + Database db = dbEvent.getDatabase(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory + .buildCreateDatabaseMessage(db).toString()); + event.setDbName(db.getName()); + process(event, dbEvent); + } } /** @@ -332,12 +352,14 @@ public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { */ @Override public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { - Database db = dbEvent.getDatabase(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory - .buildDropDatabaseMessage(db).toString()); - event.setDbName(db.getName()); - process(event, dbEvent); + if (dbEvent.getStatus()) { + Database db = dbEvent.getDatabase(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory + .buildDropDatabaseMessage(db).toString()); + event.setDbName(db.getName()); + process(event, dbEvent); + } } /** @@ -346,12 +368,14 @@ public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { */ @Override public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException { - Function fn = fnEvent.getFunction(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory - .buildCreateFunctionMessage(fn).toString()); - event.setDbName(fn.getDbName()); - process(event, fnEvent); + if (fnEvent.getStatus()) { + Function fn = fnEvent.getFunction(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory + .buildCreateFunctionMessage(fn).toString()); + event.setDbName(fn.getDbName()); + process(event, fnEvent); + } } /** @@ -360,12 +384,14 @@ public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException { */ @Override public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException { - Function fn = fnEvent.getFunction(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory - .buildDropFunctionMessage(fn).toString()); - event.setDbName(fn.getDbName()); - process(event, fnEvent); + if (fnEvent.getStatus()) { + Function fn = fnEvent.getFunction(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory + .buildDropFunctionMessage(fn).toString()); + event.setDbName(fn.getDbName()); + process(event, fnEvent); + } } /** @@ -374,12 +400,14 @@ public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException { */ @Override public void onAddIndex(AddIndexEvent indexEvent) throws MetaException { - Index index = indexEvent.getIndex(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory - .buildCreateIndexMessage(index).toString()); - event.setDbName(index.getDbName()); - process(event, indexEvent); + if (indexEvent.getStatus()) { + Index index = indexEvent.getIndex(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory + .buildCreateIndexMessage(index).toString()); + event.setDbName(index.getDbName()); + process(event, indexEvent); + } } /** @@ -388,12 +416,14 @@ public void onAddIndex(AddIndexEvent indexEvent) throws MetaException { */ @Override public void onDropIndex(DropIndexEvent indexEvent) throws MetaException { - Index index = indexEvent.getIndex(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory - .buildDropIndexMessage(index).toString()); - event.setDbName(index.getDbName()); - process(event, indexEvent); + if (indexEvent.getStatus()) { + Index index = indexEvent.getIndex(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory + .buildDropIndexMessage(index).toString()); + event.setDbName(index.getDbName()); + process(event, indexEvent); + } } /** @@ -402,13 +432,15 @@ public void onDropIndex(DropIndexEvent indexEvent) throws MetaException { */ @Override public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException { - Index before = indexEvent.getOldIndex(); - Index after = indexEvent.getNewIndex(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory - .buildAlterIndexMessage(before, after).toString()); - event.setDbName(before.getDbName()); - process(event, indexEvent); + if (indexEvent.getStatus()) { + Index before = indexEvent.getOldIndex(); + Index after = indexEvent.getNewIndex(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory + .buildAlterIndexMessage(before, after).toString()); + event.setDbName(before.getDbName()); + process(event, indexEvent); + } } class FileChksumIterator implements Iterator { @@ -438,15 +470,17 @@ public void remove() { } @Override public void onInsert(InsertEvent insertEvent) throws MetaException { - Table tableObj = insertEvent.getTableObj(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(tableObj, - insertEvent.getPartitionObj(), insertEvent.isReplace(), - new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())) - .toString()); - event.setDbName(tableObj.getDbName()); - event.setTableName(tableObj.getTableName()); - process(event, insertEvent); + if (insertEvent.getStatus()) { + Table tableObj = insertEvent.getTableObj(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(tableObj, + insertEvent.getPartitionObj(), insertEvent.isReplace(), + new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())) + .toString()); + event.setDbName(tableObj.getDbName()); + event.setTableName(tableObj.getTableName()); + process(event, insertEvent); + } } /** diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 6a6fd439d72fd5e24c881554c86480b0b3e19574..b8fd6c7d3e9ede71e32bbf10cb32ae55d448bf15 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -1553,6 +1553,13 @@ private void create_table_core(final RawStore ms, final Table tbl, } success = ms.commitTransaction(); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.CREATE_TABLE, + new CreateTableEvent(tbl, success, this), + envContext, + transactionalListenerResponses, ms); + } } finally { if (!success) { ms.rollbackTransaction(); @@ -1560,14 +1567,6 @@ private void create_table_core(final RawStore ms, final Table tbl, wh.deleteDir(tblPath, true); } } - - if (!listeners.isEmpty()) { - MetaStoreListenerNotifier.notifyEvent(listeners, - EventType.CREATE_TABLE, - new CreateTableEvent(tbl, success, this), - envContext, - transactionalListenerResponses, ms); - } } }