diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 68d3cc10fd..f16f443388 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -92,8 +92,6 @@ private static final Logger LOG = LoggerFactory.getLogger(DbNotificationListener.class.getName()); private static CleanerThread cleaner = null; - private static final Object NOTIFICATION_TBL_LOCK = new Object(); - // This is the same object as super.conf, but it's convenient to keep a copy of it as a // HiveConf rather than a Configuration. private HiveConf hiveConf; @@ -573,11 +571,9 @@ private int now() { */ private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException { event.setMessageFormat(msgFactory.getMessageFormat()); - synchronized (NOTIFICATION_TBL_LOCK) { LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), event.getMessage()); HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event); - } // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners. if (event.isSetEventId()) { @@ -603,9 +599,7 @@ private void process(NotificationEvent event, ListenerEvent listenerEvent) throw @Override public void run() { while (true) { - synchronized(NOTIFICATION_TBL_LOCK) { - rs.cleanNotificationEvents(ttl); - } + rs.cleanNotificationEvents(ttl); LOG.debug("Cleaner thread done"); try { Thread.sleep(sleepTime); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 345870ef45..7ff0198dd7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -281,8 +281,8 @@ public void testBasic() throws IOException { verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror); - verifyRun("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror); + verifyRun("SELECT a from " + replicatedDbName + ".ptned_empty", empty, driverMirror); + verifyRun("SELECT * from " + replicatedDbName + ".unptned_empty", empty, driverMirror); } @Test @@ -952,7 +952,7 @@ public void testDropsWithCM() throws IOException { @Test public void testAlters() throws IOException { - String testName = "alters"; + String testName = this.testName.getMethodName(); String dbName = createDB(testName, driver); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE", driver); @@ -3021,7 +3021,7 @@ private void verifyRun(String cmd, String data, Driver myDriver) throws IOExcept } private void verifyRun(String cmd, String[] data, Driver myDriver) throws IOException { - run(cmd, myDriver); + run(cmd, true, myDriver); verifyResults(data, myDriver); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 2dd7b68e22..912d01c9d7 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.model.MConstraint; import org.apache.hadoop.hive.metastore.model.MDatabase; +import org.apache.hadoop.hive.metastore.model.MNotificationLog; import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics; import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics; import org.apache.hadoop.hive.metastore.parser.ExpressionTree; @@ -210,7 +211,7 @@ private boolean ensureDbInit() { tx.begin(); doCommit = true; } - Query dbQuery = null, tblColumnQuery = null, partColumnQuery = null; + Query dbQuery = null, tblColumnQuery = null, partColumnQuery = null, notificationLogQuery; try { // Force the underlying db to initialize. @@ -223,6 +224,9 @@ private boolean ensureDbInit() { partColumnQuery = pm.newQuery(MPartitionColumnStatistics.class, "dbName == ''"); partColumnQuery.execute(); + notificationLogQuery = pm.newQuery(MNotificationLog.class, "eventType == ''"); + notificationLogQuery.execute(); + return true; } catch (Exception ex) { doCommit = false; diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b87811502b..6be6d274c9 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -122,6 +122,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.model.MBackwardNotificationLog; import org.apache.hadoop.hive.metastore.model.MColumnDescriptor; import org.apache.hadoop.hive.metastore.model.MConstraint; import org.apache.hadoop.hive.metastore.model.MDBPrivilege; @@ -133,7 +134,6 @@ import org.apache.hadoop.hive.metastore.model.MIndex; import org.apache.hadoop.hive.metastore.model.MMasterKey; import org.apache.hadoop.hive.metastore.model.MNotificationLog; -import org.apache.hadoop.hive.metastore.model.MNotificationNextId; import org.apache.hadoop.hive.metastore.model.MOrder; import org.apache.hadoop.hive.metastore.model.MPartition; import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege; @@ -8212,65 +8212,106 @@ public Function getFunction(String dbName, String funcName) throws MetaException return funcs; } - @Override - public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) { + private NotificationEventResponse backwardCompatibleNextNotification(NotificationEventRequest rqst) { boolean commited = false; Query query = null; - NotificationEventResponse result = new NotificationEventResponse(); - result.setEvents(new ArrayList()); + NotificationEventResponse response = new NotificationEventResponse(); + response.setEvents(new ArrayList<>()); try { openTransaction(); long lastEvent = rqst.getLastEvent(); query = pm.newQuery(MNotificationLog.class, "eventId > lastEvent"); query.declareParameters("java.lang.Long lastEvent"); query.setOrdering("eventId ascending"); - Collection events = (Collection) query.execute(lastEvent); + Collection events = (Collection) query.execute(lastEvent); commited = commitTransaction(); if (events == null) { - return result; + return response; } - Iterator i = events.iterator(); + Iterator i = events.iterator(); int maxEvents = rqst.getMaxEvents() > 0 ? rqst.getMaxEvents() : Integer.MAX_VALUE; int numEvents = 0; while (i.hasNext() && numEvents++ < maxEvents) { - result.addToEvents(translateDbToThrift(i.next())); + response.addToEvents(translateDbToThrift(i.next())); } - return result; + return response; } finally { if (!commited) { - rollbackAndCleanup(commited, query); + rollbackAndCleanup(false, query); return null; } } } @Override - public void addNotificationEvent(NotificationEvent entry) { - boolean commited = false; - Query query = null; - try { - openTransaction(); - query = pm.newQuery(MNotificationNextId.class); - Collection ids = (Collection) query.execute(); - MNotificationNextId id = null; - boolean needToPersistId; - if (ids == null || ids.size() == 0) { - id = new MNotificationNextId(1L); - needToPersistId = true; - } else { - id = ids.iterator().next(); - needToPersistId = false; - } - entry.setEventId(id.getNextEventId()); - id.incrementEventId(); - if (needToPersistId) - pm.makePersistent(id); - pm.makePersistent(translateThriftToDb(entry)); - commited = commitTransaction(); - } finally { - rollbackAndCleanup(commited, query); + public NotificationEventResponse getNextNotification(NotificationEventRequest request) { + NotificationEventResponse response = backwardCompatibleNextNotification(request); + if(response != null && response.getEvents().size() > 0 ){ + return response; } + return nextNotification(request); + } + + private NotificationEventResponse nextNotification(NotificationEventRequest request) { + NotificationEventResponse response = new NotificationEventResponse(); + response.setEvents(new ArrayList<>()); + String selectQuery = + "select NL_ID, EVENT_TIME, EVENT_TYPE, DB_NAME, TBL_NAME, MESSAGE, MESSAGE_FORMAT" + + " from NOTIFICATION_LOG where NL_ID > " + + request.getLastEvent() + + " order by NL_ID asc"; + Query query = pm.newQuery("javax.jdo.query.SQL", selectQuery); + List results = (List) query.execute(); + + if (results == null || results.isEmpty()) { + return response; + } + + Iterator iterator = results.iterator(); + int maxEvents = request.getMaxEvents() > 0 ? request.getMaxEvents() : Integer.MAX_VALUE; + for (int count = 0; count < maxEvents && iterator.hasNext(); count++) { + Object[] row = iterator.next(); + MNotificationLog notificationLog = new MNotificationLog( + Integer.parseInt(row[0].toString()), + Integer.parseInt(row[1].toString()), + row[2].toString(), + forValue(row[3]), + forValue(row[4]), + row[5].toString(), + forValue(row[6]) + ); + response.addToEvents(translateDbToThrift(notificationLog)); + } + return response; + } + + private String forValue(Object rowValue) { + return rowValue == null ? null : rowValue.toString(); + } + + @Override + public CurrentNotificationEventId getCurrentNotificationEventId() { + String selectQuery = "select max(NL_ID) from NOTIFICATION_LOG"; + Query query = pm.newQuery("javax.jdo.query.SQL", selectQuery); + query.setUnique(true); + Object eventId = query.execute(); + if (eventId == null) { + return new CurrentNotificationEventId(0L); + } + return new CurrentNotificationEventId(Long.parseLong(eventId.toString())); + } + + /** + * the eventId provided in the NotificationEvent is not honored by the metastore as the primary key + * is auto generated on the underying rdbms itself. + * @param entry + */ + @Override + public void addNotificationEvent(NotificationEvent entry) { + openTransaction(); + pm.makePersistent(translateThriftToDb(entry)); + commitTransaction(); } @Override @@ -8293,28 +8334,8 @@ public void cleanNotificationEvents(int olderThan) { } } - @Override - public CurrentNotificationEventId getCurrentNotificationEventId() { - boolean commited = false; - Query query = null; - try { - openTransaction(); - query = pm.newQuery(MNotificationNextId.class); - Collection ids = (Collection) query.execute(); - long id = 0; - if (ids != null && ids.size() > 0) { - id = ids.iterator().next().getNextEventId() - 1; - } - commited = commitTransaction(); - return new CurrentNotificationEventId(id); - } finally { - rollbackAndCleanup(commited, query); - } - } - private MNotificationLog translateThriftToDb(NotificationEvent entry) { MNotificationLog dbEntry = new MNotificationLog(); - dbEntry.setEventId(entry.getEventId()); dbEntry.setEventTime(entry.getEventTime()); dbEntry.setEventType(entry.getEventType()); dbEntry.setDbName(entry.getDbName()); @@ -8324,7 +8345,7 @@ private MNotificationLog translateThriftToDb(NotificationEvent entry) { return dbEntry; } - private NotificationEvent translateDbToThrift(MNotificationLog dbEvent) { + private NotificationEvent translateDbToThrift(MBackwardNotificationLog dbEvent) { NotificationEvent event = new NotificationEvent(); event.setEventId(dbEvent.getEventId()); event.setEventTime(dbEvent.getEventTime()); @@ -8336,6 +8357,18 @@ private NotificationEvent translateDbToThrift(MNotificationLog dbEvent) { return event; } + private NotificationEvent translateDbToThrift(MNotificationLog dbEvent) { + NotificationEvent event = new NotificationEvent(); + event.setEventId(dbEvent.getNotificationLogId()); + event.setEventTime(dbEvent.getEventTime()); + event.setEventType(dbEvent.getEventType()); + event.setDbName(dbEvent.getDbName()); + event.setTableName(dbEvent.getTableName()); + event.setMessage((dbEvent.getMessage())); + event.setMessageFormat(dbEvent.getMessageFormat()); + return event; + } + @Override public boolean isFileMetadataSupported() { return false; diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java index b28ea73593..a994687680 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -1,33 +1,28 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.metastore; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics; import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; -import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; @@ -38,10 +33,10 @@ import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Role; @@ -55,17 +50,28 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; - import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; - import javax.jdo.Query; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestObjectStore { private ObjectStore objectStore = null; @@ -141,38 +147,34 @@ public void testNotificationOps() throws InterruptedException { // Verify that there is no notifications available yet eventId = objectStore.getCurrentNotificationEventId(); - Assert.assertEquals(NO_EVENT_ID, eventId.getEventId()); + assertEquals(NO_EVENT_ID, eventId.getEventId()); // Verify that addNotificationEvent() updates the NotificationEvent with the new event ID objectStore.addNotificationEvent(event); - Assert.assertEquals(FIRST_EVENT_ID, event.getEventId()); + assertEquals(FIRST_EVENT_ID, objectStore.getCurrentNotificationEventId().getEventId()); objectStore.addNotificationEvent(event); - Assert.assertEquals(SECOND_EVENT_ID, event.getEventId()); - - // Verify that objectStore fetches the latest notification event ID - eventId = objectStore.getCurrentNotificationEventId(); - Assert.assertEquals(SECOND_EVENT_ID, eventId.getEventId()); + assertEquals(SECOND_EVENT_ID,objectStore.getCurrentNotificationEventId().getEventId()); // Verify that getNextNotification() returns all events eventResponse = objectStore.getNextNotification(new NotificationEventRequest()); - Assert.assertEquals(2, eventResponse.getEventsSize()); - Assert.assertEquals(FIRST_EVENT_ID, eventResponse.getEvents().get(0).getEventId()); - Assert.assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(1).getEventId()); + assertEquals(2, eventResponse.getEventsSize()); + assertEquals(FIRST_EVENT_ID, eventResponse.getEvents().get(0).getEventId()); + assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(1).getEventId()); // Verify that getNextNotification(last) returns events after a specified event eventResponse = objectStore.getNextNotification(new NotificationEventRequest(FIRST_EVENT_ID)); - Assert.assertEquals(1, eventResponse.getEventsSize()); - Assert.assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(0).getEventId()); + assertEquals(1, eventResponse.getEventsSize()); + assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(0).getEventId()); // Verify that getNextNotification(last) returns zero events if there are no more notifications available eventResponse = objectStore.getNextNotification(new NotificationEventRequest(SECOND_EVENT_ID)); - Assert.assertEquals(0, eventResponse.getEventsSize()); + assertEquals(0, eventResponse.getEventsSize()); // Verify that cleanNotificationEvents() cleans up all old notifications Thread.sleep(1); objectStore.cleanNotificationEvents(1); eventResponse = objectStore.getNextNotification(new NotificationEventRequest()); - Assert.assertEquals(0, eventResponse.getEventsSize()); + assertEquals(0, eventResponse.getEventsSize()); } /** @@ -187,14 +189,14 @@ public void testDatabaseOps() throws MetaException, InvalidObjectException, NoSu List databases = objectStore.getAllDatabases(); LOG.info("databases: " + databases); - Assert.assertEquals(2, databases.size()); - Assert.assertEquals(DB1, databases.get(0)); - Assert.assertEquals(DB2, databases.get(1)); + assertEquals(2, databases.size()); + assertEquals(DB1, databases.get(0)); + assertEquals(DB2, databases.get(1)); objectStore.dropDatabase(DB1); databases = objectStore.getAllDatabases(); - Assert.assertEquals(1, databases.size()); - Assert.assertEquals(DB2, databases.get(0)); + assertEquals(1, databases.size()); + assertEquals(DB2, databases.get(0)); objectStore.dropDatabase(DB2); } @@ -215,8 +217,8 @@ public void testTableOps() throws MetaException, InvalidObjectException, NoSuchO objectStore.createTable(tbl1); List tables = objectStore.getAllTables(DB1); - Assert.assertEquals(1, tables.size()); - Assert.assertEquals(TABLE1, tables.get(0)); + assertEquals(1, tables.size()); + assertEquals(TABLE1, tables.get(0)); StorageDescriptor sd2 = new StorageDescriptor(ImmutableList.of(new FieldSchema("fk_col", "double", null)), "location", null, null, false, 0, new SerDeInfo("SerDeName", "serializationLib", null), @@ -224,15 +226,15 @@ public void testTableOps() throws MetaException, InvalidObjectException, NoSuchO Table newTbl1 = new Table("new" + TABLE1, DB1, "owner", 1, 2, 3, sd2, null, params, null, null, "MANAGED_TABLE"); objectStore.alterTable(DB1, TABLE1, newTbl1); tables = objectStore.getTables(DB1, "new*"); - Assert.assertEquals(1, tables.size()); - Assert.assertEquals("new" + TABLE1, tables.get(0)); + assertEquals(1, tables.size()); + assertEquals("new" + TABLE1, tables.get(0)); objectStore.createTable(tbl1); tables = objectStore.getAllTables(DB1); - Assert.assertEquals(2, tables.size()); + assertEquals(2, tables.size()); List foreignKeys = objectStore.getForeignKeys(DB1, TABLE1, null, null); - Assert.assertEquals(0, foreignKeys.size()); + assertEquals(0, foreignKeys.size()); SQLPrimaryKey pk = new SQLPrimaryKey(DB1, TABLE1, "pk_col", 1, "pk_const_1", false, false, false); @@ -244,7 +246,7 @@ public void testTableOps() throws MetaException, InvalidObjectException, NoSuchO // Retrieve from PK side foreignKeys = objectStore.getForeignKeys(null, null, DB1, "new" + TABLE1); - Assert.assertEquals(1, foreignKeys.size()); + assertEquals(1, foreignKeys.size()); List fks = objectStore.getForeignKeys(null, null, DB1, "new" + TABLE1); if (fks != null) { @@ -254,18 +256,18 @@ public void testTableOps() throws MetaException, InvalidObjectException, NoSuchO } // Retrieve from FK side foreignKeys = objectStore.getForeignKeys(DB1, TABLE1, null, null); - Assert.assertEquals(0, foreignKeys.size()); + assertEquals(0, foreignKeys.size()); // Retrieve from PK side foreignKeys = objectStore.getForeignKeys(null, null, DB1, "new" + TABLE1); - Assert.assertEquals(0, foreignKeys.size()); + assertEquals(0, foreignKeys.size()); objectStore.dropTable(DB1, TABLE1); tables = objectStore.getAllTables(DB1); - Assert.assertEquals(1, tables.size()); + assertEquals(1, tables.size()); objectStore.dropTable(DB1, "new" + TABLE1); tables = objectStore.getAllTables(DB1); - Assert.assertEquals(0, tables.size()); + assertEquals(0, tables.size()); objectStore.dropDatabase(DB1); } @@ -295,20 +297,20 @@ public void testPartitionOps() throws MetaException, InvalidObjectException, NoS Deadline.startTimer("getPartition"); List partitions = objectStore.getPartitions(DB1, TABLE1, 10); - Assert.assertEquals(2, partitions.size()); - Assert.assertEquals(111, partitions.get(0).getCreateTime()); - Assert.assertEquals(222, partitions.get(1).getCreateTime()); + assertEquals(2, partitions.size()); + assertEquals(111, partitions.get(0).getCreateTime()); + assertEquals(222, partitions.get(1).getCreateTime()); int numPartitions = objectStore.getNumPartitionsByFilter(DB1, TABLE1, ""); - Assert.assertEquals(partitions.size(), numPartitions); + assertEquals(partitions.size(), numPartitions); numPartitions = objectStore.getNumPartitionsByFilter(DB1, TABLE1, "country = \"US\""); - Assert.assertEquals(2, numPartitions); + assertEquals(2, numPartitions); objectStore.dropPartition(DB1, TABLE1, value1); partitions = objectStore.getPartitions(DB1, TABLE1, 10); - Assert.assertEquals(1, partitions.size()); - Assert.assertEquals(222, partitions.get(0).getCreateTime()); + assertEquals(1, partitions.size()); + assertEquals(222, partitions.get(0).getCreateTime()); objectStore.dropPartition(DB1, TABLE1, value2); objectStore.dropTable(DB1, TABLE1); @@ -324,21 +326,21 @@ public void testMasterKeyOps() throws MetaException, NoSuchObjectException { int id2 = objectStore.addMasterKey(KEY2); String[] keys = objectStore.getMasterKeys(); - Assert.assertEquals(2, keys.length); - Assert.assertEquals(KEY1, keys[0]); - Assert.assertEquals(KEY2, keys[1]); + assertEquals(2, keys.length); + assertEquals(KEY1, keys[0]); + assertEquals(KEY2, keys[1]); objectStore.updateMasterKey(id1, "new" + KEY1); objectStore.updateMasterKey(id2, "new" + KEY2); keys = objectStore.getMasterKeys(); - Assert.assertEquals(2, keys.length); - Assert.assertEquals("new" + KEY1, keys[0]); - Assert.assertEquals("new" + KEY2, keys[1]); + assertEquals(2, keys.length); + assertEquals("new" + KEY1, keys[0]); + assertEquals("new" + KEY2, keys[1]); objectStore.removeMasterKey(id1); keys = objectStore.getMasterKeys(); - Assert.assertEquals(1, keys.length); - Assert.assertEquals("new" + KEY2, keys[0]); + assertEquals(1, keys.length); + assertEquals("new" + KEY2, keys[0]); objectStore.removeMasterKey(id2); } @@ -351,10 +353,10 @@ public void testRoleOps() throws InvalidObjectException, MetaException, NoSuchOb objectStore.addRole(ROLE1, OWNER); objectStore.addRole(ROLE2, OWNER); List roles = objectStore.listRoleNames(); - Assert.assertEquals(2, roles.size()); - Assert.assertEquals(ROLE2, roles.get(1)); + assertEquals(2, roles.size()); + assertEquals(ROLE2, roles.get(1)); Role role1 = objectStore.getRole(ROLE1); - Assert.assertEquals(OWNER, role1.getOwnerName()); + assertEquals(OWNER, role1.getOwnerName()); objectStore.grantRole(role1, USER1, PrincipalType.USER, OWNER, PrincipalType.ROLE, true); objectStore.revokeRole(role1, USER1, PrincipalType.USER, false); objectStore.removeRole(ROLE1); @@ -405,7 +407,8 @@ protected Database getJdoResult(ObjectStore.GetHelper ctx) throws Meta MetricsConstant.DIRECTSQL_ERRORS, 1); } - public static void dropAllStoreObjects(RawStore store) throws MetaException, InvalidObjectException, InvalidInputException { + private static void dropAllStoreObjects(RawStore store) + throws MetaException, InvalidObjectException, InvalidInputException { try { Deadline.registerIfNot(100000); List funcs = store.getAllFunctions(); @@ -467,4 +470,50 @@ public void testQueryCloseOnError() throws Exception { Mockito.verify(spy, Mockito.times(3)) .rollbackAndCleanup(Mockito.anyBoolean(), Mockito.anyObject()); } + + @Test + public void testConcurrentAddNotifications() throws ExecutionException, InterruptedException { + final int NUM_THREADS = 2; + CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_THREADS, + () -> System.out.println("both threads going to add notification")); + + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, + MockPartitionExpressionProxy.class.getName()); + + ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + + for (int i = 0; i < NUM_THREADS; i++) { + final int n = i; + + executorService.execute( + () -> { + ObjectStore store = new ObjectStore(); + store.setConf(conf); + + String eventType = EventMessage.EventType.CREATE_DATABASE.toString(); + NotificationEvent dbEvent = + new NotificationEvent(0, 0, eventType, "CREATE DATABASE DB" + n); + System.out.println("ADDING NOTIFICATION"); + + try { + cyclicBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + store.addNotificationEvent(dbEvent); + System.out.println("FINISH NOTIFICATION"); + }); + } + executorService.shutdown(); + boolean threadsExecuted = executorService.awaitTermination(15, TimeUnit.SECONDS); + assertTrue(threadsExecuted); + + NotificationEventResponse eventResponse = objectStore.getNextNotification( + new NotificationEventRequest()); + assertEquals(2, eventResponse.getEventsSize()); + long firstEventId = eventResponse.getEvents().get(0).getEventId(); + long secondEventId = eventResponse.getEvents().get(1).getEventId(); + assertTrue(firstEventId < secondEventId); + } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MBackwardNotificationLog.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MBackwardNotificationLog.java new file mode 100644 index 0000000000..02b0370489 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MBackwardNotificationLog.java @@ -0,0 +1,93 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.metastore.model; + +/** + * this class represents the state of class for repl v1 to work. it was mapping eventId below + * to a generated sequence number. This has been moved away from due to HIVE-16886 + */ +public class MBackwardNotificationLog { + + private long eventId; // This is not the datanucleus id, but the id assigned by the sequence + private int eventTime; + private String eventType; + private String dbName; + private String tableName; + private String message; + private String messageFormat; + + public MBackwardNotificationLog() { + } + + public void setEventId(long eventId) { + this.eventId = eventId; + } + + public long getEventId() { + return eventId; + } + + public int getEventTime() { + return eventTime; + } + + public void setEventTime(int eventTime) { + this.eventTime = eventTime; + } + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public String getDbName() { + return dbName; + } + + public void setDbName(String dbName) { + this.dbName = dbName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getMessageFormat() { + return messageFormat; + } + + public void setMessageFormat(String messageFormat) { + this.messageFormat = messageFormat; + } + +} diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MNotificationLog.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MNotificationLog.java index d3a166ff54..c746a7e4f1 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MNotificationLog.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MNotificationLog.java @@ -1,25 +1,25 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.metastore.model; public class MNotificationLog { - private long eventId; // This is not the datanucleus id, but the id assigned by the sequence + private long notificationLogId; // this is the corresponding NL_ID we have which we map via explicit SQL private int eventTime; private String eventType; private String dbName; @@ -27,25 +27,18 @@ private String message; private String messageFormat; - public MNotificationLog() { - } - - public MNotificationLog(int eventId, String eventType, String dbName, String tableName, - String message) { - this.eventId = eventId; + public MNotificationLog(long notificationLogId, int eventTime, String eventType, String dbName, + String tableName, String message, String messageFormat) { + this.notificationLogId = notificationLogId; + this.eventTime = eventTime; this.eventType = eventType; this.dbName = dbName; this.tableName = tableName; this.message = message; + this.messageFormat = messageFormat; } - public void setEventId(long eventId) { - this.eventId = eventId; - } - - public long getEventId() { - return eventId; - + public MNotificationLog() { } public int getEventTime() { @@ -95,4 +88,8 @@ public String getMessageFormat() { public void setMessageFormat(String messageFormat) { this.messageFormat = messageFormat; } + + public long getNotificationLogId(){ + return notificationLogId; + } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MNotificationNextId.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MNotificationNextId.java deleted file mode 100644 index ef15848c85..0000000000 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MNotificationNextId.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.model; - -public class MNotificationNextId { - - private long nextEventId; - - public MNotificationNextId() { - } - - public MNotificationNextId(long nextEventId) { - this.nextEventId = nextEventId; - } - - public long getNextEventId() { - return nextEventId; - } - - public void setNextEventId(long nextEventId) { - this.nextEventId = nextEventId; - } - - public void incrementEventId() { - nextEventId++; - } -} diff --git a/standalone-metastore/src/main/resources/package.jdo b/standalone-metastore/src/main/resources/package.jdo index 570fd44c21..7e8d433eed 100644 --- a/standalone-metastore/src/main/resources/package.jdo +++ b/standalone-metastore/src/main/resources/package.jdo @@ -1049,9 +1049,8 @@ - - - + + @@ -1072,17 +1071,33 @@ - - + + - + - - + + + + + + + + + + + + + + + + + + + + - -