commit 20e96c9c2b4253753b427c77a214d04b5c4dcffc Author: Vihang Karajgaonkar Date: Tue Sep 27 15:33:56 2016 -0700 HIVE-14822 : Add support for credential provider for jobs launched from Hiveserver2 This change adds support for credential provider for jobs launched from HiveServer2. It also adds support for job-specific credential provider and password which is passed to the jobs via the job configs when they are launched from HS2. The hive specific credential provider location is specified by a new configuration property specific to hiveserver2 and the password to job credential provider is provided using the environment variable diff --git a/common/src/java/org/apache/hadoop/hive/common/EnvironmentUtils.java b/common/src/java/org/apache/hadoop/hive/common/EnvironmentUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..83574d499c734d8d95d7c3bd34c6a08a670ae1b5 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/EnvironmentUtils.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common; + +public class EnvironmentUtils { + + /* + * Proxy class for Env which allows use to mock environment variables in testing + */ + public interface Env { + String get(String envVar); + } + + /* + * Basic implementation of Env which returns the environment variable + */ + private static Env environment = new Env() { + @Override + public String get(String envVar) { + return System.getenv(envVar); + } + }; + + /** + * Returns the value of environment variable + * @param varName - variable name + * @return value of the environment variable varName + */ + public static String getEnvVariable(String varName) { + return environment.get(varName); + } +} diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 77c6aa5663c2c5358715801bc039c0fe8035e3dc..eb72b22addec371def3c1acccb02f86b419b7feb 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -30,4 +30,8 @@ public static final String DRUID_QUERY_JSON = "druid.query.json"; public static final String DRUID_QUERY_TYPE = "druid.query.type"; public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; + + public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_NAME = "HIVE_JOB_CREDSTORE_PASSWORD"; + public static final String HADOOP_CREDENTIAL_PASSWORD_NAME = "HADOOP_CREDSTORE_PASSWORD"; + public static final String HADOOP_CREDENTIAL_PROVIDER_PATH = "hadoop.security.credential.provider.path"; } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 43a16d7fed1d38400793e7c96a7febebe42d351b..415d64a196f4509a23730cb666570768782ce3ee 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2518,6 +2518,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "if an X-XSRF-HEADER header is not present"), HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), + HIVE_SERVER2_JOB_CREDSTORE_LOCATION("hive.server2.job.credstore.location", "", + "If true, HiveServer2 will add the a job specific credstore for the jobs run using execution engines like MR"), HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 15, new SizeValidator(0L, true, 1024L, true), "Number of threads" + " used to move files in move task. Set it to 0 to disable multi-threaded file moves. This parameter is also used by" diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index fd640567124e1bb8b558359732764a07c8b0f2ed..63a73c8c26f48cc203d3cae1c7414744bc199b3a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -88,12 +88,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.EnvironmentUtils; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -276,7 +278,6 @@ public static void clearWork(Configuration conf) { if (fs.exists(reducePath)) { fs.delete(reducePath, true); } - } catch (Exception e) { LOG.warn("Failed to clean-up tmp directories.", e); } finally { @@ -286,6 +287,66 @@ public static void clearWork(Configuration conf) { } } + public static String updateJobCredentialProviders(Configuration jobConf, HiveConf conf) { + String jobKeyStoreLocation = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION); + String oldKeyStoreLocation = jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH); + if (StringUtils.isNotBlank(jobKeyStoreLocation)) { + jobConf.set(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH, jobKeyStoreLocation); + LOG.debug("Setting job conf credstore location to " + jobKeyStoreLocation + + " previous location is " + oldKeyStoreLocation); + } + // if HIVE_JOB_CREDSTORE_PASSWORD is defined along with HIVE_SERVER2_JOB_CREDSTORE_LOCATION only + // then + // use it else fall back to HADOOP_CREDSTORE_PASSWORD + String credStorepassword = + EnvironmentUtils.getEnvVariable(Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + String hiveJobpassword = + EnvironmentUtils.getEnvVariable(Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_NAME); + if (StringUtils.isNotBlank(jobKeyStoreLocation) && StringUtils.isNotBlank(hiveJobpassword)) { + credStorepassword = hiveJobpassword; + } + if (StringUtils.isNotBlank(credStorepassword)) { + // if the execution engine is MR set the map/reduce env with the credential store password + if (HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE).equals("mr")) { + addKeyValuePair(jobConf, JobConf.MAPRED_MAP_TASK_ENV, + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME, credStorepassword); + addKeyValuePair(jobConf, JobConf.MAPRED_REDUCE_TASK_ENV, + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME, credStorepassword); + } + } + return oldKeyStoreLocation; + } + + private static void addKeyValuePair(Configuration jobConf, String property, String keyName, + String newKeyValue) { + String existingValue = jobConf.get(property); + if (existingValue == null) { + jobConf.set(property, (keyName + "=" + newKeyValue)); + return; + } + String[] keyValuePairs = existingValue.split(","); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < keyValuePairs.length; i++) { + String[] pair = keyValuePairs[i].split("="); + if (pair.length != 2) { + LOG.error("Error parsing the keyvalue pair " + keyValuePairs[i] + + " Cannot update configuration property"); + return; + } + sb.append(pair[0]); + sb.append("="); + if (pair[0].equals(keyName)) { + sb.append(newKeyValue); + } else { + sb.append(pair[1]); + } + if (i <= (keyValuePairs.length - 1)) { + sb.append(","); + } + } + jobConf.set(property, sb.toString()); + } + public static MapredWork getMapRedWork(Configuration conf) { MapredWork w = new MapredWork(); w.setMapWork(getMapWork(conf)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index cea9582c8ccb0c79700ac7913735d4cdf52f0c7e..54e3ebfe2e9edaa4ba3146a7c797398041661471 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.mr; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -413,8 +415,12 @@ public int execute(DriverContext driverContext) { TezSessionPoolManager.getInstance().closeIfNotDefault(session, true); } + String origKeystoreLocation = Utilities.updateJobCredentialProviders(job, conf); // Finally SUBMIT the JOB! rj = jc.submitJob(job); + if (origKeystoreLocation != null) { + job.set(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH, origKeystoreLocation); + } this.jobID = rj.getJobID(); updateStatusInQueryDisplay(); returnVal = jobExecHelper.progress(rj, jc, ctx); @@ -613,7 +619,6 @@ private static void setupChildLog4j(Configuration conf) { @SuppressWarnings("unchecked") public static void main(String[] args) throws IOException, HiveException { - String planFileName = null; String jobConfFileName = null; boolean noLog = false; @@ -651,7 +656,6 @@ public static void main(String[] args) throws IOException, HiveException { if (jobConfFileName != null) { conf.addResource(new Path(jobConfFileName)); } - // Initialize the resources from command line if (files != null) { conf.set("tmpfiles", files); @@ -755,7 +759,6 @@ public static void main(String[] args) throws IOException, HiveException { public static String generateCmdLine(HiveConf hconf, Context ctx) throws IOException { HiveConf tempConf = new HiveConf(); - Path hConfFilePath = new Path(ctx.getLocalTmpPath(), JOBCONF_FILENAME); OutputStream out = null; Properties deltaP = hconf.getChangedProperties(); @@ -779,9 +782,10 @@ public static String generateCmdLine(HiveConf hconf, Context ctx) tempConf.set(hadoopSysDir, hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt()); tempConf.set(hadoopWorkDir, hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt()); } - + Path hConfFilePath = new Path(ctx.getMRTmpPath(), JOBCONF_FILENAME); try { - out = FileSystem.getLocal(hconf).create(hConfFilePath); + FileSystem fs = hConfFilePath.getFileSystem(hconf); + out = fs.create(hConfFilePath); tempConf.writeXml(out); } finally { if (out != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index ce1106d91db9ef75e7b425d5950f888bacbfb3e5..170de993d76ab598e200a26222f52122a76019cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CachingPrintStream; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -47,9 +46,10 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hadoop.mapred.JobConf; import org.apache.hive.common.util.StreamPrinter; + /** * Extension of ExecDriver: * - can optionally spawn a map-reduce task from a separate jvm @@ -257,6 +257,7 @@ public int execute(DriverContext driverContext) { env[pos++] = name + "=" + value; } // Run ExecDriver in another JVM + executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir)); CachingPrintStream errPrintStream = diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 784b9c9fa769eeb088e3a6408a0b29147a8b4086..2d8647b9d34f8095d6c70b43569b313d6e69d821 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -28,12 +28,14 @@ import org.apache.commons.compress.utils.CharsetNames; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hive.common.EnvironmentUtils; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -45,6 +47,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.base.Strings; +import org.apache.commons.lang3.StringUtils; import com.google.common.collect.Sets; public class HiveSparkClientFactory { @@ -195,9 +198,24 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false"); } + //set the credential provider passwords if found, if there is job specific password + //use it else fall-back to password defined by HADOOP_CREDSTORE_PASSWORD env variable + String defaultPassword = EnvironmentUtils.getEnvVariable(Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + String jobCredstorePassword = EnvironmentUtils.getEnvVariable(Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_NAME); + if(StringUtils.isNotBlank(jobCredstorePassword)) { + addCredentialProviderPassword(sparkConf, jobCredstorePassword); + } else if(StringUtils.isNotBlank(defaultPassword)) { + addCredentialProviderPassword(sparkConf, defaultPassword); + } return sparkConf; } + private static void addCredentialProviderPassword(Map sparkConf, + String jobCredstorePassword) { + sparkConf.put("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD", jobCredstorePassword); + sparkConf.put("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD", jobCredstorePassword); + } + static SparkConf generateSparkConf(Map conf) { SparkConf sparkConf = new SparkConf(false); for (Map.Entry entry : conf.entrySet()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index c75333d7022f776aab184a4b804fd7ad7befac64..7832621b7a78c07ae8bf90bc86d2192b12a1a39a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -113,6 +113,8 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr FileSystem fs = emptyScratchDir.getFileSystem(jobConf); fs.mkdirs(emptyScratchDir); + //update credential provider location + Utilities.updateJobCredentialProviders(jobConf, hiveConf); SparkCounters sparkCounters = new SparkCounters(sc); Map> prefixes = sparkWork.getRequiredCounterPrefix(); if (prefixes != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index a9f70c4100219c8929abd4e31b30d340e6ba98bd..a398ebdf1a994a4950ef18f8217466059441624c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -190,7 +190,8 @@ private SparkJobRef submit(final DriverContext driverContext, final SparkWork sp final HiveConf hiveConf = (HiveConf) ctx.getConf(); refreshLocalResources(sparkWork, hiveConf); final JobConf jobConf = new JobConf(hiveConf); - + //update the credential provider location in the jobConf + Utilities.updateJobCredentialProviders(jobConf, hiveConf); // Create temporary scratch dir final Path emptyScratchDir = ctx.getMRTmpPath(); FileSystem fs = emptyScratchDir.getFileSystem(jobConf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java new file mode 100644 index 0000000000000000000000000000000000000000..8d9cc3cc7bf03a5e39909133f3d5915a135dfa2b --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Assert; +import org.junit.Test; + +public class TestHiveCredentialProviders { + private static final String HADOOP_CREDSTORE_PASSWORD = "hadoopCredStorePassword"; + private static final String HIVE_JOB_CREDSTORE_PASSWORD = "hiveJobCredPassword"; + private static final String JOB_CREDSTORE_LOCATION = "jceks://hdfs/user/hive/creds.jceks"; + private static final String HADOOP_CREDSTORE_LOCATION = + "localjceks://file/user/hive/localcreds.jceks"; + + private JobConf jobConf; + private HiveConf hiveConf; + + /* + * Dirty hack to set the environment variables using reflection code. This method is for testing + * purposes only and should not be used elsewhere + */ + private final static void setEnv(Map newenv) throws Exception { + Class[] classes = Collections.class.getDeclaredClasses(); + Map env = System.getenv(); + for (Class cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map map = (Map) obj; + map.clear(); + map.putAll(newenv); + } + } + } + + /* + * Tests whether credential provider is updated when HIVE_JOB_CREDSTORE_PASSWORD is set and when + * hiveConf sets HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION + * + * JobConf should contain the mapred env variable equal to ${HIVE_JOB_CREDSTORE_PASSWORD} and the + * hadoop.security.credential.provider.path property should be equal to value of + * HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION + */ + @Test + public void testUpdateCredentialProviders_1() throws Exception { + jobConf = new JobConf(); + hiveConf = new HiveConf(); + + setupConfigs(true, true, true, true); + + String oldCredStoreLocation = Utilities.updateJobCredentialProviders(jobConf, hiveConf); + Assert.assertTrue(getErrorMsg(HADOOP_CREDSTORE_LOCATION, oldCredStoreLocation), + HADOOP_CREDSTORE_LOCATION.equals(oldCredStoreLocation)); + // make sure credential provider path points to HIVE_SERVER2_JOB_CREDSTORE_LOCATION + Assert.assertEquals(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION), + jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH)); + + // make sure mapred environment variable points to HIVE_JOB_CREDSTORE_PASSWORD + String value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertTrue(value.equals(HIVE_JOB_CREDSTORE_PASSWORD)); + + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertTrue(value.equals(HIVE_JOB_CREDSTORE_PASSWORD)); + } + + /* + * If hive job credstore location is not set, jobConf should contain hadoop credstore location and + * password should be from HADOOP_CREDSTORE_PASSWORD + * if there is no credential provider configured for hadoop jobConf should not contain + * credstore password and provider path + */ + @Test + public void testUpdateCredentialProviders_2() throws Exception { + jobConf = new JobConf(); + hiveConf = new HiveConf(); + + setupConfigs(true, true, true, false); + + String oldCredStoreLocation = Utilities.updateJobCredentialProviders(jobConf, hiveConf); + String errMsg = getErrorMsg(HADOOP_CREDSTORE_LOCATION, oldCredStoreLocation); + Assert.assertTrue(errMsg, HADOOP_CREDSTORE_LOCATION.equals(oldCredStoreLocation)); + + errMsg = getErrorMsg(HADOOP_CREDSTORE_LOCATION, + jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH)); + Assert.assertEquals(errMsg, HADOOP_CREDSTORE_LOCATION, + jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH)); + + String value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertTrue(value.equals(HADOOP_CREDSTORE_PASSWORD)); + + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertTrue(value.equals(HADOOP_CREDSTORE_PASSWORD)); + + jobConf = new JobConf(); + hiveConf = new HiveConf(); + //if there is no credential provider configured for hadoop jobConf should not contain + //credstore password and provider path + setupConfigs(false, false, true, false); + oldCredStoreLocation = Utilities.updateJobCredentialProviders(jobConf, hiveConf); + Assert.assertTrue(StringUtils.isBlank(oldCredStoreLocation)); + + Assert.assertTrue(StringUtils.isBlank(jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH))); + + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertNull(value); + + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertNull(value); + } + + /* + * If hive job credential provider is set but HIVE_JOB_CREDSTORE_PASSWORD is not set, use + * HADOOP_CREDSTORE_PASSWORD in the jobConf + */ + @Test + public void testUpdateCredentialProviders_3() throws Exception { + jobConf = new JobConf(); + hiveConf = new HiveConf(); + + setupConfigs(false, true, false, true); + + String oldCredStoreLocation = Utilities.updateJobCredentialProviders(jobConf, hiveConf); + Assert.assertTrue(StringUtils.isBlank(oldCredStoreLocation)); + + String errMsg = + getErrorMsg(JOB_CREDSTORE_LOCATION, jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH)); + Assert.assertEquals(errMsg, JOB_CREDSTORE_LOCATION, + jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH)); + + String value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertTrue(value.equals(HADOOP_CREDSTORE_PASSWORD)); + + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertTrue(value.equals(HADOOP_CREDSTORE_PASSWORD)); + } + + /* + * default behavior when neither hive.job.credstore location is set nor + * HIVE_JOB_CREDSTORE_PASSWORD is. In this case if hadoop credential provider is configured job + * config should use that else it should remain unset + */ + @Test + public void testUpdateCredentialProviders_4() throws Exception { + jobConf = new JobConf(); + hiveConf = new HiveConf(); + + setupConfigs(true, true, false, false); + + String oldCredStoreLocation = Utilities.updateJobCredentialProviders(jobConf, hiveConf); + Assert.assertTrue(HADOOP_CREDSTORE_LOCATION.equals(oldCredStoreLocation)); + + String errMsg = getErrorMsg(HADOOP_CREDSTORE_LOCATION, + jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH)); + Assert.assertEquals(errMsg, HADOOP_CREDSTORE_LOCATION, + jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH)); + + String value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertTrue(value.equals(HADOOP_CREDSTORE_PASSWORD)); + + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertTrue(value.equals(HADOOP_CREDSTORE_PASSWORD)); + + jobConf = new JobConf(); + hiveConf = new HiveConf(); + setupConfigs(false, false, false, false); + + oldCredStoreLocation = Utilities.updateJobCredentialProviders(jobConf, hiveConf); + Assert.assertTrue(StringUtils.isBlank(oldCredStoreLocation)); + + Assert.assertTrue(StringUtils.isBlank(jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH))); + + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertNull(value); + + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), + Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + Assert.assertNull(value); + } + + + /* + * Test updateCredentialProviders does not corrupt existing values of + * Mapred env configs + */ + @Test + public void testUpdateCredentialProviders_5() throws Exception { + jobConf = new JobConf(); + hiveConf = new HiveConf(); + + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "k1=v1, k2=v2, HADOOP_CREDSTORE_PASSWORD=test"); + setupConfigs(false, true, false, true); + Utilities.updateJobCredentialProviders(jobConf, hiveConf); + + String value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k1"); + Assert.assertTrue(value.equals("v1")); + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k2"); + Assert.assertTrue(value.equals("v2")); + + jobConf = new JobConf(); + hiveConf = new HiveConf(); + + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "k1=v1, HADOOP_CREDSTORE_PASSWORD=test, k2=v2"); + setupConfigs(false, true, false, true); + Utilities.updateJobCredentialProviders(jobConf, hiveConf); + + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k1"); + Assert.assertTrue(value.equals("v1")); + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k2"); + Assert.assertTrue(value.equals("v2")); + + jobConf = new JobConf(); + hiveConf = new HiveConf(); + + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "HADOOP_CREDSTORE_PASSWORD=test, k1=v1, k2=v2"); + setupConfigs(false, true, false, true); + Utilities.updateJobCredentialProviders(jobConf, hiveConf); + + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k1"); + Assert.assertTrue(value.equals("v1")); + value = getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k2"); + Assert.assertTrue(value.equals("v2")); + } + /** + * Sets up the environment and configurations + * + * @param setHadoopCredProvider set hadoop credstore provider path + * @param setHadoopCredstorePassword set HADOOP_CREDSTORE_PASSWORD env variable + * @param setHiveCredPassword set HIVE_JOB_CREDSTORE_PASSWORD env variable + * @param setHiveProviderPath set HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION in the + * hive config + * @throws Exception + */ + private void setupConfigs(boolean setHadoopCredProvider, boolean setHadoopCredstorePassword, + boolean setHiveCredPassword, boolean setHiveProviderPath) throws Exception { + Map mockEnv = new HashMap<>(); + // sets the env variable HADOOP_CREDSTORE_PASSWORD to value defined by HADOOP_CREDSTORE_PASSWORD + // sets hadoop.security.credential.provider.path property to simulate default credential + // provider setup + if (setHadoopCredProvider) { + jobConf.set(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH, HADOOP_CREDSTORE_LOCATION); + } + if (setHadoopCredstorePassword) { + mockEnv.put(Constants.HADOOP_CREDENTIAL_PASSWORD_NAME, HADOOP_CREDSTORE_PASSWORD); + } + // sets the env variable HIVE_JOB_CREDSTORE_PASSWORD to value defined by + // HIVE_JOB_CREDSTORE_PASSWORD + if (setHiveCredPassword) { + mockEnv.put(Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_NAME, HIVE_JOB_CREDSTORE_PASSWORD); + } + TestHiveCredentialProviders.setEnv(mockEnv); + // set hive provider path in hiveConf if setHiveProviderPath is true + // simulates hive.server2.job.credstore.location property set in hive-site.xml/core-site.xml of + // HS2 + if (setHiveProviderPath) { + hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION.varname, + JOB_CREDSTORE_LOCATION); + } + } + + /* + * Extract value from a comma-separated key=value pairs + */ + private String getValueFromJobConf(String keyValuePairs, String key) { + if (keyValuePairs == null) { + return null; + } + String[] keyValues = keyValuePairs.split(","); + for (String kv : keyValues) { + String[] parts = kv.split("="); + if (key.equals(parts[0].trim())) { + return parts[1].trim(); + } + } + return null; + } + + @Test + public void testEnv() throws Exception { + Map mockEnv = new HashMap<>(); + mockEnv.put(Constants.HADOOP_CREDENTIAL_PASSWORD_NAME, HADOOP_CREDSTORE_PASSWORD); + mockEnv.put(Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_NAME, HIVE_JOB_CREDSTORE_PASSWORD); + TestHiveCredentialProviders.setEnv(mockEnv); + Assert.assertEquals( + getErrorMsg(HADOOP_CREDSTORE_PASSWORD, + System.getenv(Constants.HADOOP_CREDENTIAL_PASSWORD_NAME)), + HADOOP_CREDSTORE_PASSWORD, System.getenv(Constants.HADOOP_CREDENTIAL_PASSWORD_NAME)); + Assert.assertEquals( + getErrorMsg(HIVE_JOB_CREDSTORE_PASSWORD, + System.getenv(Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_NAME)), + HIVE_JOB_CREDSTORE_PASSWORD, + System.getenv(Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_NAME)); + } + + private String getErrorMsg(String expected, String found) { + return "Expect " + expected + " found " + found; + } +} diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 936fdafdb37c461a7a5deb97cba72d4db54a49e1..389c1fedc571f48eebf48f4b170103e92c820310 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -53,6 +53,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.EnvironmentUtils; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.Utils; @@ -206,6 +209,7 @@ private Thread startDriver(final RpcServer rpcServer, final String clientId, fin if (conf.containsKey(SparkClientFactory.CONF_KEY_IN_PROCESS)) { // Mostly for testing things quickly. Do not do this in production. + // when invoked in-process it inherits the environment variables of the parent LOG.warn("!!!! Running remote driver in-process. !!!!"); runnable = new Runnable() { @Override @@ -440,7 +444,16 @@ public void run() { // Prevent hive configurations from being visible in Spark. pb.environment().remove("HIVE_HOME"); pb.environment().remove("HIVE_CONF_DIR"); - + //add credential provider password to the child process's environment + //the credential provider location is provided in the jobConf when + //the job is submitted + String credstorePassword = EnvironmentUtils.getEnvVariable(Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_NAME); + if (StringUtils.isBlank(credstorePassword)) { + credstorePassword = EnvironmentUtils.getEnvVariable(Constants.HADOOP_CREDENTIAL_PASSWORD_NAME); + } + if (!StringUtils.isBlank(credstorePassword)) { + pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_NAME, credstorePassword); + } if (isTesting != null) { pb.environment().put("SPARK_TESTING", isTesting); }