commit ca532f3affc798b36ec2dd7b43e6b732b1714de7 Author: Andrew Sherman Date: Fri Oct 6 16:51:01 2017 -0700 HIVE-16395: ConcurrentModificationException on config object in HoS In SPARK-2546 a config called spark.hadoop.cloneConf was added that is set to false by default. When set to true it clones a new JobConf object for each Spark Task, which avoids any thread safety issues. In Spark it's set to false by default for performance considerations. On the Hive side we now think the cost of cloning a Spark configuration is a price worth paying to avoid concurrency problems. We therefore force spark.hadoop.cloneConf to "true" in the Spark config unless the user has set this explicitly in the Hive config. Add a new test that forces a Spark config to be generated from a Hive config and check that the spark.hadoop.cloneConf config value is as expected. Change-Id: I1bf4877ef6312848184336aded0003f72dc0fed8 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 194585e0f0a66767819f221ec1ac75486bc4ce11..597fcab02fb1090be65e2ebed1d68abc73c1efda 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -26,10 +26,10 @@ import java.util.Properties; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.compress.utils.CharsetNames; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hive.common.LogUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.SparkClientUtilities; import org.slf4j.Logger; @@ -60,6 +60,8 @@ private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false"; private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion"; private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode"; + @VisibleForTesting + public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf"; public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception { Map sparkConf = initiateSparkConf(hiveconf); @@ -222,6 +224,10 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false"); } + // Force Spark configs to be cloned by default + sparkConf.putIfAbsent(SPARK_CLONE_CONFIGURATION, "true"); + + // Set the credential provider passwords if found, if there is job specific password // the credential provider location is set directly in the execute method of LocalSparkClient // and submit method of RemoteHiveSparkClient when the job config is created diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 54d2ceca6e1dd171bd26e91e445421b46c20aeb9..8d79dd9e4e92306a76a9f4f1a0f34a0c2ff5eee9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -174,4 +175,9 @@ public Path getHDFSSessionDir() throws IOException { public static String makeSessionId() { return UUID.randomUUID().toString(); } + + @VisibleForTesting + HiveSparkClient getHiveSparkClient() { + return hiveSparkClient; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index 489383baee8b055132726c7422f1bc630ba90057..47d243756fa2dac536c7776cfd86e42989e686d9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -27,7 +27,13 @@ import java.util.List; import java.util.Random; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.spark.SparkConf; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -97,6 +103,51 @@ public void testMultiSessionMultipleUse() throws Exception { sessionManagerHS2.shutdown(); } + /** + * Test HIVE-16395 - by default we force cloning of Configurations for Spark jobs + */ + @Test + public void testForceConfCloning() throws Exception { + HiveConf conf = new HiveConf(); + conf.set("spark.master", "local"); + String sparkCloneConfiguration = HiveSparkClientFactory.SPARK_CLONE_CONFIGURATION; + + // Clear the value of sparkCloneConfiguration + conf.unset(sparkCloneConfiguration); + assertNull( "Could not clear " + sparkCloneConfiguration + " in HiveConf", + conf.get(sparkCloneConfiguration)); + + // By default we should set sparkCloneConfiguration to true in the Spark config + checkSparkConf(conf, sparkCloneConfiguration, "true"); + + // User can override value for sparkCloneConfiguration in Hive config to false + conf.set(sparkCloneConfiguration, "false"); + checkSparkConf(conf, sparkCloneConfiguration, "false"); + + // User can override value of sparkCloneConfiguration in Hive config to true + conf.set(sparkCloneConfiguration, "true"); + checkSparkConf(conf, sparkCloneConfiguration, "true"); + } + + /** + * Force a Spark config to be generated and check that a config value has the expected value + * @param conf the Hive config to use as a base + * @param paramName the Spark config name to check + * @param expectedValue the expected value in the Spark config + */ + private void checkSparkConf(HiveConf conf, String paramName, String expectedValue) throws HiveException { + SparkSessionManager sessionManager = SparkSessionManagerImpl.getInstance(); + SparkSessionImpl sparkSessionImpl = (SparkSessionImpl) + sessionManager.getSession(null, conf, true); + assertTrue(sparkSessionImpl.isOpen()); + HiveSparkClient hiveSparkClient = sparkSessionImpl.getHiveSparkClient(); + SparkConf sparkConf = hiveSparkClient.getSparkConf(); + String cloneConfig = sparkConf.get(paramName); + sessionManager.closeSession(sparkSessionImpl); + assertEquals(expectedValue, cloneConfig); + sessionManager.shutdown(); + } + /* Thread simulating a user session in HiveServer2. */ public class SessionThread implements Runnable {