From e5373404a0fa2f039c762a0b918fa86eb163e98d Mon Sep 17 00:00:00 2001 From: Alexander Kolbasov Date: Fri, 26 Jan 2018 16:03:38 -0800 Subject: [PATCH 1/1] HIVE-18526 Backport HIVE-16886 to Hive 2 --- .../apache/hadoop/hive/metastore/ObjectStore.java | 7 + .../hadoop/hive/metastore/TestObjectStore.java | 169 +++++++++++++++++++-- 2 files changed, 166 insertions(+), 10 deletions(-) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index d16d2a3cd6..587865f6f6 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -8233,6 +8233,13 @@ public void addNotificationEvent(NotificationEvent entry) { try { openTransaction(); query = pm.newQuery(MNotificationNextId.class); + + // To protect against concurrent modifications of Notification ID, obtain the row lock + // on the NOTIFICATION_SEQUENCE table row. + // This is a WRITE lock and it is maintained for the duration of transaction. + // See http://www.datanucleus.org/products/accessplatform_3_2/jdo/transactions.html + query.setSerializeRead(true); + Collection ids = (Collection) query.execute(); MNotificationNextId id = null; boolean needToPersistId; diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java index 69e8826f53..4ba1a03737 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -17,17 +17,11 @@ */ package org.apache.hadoop.hive.metastore; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - +import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics; import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; -import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; @@ -38,10 +32,10 @@ import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Role; @@ -57,13 +51,27 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; - import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jdo.Query; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import static java.util.concurrent.Executors.newFixedThreadPool; public class TestObjectStore { private ObjectStore objectStore = null; @@ -144,9 +152,11 @@ public void testNotificationOps() throws InterruptedException { // Verify that addNotificationEvent() updates the NotificationEvent with the new event ID objectStore.addNotificationEvent(event); Assert.assertEquals(FIRST_EVENT_ID, event.getEventId()); + // Verify that objectStore fetches the latest notification event ID + eventId = objectStore.getCurrentNotificationEventId(); + Assert.assertEquals(FIRST_EVENT_ID, eventId.getEventId()); objectStore.addNotificationEvent(event); Assert.assertEquals(SECOND_EVENT_ID, event.getEventId()); - // Verify that objectStore fetches the latest notification event ID eventId = objectStore.getCurrentNotificationEventId(); Assert.assertEquals(SECOND_EVENT_ID, eventId.getEventId()); @@ -171,6 +181,15 @@ public void testNotificationOps() throws InterruptedException { objectStore.cleanNotificationEvents(1); eventResponse = objectStore.getNextNotification(new NotificationEventRequest()); Assert.assertEquals(0, eventResponse.getEventsSize()); + + // Verify that two notifications can be added in a single transaction + long currentId = objectStore.getCurrentNotificationEventId().getEventId(); + objectStore.openTransaction(); + objectStore.addNotificationEvent(event); + objectStore.addNotificationEvent(event); + objectStore.commitTransaction(); + Assert.assertEquals(currentId + 2, + objectStore.getCurrentNotificationEventId().getEventId()); } /** @@ -424,4 +443,134 @@ public void testQueryCloseOnError() throws Exception { Mockito.verify(spy, Mockito.times(3)) .rollbackAndCleanup(Mockito.anyBoolean(), Mockito.anyObject()); } + + /** + * Test concurrent updates to notifications.

+ * + * The test uses N threads to concurrently add M notifications. + * It assumes that no other thread modifies nodifications, but there are no assumptions + * about the initial state of notification table.

+ * + * The following assertions are verified: + *

    + *
  • Correct number of events are added in the table
  • + *
  • There are no duplicate events
  • + *
  • There are no holes
  • + *
  • Events returned by getNextNotification() have all the new events in increasing order
  • + *
+ * + * @throws ExecutionException + * @throws InterruptedException + */ + @Ignore( + "This test is here to allow testing with other databases like mysql / postgres etc\n" + + " with user changes to the code. This cannot be run on apache derby because of\n" + + " https://db.apache.org/derby/docs/10.10/devguide/cdevconcepts842385.html" + ) + @Test + public void testConcurrentNotifications() throws ExecutionException, InterruptedException { + + final int NUM_THREADS = 4; + final int NUM_EVENTS = 200; + // Barrier is used to ensure that all threads start race at the same time + final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS); + ExecutorService executor = newFixedThreadPool(NUM_THREADS); + + final HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, MockPartitionExpressionProxy.class.getName()); + + /* + * To tun these tests on real DB you need to + * - make sure NOTIFICATION_SEQUENCE schema is initialized + * - Uncomment the following settings and fill appropriate values for your setup. + * You also need to add test dependency on mysql-connector driver. + */ + // conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); + // conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, + // "jdbc:mysql://:3306/"); + // conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, ""); + // conf.setVar(HiveConf.ConfVars.METASTOREPWD, ""); + + // We can't rely on this.objectSTore because we are not using derby + ObjectStore myStore = new ObjectStore(); + myStore.setConf(conf); + + // Get current notification value. We assume that no one else modifies notifications + long currentEventId = myStore.getCurrentNotificationEventId().getEventId(); + + // Add NUM_EVENTS notifications and return list if notification IDs added + Callable> addNotifications = new Callable>() { + @Override + public List call() throws Exception { + // We need thread-local object store + ObjectStore store = new ObjectStore(); + store.setConf(conf); + List result = new ArrayList<>(NUM_EVENTS); + NotificationEvent event = + new NotificationEvent(0, 0, EventMessage.EventType.CREATE_DATABASE.toString(), ""); + + // Prepare for the race + barrier.await(); + // Fun part begins + for (int i = 1; i < NUM_EVENTS; i++) { + store.addNotificationEvent(event); + long evId = store.getCurrentNotificationEventId().getEventId(); + // Make sure events do not jump backwards + Assert.assertTrue(evId >= event.getEventId()); + result.add(event.getEventId()); + } + return result; + } + }; + + List>> results = new ArrayList<>(NUM_THREADS); + + // Submit work for all threads + for (int i = 0; i < NUM_THREADS; i++) { + results.add(executor.submit(addNotifications)); + } + + // Collect all results in a map which counts number of times each notification ID is used. + // Later we verify that each count is one + Map ids = new HashMap<>(); + for (Future> r: results) { + List values = r.get(); + for (Long value: values) { + Integer oldVal = ids.get(value); + Assert.assertNull(oldVal); + if (oldVal == null) { + ids.put(value, 1); + } else { + ids.put(value, oldVal + 1); + } + } + } + + // By now all the async work is complete, so we can safely shut down the executor + executor.shutdownNow(); + + // Get latest notification ID + long lastEventId = myStore.getCurrentNotificationEventId().getEventId(); + + Assert.assertEquals(NUM_THREADS * (NUM_EVENTS - 1), lastEventId - currentEventId); + for(long evId = currentEventId + 1; evId <= lastEventId; evId++) { + Integer count = ids.get(evId); + Assert.assertNotNull(count); + Assert.assertEquals(1L, count.longValue()); + } + + // Certify that all notifications returned from getNextNotification() are present + // and properly ordered. + NotificationEventResponse eventResponse = + myStore.getNextNotification(new NotificationEventRequest(currentEventId)); + long prevId = currentEventId; + for (NotificationEvent e: eventResponse.getEvents()) { + Assert.assertEquals(prevId + 1, e.getEventId()); + prevId = e.getEventId(); + Integer count = ids.get(e.getEventId()); + Assert.assertNotNull(count); + Assert.assertEquals(1L, count.longValue()); + } + Assert.assertEquals(prevId, lastEventId); + } } -- 2.15.1