commit 3aab12f9e0a34863023bf77e95757c42a7ebbef1 Author: Sahil Takiar Date: Mon Apr 2 09:31:38 2018 -0700 HIVE-19079: Add extended query string to Spark job description diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7942608118..b47e4b735d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1491,6 +1491,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVEQUERYID("hive.query.id", "", "ID for query being executed (might be multiple per a session)"), + HIVESPARKJOBNAMELENGTH("hive.spark.jobname.length", 100000, "max jobname length for Hive on " + + "Spark queries"), HIVEJOBNAMELENGTH("hive.jobname.length", 50, "max jobname length"), // hive jar diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 52799b30c3..72f1b00b8c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2033,7 +2033,13 @@ private void execute() throws CommandProcessorResponse { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE); boolean noName = StringUtils.isEmpty(conf.get(MRJobConfig.JOB_NAME)); - int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); + + int maxlen; + if ("spark".equals(conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) { + maxlen = conf.getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH); + } else { + maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); + } Metrics metrics = MetricsFactory.getInstance(); String queryId = queryState.getQueryId(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index e8f39aeabb..154d38fd64 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Splitter; import com.google.common.base.Strings; @@ -306,7 +307,8 @@ public void close() { localJars.clear(); } - private static class JobStatusJob implements Job { + @VisibleForTesting + static class JobStatusJob implements Job { private static final long serialVersionUID = 1L; private final byte[] jobConfBytes; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java new file mode 100644 index 0000000000..239c09880f --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java @@ -0,0 +1,97 @@ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hive.spark.client.JobContext; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Paths; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestHiveSparkClient { + + @Test + public void testSetJobGroupAndDescription() throws Exception { + + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactory.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + conf.set("spark.master", "local"); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestHiveSparkClient-local-dir").toString()); + + SessionState.start(conf); + FileSystem fs = FileSystem.getLocal(conf); + Path tmpDir = new Path("TestSparkPlan-tmp"); + + IDriver driver = null; + JavaSparkContext sc = null; + + try { + driver = DriverFactory.newDriver(conf); + Assert.assertEquals(0, driver.run("create table test (col int)").getResponseCode()); + + String query = "select * from test order by col"; + driver.compile(query); + List sparkTasks = Utilities.getSparkTasks(driver.getPlan().getRootTasks()); + Assert.assertEquals(1, sparkTasks.size()); + + SparkTask sparkTask = sparkTasks.get(0); + + conf.set(MRJobConfig.JOB_NAME, query); + JobConf jobConf = new JobConf(conf); + + SparkConf sparkConf = new SparkConf(); + sparkConf.setMaster("local"); + sparkConf.setAppName("TestSparkPlan-app"); + sc = new JavaSparkContext(sparkConf); + + byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf); + byte[] scratchDirBytes = KryoSerializer.serialize(tmpDir); + byte[] sparkWorkBytes = KryoSerializer.serialize(sparkTask.getWork()); + + RemoteHiveSparkClient.JobStatusJob job = new RemoteHiveSparkClient.JobStatusJob(jobConfBytes, + scratchDirBytes, sparkWorkBytes); + + JobContext mockJobContext = mock(JobContext.class); + when(mockJobContext.sc()).thenReturn(sc); + + job.call(mockJobContext); + + Assert.assertTrue(sc.getLocalProperty("spark.job.description").contains(query)); + Assert.assertTrue(sc.getLocalProperty("spark.jobGroup.id") + .contains(sparkTask.getWork().getQueryId())); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("drop table if exists test").getResponseCode()); + driver.destroy(); + } + if (sc != null) { + sc.close(); + } + if (fs.exists(tmpDir)) { + fs.delete(tmpDir, true); + } + } + } +}