commit 01de603c57e6764fafb36abf327cb9f3cabfda8a Author: Vihang Karajgaonkar Date: Tue Oct 16 12:11:50 2018 -0700 HIVE-20740 : Remove global lock in ObjectStore.setConf method diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 66977d79c946f1ac57aacfbe8704d37bfbac3ea3..22005a5b36920a7aa19006551559d7aab4f4910f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -52,7 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; import javax.jdo.JDOCanRetryException; @@ -142,7 +142,9 @@ private static final DateTimeFormatter YMDHMS_FORMAT = DateTimeFormatter.ofPattern( "yyyy_MM_dd_HH_mm_ss"); - private static Lock pmfPropLock = new ReentrantLock(); + private final static ReentrantReadWriteLock pmfLock = new ReentrantReadWriteLock(); + private final static Lock pmfReadLock = pmfLock.readLock(); + private final static Lock pmfWriteLock = pmfLock.writeLock(); /** * Verify the schema only once per JVM since the db connection info is static */ @@ -222,75 +224,156 @@ public Configuration getConf() { } /** - * Called whenever this object is instantiated using ReflectionUtils, and also - * on connection retries. In cases of connection retries, conf will usually - * contain modified values. + * This method returns a updated instance of PersistenceManagerFactory if needed. It looks + * at the given propsFromConf and compares with the current pmf properties. If there is a + * change in properties, it re-initializes the PMF and returns the updated PMF instance. The + * method is optimized for the most common case when PMF doesn't need to be updated. It acquires + * read locks to first check if properties are changed. In case the properties are not changed, + * it returns the current PersistenceManagerFactory instance which should have a very low + * synchronization overhead. In case the properties are changed it blocks other clients who are + * trying to instantiate ObjectStore or retrieve ObjectStore instance while the update is being + * done to the prop and pmf static objects + * + * @param propsFromConf the input datasource properties which are compared when the stored prop + * to detect if PersistenceManagerFactory pmf needs to be updated + * @return The singleton PersistenceMangerFactory instance used by this ObjectStore */ - @Override - @SuppressWarnings("nls") - public void setConf(Configuration conf) { - // Although an instance of ObjectStore is accessed by one thread, there may - // be many threads with ObjectStore instances. So the static variables - // pmf and prop need to be protected with locks. - pmfPropLock.lock(); - try { - isInitialized = false; - this.conf = conf; - this.areTxnStatsSupported = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED); - configureSSL(conf); - Properties propsFromConf = getDataSourceProps(conf); - boolean propsChanged = !propsFromConf.equals(prop); - - if (propsChanged) { - if (pmf != null){ - clearOutPmfClassLoaderCache(pmf); - if (!forTwoMetastoreTesting) { - // close the underlying connection pool to avoid leaks - pmf.close(); - } + private static PersistenceManagerFactory getUpdatedPmfIfNeeded(Properties propsFromConf) { + // take a read lock to check if the datasource properties changed. + // Most common case is that datasource properties do not change + boolean propsChanged; + pmfReadLock.lock(); + try { + propsChanged = !propsFromConf.equals(prop); + } finally { + pmfReadLock.unlock(); + } + // if pmf properties didn't change, no need to update + if (!propsChanged) { + return pmf; + } + // pmf properties changed, we need to update the prop and pmf static objects + // by taking a write lock + pmfWriteLock.lock(); + try { + // its possible that mulitple threads detected prop change at the same time above + // do another check here so pmf is updated only when its really necessary + if (prop != null && !propsFromConf.equals(prop)) { + return pmf; + } + // OK, now we really need to re-initialize pmf and pmf properties + if (pmf != null) { + clearOutPmfClassLoaderCache(pmf); + if (!forTwoMetastoreTesting) { + // close the underlying connection pool to avoid leaks + pmf.close(); } pmf = null; - prop = null; } + // update the pmf properties object then initialize pmf using them + prop = propsFromConf; + Configuration conf = MetastoreConf.newMetastoreConf(); + DataSourceProvider dsp = DataSourceProviderFactory + .hasProviderSpecificConfigurations(conf) ? DataSourceProviderFactory + .getDataSourceProvider(conf) : null; - assert(!isActiveTransaction()); - shutdown(); - // Always want to re-create pm as we don't know if it were created by the - // most recent instance of the pmf - pm = null; - directSql = null; - expressionProxy = null; - openTrasactionCalls = 0; - currentTransaction = null; - transactionStatus = TXN_STATUS.NO_STATE; - - initialize(propsFromConf); - - String partitionValidationRegex = - MetastoreConf.getVar(this.conf, ConfVars.PARTITION_NAME_WHITELIST_PATTERN); - if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { - partitionValidationPattern = Pattern.compile(partitionValidationRegex); + if (dsp == null) { + pmf = JDOHelper.getPersistenceManagerFactory(prop); } else { - partitionValidationPattern = null; - } - - // Note, if metrics have not been initialized this will return null, which means we aren't - // using metrics. Thus we should always check whether this is non-null before using. - MetricRegistry registry = Metrics.getRegistry(); - if (registry != null) { - directSqlErrors = Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS); + try { + DataSource ds = dsp.create(conf); + Map dsProperties = new HashMap<>(); + //Any preexisting datanucleus property should be passed along + dsProperties.putAll(prop); + dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds); + dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds); + dsProperties.put(MetastoreConf.ConfVars.MANAGER_FACTORY_CLASS.getVarname(), + "org.datanucleus.api.jdo.JDOPersistenceManagerFactory"); + pmf = JDOHelper.getPersistenceManagerFactory(dsProperties); + } catch (SQLException e) { + LOG.warn("Could not create PersistenceManagerFactory using " + + "connection pool properties, will fall back", e); + pmf = JDOHelper.getPersistenceManagerFactory(prop); + } } - - this.batchSize = MetastoreConf.getIntVar(conf, ConfVars.RAWSTORE_PARTITION_BATCH_SIZE); - - if (!isInitialized) { - throw new RuntimeException( - "Unable to create persistence manager. Check dss.log for details"); + DataStoreCache dsc = pmf.getDataStoreCache(); + if (dsc != null) { + String objTypes = MetastoreConf.getVar(conf, ConfVars.CACHE_PINOBJTYPES); + LOG.info( + "Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=\"{}\"", + objTypes); + if (org.apache.commons.lang.StringUtils.isNotEmpty(objTypes)) { + String[] typeTokens = objTypes.toLowerCase().split(","); + for (String type : typeTokens) { + type = type.trim(); + if (PINCLASSMAP.containsKey(type)) { + dsc.pinAll(true, PINCLASSMAP.get(type)); + } else { + LOG.warn("{} is not one of the pinnable object types: {}", type, + org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), " ")); + } + } + } } else { - LOG.debug("Initialized ObjectStore"); + LOG.warn( + "PersistenceManagerFactory returned null DataStoreCache object. Unable to initialize object pin types defined by hive.metastore.cache.pinobjtypes"); } + return pmf; } finally { - pmfPropLock.unlock(); + pmfWriteLock.unlock(); + } + } + + /** + * Called whenever this object is instantiated using ReflectionUtils, and also + * on connection retries. In cases of connection retries, conf will usually + * contain modified values. + */ + @Override + @SuppressWarnings("nls") + public void setConf(Configuration conf) { + isInitialized = false; + this.conf = conf; + this.areTxnStatsSupported = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED); + configureSSL(conf); + Properties propsFromConf = getDataSourceProps(conf); + + getUpdatedPmfIfNeeded(propsFromConf); + + assert (!isActiveTransaction()); + shutdown(); + // Always want to re-create pm as we don't know if it were created by the + // most recent instance of the pmf + pm = null; + directSql = null; + expressionProxy = null; + openTrasactionCalls = 0; + currentTransaction = null; + transactionStatus = TXN_STATUS.NO_STATE; + + initialize(); + + String partitionValidationRegex = + MetastoreConf.getVar(this.conf, ConfVars.PARTITION_NAME_WHITELIST_PATTERN); + if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { + partitionValidationPattern = Pattern.compile(partitionValidationRegex); + } else { + partitionValidationPattern = null; + } + + // Note, if metrics have not been initialized this will return null, which means we aren't + // using metrics. Thus we should always check whether this is non-null before using. + MetricRegistry registry = Metrics.getRegistry(); + if (registry != null) { + directSqlErrors = Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS); + } + + this.batchSize = MetastoreConf.getIntVar(conf, ConfVars.RAWSTORE_PARTITION_BATCH_SIZE); + + if (!isInitialized) { + throw new RuntimeException("Unable to create persistence manager. Check dss.log for details"); + } else { + LOG.debug("Initialized ObjectStore"); } } @@ -303,7 +386,7 @@ public void setConf(Configuration conf) { } @SuppressWarnings("nls") - private void initialize(Properties dsProps) { + private void initialize() { int retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS); long retryInterval = MetastoreConf.getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS); @@ -311,7 +394,7 @@ private void initialize(Properties dsProps) { while (numTries > 0){ try { - initializeHelper(dsProps); + initializeHelper(); return; // If we reach here, we succeed. } catch (Exception e){ shutdown(); @@ -370,11 +453,9 @@ private boolean isRetriableException(Throwable e) { /** * private helper to do initialization routine, so we can retry if needed if it fails. - * @param dsProps */ - private void initializeHelper(Properties dsProps) { + private void initializeHelper() { LOG.debug("ObjectStore, initialize called"); - prop = dsProps; pm = getPersistenceManager(); LOG.info("RawStore: {}, with PersistenceManager: {}" + " created in the thread with id: {}", this, pm, Thread.currentThread().getId()); @@ -536,59 +617,10 @@ private static void correctAutoStartMechanism(Configuration conf) { conf.set(autoStartKey, autoStartIgnore); } - private static synchronized PersistenceManagerFactory getPMF() { - if (pmf == null) { - - Configuration conf = MetastoreConf.newMetastoreConf(); - DataSourceProvider dsp = DataSourceProviderFactory.hasProviderSpecificConfigurations(conf) ? - DataSourceProviderFactory.getDataSourceProvider(conf) : null; - - if (dsp == null) { - pmf = JDOHelper.getPersistenceManagerFactory(prop); - } else { - try { - DataSource ds = dsp.create(conf); - Map dsProperties = new HashMap<>(); - //Any preexisting datanucleus property should be passed along - dsProperties.putAll(prop); - dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds); - dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds); - dsProperties.put("javax.jdo.PersistenceManagerFactoryClass", - "org.datanucleus.api.jdo.JDOPersistenceManagerFactory"); - pmf = JDOHelper.getPersistenceManagerFactory(dsProperties); - } catch (SQLException e) { - LOG.warn("Could not create PersistenceManagerFactory using " + - "connection pool properties, will fall back", e); - pmf = JDOHelper.getPersistenceManagerFactory(prop); - } - } - DataStoreCache dsc = pmf.getDataStoreCache(); - if (dsc != null) { - String objTypes = MetastoreConf.getVar(conf, ConfVars.CACHE_PINOBJTYPES); - LOG.info("Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=\"{}\"", objTypes); - if (org.apache.commons.lang.StringUtils.isNotEmpty(objTypes)) { - String[] typeTokens = objTypes.toLowerCase().split(","); - for (String type : typeTokens) { - type = type.trim(); - if (PINCLASSMAP.containsKey(type)) { - dsc.pinAll(true, PINCLASSMAP.get(type)); - } else { - LOG.warn("{} is not one of the pinnable object types: {}", type, - org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), " ")); - } - } - } - } else { - LOG.warn("PersistenceManagerFactory returned null DataStoreCache object. Unable to initialize object pin types defined by hive.metastore.cache.pinobjtypes"); - } - } - return pmf; - } - @InterfaceAudience.LimitedPrivate({"HCATALOG"}) @InterfaceStability.Evolving public PersistenceManager getPersistenceManager() { - return getPMF().getPersistenceManager(); + return getUpdatedPmfIfNeeded(prop).getPersistenceManager(); } @Override @@ -10243,7 +10275,7 @@ public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { * We're not clearing the cache HashMap by calling HashMap#clear to avoid concurrency issues. */ public static void unCacheDataNucleusClassLoaders() { - PersistenceManagerFactory pmf = ObjectStore.getPMF(); + PersistenceManagerFactory pmf = ObjectStore.getUpdatedPmfIfNeeded(prop); clearOutPmfClassLoaderCache(pmf); } diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java index b74c3048fa2e18adc7f0d7cc813a180d4466fa36..d5fa1cb88eaf9a1e599068511d6de08d39266916 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -86,14 +86,19 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Random; import java.util.Set; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; @Category(MetastoreUnitTest.class) @@ -867,7 +872,7 @@ public void testConcurrentAddNotifications() throws ExecutionException, Interrup EventMessage.EventType.CREATE_DATABASE.toString(), "CREATE DATABASE DB initial")); - ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + ExecutorService executorService = newFixedThreadPool(NUM_THREADS); for (int i = 0; i < NUM_THREADS; i++) { final int n = i; @@ -911,6 +916,51 @@ public void testConcurrentAddNotifications() throws ExecutionException, Interrup } } + /** + * This test calls ObjectStore.setConf methods from multiple threads. Each threads uses its + * own instance of ObjectStore to simulate thread-local objectstore behaviour. Before each + * invocation the datasource property is changed which forces a reinitialization of + * PersistenceManagerFactory instance in each call + * @throws Exception + */ + @Test + public void testConcurrentPMFInitialize() throws Exception { + final String dataSourceProp = "datanucleus.connectionPool.maxPoolSize"; + // Barrier is used to ensure that all threads start race at the same time + final int numThreads = 10; + final int numIteration = 50; + final CyclicBarrier barrier = new CyclicBarrier(numThreads); + final AtomicInteger counter = new AtomicInteger(0); + ExecutorService executor = newFixedThreadPool(numThreads); + List> results = new ArrayList<>(numThreads); + for (int i = 0; i < numThreads; i++) { + final Random random = new Random(); + Configuration conf = MetastoreConf.newMetastoreConf(); + MetaStoreTestUtils.setConfForStandloneMode(conf); + results.add(executor.submit(new Callable() { + @Override + public Void call() throws Exception { + // each thread gets its own ObjectStore to simulate threadLocal store + ObjectStore objectStore = new ObjectStore(); + barrier.await(); + for (int j = 0; j < numIteration; j++) { + // set connectionPool to a random value to increase the likelihood of pmf + // re-initialization + conf.set(dataSourceProp, Integer.toString(random.nextInt(100))); + objectStore.setConf(conf); + counter.getAndIncrement(); + } + return null; + } + })); + } + for (Future future : results) { + future.get(60, TimeUnit.SECONDS); + } + Assert.assertEquals("Unexpected number of setConf calls", numIteration * numThreads, + counter.get()); + } + private void createTestCatalog(String catName) throws MetaException { Catalog cat = new CatalogBuilder() .setName(catName)