commit 9ab9dadb9e34a44e4384a51eade1e6a33f68a1d4 Author: Vihang Karajgaonkar Date: Tue Oct 16 12:11:50 2018 -0700 HIVE-20740 : Remove global lock in ObjectStore.setConf method diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 75cd68a9d6be6c5804e458d19a0023f0d7f5beae..459e296e9194f2943af645f043ca97b7eda25bda 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; -import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.PersistenceManagerProvider; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -199,7 +199,7 @@ static void internalBeforeClassSetup(Map additionalProperties) driverMirror = DriverFactory.newDriver(hconfMirror); metaStoreClientMirror = new HiveMetaStoreClient(hconfMirror); - ObjectStore.setTwoMetastoreTesting(true); + PersistenceManagerProvider.setTwoMetastoreTesting(true); } @AfterClass diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 5cb0a887e672f49739f5b648e608fba66de06326..a2da15f5b68227d8a9da521167b0c346e23798c4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.PersistenceManagerProvider; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.UDF; @@ -1147,7 +1148,7 @@ private int getNucleusClassLoaderResolverMapSize() { NucleusContext nc = null; Map cMap; try { - pmf = ObjectStore.class.getDeclaredField("pmf"); + pmf = PersistenceManagerProvider.class.getDeclaredField("pmf"); if (pmf != null) { pmf.setAccessible(true); jdoPmf = (JDOPersistenceManagerFactory) pmf.get(null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index ee7c940d2b7fbd66af2d006da0585c6b42b9b0bb..499e923aede684bbe9f8acd33d1a2bfe6109aa6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.PersistenceManagerProvider; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -1818,7 +1819,7 @@ private void unCacheDataNucleusClassLoaders() { } Class clazz = Class.forName(realStoreImpl); if (ObjectStore.class.isAssignableFrom(clazz)) { - ObjectStore.unCacheDataNucleusClassLoaders(); + PersistenceManagerProvider.clearOutPmfClassLoaderCache(); } } } catch (Exception e) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 03e3a2d2573b54651833867b906821650f4fb9c1..0546cdf045cc68a91f5938740ccf275061aea1dc 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; import java.io.IOException; -import java.lang.reflect.Field; import java.net.InetAddress; import java.net.URI; import java.nio.ByteBuffer; @@ -52,22 +51,18 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; import javax.jdo.JDOCanRetryException; import javax.jdo.JDODataStoreException; import javax.jdo.JDOException; -import javax.jdo.JDOHelper; import javax.jdo.JDOObjectNotFoundException; import javax.jdo.PersistenceManager; -import javax.jdo.PersistenceManagerFactory; import javax.jdo.Query; import javax.jdo.Transaction; -import javax.jdo.datastore.DataStoreCache; import javax.jdo.datastore.JDOConnection; import javax.jdo.identity.IntIdentity; -import javax.sql.DataSource; import com.google.common.base.Joiner; import com.google.common.base.Strings; @@ -86,8 +81,6 @@ import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; -import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.model.*; @@ -104,16 +97,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.ObjectPair; import org.apache.thrift.TException; -import org.datanucleus.AbstractNucleusContext; -import org.datanucleus.ClassLoaderResolver; -import org.datanucleus.ClassLoaderResolverImpl; -import org.datanucleus.NucleusContext; -import org.datanucleus.PropertyNames; -import org.datanucleus.api.jdo.JDOPersistenceManager; -import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; import org.datanucleus.store.rdbms.exceptions.MissingTableException; -import org.datanucleus.store.scostore.Store; -import org.datanucleus.util.WeakValueMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,15 +118,14 @@ * filestore. */ public class ObjectStore implements RawStore, Configurable { - private static Properties prop = null; - private static PersistenceManagerFactory pmf = null; - private static boolean forTwoMetastoreTesting = false; private int batchSize = Batchable.NO_BATCHING; private static final DateTimeFormatter YMDHMS_FORMAT = DateTimeFormatter.ofPattern( "yyyy_MM_dd_HH_mm_ss"); - private static Lock pmfPropLock = new ReentrantLock(); + private final static ReentrantReadWriteLock pmfLock = new ReentrantReadWriteLock(); + private final static Lock pmfReadLock = pmfLock.readLock(); + private final static Lock pmfWriteLock = pmfLock.writeLock(); /** * Verify the schema only once per JVM since the db connection info is static */ @@ -229,68 +212,46 @@ public Configuration getConf() { @Override @SuppressWarnings("nls") public void setConf(Configuration conf) { - // Although an instance of ObjectStore is accessed by one thread, there may - // be many threads with ObjectStore instances. So the static variables - // pmf and prop need to be protected with locks. - pmfPropLock.lock(); - try { - isInitialized = false; - this.conf = conf; - this.areTxnStatsSupported = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED); - configureSSL(conf); - Properties propsFromConf = getDataSourceProps(conf); - boolean propsChanged = !propsFromConf.equals(prop); - - if (propsChanged) { - if (pmf != null){ - clearOutPmfClassLoaderCache(pmf); - if (!forTwoMetastoreTesting) { - // close the underlying connection pool to avoid leaks - pmf.close(); - } - } - pmf = null; - prop = null; - } - - assert(!isActiveTransaction()); - shutdown(); - // Always want to re-create pm as we don't know if it were created by the - // most recent instance of the pmf - pm = null; - directSql = null; - expressionProxy = null; - openTrasactionCalls = 0; - currentTransaction = null; - transactionStatus = TXN_STATUS.NO_STATE; - - initialize(propsFromConf); - - String partitionValidationRegex = - MetastoreConf.getVar(this.conf, ConfVars.PARTITION_NAME_WHITELIST_PATTERN); - if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { - partitionValidationPattern = Pattern.compile(partitionValidationRegex); - } else { - partitionValidationPattern = null; - } + isInitialized = false; + this.conf = conf; + this.areTxnStatsSupported = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED); + configureSSL(conf); + PersistenceManagerProvider.updatePmfProperties(conf); + + assert (!isActiveTransaction()); + shutdown(); + // Always want to re-create pm as we don't know if it were created by the + // most recent instance of the pmf + pm = null; + directSql = null; + expressionProxy = null; + openTrasactionCalls = 0; + currentTransaction = null; + transactionStatus = TXN_STATUS.NO_STATE; + + initialize(); + + String partitionValidationRegex = + MetastoreConf.getVar(this.conf, ConfVars.PARTITION_NAME_WHITELIST_PATTERN); + if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { + partitionValidationPattern = Pattern.compile(partitionValidationRegex); + } else { + partitionValidationPattern = null; + } - // Note, if metrics have not been initialized this will return null, which means we aren't - // using metrics. Thus we should always check whether this is non-null before using. - MetricRegistry registry = Metrics.getRegistry(); - if (registry != null) { - directSqlErrors = Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS); - } + // Note, if metrics have not been initialized this will return null, which means we aren't + // using metrics. Thus we should always check whether this is non-null before using. + MetricRegistry registry = Metrics.getRegistry(); + if (registry != null) { + directSqlErrors = Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS); + } - this.batchSize = MetastoreConf.getIntVar(conf, ConfVars.RAWSTORE_PARTITION_BATCH_SIZE); + this.batchSize = MetastoreConf.getIntVar(conf, ConfVars.RAWSTORE_PARTITION_BATCH_SIZE); - if (!isInitialized) { - throw new RuntimeException( - "Unable to create persistence manager. Check dss.log for details"); - } else { - LOG.debug("Initialized ObjectStore"); - } - } finally { - pmfPropLock.unlock(); + if (!isInitialized) { + throw new RuntimeException("Unable to create persistence manager. Check dss.log for details"); + } else { + LOG.debug("Initialized ObjectStore"); } } @@ -303,81 +264,13 @@ public void setConf(Configuration conf) { } @SuppressWarnings("nls") - private void initialize(Properties dsProps) { - int retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS); - long retryInterval = MetastoreConf.getTimeVar(conf, - ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS); - int numTries = retryLimit; - - while (numTries > 0){ - try { - initializeHelper(dsProps); - return; // If we reach here, we succeed. - } catch (Exception e){ - shutdown(); - numTries--; - boolean retriable = isRetriableException(e); - if ((numTries > 0) && retriable){ - LOG.info("Retriable exception while instantiating ObjectStore, retrying. " + - "{} tries left", numTries, e); - try { - Thread.sleep(retryInterval); - } catch (InterruptedException ie) { - // Restore the interrupted status, since we do not want to catch it. - LOG.debug("Interrupted while sleeping before retrying.", ie); - Thread.currentThread().interrupt(); - } - // If we're here, we'll proceed down the next while loop iteration. - } else { - // we've reached our limit, throw the last one. - if (retriable){ - LOG.warn("Exception retry limit reached, not retrying any longer.", - e); - } else { - LOG.debug("Non-retriable exception during ObjectStore initialize.", e); - } - throw e; - } - } - } - } - - private static final Set> retriableExceptionClasses = - new HashSet<>(Arrays.asList(JDOCanRetryException.class)); - /** - * Helper function for initialize to determine if we should retry an exception. - * We return true if the exception is of a known type of retriable exceptions, or if one - * of its recursive .getCause returns a known type of retriable exception. - */ - private boolean isRetriableException(Throwable e) { - if (e == null){ - return false; - } - if (retriableExceptionClasses.contains(e.getClass())){ - return true; - } - for (Class c : retriableExceptionClasses){ - if (c.isInstance(e)){ - return true; - } - } - - if (e.getCause() == null){ - return false; - } - return isRetriableException(e.getCause()); - } - - /** - * private helper to do initialization routine, so we can retry if needed if it fails. - * @param dsProps - */ - private void initializeHelper(Properties dsProps) { + private void initialize() { LOG.debug("ObjectStore, initialize called"); - prop = dsProps; - pm = getPersistenceManager(); + // if this method fails, PersistenceManagerProvider will retry for the configured number of times + // before giving up + pm = PersistenceManagerProvider.getPersistenceManager(); LOG.info("RawStore: {}, with PersistenceManager: {}" + - " created in the thread with id: {}", this, pm, Thread.currentThread().getId()); + " created in the thread with id: {}", this, pm, Thread.currentThread().getId()); try { String productName = MetaStoreDirectSql.getProductName(pm); sqlGenerator = new SQLGenerator(DatabaseProduct.determineDatabaseProduct(productName), conf); @@ -390,7 +283,7 @@ private void initializeHelper(Properties dsProps) { dbType = determineDatabaseProduct(); expressionProxy = createExpressionProxy(conf); if (MetastoreConf.getBoolVar(getConf(), ConfVars.TRY_DIRECT_SQL)) { - String schema = prop.getProperty("javax.jdo.mapping.Schema"); + String schema = PersistenceManagerProvider.getProperty("javax.jdo.mapping.Schema"); schema = org.apache.commons.lang.StringUtils.defaultIfBlank(schema, null); directSql = new MetaStoreDirectSql(pm, conf, schema); } @@ -457,143 +350,15 @@ private static void configureSSL(Configuration conf) { } } - /** - * Properties specified in hive-default.xml override the properties specified - * in jpox.properties. - */ - @SuppressWarnings("nls") - private static Properties getDataSourceProps(Configuration conf) { - Properties prop = new Properties(); - correctAutoStartMechanism(conf); - - // First, go through and set all our values for datanucleus and javax.jdo parameters. This - // has to be a separate first step because we don't set the default values in the config object. - for (ConfVars var : MetastoreConf.dataNucleusAndJdoConfs) { - String confVal = MetastoreConf.getAsString(conf, var); - String varName = var.getVarname(); - Object prevVal = prop.setProperty(varName, confVal); - if (MetastoreConf.isPrintable(varName)) { - LOG.debug("Overriding {} value {} from jpox.properties with {}", - varName, prevVal, confVal); - } - } - - // Now, we need to look for any values that the user set that MetastoreConf doesn't know about. - // TODO Commenting this out for now, as it breaks because the conf values aren't getting properly - // interpolated in case of variables. See HIVE-17788. - /* - for (Map.Entry e : conf) { - if (e.getKey().startsWith("datanucleus.") || e.getKey().startsWith("javax.jdo.")) { - // We have to handle this differently depending on whether it is a value known to - // MetastoreConf or not. If it is, we need to get the default value if a value isn't - // provided. If not, we just set whatever the user has set. - Object prevVal = prop.setProperty(e.getKey(), e.getValue()); - if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(e.getKey())) { - LOG.debug("Overriding " + e.getKey() + " value " + prevVal - + " from jpox.properties with " + e.getValue()); - } - } - } - */ - - // Password may no longer be in the conf, use getPassword() - try { - String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD); - if (org.apache.commons.lang.StringUtils.isNotEmpty(passwd)) { - // We can get away with the use of varname here because varname == hiveName for PWD - prop.setProperty(ConfVars.PWD.getVarname(), passwd); - } - } catch (IOException err) { - throw new RuntimeException("Error getting metastore password: " + err.getMessage(), err); - } - - if (LOG.isDebugEnabled()) { - for (Entry e : prop.entrySet()) { - if (MetastoreConf.isPrintable(e.getKey().toString())) { - LOG.debug("{} = {}", e.getKey(), e.getValue()); - } - } - } - - return prop; - } - - /** - * Update conf to set datanucleus.autoStartMechanismMode=ignored. - * This is necessary to able to use older version of hive against - * an upgraded but compatible metastore schema in db from new version - * of hive - * @param conf - */ - private static void correctAutoStartMechanism(Configuration conf) { - final String autoStartKey = "datanucleus.autoStartMechanismMode"; - final String autoStartIgnore = "ignored"; - String currentAutoStartVal = conf.get(autoStartKey); - if (!autoStartIgnore.equalsIgnoreCase(currentAutoStartVal)) { - LOG.warn("{} is set to unsupported value {} . Setting it to value: {}", autoStartKey, - conf.get(autoStartKey), autoStartIgnore); - } - conf.set(autoStartKey, autoStartIgnore); - } - - private static synchronized PersistenceManagerFactory getPMF() { - if (pmf == null) { - - Configuration conf = MetastoreConf.newMetastoreConf(); - DataSourceProvider dsp = DataSourceProviderFactory.hasProviderSpecificConfigurations(conf) ? - DataSourceProviderFactory.getDataSourceProvider(conf) : null; - - if (dsp == null) { - pmf = JDOHelper.getPersistenceManagerFactory(prop); - } else { - try { - DataSource ds = dsp.create(conf); - Map dsProperties = new HashMap<>(); - //Any preexisting datanucleus property should be passed along - dsProperties.putAll(prop); - dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds); - dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds); - dsProperties.put("javax.jdo.PersistenceManagerFactoryClass", - "org.datanucleus.api.jdo.JDOPersistenceManagerFactory"); - pmf = JDOHelper.getPersistenceManagerFactory(dsProperties); - } catch (SQLException e) { - LOG.warn("Could not create PersistenceManagerFactory using " + - "connection pool properties, will fall back", e); - pmf = JDOHelper.getPersistenceManagerFactory(prop); - } - } - DataStoreCache dsc = pmf.getDataStoreCache(); - if (dsc != null) { - String objTypes = MetastoreConf.getVar(conf, ConfVars.CACHE_PINOBJTYPES); - LOG.info("Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=\"{}\"", objTypes); - if (org.apache.commons.lang.StringUtils.isNotEmpty(objTypes)) { - String[] typeTokens = objTypes.toLowerCase().split(","); - for (String type : typeTokens) { - type = type.trim(); - if (PINCLASSMAP.containsKey(type)) { - dsc.pinAll(true, PINCLASSMAP.get(type)); - } else { - LOG.warn("{} is not one of the pinnable object types: {}", type, - org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), " ")); - } - } - } - } else { - LOG.warn("PersistenceManagerFactory returned null DataStoreCache object. Unable to initialize object pin types defined by hive.metastore.cache.pinobjtypes"); - } - } - return pmf; - } - @InterfaceAudience.LimitedPrivate({"HCATALOG"}) @InterfaceStability.Evolving public PersistenceManager getPersistenceManager() { - return getPMF().getPersistenceManager(); + return PersistenceManagerProvider.getPersistenceManager(); } @Override public void shutdown() { - LOG.info("RawStore: {}, with PersistenceManager: {} will be shutdown", this, pm); + LOG.info("RawStore: {}, with PersistenceManager: {} will be shutdown. Callstack : {}", this, pm, getCallStack()); if (pm != null) { pm.close(); pm = null; @@ -9750,7 +9515,7 @@ private void debugLog(String message) { } } - private static final int stackLimit = 3; + private static final int stackLimit = 20; private String getCallStack() { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); @@ -9758,7 +9523,7 @@ private String getCallStack() { StringBuilder sb = new StringBuilder(); sb.append(" at:"); // Offset by 4 because the first 4 frames are just calls to get down here. - for (int i = 4; i < thislimit + 4; i++) { + for (int i = 0; i < stackTrace.length; i++) { sb.append("\n\t"); sb.append(stackTrace[i].toString()); } @@ -10372,113 +10137,6 @@ public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { throw new UnsupportedOperationException(); } - /** - * Removed cached classloaders from DataNucleus - * DataNucleus caches classloaders in NucleusContext. - * In UDFs, this can result in classloaders not getting GCed resulting in PermGen leaks. - * This is particularly an issue when using embedded metastore with HiveServer2, - * since the current classloader gets modified with each new add jar, - * becoming the classloader for downstream classes, which DataNucleus ends up using. - * The NucleusContext cache gets freed up only on calling a close on it. - * We're not closing NucleusContext since it does a bunch of other things which we don't want. - * We're not clearing the cache HashMap by calling HashMap#clear to avoid concurrency issues. - */ - public static void unCacheDataNucleusClassLoaders() { - PersistenceManagerFactory pmf = ObjectStore.getPMF(); - clearOutPmfClassLoaderCache(pmf); - } - - private static void clearOutPmfClassLoaderCache(PersistenceManagerFactory pmf) { - if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) { - return; - } - // NOTE : This is hacky, and this section of code is fragile depending on DN code varnames - // so it's likely to stop working at some time in the future, especially if we upgrade DN - // versions, so we actively need to find a better way to make sure the leak doesn't happen - // instead of just clearing out the cache after every call. - JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf; - NucleusContext nc = jdoPmf.getNucleusContext(); - try { - Field pmCache = pmf.getClass().getDeclaredField("pmCache"); - pmCache.setAccessible(true); - Set pmSet = (Set)pmCache.get(pmf); - for (JDOPersistenceManager pm : pmSet) { - org.datanucleus.ExecutionContext ec = pm.getExecutionContext(); - if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) { - ClassLoaderResolver clr = ((org.datanucleus.ExecutionContextThreadedImpl)ec).getClassLoaderResolver(); - clearClr(clr); - } - } - org.datanucleus.plugin.PluginManager pluginManager = jdoPmf.getNucleusContext().getPluginManager(); - Field registryField = pluginManager.getClass().getDeclaredField("registry"); - registryField.setAccessible(true); - org.datanucleus.plugin.PluginRegistry registry = (org.datanucleus.plugin.PluginRegistry)registryField.get(pluginManager); - if (registry instanceof org.datanucleus.plugin.NonManagedPluginRegistry) { - org.datanucleus.plugin.NonManagedPluginRegistry nRegistry = (org.datanucleus.plugin.NonManagedPluginRegistry)registry; - Field clrField = nRegistry.getClass().getDeclaredField("clr"); - clrField.setAccessible(true); - ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(nRegistry); - clearClr(clr); - } - if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) { - org.datanucleus.PersistenceNucleusContextImpl pnc = (org.datanucleus.PersistenceNucleusContextImpl)nc; - org.datanucleus.store.types.TypeManagerImpl tm = (org.datanucleus.store.types.TypeManagerImpl)pnc.getTypeManager(); - Field clrField = tm.getClass().getDeclaredField("clr"); - clrField.setAccessible(true); - ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(tm); - clearClr(clr); - Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr"); - storeMgrField.setAccessible(true); - org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr = (org.datanucleus.store.rdbms.RDBMSStoreManager)storeMgrField.get(pnc); - Field backingStoreField = storeMgr.getClass().getDeclaredField("backingStoreByMemberName"); - backingStoreField.setAccessible(true); - Map backingStoreByMemberName = (Map)backingStoreField.get(storeMgr); - for (Store store : backingStoreByMemberName.values()) { - org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore = (org.datanucleus.store.rdbms.scostore.BaseContainerStore)store; - clrField = org.datanucleus.store.rdbms.scostore.BaseContainerStore.class.getDeclaredField("clr"); - clrField.setAccessible(true); - clr = (ClassLoaderResolver)clrField.get(baseStore); - clearClr(clr); - } - } - Field classLoaderResolverMap = AbstractNucleusContext.class.getDeclaredField( - "classLoaderResolverMap"); - classLoaderResolverMap.setAccessible(true); - Map loaderMap = - (Map) classLoaderResolverMap.get(nc); - for (ClassLoaderResolver clr : loaderMap.values()){ - clearClr(clr); - } - classLoaderResolverMap.set(nc, new HashMap()); - LOG.debug("Removed cached classloaders from DataNucleus NucleusContext"); - } catch (Exception e) { - LOG.warn("Failed to remove cached classloaders from DataNucleus NucleusContext", e); - } - } - - private static void clearClr(ClassLoaderResolver clr) throws Exception { - if (clr != null){ - if (clr instanceof ClassLoaderResolverImpl){ - ClassLoaderResolverImpl clri = (ClassLoaderResolverImpl) clr; - long resourcesCleared = clearFieldMap(clri,"resources"); - long loadedClassesCleared = clearFieldMap(clri,"loadedClasses"); - long unloadedClassesCleared = clearFieldMap(clri, "unloadedClasses"); - LOG.debug("Cleared ClassLoaderResolverImpl: {}, {}, {}", - resourcesCleared, loadedClassesCleared, unloadedClassesCleared); - } - } - } - private static long clearFieldMap(ClassLoaderResolverImpl clri, String mapFieldName) throws Exception { - Field mapField = ClassLoaderResolverImpl.class.getDeclaredField(mapFieldName); - mapField.setAccessible(true); - - Map map = (Map) mapField.get(clri); - long sz = map.size(); - mapField.set(clri, Collections.synchronizedMap(new WeakValueMap())); - return sz; - } - - @Override public List getPrimaryKeys(String catName, String db_name, String tbl_name) throws MetaException { @@ -11540,20 +11198,6 @@ void rollbackAndCleanup(boolean success, QueryWrapper queryWrapper) { } } - /** - * To make possible to run multiple metastore in unit test - * @param twoMetastoreTesting if we are using multiple metastore in unit test - */ - @VisibleForTesting - public static void setTwoMetastoreTesting(boolean twoMetastoreTesting) { - forTwoMetastoreTesting = twoMetastoreTesting; - } - - @VisibleForTesting - Properties getProp() { - return prop; - } - private void checkForConstraintException(Exception e, String msg) throws AlreadyExistsException { if (getConstraintException(e) != null) { LOG.error(msg, e); @@ -12736,9 +12380,7 @@ public int deleteRuntimeStats(int maxRetainSecs) throws MetaException { * ~ COLUMN_STATE_ACCURATE(CSA) state is true * ~ Isolation-level (snapshot) compliant with the query * @param tbl MTable of the stats entity - * @param queryTxnId transaction id of the query * @param queryValidWriteIdList valid writeId list of the query - * @param queryWriteId writeId of the query * @Precondition "tbl" should be retrieved from the TBLS table. */ private boolean isCurrentStatsValidForTheQuery(MTable tbl, String queryValidWriteIdList, @@ -12757,7 +12399,6 @@ private boolean isCurrentStatsValidForTheQuery(MTable tbl, String queryValidWrit * ~ COLUMN_STATE_ACCURATE(CSA) state is true * ~ Isolation-level (snapshot) compliant with the query * @param part MPartition of the stats entity - * @param txnId transaction id of the query * @param queryValidWriteIdList valid writeId list of the query * @Precondition "part" should be retrieved from the PARTITIONS table. */ diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..516cb9ac4684ba4609c5d3f5f898e07123b413a7 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java @@ -0,0 +1,534 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; +import org.apache.hadoop.hive.metastore.model.MDatabase; +import org.apache.hadoop.hive.metastore.model.MFieldSchema; +import org.apache.hadoop.hive.metastore.model.MOrder; +import org.apache.hadoop.hive.metastore.model.MPartition; +import org.apache.hadoop.hive.metastore.model.MSerDeInfo; +import org.apache.hadoop.hive.metastore.model.MStorageDescriptor; +import org.apache.hadoop.hive.metastore.model.MTable; +import org.apache.hadoop.hive.metastore.model.MType; +import org.datanucleus.AbstractNucleusContext; +import org.datanucleus.ClassLoaderResolver; +import org.datanucleus.ClassLoaderResolverImpl; +import org.datanucleus.NucleusContext; +import org.datanucleus.PropertyNames; +import org.datanucleus.api.jdo.JDOPersistenceManager; +import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; +import org.datanucleus.store.scostore.Store; +import org.datanucleus.util.WeakValueMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jdo.JDOCanRetryException; +import javax.jdo.JDOHelper; +import javax.jdo.PersistenceManager; +import javax.jdo.PersistenceManagerFactory; +import javax.jdo.datastore.DataStoreCache; +import javax.sql.DataSource; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +/** + * This class is a wrapper class around PersistenceManagerFactory and its properties + * These objects are static and need to be carefully modified together such that there are no + * race-conditions when updating them. Additionally, this class provides thread-safe methods + * to get PersistenceManager instances from the current PersistenceManagerFactory. The most + * common usage of this class is to create a PersistenceManager from existing PersistenceManagerFactory + * PersistenceManagerFactory properties are modified less often and hence the update pmf properties + * can make use of read/write locks such that it is only blocking when current properties change. + */ +public class PersistenceManagerProvider { + private static PersistenceManagerFactory pmf; + private static Properties prop; + private static final ReentrantReadWriteLock pmfLock = new ReentrantReadWriteLock(); + private static final Lock pmfReadLock = pmfLock.readLock(); + private static final Lock pmfWriteLock = pmfLock.writeLock(); + private static final Logger LOG = LoggerFactory.getLogger(PersistenceManagerProvider.class); + private static final Map> PINCLASSMAP; + private static boolean forTwoMetastoreTesting; + private static int retryLimit; + private static long retryInterval; + + static { + Map> map = new HashMap<>(); + map.put("table", MTable.class); + map.put("storagedescriptor", MStorageDescriptor.class); + map.put("serdeinfo", MSerDeInfo.class); + map.put("partition", MPartition.class); + map.put("database", MDatabase.class); + map.put("type", MType.class); + map.put("fieldschema", MFieldSchema.class); + map.put("order", MOrder.class); + PINCLASSMAP = Collections.unmodifiableMap(map); + } + + private PersistenceManagerProvider() { + // prevent instantiation + } + + private static final Set> retriableExceptionClasses = + new HashSet<>(Arrays.asList(JDOCanRetryException.class)); + /** + * Helper function for initialize to determine if we should retry an exception. + * We return true if the exception is of a known type of retriable exceptions, or if one + * of its recursive .getCause returns a known type of retriable exception. + */ + private static boolean isRetriableException(Throwable e) { + if (e == null){ + return false; + } + if (retriableExceptionClasses.contains(e.getClass())){ + return true; + } + for (Class c : retriableExceptionClasses){ + if (c.isInstance(e)){ + return true; + } + } + + if (e.getCause() == null){ + return false; + } + return isRetriableException(e.getCause()); + } + /** + * This method updates the PersistenceManagerFactory and its properties if the given + * configuration is different from its current set of properties. Most common case is that + * the persistenceManagerFactory properties do not change, and hence this method is optimized to + * be non-blocking in such cases. However, if the properties are different, this method blocks + * other threads until the properties are updated, current pmf is closed and + * a new pmf is re-initialized. Note that when a PersistenceManagerFactory is re-initialized all + * the PersistenceManagers which are instantiated using old factory become invalid and will throw + * JDOUserException. Hence it is recommended that this method is called in the setup/init phase + * of the Metastore service when there are no other active threads serving clients. + * + * @param conf Configuration which provides the datanucleus/datasource properties for comparison + */ + public static void updatePmfProperties(Configuration conf) { + // take a read lock to check if the datasource properties changed. + // Most common case is that datasource properties do not change + Properties propsFromConf = PersistenceManagerProvider.getDataSourceProps(conf); + pmfReadLock.lock(); + // keep track of if the read-lock is acquired by this thread + // so that we can unlock it before leaving this method + // this is needed because pmf methods below could throw JDOException (unchecked exception) + // which can lead to readLock not being acquired at the end of the inner try-finally + // block below + boolean readLockAcquired = true; + try { + // if pmf properties change, need to update, release read lock and take write lock + if (prop == null || pmf == null || !propsFromConf.equals(prop)) { + pmfReadLock.unlock(); + readLockAcquired = false; + pmfWriteLock.lock(); + try { + // check if we need to update pmf again here in case some other thread already did it + // for us after releasing readlock and before acquiring write lock above + if (prop == null || pmf == null || !propsFromConf.equals(prop)) { + // OK, now we really need to re-initialize pmf and pmf properties + if (LOG.isInfoEnabled()) { + LOG.info("Updating the pmf due to property change"); + if (prop == null) { + LOG.info("Current pmf properties are uninitialized"); + } else { + for (Entry kv : prop.entrySet()) { + if (!kv.getValue().equals(propsFromConf.get(kv.getKey()))) { + if (MetastoreConf.isPrintable(kv.getKey().toString())) { + LOG.info("Found {} to be different. Old val : {} : New Val : {}", kv.getKey(), + kv.getValue(), propsFromConf.getProperty(kv.getKey().toString())); + } else { + LOG.info("Found masked property {} to be different", kv.getKey()); + } + } + } + } + } + if (pmf != null) { + clearOutPmfClassLoaderCache(); + if (!forTwoMetastoreTesting) { + // close the underlying connection pool to avoid leaks + LOG.debug("Closing PersistenceManagerFactory"); + pmf.close(); + LOG.debug("PersistenceManagerFactory closed"); + } + pmf = null; + } + // update the pmf properties object then initialize pmf using them + prop = propsFromConf; + retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS); + retryInterval = MetastoreConf + .getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS); + // init PMF with retry logic + retry(() -> {initPMF(conf); return null;}); + } + // downgrade by acquiring read lock before releasing write lock + pmfReadLock.lock(); + readLockAcquired = true; + } finally { + pmfWriteLock.unlock(); + } + } + } finally { + if (readLockAcquired) { + pmfReadLock.unlock(); + } + } + } + + private static void initPMF(Configuration conf) { + DataSourceProvider dsp = DataSourceProviderFactory + .hasProviderSpecificConfigurations(conf) ? DataSourceProviderFactory + .getDataSourceProvider(conf) : null; + + if (dsp == null) { + pmf = JDOHelper.getPersistenceManagerFactory(prop); + } else { + try { + DataSource ds = dsp.create(conf); + Map dsProperties = new HashMap<>(); + //Any preexisting datanucleus property should be passed along + dsProperties.putAll(prop); + dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds); + dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds); + dsProperties.put(ConfVars.MANAGER_FACTORY_CLASS.getVarname(), + "org.datanucleus.api.jdo.JDOPersistenceManagerFactory"); + pmf = JDOHelper.getPersistenceManagerFactory(dsProperties); + } catch (SQLException e) { + LOG.warn("Could not create PersistenceManagerFactory using " + + "connection pool properties, will fall back", e); + pmf = JDOHelper.getPersistenceManagerFactory(prop); + } + } + DataStoreCache dsc = pmf.getDataStoreCache(); + if (dsc != null) { + String objTypes = MetastoreConf.getVar(conf, ConfVars.CACHE_PINOBJTYPES); + LOG.info( + "Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=\"{}\"", + objTypes); + if (org.apache.commons.lang.StringUtils.isNotEmpty(objTypes)) { + String[] typeTokens = objTypes.toLowerCase().split(","); + for (String type : typeTokens) { + type = type.trim(); + if (PINCLASSMAP.containsKey(type)) { + dsc.pinAll(true, PINCLASSMAP.get(type)); + } else { + LOG.warn("{} is not one of the pinnable object types: {}", type, + org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), " ")); + } + } + } + } else { + LOG.warn("PersistenceManagerFactory returned null DataStoreCache object. " + + "Unable to initialize object pin types defined by hive.metastore.cache.pinobjtypes"); + } + } + + /** + * Removed cached classloaders from DataNucleus + * DataNucleus caches classloaders in NucleusContext. + * In UDFs, this can result in classloaders not getting GCed resulting in PermGen leaks. + * This is particularly an issue when using embedded metastore with HiveServer2, + * since the current classloader gets modified with each new add jar, + * becoming the classloader for downstream classes, which DataNucleus ends up using. + * The NucleusContext cache gets freed up only on calling a close on it. + * We're not closing NucleusContext since it does a bunch of other things which we don't want. + * We're not clearing the cache HashMap by calling HashMap#clear to avoid concurrency issues. + */ + public static void clearOutPmfClassLoaderCache() { + pmfWriteLock.lock(); + try { + if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) { + return; + } + // NOTE : This is hacky, and this section of code is fragile depending on DN code varnames + // so it's likely to stop working at some time in the future, especially if we upgrade DN + // versions, so we actively need to find a better way to make sure the leak doesn't happen + // instead of just clearing out the cache after every call. + JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf; + NucleusContext nc = jdoPmf.getNucleusContext(); + try { + Field pmCache = pmf.getClass().getDeclaredField("pmCache"); + pmCache.setAccessible(true); + Set pmSet = (Set) pmCache.get(pmf); + for (JDOPersistenceManager pm : pmSet) { + org.datanucleus.ExecutionContext ec = pm.getExecutionContext(); + if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) { + ClassLoaderResolver clr = + ((org.datanucleus.ExecutionContextThreadedImpl) ec).getClassLoaderResolver(); + clearClr(clr); + } + } + org.datanucleus.plugin.PluginManager pluginManager = + jdoPmf.getNucleusContext().getPluginManager(); + Field registryField = pluginManager.getClass().getDeclaredField("registry"); + registryField.setAccessible(true); + org.datanucleus.plugin.PluginRegistry registry = + (org.datanucleus.plugin.PluginRegistry) registryField.get(pluginManager); + if (registry instanceof org.datanucleus.plugin.NonManagedPluginRegistry) { + org.datanucleus.plugin.NonManagedPluginRegistry nRegistry = + (org.datanucleus.plugin.NonManagedPluginRegistry) registry; + Field clrField = nRegistry.getClass().getDeclaredField("clr"); + clrField.setAccessible(true); + ClassLoaderResolver clr = (ClassLoaderResolver) clrField.get(nRegistry); + clearClr(clr); + } + if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) { + org.datanucleus.PersistenceNucleusContextImpl pnc = + (org.datanucleus.PersistenceNucleusContextImpl) nc; + org.datanucleus.store.types.TypeManagerImpl tm = + (org.datanucleus.store.types.TypeManagerImpl) pnc.getTypeManager(); + Field clrField = tm.getClass().getDeclaredField("clr"); + clrField.setAccessible(true); + ClassLoaderResolver clr = (ClassLoaderResolver) clrField.get(tm); + clearClr(clr); + Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr"); + storeMgrField.setAccessible(true); + org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr = + (org.datanucleus.store.rdbms.RDBMSStoreManager) storeMgrField.get(pnc); + Field backingStoreField = + storeMgr.getClass().getDeclaredField("backingStoreByMemberName"); + backingStoreField.setAccessible(true); + Map backingStoreByMemberName = + (Map) backingStoreField.get(storeMgr); + for (Store store : backingStoreByMemberName.values()) { + org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore = + (org.datanucleus.store.rdbms.scostore.BaseContainerStore) store; + clrField = org.datanucleus.store.rdbms.scostore.BaseContainerStore.class + .getDeclaredField("clr"); + clrField.setAccessible(true); + clr = (ClassLoaderResolver) clrField.get(baseStore); + clearClr(clr); + } + } + Field classLoaderResolverMap = + AbstractNucleusContext.class.getDeclaredField("classLoaderResolverMap"); + classLoaderResolverMap.setAccessible(true); + Map loaderMap = + (Map) classLoaderResolverMap.get(nc); + for (ClassLoaderResolver clr : loaderMap.values()) { + clearClr(clr); + } + classLoaderResolverMap.set(nc, new HashMap()); + LOG.debug("Removed cached classloaders from DataNucleus NucleusContext"); + } catch (Exception e) { + LOG.warn("Failed to remove cached classloaders from DataNucleus NucleusContext", e); + } + } finally { + pmfWriteLock.unlock(); + } + } + + private static void clearClr(ClassLoaderResolver clr) throws Exception { + if (clr != null) { + if (clr instanceof ClassLoaderResolverImpl) { + ClassLoaderResolverImpl clri = (ClassLoaderResolverImpl) clr; + long resourcesCleared = clearFieldMap(clri, "resources"); + long loadedClassesCleared = clearFieldMap(clri, "loadedClasses"); + long unloadedClassesCleared = clearFieldMap(clri, "unloadedClasses"); + LOG.debug("Cleared ClassLoaderResolverImpl: {}, {}, {}", resourcesCleared, + loadedClassesCleared, unloadedClassesCleared); + } + } + } + + private static long clearFieldMap(ClassLoaderResolverImpl clri, String mapFieldName) + throws Exception { + Field mapField = ClassLoaderResolverImpl.class.getDeclaredField(mapFieldName); + mapField.setAccessible(true); + + Map map = (Map) mapField.get(clri); + long sz = map.size(); + mapField.set(clri, Collections.synchronizedMap(new WeakValueMap())); + return sz; + } + + /** + * creates a PersistenceManager instance for the current PersistenceManagerFactory. Note that this + * acquires a read-lock on PersistenceManagerFactory so that this method will block if any other + * thread is actively, (re-)initializing PersistenceManagerFactory when this method is called + * Note that this method throws a RuntimeException, if PersistenceManagerFactory is not yet initialized. + * + * @return PersistenceManager from the current PersistenceManagerFactory instance + */ + public static PersistenceManager getPersistenceManager() { + pmfReadLock.lock(); + try { + if (pmf == null) { + throw new RuntimeException( + "Cannot create PersistenceManager. PersistenceManagerFactory is not yet initialized"); + } + return retry(pmf::getPersistenceManager); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + pmfReadLock.unlock(); + } + } + + /** + * Properties specified in hive-default.xml override the properties specified + * in jpox.properties. + */ + @SuppressWarnings("nls") + private static Properties getDataSourceProps(Configuration conf) { + Properties prop = new Properties(); + correctAutoStartMechanism(conf); + + // First, go through and set all our values for datanucleus and javax.jdo parameters. This + // has to be a separate first step because we don't set the default values in the config object. + for (ConfVars var : MetastoreConf.dataNucleusAndJdoConfs) { + String confVal = MetastoreConf.getAsString(conf, var); + String varName = var.getVarname(); + Object prevVal = prop.setProperty(varName, confVal); + if (MetastoreConf.isPrintable(varName)) { + LOG.debug("Overriding {} value {} from jpox.properties with {}", varName, prevVal, confVal); + } + } + + // Now, we need to look for any values that the user set that MetastoreConf doesn't know about. + // TODO Commenting this out for now, as it breaks because the conf values aren't getting properly + // interpolated in case of variables. See HIVE-17788. + /* + for (Map.Entry e : conf) { + if (e.getKey().startsWith("datanucleus.") || e.getKey().startsWith("javax.jdo.")) { + // We have to handle this differently depending on whether it is a value known to + // MetastoreConf or not. If it is, we need to get the default value if a value isn't + // provided. If not, we just set whatever the user has set. + Object prevVal = prop.setProperty(e.getKey(), e.getValue()); + if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(e.getKey())) { + LOG.debug("Overriding " + e.getKey() + " value " + prevVal + + " from jpox.properties with " + e.getValue()); + } + } + } + */ + + // Password may no longer be in the conf, use getPassword() + try { + String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD); + if (org.apache.commons.lang.StringUtils.isNotEmpty(passwd)) { + // We can get away with the use of varname here because varname == hiveName for PWD + prop.setProperty(ConfVars.PWD.getVarname(), passwd); + } + } catch (IOException err) { + throw new RuntimeException("Error getting metastore password: " + err.getMessage(), err); + } + + if (LOG.isDebugEnabled()) { + for (Entry e : prop.entrySet()) { + if (MetastoreConf.isPrintable(e.getKey().toString())) { + LOG.debug("{} = {}", e.getKey(), e.getValue()); + } + } + } + + return prop; + } + + /** + * Update conf to set datanucleus.autoStartMechanismMode=ignored. + * This is necessary to able to use older version of hive against + * an upgraded but compatible metastore schema in db from new version + * of hive + * + * @param conf + */ + private static void correctAutoStartMechanism(Configuration conf) { + final String autoStartKey = "datanucleus.autoStartMechanismMode"; + final String autoStartIgnore = "ignored"; + String currentAutoStartVal = conf.get(autoStartKey); + if (!autoStartIgnore.equalsIgnoreCase(currentAutoStartVal)) { + LOG.warn("{} is set to unsupported value {} . Setting it to value: {}", autoStartKey, + conf.get(autoStartKey), autoStartIgnore); + } + conf.set(autoStartKey, autoStartIgnore); + } + + /** + * To make possible to run multiple metastore in unit test + * + * @param twoMetastoreTesting if we are using multiple metastore in unit test + */ + @VisibleForTesting + public static void setTwoMetastoreTesting(boolean twoMetastoreTesting) { + forTwoMetastoreTesting = twoMetastoreTesting; + } + + public static String getProperty(String key) { + return prop == null ? null : prop.getProperty(key); + } + + private static T retry(Supplier s) { + Exception ex = null; + int myRetryLimit = retryLimit; + while (myRetryLimit > 0) { + try { + return s.get(); + } catch (Exception e) { + myRetryLimit--; + boolean retriable = isRetriableException(e); + if (myRetryLimit > 0 && retriable) { + LOG.info("Retriable exception while invoking method, retrying. " + "{} tries left", + myRetryLimit, e); + try { + Thread.sleep(retryInterval); + } catch (InterruptedException ie) { + // Restore the interrupted status, since we do not want to catch it. + LOG.debug("Interrupted while sleeping before retrying.", ie); + Thread.currentThread().interrupt(); + } + // If we're here, we'll proceed down the next while loop iteration. + } else { + // we've reached our limit, throw the last one. + if (retriable) { + LOG.warn("Exception retry limit reached, not retrying any longer.", e); + } else { + LOG.debug("Non-retriable exception.", e); + } + ex = e; + } + } + } + throw new RuntimeException(ex); + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java index b74c3048fa2e18adc7f0d7cc813a180d4466fa36..c3276045497e31e09a4950543eec6f8df92ee82f 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -86,14 +86,20 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Random; import java.util.Set; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; @Category(MetastoreUnitTest.class) @@ -756,8 +762,8 @@ public void testNonConfDatanucleusValueSet() { localConf.set(key1, value1); objectStore = new ObjectStore(); objectStore.setConf(localConf); - Assert.assertEquals(value, objectStore.getProp().getProperty(key)); - Assert.assertNull(objectStore.getProp().getProperty(key1)); + Assert.assertEquals(value, PersistenceManagerProvider.getProperty(key)); + Assert.assertNull(PersistenceManagerProvider.getProperty(key1)); } /** @@ -867,7 +873,7 @@ public void testConcurrentAddNotifications() throws ExecutionException, Interrup EventMessage.EventType.CREATE_DATABASE.toString(), "CREATE DATABASE DB initial")); - ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + ExecutorService executorService = newFixedThreadPool(NUM_THREADS); for (int i = 0; i < NUM_THREADS; i++) { final int n = i; @@ -911,6 +917,57 @@ public void testConcurrentAddNotifications() throws ExecutionException, Interrup } } + /** + * This test calls ObjectStore.setConf methods from multiple threads. Each threads uses its + * own instance of ObjectStore to simulate thread-local objectstore behaviour. Before each + * invocation the datasource property is changed which forces a reinitialization of + * PersistenceManagerFactory instance in each call + * @throws Exception + */ + @Test + public void testConcurrentPMFInitialize() throws Exception { + final String dataSourceProp = "datanucleus.connectionPool.maxPoolSize"; + // Barrier is used to ensure that all threads start race at the same time + final int numThreads = 10; + final int numIteration = 50; + final CyclicBarrier barrier = new CyclicBarrier(numThreads); + final AtomicInteger counter = new AtomicInteger(0); + ExecutorService executor = newFixedThreadPool(numThreads); + List> results = new ArrayList<>(numThreads); + for (int i = 0; i < numThreads; i++) { + final Random random = new Random(); + Configuration conf = MetastoreConf.newMetastoreConf(); + MetaStoreTestUtils.setConfForStandloneMode(conf); + results.add(executor.submit(new Callable() { + @Override + public Void call() throws Exception { + // each thread gets its own ObjectStore to simulate threadLocal store + ObjectStore objectStore = new ObjectStore(); + barrier.await(); + for (int j = 0; j < numIteration; j++) { + // set connectionPool to a random value to increase the likelihood of pmf + // re-initialization + int randomNumber = random.nextInt(100); + if (randomNumber % 2 == 0) { + //conf.set(dataSourceProp, String.valueOf(randomNumber)); + objectStore.setConf(conf); + } else { + Assert.assertNotNull(objectStore.getPersistenceManager()); + } + counter.getAndIncrement(); + } + return null; + } + })); + } + for (Future future : results) { + future.get(120, TimeUnit.SECONDS); + //future.get(); + } + Assert.assertEquals("Unexpected number of setConf calls", numIteration * numThreads, + counter.get()); + } + private void createTestCatalog(String catName) throws MetaException { Catalog cat = new CatalogBuilder() .setName(catName)