commit 3b6de1fb91c74e4b8e4adfa97075b92e8b06538d Author: Andrew Sherman Date: Fri Oct 6 16:51:01 2017 -0700 HIVE-16395: ConcurrentModificationException on config object in HoS In SPARK-2546 a config called spark.hadoop.cloneConf was added that is set to false by default. When set to true it clones a new JobConf object for each Spark Task, which avoids any thread safety issues. It's set to false by default for performance considerations. On the Hive side we now think the cost of cloning a Spark configuration is a price worth paying to avoid concurrency problems. We therefore force spark.hadoop.cloneConf to "true" in the Spark config unless the user has set this explicitly in the Hive config. Add new test that Force a Spark config to be generated from a Hive config and check that a config value has the expected value. Change-Id: I1bf4877ef6312848184336aded0003f72dc0fed8 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 194585e0f0a66767819f221ec1ac75486bc4ce11..597fcab02fb1090be65e2ebed1d68abc73c1efda 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -26,10 +26,10 @@ import java.util.Properties; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.compress.utils.CharsetNames; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hive.common.LogUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.SparkClientUtilities; import org.slf4j.Logger; @@ -60,6 +60,8 @@ private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false"; private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion"; private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode"; + @VisibleForTesting + public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf"; public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception { Map sparkConf = initiateSparkConf(hiveconf); @@ -222,6 +224,10 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false"); } + // Force Spark configs to be cloned by default + sparkConf.putIfAbsent(SPARK_CLONE_CONFIGURATION, "true"); + + // Set the credential provider passwords if found, if there is job specific password // the credential provider location is set directly in the execute method of LocalSparkClient // and submit method of RemoteHiveSparkClient when the job config is created diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 54d2ceca6e1dd171bd26e91e445421b46c20aeb9..8d79dd9e4e92306a76a9f4f1a0f34a0c2ff5eee9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -174,4 +175,9 @@ public Path getHDFSSessionDir() throws IOException { public static String makeSessionId() { return UUID.randomUUID().toString(); } + + @VisibleForTesting + HiveSparkClient getHiveSparkClient() { + return hiveSparkClient; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index 489383baee8b055132726c7422f1bc630ba90057..3512208330166622c03e61b5616c2515d5bb782c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -27,7 +27,13 @@ import java.util.List; import java.util.Random; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.spark.SparkConf; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -97,6 +103,49 @@ public void testMultiSessionMultipleUse() throws Exception { sessionManagerHS2.shutdown(); } + /** + * Test HIVE-16395 - by default we force cloning of Configurations for Spark jobs + */ + @Test + public void testForceConfCloning() throws Exception { + HiveConf conf = new HiveConf(); + conf.set("spark.master", "local"); + String sparkCloneConfiguration = HiveSparkClientFactory.SPARK_CLONE_CONFIGURATION; + String hiveSetting = conf.get(sparkCloneConfiguration); + assertNull( "this test assumes the value of " + sparkCloneConfiguration + + " is not set in HiveConf", hiveSetting); + + // By default we should set sparkCloneConfiguration to true + checkSparkConf(conf, sparkCloneConfiguration, "true"); + + // User can override value for sparkCloneConfiguration + conf.set(sparkCloneConfiguration, "false"); + checkSparkConf(conf, sparkCloneConfiguration, "false"); + + // User can override value of sparkCloneConfiguration to true even if that is a no-op + conf.set(sparkCloneConfiguration, "true"); + checkSparkConf(conf, sparkCloneConfiguration, "true"); + } + + /** + * Force a Spark config to be generated and check that a config value has the expected value + * @param conf the Hive config to use as a base + * @param paramName the Spark config name to check + * @param expectedValue the expected value in the Spark config + */ + private void checkSparkConf(HiveConf conf, String paramName, String expectedValue) throws HiveException { + SparkSessionManager sessionManager = SparkSessionManagerImpl.getInstance(); + SparkSessionImpl sparkSessionImpl = (SparkSessionImpl) + sessionManager.getSession(null, conf, true); + assertTrue(sparkSessionImpl.isOpen()); + HiveSparkClient hiveSparkClient = sparkSessionImpl.getHiveSparkClient(); + SparkConf sparkConf = hiveSparkClient.getSparkConf(); + String cloneConfig = sparkConf.get(paramName); + sessionManager.closeSession(sparkSessionImpl); + assertEquals(expectedValue, cloneConfig); + sessionManager.shutdown(); + } + /* Thread simulating a user session in HiveServer2. */ public class SessionThread implements Runnable {