diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 4df2758..f7e3e3a 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -474,6 +474,7 @@ private int now() { // Process this notification by adding it to metastore DB private void process(NotificationEvent event) { + event.setMessageFormat(msgFactory.getMessageFormat()); if (rs != null) { synchronized (NOTIFICATION_TBL_LOCK) { LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 7836c47..952c6b0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -22,10 +22,14 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.EventUtils; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -45,10 +49,12 @@ import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.assertFalse; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -958,6 +964,120 @@ public void testStatus() throws IOException { } + @Test + public void testEventFilters(){ + // Test testing that the filters introduced by EventUtils are working correctly. + + // The current filters we use in ReplicationSemanticAnalyzer is as follows: + // IMetaStoreClient.NotificationFilter evFilter = EventUtils.andFilter( + // EventUtils.getDbTblNotificationFilter(dbNameOrPattern, tblNameOrPattern), + // EventUtils.getEventBoundaryFilter(eventFrom, eventTo), + // EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat())); + // So, we test each of those three filters, and then test andFilter itself. + + + String dbname = "testfilter_db"; + String tblname = "testfilter_tbl"; + + // Test EventUtils.getDbTblNotificationFilter - this is supposed to restrict + // events to those that match the dbname and tblname provided to the filter. + // If the tblname passed in to the filter is null, then it restricts itself + // to dbname-matching alone. + IMetaStoreClient.NotificationFilter dbTblFilter = EventUtils.getDbTblNotificationFilter(dbname,tblname); + IMetaStoreClient.NotificationFilter dbFilter = EventUtils.getDbTblNotificationFilter(dbname,null); + + assertFalse(dbTblFilter.accept(null)); + assertTrue(dbTblFilter.accept(createDummyEvent(dbname, tblname, 0))); + assertFalse(dbTblFilter.accept(createDummyEvent(dbname, tblname + "extra",0))); + assertFalse(dbTblFilter.accept(createDummyEvent(dbname + "extra", tblname,0))); + + assertFalse(dbFilter.accept(null)); + assertTrue(dbFilter.accept(createDummyEvent(dbname, tblname,0))); + assertTrue(dbFilter.accept(createDummyEvent(dbname, tblname + "extra", 0))); + assertFalse(dbFilter.accept(createDummyEvent(dbname + "extra", tblname,0))); + + + // Test EventUtils.getEventBoundaryFilter - this is supposed to only allow events + // within a range specified. + long evBegin = 50; + long evEnd = 75; + IMetaStoreClient.NotificationFilter evRangeFilter = EventUtils.getEventBoundaryFilter(evBegin,evEnd); + + assertTrue(evBegin < evEnd); + assertFalse(evRangeFilter.accept(null)); + assertFalse(evRangeFilter.accept(createDummyEvent(dbname, tblname, evBegin - 1))); + assertTrue(evRangeFilter.accept(createDummyEvent(dbname, tblname, evBegin))); + assertTrue(evRangeFilter.accept(createDummyEvent(dbname, tblname, evBegin + 1))); + assertTrue(evRangeFilter.accept(createDummyEvent(dbname, tblname, evEnd - 1))); + assertTrue(evRangeFilter.accept(createDummyEvent(dbname, tblname, evEnd))); + assertFalse(evRangeFilter.accept(createDummyEvent(dbname, tblname, evEnd + 1))); + + + // Test EventUtils.restrictByMessageFormat - this restricts events generated to those + // that match a provided message format + + IMetaStoreClient.NotificationFilter restrictByDefaultMessageFormat = + EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat()); + IMetaStoreClient.NotificationFilter restrictByArbitraryMessageFormat = + EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat() + "_bogus"); + NotificationEvent dummyEvent = createDummyEvent(dbname,tblname,0); + + assertEquals(MessageFactory.getInstance().getMessageFormat(),dummyEvent.getMessageFormat()); + + assertFalse(restrictByDefaultMessageFormat.accept(null)); + assertTrue(restrictByDefaultMessageFormat.accept(dummyEvent)); + assertFalse(restrictByArbitraryMessageFormat.accept(dummyEvent)); + + // Test andFilter operation. + + IMetaStoreClient.NotificationFilter yes = new IMetaStoreClient.NotificationFilter() { + @Override + public boolean accept(NotificationEvent notificationEvent) { + return true; + } + }; + + IMetaStoreClient.NotificationFilter no = new IMetaStoreClient.NotificationFilter() { + @Override + public boolean accept(NotificationEvent notificationEvent) { + return false; + } + }; + + assertTrue(EventUtils.andFilter(yes, yes).accept(dummyEvent)); + assertFalse(EventUtils.andFilter(yes, no).accept(dummyEvent)); + assertFalse(EventUtils.andFilter(no, yes).accept(dummyEvent)); + assertFalse(EventUtils.andFilter(no, no).accept(dummyEvent)); + + assertTrue(EventUtils.andFilter(yes, yes, yes).accept(dummyEvent)); + assertFalse(EventUtils.andFilter(yes, yes, no).accept(dummyEvent)); + assertFalse(EventUtils.andFilter(yes, no, yes).accept(dummyEvent)); + assertFalse(EventUtils.andFilter(yes, no, no).accept(dummyEvent)); + assertFalse(EventUtils.andFilter(no, yes, yes).accept(dummyEvent)); + assertFalse(EventUtils.andFilter(no, yes, no).accept(dummyEvent)); + assertFalse(EventUtils.andFilter(no, no, yes).accept(dummyEvent)); + assertFalse(EventUtils.andFilter(no, no, no).accept(dummyEvent)); + + + } + + private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) { + MessageFactory msgFactory = MessageFactory.getInstance(); + Table t = new Table(); + t.setDbName(dbname); + t.setTableName(tblname); + NotificationEvent event = new NotificationEvent( + evid, + (int)System.currentTimeMillis(), + MessageFactory.CREATE_TABLE_EVENT, + msgFactory.buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator()).toString() + ); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + event.setMessageFormat(msgFactory.getMessageFormat()); + return event; + } + private String verifyAndReturnDbReplStatus(String dbName, String tblName, String prevReplDumpId, String cmd) throws IOException { run(cmd); advanceDumpDir(); diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift index bf80455..d056498 100755 --- a/metastore/if/hive_metastore.thrift +++ b/metastore/if/hive_metastore.thrift @@ -800,6 +800,7 @@ struct NotificationEvent { 4: optional string dbName, 5: optional string tableName, 6: required string message, + 7: optional string messageFormat, } struct NotificationEventResponse { diff --git a/metastore/scripts/upgrade/derby/038-HIVE-10562.derby.sql b/metastore/scripts/upgrade/derby/038-HIVE-10562.derby.sql new file mode 100644 index 0000000..ab33d3a --- /dev/null +++ b/metastore/scripts/upgrade/derby/038-HIVE-10562.derby.sql @@ -0,0 +1,11 @@ +-- Step 1: Add the column for format +ALTER TABLE "APP"."NOTIFICATION_LOG" ADD "MESSAGE_FORMAT" varchar(16); + +-- Step 2 : Change the type of the MESSAGE field from long varchar to clob +ALTER TABLE "APP"."NOTIFICATION_LOG" ADD COLUMN "MESSAGE_CLOB" CLOB; +UPDATE "APP"."NOTIFICATION_LOG" SET MESSAGE_CLOB=CAST(MESSAGE AS CLOB); +ALTER TABLE "APP"."NOTIFICATION_LOG" DROP COLUMN MESSAGE; +RENAME COLUMN "APP"."NOTIFICATION_LOG"."MESSAGE_CLOB" TO "MESSAGE"; + +-- ALTER TABLE "APP"."NOTIFICATION_LOG" ALTER COLUMN "MESSAGE" SET DATA TYPE CLOB; + diff --git a/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql b/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql index fe18089..8a3ae78 100644 --- a/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql +++ b/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql @@ -102,7 +102,7 @@ CREATE TABLE "APP"."FUNCS" ("FUNC_ID" BIGINT NOT NULL, "CLASS_NAME" VARCHAR(4000 CREATE TABLE "APP"."FUNC_RU" ("FUNC_ID" BIGINT NOT NULL, "RESOURCE_TYPE" INTEGER NOT NULL, "RESOURCE_URI" VARCHAR(4000), "INTEGER_IDX" INTEGER NOT NULL); -CREATE TABLE "APP"."NOTIFICATION_LOG" ("NL_ID" BIGINT NOT NULL, "DB_NAME" VARCHAR(128), "EVENT_ID" BIGINT NOT NULL, "EVENT_TIME" INTEGER NOT NULL, "EVENT_TYPE" VARCHAR(32) NOT NULL, "MESSAGE" LONG VARCHAR, "TBL_NAME" VARCHAR(128)); +CREATE TABLE "APP"."NOTIFICATION_LOG" ("NL_ID" BIGINT NOT NULL, "DB_NAME" VARCHAR(128), "EVENT_ID" BIGINT NOT NULL, "EVENT_TIME" INTEGER NOT NULL, "EVENT_TYPE" VARCHAR(32) NOT NULL, "MESSAGE" CLOB, "TBL_NAME" VARCHAR(128), "MESSAGE_FORMAT" VARCHAR(16)); CREATE TABLE "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID" BIGINT NOT NULL, "NEXT_EVENT_ID" BIGINT NOT NULL); diff --git a/metastore/scripts/upgrade/derby/upgrade-2.1.0-to-2.2.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-2.1.0-to-2.2.0.derby.sql index 699a619..e5a144c 100644 --- a/metastore/scripts/upgrade/derby/upgrade-2.1.0-to-2.2.0.derby.sql +++ b/metastore/scripts/upgrade/derby/upgrade-2.1.0-to-2.2.0.derby.sql @@ -1,4 +1,5 @@ -- Upgrade MetaStore schema from 2.1.0 to 2.2.0 RUN '037-HIVE-14496.derby.sql'; +RUN '038-HIVE-10562.derby.sql'; UPDATE "APP".VERSION SET SCHEMA_VERSION='2.2.0', VERSION_COMMENT='Hive release version 2.2.0' where VER_ID=1; diff --git a/metastore/scripts/upgrade/mssql/023-HIVE-10562.mssql.sql b/metastore/scripts/upgrade/mssql/023-HIVE-10562.mssql.sql new file mode 100644 index 0000000..fe52a15 --- /dev/null +++ b/metastore/scripts/upgrade/mssql/023-HIVE-10562.mssql.sql @@ -0,0 +1 @@ +ALTER TABLE NOTIFICATION_LOG ADD MESSAGE_FORMAT nvarchar(16); diff --git a/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql index 7ff881c..f83911a 100644 --- a/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql +++ b/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql @@ -579,6 +579,7 @@ CREATE TABLE NOTIFICATION_LOG EVENT_TYPE nvarchar(32) NOT NULL, DB_NAME nvarchar(128) NULL, TBL_NAME nvarchar(128) NULL, + MESSAGE_FORMAT nvarchar(16) MESSAGE text NULL ); diff --git a/metastore/scripts/upgrade/mssql/upgrade-2.1.0-to-2.2.0.mssql.sql b/metastore/scripts/upgrade/mssql/upgrade-2.1.0-to-2.2.0.mssql.sql index 55d8e9b..a4b8fda 100644 --- a/metastore/scripts/upgrade/mssql/upgrade-2.1.0-to-2.2.0.mssql.sql +++ b/metastore/scripts/upgrade/mssql/upgrade-2.1.0-to-2.2.0.mssql.sql @@ -1,6 +1,7 @@ SELECT 'Upgrading MetaStore schema from 2.1.0 to 2.2.0' AS MESSAGE; :r 022-HIVE-14496.mssql.sql +:r 023-HIVE-10562.mssql.sql UPDATE VERSION SET SCHEMA_VERSION='2.2.0', VERSION_COMMENT='Hive release version 2.2.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.1.0 to 2.2.0' AS MESSAGE; diff --git a/metastore/scripts/upgrade/mysql/038-HIVE-10562.mysql.sql b/metastore/scripts/upgrade/mysql/038-HIVE-10562.mysql.sql new file mode 100644 index 0000000..51e73ba --- /dev/null +++ b/metastore/scripts/upgrade/mysql/038-HIVE-10562.mysql.sql @@ -0,0 +1,6 @@ +-- Step 1: Add the column for format +ALTER TABLE `NOTIFICATION_LOG` ADD `MESSAGE_FORMAT` varchar(16); +-- if MESSAGE_FORMAT is null, then it is the legacy hcat JSONMessageFactory that created this message + +-- Step 2 : Change the type of the MESSAGE field from mediumtext to longtext +ALTER TABLE `NOTIFICATION_LOG` MODIFY `MESSAGE` longtext; diff --git a/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql index 2009f1f..f0b9f10 100644 --- a/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql +++ b/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql @@ -798,7 +798,8 @@ CREATE TABLE IF NOT EXISTS `NOTIFICATION_LOG` `EVENT_TYPE` varchar(32) NOT NULL, `DB_NAME` varchar(128), `TBL_NAME` varchar(128), - `MESSAGE` mediumtext, + `MESSAGE` longtext, + `MESSAGE_FORMAT` varchar(16) PRIMARY KEY (`NL_ID`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; diff --git a/metastore/scripts/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql b/metastore/scripts/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql index 07a002f..509c532 100644 --- a/metastore/scripts/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql +++ b/metastore/scripts/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql @@ -1,6 +1,7 @@ SELECT 'Upgrading MetaStore schema from 2.1.0 to 2.2.0' AS ' '; SOURCE 037-HIVE-14496.mysql.sql; +SOURCE 038-HIVE-10562.mysql.sql; UPDATE VERSION SET SCHEMA_VERSION='2.2.0', VERSION_COMMENT='Hive release version 2.2.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.1.0 to 2.2.0' AS ' '; diff --git a/metastore/scripts/upgrade/oracle/038-HIVE-10562.oracle.sql b/metastore/scripts/upgrade/oracle/038-HIVE-10562.oracle.sql new file mode 100644 index 0000000..bdf7eb8 --- /dev/null +++ b/metastore/scripts/upgrade/oracle/038-HIVE-10562.oracle.sql @@ -0,0 +1,2 @@ +ALTER TABLE NOTIFICATION_LOG ADD MESSAGE_FORMAT VARCHAR(16) NULL; + diff --git a/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql index bb5a934..1d652a8 100644 --- a/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql +++ b/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql @@ -560,7 +560,8 @@ CREATE TABLE NOTIFICATION_LOG EVENT_TYPE VARCHAR2(32) NOT NULL, DB_NAME VARCHAR2(128), TBL_NAME VARCHAR2(128), - MESSAGE CLOB NULL + MESSAGE CLOB NULL, + MESSAGE_FORMAT VARCHAR(16) NULL ); ALTER TABLE NOTIFICATION_LOG ADD CONSTRAINT NOTIFICATION_LOG_PK PRIMARY KEY (NL_ID); diff --git a/metastore/scripts/upgrade/oracle/upgrade-2.1.0-to-2.2.0.oracle.sql b/metastore/scripts/upgrade/oracle/upgrade-2.1.0-to-2.2.0.oracle.sql index b5e65b9..f31fda9 100644 --- a/metastore/scripts/upgrade/oracle/upgrade-2.1.0-to-2.2.0.oracle.sql +++ b/metastore/scripts/upgrade/oracle/upgrade-2.1.0-to-2.2.0.oracle.sql @@ -1,6 +1,7 @@ SELECT 'Upgrading MetaStore schema from 2.1.0 to 2.2.0' AS Status from dual; @037-HIVE-14496.oracle.sql; +@038-HIVE-10562.oracle.sql; UPDATE VERSION SET SCHEMA_VERSION='2.2.0', VERSION_COMMENT='Hive release version 2.2.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.1.0 to 2.2.0' AS Status from dual; diff --git a/metastore/scripts/upgrade/postgres/037-HIVE-10562.postgres.sql b/metastore/scripts/upgrade/postgres/037-HIVE-10562.postgres.sql new file mode 100644 index 0000000..7189f21 --- /dev/null +++ b/metastore/scripts/upgrade/postgres/037-HIVE-10562.postgres.sql @@ -0,0 +1 @@ +ALTER TABLE "NOTIFICATION_LOG" ADD COLUMN "MESSAGE_FORMAT" VARCHAR(16) NULL; diff --git a/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql index 0021df0..81deaa2 100644 --- a/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql +++ b/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql @@ -576,6 +576,7 @@ CREATE TABLE "NOTIFICATION_LOG" "DB_NAME" VARCHAR(128), "TBL_NAME" VARCHAR(128), "MESSAGE" text, + "MESSAGE_FORMAT" VARCHAR(16), PRIMARY KEY ("NL_ID") ); diff --git a/metastore/scripts/upgrade/postgres/upgrade-2.1.0-to-2.2.0.postgres.sql b/metastore/scripts/upgrade/postgres/upgrade-2.1.0-to-2.2.0.postgres.sql index 0f7139a..0f64a90 100644 --- a/metastore/scripts/upgrade/postgres/upgrade-2.1.0-to-2.2.0.postgres.sql +++ b/metastore/scripts/upgrade/postgres/upgrade-2.1.0-to-2.2.0.postgres.sql @@ -1,6 +1,7 @@ SELECT 'Upgrading MetaStore schema from 2.1.0 to 2.2.0'; \i 036-HIVE-14496.postgres.sql; +\i 037-HIVE-10562.postgres.sql; UPDATE "VERSION" SET "SCHEMA_VERSION"='2.2.0', "VERSION_COMMENT"='Hive release version 2.2.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 2.1.0 to 2.2.0'; diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index d605049..be8429e 100644 --- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -15736,6 +15736,11 @@ void NotificationEvent::__set_message(const std::string& val) { this->message = val; } +void NotificationEvent::__set_messageFormat(const std::string& val) { + this->messageFormat = val; +__isset.messageFormat = true; +} + uint32_t NotificationEvent::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -15809,6 +15814,14 @@ uint32_t NotificationEvent::read(::apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 7: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->messageFormat); + this->__isset.messageFormat = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -15860,6 +15873,11 @@ uint32_t NotificationEvent::write(::apache::thrift::protocol::TProtocol* oprot) xfer += oprot->writeString(this->message); xfer += oprot->writeFieldEnd(); + if (this->__isset.messageFormat) { + xfer += oprot->writeFieldBegin("messageFormat", ::apache::thrift::protocol::T_STRING, 7); + xfer += oprot->writeString(this->messageFormat); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -15873,6 +15891,7 @@ void swap(NotificationEvent &a, NotificationEvent &b) { swap(a.dbName, b.dbName); swap(a.tableName, b.tableName); swap(a.message, b.message); + swap(a.messageFormat, b.messageFormat); swap(a.__isset, b.__isset); } @@ -15883,6 +15902,7 @@ NotificationEvent::NotificationEvent(const NotificationEvent& other639) { dbName = other639.dbName; tableName = other639.tableName; message = other639.message; + messageFormat = other639.messageFormat; __isset = other639.__isset; } NotificationEvent& NotificationEvent::operator=(const NotificationEvent& other640) { @@ -15892,6 +15912,7 @@ NotificationEvent& NotificationEvent::operator=(const NotificationEvent& other64 dbName = other640.dbName; tableName = other640.tableName; message = other640.message; + messageFormat = other640.messageFormat; __isset = other640.__isset; return *this; } @@ -15904,6 +15925,7 @@ void NotificationEvent::printTo(std::ostream& out) const { out << ", " << "dbName="; (__isset.dbName ? (out << to_string(dbName)) : (out << "")); out << ", " << "tableName="; (__isset.tableName ? (out << to_string(tableName)) : (out << "")); out << ", " << "message=" << to_string(message); + out << ", " << "messageFormat="; (__isset.messageFormat ? (out << to_string(messageFormat)) : (out << "")); out << ")"; } diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h index 4d5da71..e73333a 100644 --- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -6391,9 +6391,10 @@ inline std::ostream& operator<<(std::ostream& out, const NotificationEventReques } typedef struct _NotificationEvent__isset { - _NotificationEvent__isset() : dbName(false), tableName(false) {} + _NotificationEvent__isset() : dbName(false), tableName(false), messageFormat(false) {} bool dbName :1; bool tableName :1; + bool messageFormat :1; } _NotificationEvent__isset; class NotificationEvent { @@ -6401,7 +6402,7 @@ class NotificationEvent { NotificationEvent(const NotificationEvent&); NotificationEvent& operator=(const NotificationEvent&); - NotificationEvent() : eventId(0), eventTime(0), eventType(), dbName(), tableName(), message() { + NotificationEvent() : eventId(0), eventTime(0), eventType(), dbName(), tableName(), message(), messageFormat() { } virtual ~NotificationEvent() throw(); @@ -6411,6 +6412,7 @@ class NotificationEvent { std::string dbName; std::string tableName; std::string message; + std::string messageFormat; _NotificationEvent__isset __isset; @@ -6426,6 +6428,8 @@ class NotificationEvent { void __set_message(const std::string& val); + void __set_messageFormat(const std::string& val); + bool operator == (const NotificationEvent & rhs) const { if (!(eventId == rhs.eventId)) @@ -6444,6 +6448,10 @@ class NotificationEvent { return false; if (!(message == rhs.message)) return false; + if (__isset.messageFormat != rhs.__isset.messageFormat) + return false; + else if (__isset.messageFormat && !(messageFormat == rhs.messageFormat)) + return false; return true; } bool operator != (const NotificationEvent &rhs) const { diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java index c40bb4b..8e0fb40 100644 --- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java +++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java @@ -44,6 +44,7 @@ private static final org.apache.thrift.protocol.TField DB_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("dbName", org.apache.thrift.protocol.TType.STRING, (short)4); private static final org.apache.thrift.protocol.TField TABLE_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("tableName", org.apache.thrift.protocol.TType.STRING, (short)5); private static final org.apache.thrift.protocol.TField MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("message", org.apache.thrift.protocol.TType.STRING, (short)6); + private static final org.apache.thrift.protocol.TField MESSAGE_FORMAT_FIELD_DESC = new org.apache.thrift.protocol.TField("messageFormat", org.apache.thrift.protocol.TType.STRING, (short)7); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -57,6 +58,7 @@ private String dbName; // optional private String tableName; // optional private String message; // required + private String messageFormat; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -65,7 +67,8 @@ EVENT_TYPE((short)3, "eventType"), DB_NAME((short)4, "dbName"), TABLE_NAME((short)5, "tableName"), - MESSAGE((short)6, "message"); + MESSAGE((short)6, "message"), + MESSAGE_FORMAT((short)7, "messageFormat"); private static final Map byName = new HashMap(); @@ -92,6 +95,8 @@ public static _Fields findByThriftId(int fieldId) { return TABLE_NAME; case 6: // MESSAGE return MESSAGE; + case 7: // MESSAGE_FORMAT + return MESSAGE_FORMAT; default: return null; } @@ -135,7 +140,7 @@ public String getFieldName() { private static final int __EVENTID_ISSET_ID = 0; private static final int __EVENTTIME_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.DB_NAME,_Fields.TABLE_NAME}; + private static final _Fields optionals[] = {_Fields.DB_NAME,_Fields.TABLE_NAME,_Fields.MESSAGE_FORMAT}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -151,6 +156,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("message", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.MESSAGE_FORMAT, new org.apache.thrift.meta_data.FieldMetaData("messageFormat", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(NotificationEvent.class, metaDataMap); } @@ -192,6 +199,9 @@ public NotificationEvent(NotificationEvent other) { if (other.isSetMessage()) { this.message = other.message; } + if (other.isSetMessageFormat()) { + this.messageFormat = other.messageFormat; + } } public NotificationEvent deepCopy() { @@ -208,6 +218,7 @@ public void clear() { this.dbName = null; this.tableName = null; this.message = null; + this.messageFormat = null; } public long getEventId() { @@ -346,6 +357,29 @@ public void setMessageIsSet(boolean value) { } } + public String getMessageFormat() { + return this.messageFormat; + } + + public void setMessageFormat(String messageFormat) { + this.messageFormat = messageFormat; + } + + public void unsetMessageFormat() { + this.messageFormat = null; + } + + /** Returns true if field messageFormat is set (has been assigned a value) and false otherwise */ + public boolean isSetMessageFormat() { + return this.messageFormat != null; + } + + public void setMessageFormatIsSet(boolean value) { + if (!value) { + this.messageFormat = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case EVENT_ID: @@ -396,6 +430,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case MESSAGE_FORMAT: + if (value == null) { + unsetMessageFormat(); + } else { + setMessageFormat((String)value); + } + break; + } } @@ -419,6 +461,9 @@ public Object getFieldValue(_Fields field) { case MESSAGE: return getMessage(); + case MESSAGE_FORMAT: + return getMessageFormat(); + } throw new IllegalStateException(); } @@ -442,6 +487,8 @@ public boolean isSet(_Fields field) { return isSetTableName(); case MESSAGE: return isSetMessage(); + case MESSAGE_FORMAT: + return isSetMessageFormat(); } throw new IllegalStateException(); } @@ -513,6 +560,15 @@ public boolean equals(NotificationEvent that) { return false; } + boolean this_present_messageFormat = true && this.isSetMessageFormat(); + boolean that_present_messageFormat = true && that.isSetMessageFormat(); + if (this_present_messageFormat || that_present_messageFormat) { + if (!(this_present_messageFormat && that_present_messageFormat)) + return false; + if (!this.messageFormat.equals(that.messageFormat)) + return false; + } + return true; } @@ -550,6 +606,11 @@ public int hashCode() { if (present_message) list.add(message); + boolean present_messageFormat = true && (isSetMessageFormat()); + list.add(present_messageFormat); + if (present_messageFormat) + list.add(messageFormat); + return list.hashCode(); } @@ -621,6 +682,16 @@ public int compareTo(NotificationEvent other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetMessageFormat()).compareTo(other.isSetMessageFormat()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMessageFormat()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.messageFormat, other.messageFormat); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -684,6 +755,16 @@ public String toString() { sb.append(this.message); } first = false; + if (isSetMessageFormat()) { + if (!first) sb.append(", "); + sb.append("messageFormat:"); + if (this.messageFormat == null) { + sb.append("null"); + } else { + sb.append(this.messageFormat); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -793,6 +874,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, NotificationEvent s org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 7: // MESSAGE_FORMAT + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.messageFormat = iprot.readString(); + struct.setMessageFormatIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -836,6 +925,13 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, NotificationEvent oprot.writeString(struct.message); oprot.writeFieldEnd(); } + if (struct.messageFormat != null) { + if (struct.isSetMessageFormat()) { + oprot.writeFieldBegin(MESSAGE_FORMAT_FIELD_DESC); + oprot.writeString(struct.messageFormat); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -864,13 +960,19 @@ public void write(org.apache.thrift.protocol.TProtocol prot, NotificationEvent s if (struct.isSetTableName()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.isSetMessageFormat()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetDbName()) { oprot.writeString(struct.dbName); } if (struct.isSetTableName()) { oprot.writeString(struct.tableName); } + if (struct.isSetMessageFormat()) { + oprot.writeString(struct.messageFormat); + } } @Override @@ -884,7 +986,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, NotificationEvent st struct.setEventTypeIsSet(true); struct.message = iprot.readString(); struct.setMessageIsSet(true); - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.dbName = iprot.readString(); struct.setDbNameIsSet(true); @@ -893,6 +995,10 @@ public void read(org.apache.thrift.protocol.TProtocol prot, NotificationEvent st struct.tableName = iprot.readString(); struct.setTableNameIsSet(true); } + if (incoming.get(2)) { + struct.messageFormat = iprot.readString(); + struct.setMessageFormatIsSet(true); + } } } diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php index 103cd86..2dfa1a9 100644 --- a/metastore/src/gen/thrift/gen-php/metastore/Types.php +++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -15639,6 +15639,10 @@ class NotificationEvent { * @var string */ public $message = null; + /** + * @var string + */ + public $messageFormat = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -15667,6 +15671,10 @@ class NotificationEvent { 'var' => 'message', 'type' => TType::STRING, ), + 7 => array( + 'var' => 'messageFormat', + 'type' => TType::STRING, + ), ); } if (is_array($vals)) { @@ -15688,6 +15696,9 @@ class NotificationEvent { if (isset($vals['message'])) { $this->message = $vals['message']; } + if (isset($vals['messageFormat'])) { + $this->messageFormat = $vals['messageFormat']; + } } } @@ -15752,6 +15763,13 @@ class NotificationEvent { $xfer += $input->skip($ftype); } break; + case 7: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->messageFormat); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -15795,6 +15813,11 @@ class NotificationEvent { $xfer += $output->writeString($this->message); $xfer += $output->writeFieldEnd(); } + if ($this->messageFormat !== null) { + $xfer += $output->writeFieldBegin('messageFormat', TType::STRING, 7); + $xfer += $output->writeString($this->messageFormat); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 2f1c3cf..3faf1bb 100644 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -10868,6 +10868,7 @@ class NotificationEvent: - dbName - tableName - message + - messageFormat """ thrift_spec = ( @@ -10878,15 +10879,17 @@ class NotificationEvent: (4, TType.STRING, 'dbName', None, None, ), # 4 (5, TType.STRING, 'tableName', None, None, ), # 5 (6, TType.STRING, 'message', None, None, ), # 6 + (7, TType.STRING, 'messageFormat', None, None, ), # 7 ) - def __init__(self, eventId=None, eventTime=None, eventType=None, dbName=None, tableName=None, message=None,): + def __init__(self, eventId=None, eventTime=None, eventType=None, dbName=None, tableName=None, message=None, messageFormat=None,): self.eventId = eventId self.eventTime = eventTime self.eventType = eventType self.dbName = dbName self.tableName = tableName self.message = message + self.messageFormat = messageFormat def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -10927,6 +10930,11 @@ def read(self, iprot): self.message = iprot.readString() else: iprot.skip(ftype) + elif fid == 7: + if ftype == TType.STRING: + self.messageFormat = iprot.readString() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -10961,6 +10969,10 @@ def write(self, oprot): oprot.writeFieldBegin('message', TType.STRING, 6) oprot.writeString(self.message) oprot.writeFieldEnd() + if self.messageFormat is not None: + oprot.writeFieldBegin('messageFormat', TType.STRING, 7) + oprot.writeString(self.messageFormat) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -10984,6 +10996,7 @@ def __hash__(self): value = (value * 31) ^ hash(self.dbName) value = (value * 31) ^ hash(self.tableName) value = (value * 31) ^ hash(self.message) + value = (value * 31) ^ hash(self.messageFormat) return value def __repr__(self): diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index b6050c6..5342451 100644 --- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2443,6 +2443,7 @@ class NotificationEvent DBNAME = 4 TABLENAME = 5 MESSAGE = 6 + MESSAGEFORMAT = 7 FIELDS = { EVENTID => {:type => ::Thrift::Types::I64, :name => 'eventId'}, @@ -2450,7 +2451,8 @@ class NotificationEvent EVENTTYPE => {:type => ::Thrift::Types::STRING, :name => 'eventType'}, DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName', :optional => true}, TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName', :optional => true}, - MESSAGE => {:type => ::Thrift::Types::STRING, :name => 'message'} + MESSAGE => {:type => ::Thrift::Types::STRING, :name => 'message'}, + MESSAGEFORMAT => {:type => ::Thrift::Types::STRING, :name => 'messageFormat', :optional => true} } def struct_fields; FIELDS; end diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 778615d..c3f2e99 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -8387,6 +8387,7 @@ private MNotificationLog translateThriftToDb(NotificationEvent entry) { dbEntry.setDbName(entry.getDbName()); dbEntry.setTableName(entry.getTableName()); dbEntry.setMessage(entry.getMessage()); + dbEntry.setMessageFormat(entry.getMessageFormat()); return dbEntry; } @@ -8398,6 +8399,7 @@ private NotificationEvent translateDbToThrift(MNotificationLog dbEvent) { event.setDbName(dbEvent.getDbName()); event.setTableName(dbEvent.getTableName()); event.setMessage((dbEvent.getMessage())); + event.setMessageFormat(dbEvent.getMessageFormat()); return event; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java index 927bf15..a5414d1 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java @@ -63,6 +63,24 @@ public boolean accept(NotificationEvent event) { }; } + public static IMetaStoreClient.NotificationFilter restrictByMessageFormat(final String messageFormat){ + return new IMetaStoreClient.NotificationFilter() { + @Override + public boolean accept(NotificationEvent event) { + if (event == null){ + return false; // get rid of trivial case first, so that we can safely assume non-null + } + if (messageFormat == null){ + return true; // let's say that passing null in will not do any filtering. + } + if (messageFormat.equalsIgnoreCase(event.getMessageFormat())){ + return true; + } + return false; + } + }; + } + public static IMetaStoreClient.NotificationFilter getEventBoundaryFilter(final Long eventFrom, final Long eventTo){ return new IMetaStoreClient.NotificationFilter() { @Override @@ -76,12 +94,16 @@ public boolean accept(NotificationEvent event) { } public static IMetaStoreClient.NotificationFilter andFilter( - final IMetaStoreClient.NotificationFilter filter1, - final IMetaStoreClient.NotificationFilter filter2) { + final IMetaStoreClient.NotificationFilter... filters ) { return new IMetaStoreClient.NotificationFilter() { @Override public boolean accept(NotificationEvent event) { - return filter1.accept(event) && filter2.accept(event); + for (IMetaStoreClient.NotificationFilter filter : filters){ + if (!filter.accept(event)){ + return false; + } + } + return true; } }; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index c632ca4..aa770f2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -116,11 +116,6 @@ public static MessageDeserializer getDeserializer(String format, public abstract MessageDeserializer getDeserializer(); /** - * Getter for version-string, corresponding to all constructed messages. - */ - public abstract String getVersion(); - - /** * Getter for message-format. */ public abstract String getMessageFormat(); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index a6ae8de..3406afb 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -84,13 +84,8 @@ public MessageDeserializer getDeserializer() { } @Override - public String getVersion() { - return "0.1"; - } - - @Override public String getMessageFormat() { - return "json"; + return "json-0.2"; } @Override diff --git a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java index aedac35..d3a166f 100644 --- a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java +++ b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java @@ -25,6 +25,7 @@ private String dbName; private String tableName; private String message; + private String messageFormat; public MNotificationLog() { } @@ -86,4 +87,12 @@ public String getMessage() { public void setMessage(String message) { this.message = message; } + + public String getMessageFormat() { + return messageFormat; + } + + public void setMessageFormat(String messageFormat) { + this.messageFormat = messageFormat; + } } diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo index daee72c..e7d26fb 100644 --- a/metastore/src/model/package.jdo +++ b/metastore/src/model/package.jdo @@ -1049,6 +1049,9 @@ + + + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 2b327db..37baca1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -377,9 +377,16 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } } + // TODO : instead of simply restricting by message format, we should eventually + // move to a jdbc-driver-stype registering of message format, and picking message + // factory per event to decode. For now, however, since all messages have the + // same factory, restricting by message format is effectively a guard against + // older leftover data that would cause us problems. + IMetaStoreClient.NotificationFilter evFilter = EventUtils.andFilter( EventUtils.getDbTblNotificationFilter(dbNameOrPattern, tblNameOrPattern), - EventUtils.getEventBoundaryFilter(eventFrom, eventTo)); + EventUtils.getEventBoundaryFilter(eventFrom, eventTo), + EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat())); EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(db.getMSC());