From 980a613eb46cfedabeb40d57b9802599da53d3ac Mon Sep 17 00:00:00 2001 From: Alexander Kolbasov Date: Wed, 21 Feb 2018 20:19:15 -0800 Subject: [PATCH 1/1] HIVE-18768: Use Datanucleus to serialize notification updates --- .../apache/hadoop/hive/metastore/ObjectStore.java | 21 +-- .../hadoop/hive/metastore/TestObjectStore.java | 182 +++++++++++++-------- 2 files changed, 125 insertions(+), 78 deletions(-) diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 3e1fea9d4f..029996b3a3 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -8832,19 +8832,6 @@ private void prepareQuotes() throws SQLException { } } - private void lockForUpdate() throws MetaException { - String selectQuery = "select \"NEXT_EVENT_ID\" from \"NOTIFICATION_SEQUENCE\""; - String selectForUpdateQuery = sqlGenerator.addForUpdateClause(selectQuery); - new RetryingExecutor(conf, () -> { - prepareQuotes(); - Query query = pm.newQuery("javax.jdo.query.SQL", selectForUpdateQuery); - query.setUnique(true); - // only need to execute it to get db Lock - query.execute(); - query.closeAll(); - }).run(); - } - static class RetryingExecutor { interface Command { void process() throws Exception; @@ -8906,7 +8893,13 @@ public void addNotificationEvent(NotificationEvent entry) { Query query = null; try { openTransaction(); - lockForUpdate(); + + // To protect against concurrent modifications of Notification ID, obtain the row lock + // on the NOTIFICATION_SEQUENCE table row. + // This is a WRITE lock and it is maintained for the duration of transaction. + // See http://www.datanucleus.org/products/accessplatform_3_2/jdo/transactions.html + query.setSerializeRead(true); + query = pm.newQuery(MNotificationNextId.class); Collection ids = (Collection) query.execute(); MNotificationNextId mNotificationNextId = null; diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java index ca33b7da21..36228ccf0f 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -60,18 +60,24 @@ import org.slf4j.LoggerFactory; import javax.jdo.Query; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import static java.util.concurrent.Executors.newFixedThreadPool; + @Category(MetastoreUnitTest.class) public class TestObjectStore { private ObjectStore objectStore = null; @@ -485,86 +491,134 @@ public void testNotificationOps() throws InterruptedException { Assert.assertEquals(0, eventResponse.getEventsSize()); } + /** + * Test concurrent updates to notifications.

+ * + * The test uses N threads to concurrently add M notifications. + * It assumes that no other thread modifies nodifications, but there are no assumptions + * about the initial state of notification table.

+ * + * The following assertions are verified: + *

+ * + * @throws ExecutionException + * @throws InterruptedException + */ @Ignore( "This test is here to allow testing with other databases like mysql / postgres etc\n" + " with user changes to the code. This cannot be run on apache derby because of\n" + " https://db.apache.org/derby/docs/10.10/devguide/cdevconcepts842385.html" ) @Test - public void testConcurrentAddNotifications() throws ExecutionException, InterruptedException { + public void testConcurrentNotifications() throws ExecutionException, InterruptedException { - final int NUM_THREADS = 10; - CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_THREADS, - () -> LoggerFactory.getLogger("test") - .debug(NUM_THREADS + " threads going to add notification")); + final int NUM_THREADS = 4; + final int NUM_EVENTS = 200; + // Barrier is used to ensure that all threads start race at the same time + final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS); + ExecutorService executor = newFixedThreadPool(NUM_THREADS); - Configuration conf = MetastoreConf.newMetastoreConf(); + final Configuration conf = MetastoreConf.newMetastoreConf(); MetaStoreTestUtils.setConfForStandloneMode(conf); + /* - Below are the properties that need to be set based on what database this test is going to be run + * To tun these tests on real DB you need to + * - make sure NOTIFICATION_SEQUENCE schema is initialized + * - Uncomment the following settings and fill appropriate values for your setup. + * You also need to add test dependency on mysql-connector driver. */ + // conf.set(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); + // conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY, + // "jdbc:mysql://:3306/"); + // conf.set(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, ""); + // conf.set(HiveConf.ConfVars.METASTOREPWD, ""); -// conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); -// conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, -// "jdbc:mysql://localhost:3306/metastore_db"); -// conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, ""); -// conf.setVar(HiveConf.ConfVars.METASTOREPWD, ""); + // We can't rely on this.objectSTore because we are not using derby + ObjectStore myStore = new ObjectStore(); + myStore.setConf(conf); - /* - we have to add this one manually as for tests the db is initialized via the metastoreDiretSQL - and we don't run the schema creation sql that includes the an insert for notification_sequence - which can be locked. the entry in notification_sequence happens via notification_event insertion. - */ - objectStore.getPersistenceManager().newQuery(MNotificationLog.class, "eventType==''").execute(); - objectStore.getPersistenceManager().newQuery(MNotificationNextId.class, "nextEventId==-1").execute(); - - objectStore.addNotificationEvent( - new NotificationEvent(0, 0, - EventMessage.EventType.CREATE_DATABASE.toString(), - "CREATE DATABASE DB initial")); - - ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + // Get current notification value. We assume that no one else modifies notifications + long currentEventId = myStore.getCurrentNotificationEventId().getEventId(); + + // Add NUM_EVENTS notifications and return list if notification IDs added + Callable> addNotifications = new Callable>() { + @Override + public List call() throws Exception { + // We need thread-local object store + ObjectStore store = new ObjectStore(); + store.setConf(conf); + List result = new ArrayList<>(NUM_EVENTS); + NotificationEvent event = + new NotificationEvent(0, 0, EventMessage.EventType.CREATE_DATABASE.toString(), ""); + + // Prepare for the race + barrier.await(); + // Fun part begins + for (int i = 1; i < NUM_EVENTS; i++) { + store.addNotificationEvent(event); + long evId = store.getCurrentNotificationEventId().getEventId(); + // Make sure events do not jump backwards + Assert.assertTrue(evId >= event.getEventId()); + result.add(event.getEventId()); + } + return result; + } + }; + + List>> results = new ArrayList<>(NUM_THREADS); + + // Submit work for all threads for (int i = 0; i < NUM_THREADS; i++) { - final int n = i; - - executorService.execute( - () -> { - ObjectStore store = new ObjectStore(); - store.setConf(conf); - - String eventType = EventMessage.EventType.CREATE_DATABASE.toString(); - NotificationEvent dbEvent = - new NotificationEvent(0, 0, eventType, - "CREATE DATABASE DB" + n); - System.out.println("ADDING NOTIFICATION"); - - try { - cyclicBarrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - throw new RuntimeException(e); - } - store.addNotificationEvent(dbEvent); - System.out.println("FINISH NOTIFICATION"); - }); + results.add(executor.submit(addNotifications)); + } + + // Collect all results in a map which counts number of times each notification ID is used. + // Later we verify that each count is one + Map ids = new HashMap<>(); + for (Future> r: results) { + List values = r.get(); + for (Long value: values) { + Integer oldVal = ids.get(value); + Assert.assertNull(oldVal); + if (oldVal == null) { + ids.put(value, 1); + } else { + ids.put(value, oldVal + 1); + } + } } - executorService.shutdown(); - Assert.assertTrue(executorService.awaitTermination(15, TimeUnit.SECONDS)); - - // we have to setup this again as the underlying PMF keeps getting reinitialized with original - // reference closed - ObjectStore store = new ObjectStore(); - store.setConf(conf); - - NotificationEventResponse eventResponse = store.getNextNotification( - new NotificationEventRequest()); - Assert.assertEquals(NUM_THREADS + 1, eventResponse.getEventsSize()); - long previousId = 0; - for (NotificationEvent event : eventResponse.getEvents()) { - Assert.assertTrue("previous:" + previousId + " current:" + event.getEventId(), - previousId < event.getEventId()); - Assert.assertTrue(previousId + 1 == event.getEventId()); - previousId = event.getEventId(); + + // By now all the async work is complete, so we can safely shut down the executor + executor.shutdownNow(); + + // Get latest notification ID + long lastEventId = myStore.getCurrentNotificationEventId().getEventId(); + + Assert.assertEquals(NUM_THREADS * (NUM_EVENTS - 1), lastEventId - currentEventId); + for(long evId = currentEventId + 1; evId <= lastEventId; evId++) { + Integer count = ids.get(evId); + Assert.assertNotNull(count); + Assert.assertEquals(1L, count.longValue()); + } + + // Certify that all notifications returned from getNextNotification() are present + // and properly ordered. + NotificationEventResponse eventResponse = + myStore.getNextNotification(new NotificationEventRequest(currentEventId)); + long prevId = currentEventId; + for (NotificationEvent e: eventResponse.getEvents()) { + Assert.assertEquals(prevId + 1, e.getEventId()); + prevId = e.getEventId(); + Integer count = ids.get(e.getEventId()); + Assert.assertNotNull(count); + Assert.assertEquals(1L, count.longValue()); } + Assert.assertEquals(prevId, lastEventId); } } -- 2.16.3