diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 351546c3bc9edb69a435f04795b5ea6c3421f5b0..4fb3f0efd8821a58952221500c6a78c50cd6f963 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; @@ -115,15 +116,17 @@ public DbNotificationListener(Configuration config) throws MetaException { */ @Override public void onConfigChange(ConfigChangeEvent tableEvent) throws MetaException { - String key = tableEvent.getKey(); - if (key.equals(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.toString())) { - // This weirdness of setting it in our hiveConf and then reading back does two things. - // One, it handles the conversion of the TimeUnit. Two, it keeps the value around for - // later in case we need it again. - hiveConf.set(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.name(), - tableEvent.getNewValue()); - cleaner.setTimeToLive(hiveConf.getTimeVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, - TimeUnit.SECONDS)); + if (tableEvent.getStatus()) { + String key = tableEvent.getKey(); + if (key.equals(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.toString())) { + // This weirdness of setting it in our hiveConf and then reading back does two things. + // One, it handles the conversion of the TimeUnit. Two, it keeps the value around for + // later in case we need it again. + hiveConf.set(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.name(), + tableEvent.getNewValue()); + cleaner.setTimeToLive(hiveConf.getTimeVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, + TimeUnit.SECONDS)); + } } } @@ -133,13 +136,17 @@ public void onConfigChange(ConfigChangeEvent tableEvent) throws MetaException { */ @Override public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { - Table t = tableEvent.getTable(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), msgFactory - .buildCreateTableMessage(t, new FileIterator(t.getSd().getLocation())).toString()); - event.setDbName(t.getDbName()); - event.setTableName(t.getTableName()); - process(event, tableEvent); + if (tableEvent.getStatus()) { + Table t = tableEvent.getTable(); + final CreateTableMessage tableMessage = + msgFactory.buildCreateTableMessage(t, + new FileIterator(t.getSd().getLocation())); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), tableMessage.toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + process(event, tableEvent); + } } /** @@ -148,13 +155,15 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { */ @Override public void onDropTable(DropTableEvent tableEvent) throws MetaException { - Table t = tableEvent.getTable(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory - .buildDropTableMessage(t).toString()); - event.setDbName(t.getDbName()); - event.setTableName(t.getTableName()); - process(event, tableEvent); + if (tableEvent.getStatus()) { + Table t = tableEvent.getTable(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory + .buildDropTableMessage(t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + process(event, tableEvent); + } } /** @@ -163,14 +172,16 @@ public void onDropTable(DropTableEvent tableEvent) throws MetaException { */ @Override public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { - Table before = tableEvent.getOldTable(); - Table after = tableEvent.getNewTable(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory - .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString()); - event.setDbName(after.getDbName()); - event.setTableName(after.getTableName()); - process(event, tableEvent); + if (tableEvent.getStatus()) { + Table before = tableEvent.getOldTable(); + Table after = tableEvent.getNewTable(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory + .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString()); + event.setDbName(after.getDbName()); + event.setTableName(after.getTableName()); + process(event, tableEvent); + } } class FileIterator implements Iterator { @@ -270,15 +281,17 @@ public void remove() { */ @Override public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { - Table t = partitionEvent.getTable(); - String msg = msgFactory - .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), - new PartitionFilesIterator(partitionEvent.getPartitionIterator(), t)).toString(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg); - event.setDbName(t.getDbName()); - event.setTableName(t.getTableName()); - process(event, partitionEvent); + if (partitionEvent.getStatus()) { + Table t = partitionEvent.getTable(); + String msg = msgFactory + .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), + new PartitionFilesIterator(partitionEvent.getPartitionIterator(), t)).toString(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + process(event, partitionEvent); + } } /** @@ -287,13 +300,15 @@ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaExceptio */ @Override public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { - Table t = partitionEvent.getTable(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory - .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); - event.setDbName(t.getDbName()); - event.setTableName(t.getTableName()); - process(event, partitionEvent); + if (partitionEvent.getStatus()) { + Table t = partitionEvent.getTable(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory + .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + process(event, partitionEvent); + } } /** @@ -302,14 +317,16 @@ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaExcept */ @Override public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException { - Partition before = partitionEvent.getOldPartition(); - Partition after = partitionEvent.getNewPartition(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory - .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString()); - event.setDbName(before.getDbName()); - event.setTableName(before.getTableName()); - process(event, partitionEvent); + if (partitionEvent.getStatus()) { + Partition before = partitionEvent.getOldPartition(); + Partition after = partitionEvent.getNewPartition(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory + .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString()); + event.setDbName(before.getDbName()); + event.setTableName(before.getTableName()); + process(event, partitionEvent); + } } /** @@ -318,12 +335,14 @@ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaExce */ @Override public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { - Database db = dbEvent.getDatabase(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory - .buildCreateDatabaseMessage(db).toString()); - event.setDbName(db.getName()); - process(event, dbEvent); + if (dbEvent.getStatus()) { + Database db = dbEvent.getDatabase(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory + .buildCreateDatabaseMessage(db).toString()); + event.setDbName(db.getName()); + process(event, dbEvent); + } } /** @@ -332,12 +351,14 @@ public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { */ @Override public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { - Database db = dbEvent.getDatabase(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory - .buildDropDatabaseMessage(db).toString()); - event.setDbName(db.getName()); - process(event, dbEvent); + if (dbEvent.getStatus()) { + Database db = dbEvent.getDatabase(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory + .buildDropDatabaseMessage(db).toString()); + event.setDbName(db.getName()); + process(event, dbEvent); + } } /** @@ -346,12 +367,14 @@ public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { */ @Override public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException { - Function fn = fnEvent.getFunction(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory - .buildCreateFunctionMessage(fn).toString()); - event.setDbName(fn.getDbName()); - process(event, fnEvent); + if (fnEvent.getStatus()) { + Function fn = fnEvent.getFunction(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory + .buildCreateFunctionMessage(fn).toString()); + event.setDbName(fn.getDbName()); + process(event, fnEvent); + } } /** @@ -360,12 +383,14 @@ public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException { */ @Override public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException { - Function fn = fnEvent.getFunction(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory - .buildDropFunctionMessage(fn).toString()); - event.setDbName(fn.getDbName()); - process(event, fnEvent); + if (fnEvent.getStatus()) { + Function fn = fnEvent.getFunction(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory + .buildDropFunctionMessage(fn).toString()); + event.setDbName(fn.getDbName()); + process(event, fnEvent); + } } /** @@ -374,12 +399,14 @@ public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException { */ @Override public void onAddIndex(AddIndexEvent indexEvent) throws MetaException { - Index index = indexEvent.getIndex(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory - .buildCreateIndexMessage(index).toString()); - event.setDbName(index.getDbName()); - process(event, indexEvent); + if (indexEvent.getStatus()) { + Index index = indexEvent.getIndex(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory + .buildCreateIndexMessage(index).toString()); + event.setDbName(index.getDbName()); + process(event, indexEvent); + } } /** @@ -388,12 +415,14 @@ public void onAddIndex(AddIndexEvent indexEvent) throws MetaException { */ @Override public void onDropIndex(DropIndexEvent indexEvent) throws MetaException { - Index index = indexEvent.getIndex(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory - .buildDropIndexMessage(index).toString()); - event.setDbName(index.getDbName()); - process(event, indexEvent); + if (indexEvent.getStatus()) { + Index index = indexEvent.getIndex(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory + .buildDropIndexMessage(index).toString()); + event.setDbName(index.getDbName()); + process(event, indexEvent); + } } /** @@ -402,13 +431,15 @@ public void onDropIndex(DropIndexEvent indexEvent) throws MetaException { */ @Override public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException { - Index before = indexEvent.getOldIndex(); - Index after = indexEvent.getNewIndex(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory - .buildAlterIndexMessage(before, after).toString()); - event.setDbName(before.getDbName()); - process(event, indexEvent); + if (indexEvent.getStatus()) { + Index before = indexEvent.getOldIndex(); + Index after = indexEvent.getNewIndex(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory + .buildAlterIndexMessage(before, after).toString()); + event.setDbName(before.getDbName()); + process(event, indexEvent); + } } class FileChksumIterator implements Iterator { @@ -438,15 +469,17 @@ public void remove() { } @Override public void onInsert(InsertEvent insertEvent) throws MetaException { - Table tableObj = insertEvent.getTableObj(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(tableObj, - insertEvent.getPartitionObj(), insertEvent.isReplace(), - new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())) - .toString()); - event.setDbName(tableObj.getDbName()); - event.setTableName(tableObj.getTableName()); - process(event, insertEvent); + if (insertEvent.getStatus()) { + Table tableObj = insertEvent.getTableObj(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(tableObj, + insertEvent.getPartitionObj(), insertEvent.isReplace(), + new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())) + .toString()); + event.setDbName(tableObj.getDbName()); + event.setTableName(tableObj.getTableName()); + process(event, insertEvent); + } } /** diff --git metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java index 37327f8bd2ebf3f883fb3bd408d63f776b26406e..0f48cae83978f1cb078e1ba0646dae58c0ab9d8b 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; @@ -51,6 +53,8 @@ */ @Private public class MetaStoreListenerNotifier { + + private static final Logger LOG = LoggerFactory.getLogger(MetaStoreListenerNotifier.class); private interface EventNotifier { void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException; } @@ -164,7 +168,11 @@ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws Preconditions.checkNotNull(event, "The event must not be null."); for (MetaStoreEventListener listener : listeners) { - notificationEvents.get(eventType).notify(listener, event); + try { + notificationEvents.get(eventType).notify(listener, event); + } catch (Exception e) { + LOG.warn("Exception caught while calling metastore listener ", e); + } } // Each listener called above might set a different parameter on the event.