diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java index 83517ce..b44f92f 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.common; +import java.net.InetAddress; +import java.net.UnknownHostException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -47,4 +50,20 @@ public static void cleanUpScratchDir(HiveConf hiveConf) { } } + /** + * Get the Inet address of the machine of the given host name. + * @param hostname The name of the host + * @return The network address of the the host + * @throws UnknownHostException + */ + public static InetAddress getHostAddress(String hostname) throws UnknownHostException { + InetAddress serverIPAddress; + if (hostname != null && !hostname.isEmpty()) { + serverIPAddress = InetAddress.getByName(hostname); + } else { + serverIPAddress = InetAddress.getLocalHost(); + } + return serverIPAddress; + } + } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 97fe7bc..9bf0d66 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -386,6 +386,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { // a symbolic name to reference in the Hive source code. Properties with non-null // values will override any values set in the underlying Hadoop configuration. HADOOPBIN("hadoop.bin.path", findHadoopBinary(), "", true), + YARNBIN("yarn.bin.path", findYarnBinary(), "", true), HIVE_FS_HAR_IMPL("fs.har.impl", "org.apache.hadoop.hive.shims.HiveHarFileSystem", "The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop versions less than 0.20"), MAPREDMAXSPLITSIZE(FileInputFormat.SPLIT_MAXSIZE, 256000000L, "", true), @@ -2597,6 +2598,11 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."), SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5", "Name of the SASL mechanism to use for authentication."), + SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "", + "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " + + "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." + + "This is only necessary if the host has mutiple network addresses and if a different network address other than " + + "hive.server2.thrift.bind.host is to be used."), SPARK_DYNAMIC_PARTITION_PRUNING( "hive.spark.dynamic.partition.pruning", false, "When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" + @@ -2787,16 +2793,27 @@ public String toString() { } private static String findHadoopBinary() { + String val = findHadoopHome(); + // if can't find hadoop home we can at least try /usr/bin/hadoop + val = (val == null ? File.separator + "usr" : val) + + File.separator + "bin" + File.separator + "hadoop"; + // Launch hadoop command file on windows. + return val + (Shell.WINDOWS ? ".cmd" : ""); + } + + private static String findYarnBinary() { + String val = findHadoopHome(); + val = (val == null ? "yarn" : val + File.separator + "bin" + File.separator + "yarn"); + return val + (Shell.WINDOWS ? ".cmd" : ""); + } + + private static String findHadoopHome() { String val = System.getenv("HADOOP_HOME"); // In Hadoop 1.X and Hadoop 2.X HADOOP_HOME is gone and replaced with HADOOP_PREFIX if (val == null) { val = System.getenv("HADOOP_PREFIX"); } - // and if all else fails we can at least try /usr/bin/hadoop - val = (val == null ? File.separator + "usr" : val) - + File.separator + "bin" + File.separator + "hadoop"; - // Launch hadoop command file on windows. - return val + (Shell.WINDOWS ? ".cmd" : ""); + return val; } public String getDefaultValue() { @@ -2916,7 +2933,8 @@ public boolean isHiddenConfig(String name) { private boolean isSparkRelatedConfig(String name) { boolean result = false; if (name.startsWith("spark")) { // Spark property. - result = true; + // for now we don't support changing spark app name on the fly + result = !name.equals("spark.app.name"); } else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode. String sparkMaster = get("spark.master"); if (sparkMaster != null && diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java new file mode 100644 index 0000000..b87de09 --- /dev/null +++ b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.genericudf.example; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * GenericUDFAbs. + * + */ +@Description(name = "add10", + value = "_FUNC_(x) - returns 10 plus the original value of x", + extended = "Example:\n" + + " > SELECT _FUNC_(0) FROM src LIMIT 1;\n" + + " 10\n" + + " > SELECT _FUNC_(-5) FROM src LIMIT 1;\n" + " 5") +public class GenericUDFAdd10 extends GenericUDF { + private transient PrimitiveCategory inputType; + private final DoubleWritable resultDouble = new DoubleWritable(); + private final LongWritable resultLong = new LongWritable(); + private final IntWritable resultInt = new IntWritable(); + private final HiveDecimalWritable resultDecimal = new HiveDecimalWritable(); + private transient PrimitiveObjectInspector argumentOI; + private transient Converter inputConverter; + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { + if (arguments.length != 1) { + throw new UDFArgumentLengthException( + "ADD10() requires 1 argument, got " + arguments.length); + } + + if (arguments[0].getCategory() != Category.PRIMITIVE) { + throw new UDFArgumentException( + "ADD10 only takes primitive types, got " + arguments[0].getTypeName()); + } + argumentOI = (PrimitiveObjectInspector) arguments[0]; + + inputType = argumentOI.getPrimitiveCategory(); + ObjectInspector outputOI = null; + switch (inputType) { + case SHORT: + case BYTE: + case INT: + inputConverter = ObjectInspectorConverters.getConverter(arguments[0], + PrimitiveObjectInspectorFactory.writableIntObjectInspector); + outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector; + break; + case LONG: + inputConverter = ObjectInspectorConverters.getConverter(arguments[0], + PrimitiveObjectInspectorFactory.writableLongObjectInspector); + outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector; + break; + case FLOAT: + case STRING: + case DOUBLE: + inputConverter = ObjectInspectorConverters.getConverter(arguments[0], + PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); + outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; + break; + case DECIMAL: + outputOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( + ((PrimitiveObjectInspector) arguments[0]).getTypeInfo()); + inputConverter = ObjectInspectorConverters.getConverter(arguments[0], + outputOI); + break; + default: + throw new UDFArgumentException( + "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType); + } + return outputOI; + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + Object valObject = arguments[0].get(); + if (valObject == null) { + return null; + } + switch (inputType) { + case SHORT: + case BYTE: + case INT: + valObject = inputConverter.convert(valObject); + resultInt.set(10 + ((IntWritable) valObject).get()); + return resultInt; + case LONG: + valObject = inputConverter.convert(valObject); + resultLong.set(10 + ((LongWritable) valObject).get()); + return resultLong; + case FLOAT: + case STRING: + case DOUBLE: + valObject = inputConverter.convert(valObject); + resultDouble.set(10.0 + ((DoubleWritable) valObject).get()); + return resultDouble; + case DECIMAL: + HiveDecimalObjectInspector decimalOI = + (HiveDecimalObjectInspector) argumentOI; + HiveDecimalWritable val = decimalOI.getPrimitiveWritableObject(valObject); + + if (val != null) { + resultDecimal.set(val.getHiveDecimal().add(HiveDecimal.create("10"))); + val = resultDecimal; + } + return val; + default: + throw new UDFArgumentException( + "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType); + } + } + + @Override + public String getDisplayString(String[] children) { + return getStandardDisplayString("add10", children); + } + +} diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml index aef7877..f1d9ddc 100644 --- a/data/conf/spark/yarn-client/hive-site.xml +++ b/data/conf/spark/yarn-client/hive-site.xml @@ -196,7 +196,7 @@ spark.master - yarn-client + yarn-cluster @@ -231,7 +231,22 @@ spark.executor.memory - 512m + 1g + + + + spark.yarn.executor.memoryOverhead + 0 + + + + spark.driver.memory + 1g + + + + spark.yarn.driver.memoryOverhead + 0 @@ -255,4 +270,9 @@ Internal marker for test. Used for masking env-dependent values + + hive.spark.client.connect.timeout + 30000ms + + diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 8318c3a..ec6a2c7 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1091,8 +1091,6 @@ spark.query.files=add_part_multiple.q, \ stats7.q, \ stats8.q, \ stats9.q, \ - stats_counter.q, \ - stats_counter_partitioned.q, \ stats_noscan_1.q, \ stats_noscan_2.q, \ stats_only_null.q, \ @@ -1262,6 +1260,7 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ empty_dir_in_table.q,\ external_table_with_space_in_location_path.q,\ file_with_header_footer.q,\ + gen_udf_example_add10.q,\ import_exported_table.q,\ index_bitmap3.q,\ index_bitmap_auto.q,\ @@ -1297,8 +1296,6 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ schemeAuthority2.q,\ scriptfile1.q,\ scriptfile1_win.q,\ - stats_counter.q,\ - stats_counter_partitioned.q,\ temp_table_external.q,\ truncate_column_buckets.q,\ uber_reduce.q,\ diff --git a/pom.xml b/pom.xml index 2066518..2d2a3de 100644 --- a/pom.xml +++ b/pom.xml @@ -166,7 +166,7 @@ 4.0.4 0.8.2 2.2.0 - 1.5.0 + 1.6.0 2.10 2.10.4 1.1 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 3289cfc..a659f7e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -440,13 +440,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { cntr = 1; logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); - String suffix = Integer.toString(conf.getDestTableId()); - String fullName = conf.getTableInfo().getTableName(); - if (fullName != null) { - suffix = suffix + "_" + fullName.toLowerCase(); - } - - statsMap.put(Counter.RECORDS_OUT + "_" + suffix, row_count); + statsMap.put(getCounterName(Counter.RECORDS_OUT), row_count); } catch (HiveException e) { throw e; } catch (Exception e) { @@ -455,6 +449,15 @@ protected void initializeOp(Configuration hconf) throws HiveException { } } + public String getCounterName(Counter counter) { + String suffix = Integer.toString(conf.getDestTableId()); + String fullName = conf.getTableInfo().getTableName(); + if (fullName != null) { + suffix = suffix + "_" + fullName.toLowerCase(); + } + return counter + "_" + suffix; + } + private void logOutputFormatError(Configuration hconf, HiveException ex) { StringWriter errorWriter = new StringWriter(); errorWriter.append("Failed to create output format; configuration: "); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 74b4802..e692460 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -170,11 +170,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { cntr = 1; logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); - String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); - if (context != null && !context.isEmpty()) { - context = "_" + context.replace(" ","_"); - } - statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter); + statsMap.put(getCounterName(Counter.RECORDS_OUT_INTERMEDIATE, hconf), recordCounter); List keys = conf.getKeyCols(); @@ -256,6 +252,14 @@ protected void initializeOp(Configuration hconf) throws HiveException { } } + public String getCounterName(Counter counter, Configuration hconf) { + String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); + if (context != null && !context.isEmpty()) { + context = "_" + context.replace(" ", "_"); + } + return counter + context; + } + /** * Initializes array of ExprNodeEvaluator. Adds Union field for distinct diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 259c12f..993d02b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -27,8 +27,10 @@ import java.util.Set; import org.apache.commons.compress.utils.CharsetNames; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -66,10 +68,16 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex public static Map initiateSparkConf(HiveConf hiveConf) { Map sparkConf = new HashMap(); + HBaseConfiguration.addHbaseResources(hiveConf); // set default spark configurations. sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); - sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME); + final String appNameKey = "spark.app.name"; + String appName = hiveConf.get(appNameKey); + if (appName == null) { + appName = SPARK_DEFAULT_APP_NAME; + } + sparkConf.put(appNameKey, appName); sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER); sparkConf.put("spark.kryo.referenceTracking", SPARK_DEFAULT_REFERENCE_TRACKING); @@ -133,7 +141,21 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex LOG.info(String.format( "load yarn property from hive configuration in %s mode (%s -> %s).", sparkMaster, propertyName, value)); + } else if (propertyName.equals(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)) { + String value = hiveConf.get(propertyName); + if (value != null && !value.isEmpty()) { + sparkConf.put("spark.hadoop." + propertyName, value); + } + } else if (propertyName.startsWith("hbase")) { + // Add HBase related configuration to Spark because in security mode, Spark needs it + // to generate hbase delegation token for Spark. This is a temp solution to deal with + // Spark problem. + String value = hiveConf.get(propertyName); + sparkConf.put("spark.hadoop." + propertyName, value); + LOG.info(String.format( + "load HBase configuration (%s -> %s).", propertyName, value)); } + if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { String value = RpcConfiguration.getValue(hiveConf, propertyName); sparkConf.put(propertyName, value); @@ -152,6 +174,15 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex classes.add(HiveKey.class.getName()); sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes)); + // set yarn queue name + final String sparkQueueNameKey = "spark.yarn.queue"; + if (sparkMaster.startsWith("yarn") && hiveConf.get(sparkQueueNameKey) == null) { + String queueName = hiveConf.get("mapreduce.job.queuename"); + if (queueName != null) { + sparkConf.put(sparkQueueNameKey, queueName); + } + } + return sparkConf; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 6380774..11e7116 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -297,8 +297,9 @@ public Serializable call(JobContext jc) throws Exception { // may need to load classes from this jar in other threads. Map addedJars = jc.getAddedJars(); if (addedJars != null && !addedJars.isEmpty()) { - SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir()); - localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";")); + List localAddedJars = SparkClientUtilities.addToClassPath(addedJars, + localJobConf, jc.getLocalTmpDir()); + localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(localAddedJars, ";")); } Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index eb93aca..26cce1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -30,11 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; @@ -44,7 +38,6 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.ScriptOperator; -import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic; @@ -58,25 +51,15 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; -import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkWork; -import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.util.StringUtils; -import org.apache.hive.spark.counter.SparkCounters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; @@ -86,7 +69,6 @@ private static final LogHelper console = new LogHelper(LOG); private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final long serialVersionUID = 1L; - private SparkCounters sparkCounters; @Override public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext, @@ -106,7 +88,7 @@ public int execute(DriverContext driverContext) { sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager); SparkWork sparkWork = getWork(); - sparkWork.setRequiredCounterPrefix(getCounterPrefixes()); + sparkWork.setRequiredCounterPrefix(getOperatorCounters()); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); @@ -116,8 +98,6 @@ public int execute(DriverContext driverContext) { rc = jobRef.monitorJob(); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); if (rc == 0) { - sparkCounters = sparkJobStatus.getCounter(); - // for RSC, we should get the counters after job has finished SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); if (LOG.isInfoEnabled() && sparkStatistics != null) { LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); @@ -233,10 +213,6 @@ public String getName() { return ((ReduceWork) children.get(0)).getReducer(); } - public SparkCounters getSparkCounters() { - return sparkCounters; - } - /** * Set the number of reducers for the spark work. */ @@ -250,126 +226,6 @@ private void printConfigInfo() throws IOException { console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "="); } - private Map> getCounterPrefixes() throws HiveException, MetaException { - Map> counters = getOperatorCounters(); - StatsTask statsTask = getStatsTaskInChildTasks(this); - String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); - // fetch table prefix if SparkTask try to gather table statistics based on counter. - if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) { - List prefixes = getRequiredCounterPrefix(statsTask); - for (String prefix : prefixes) { - List counterGroup = counters.get(prefix); - if (counterGroup == null) { - counterGroup = new LinkedList(); - counters.put(prefix, counterGroup); - } - counterGroup.add(StatsSetupConst.ROW_COUNT); - counterGroup.add(StatsSetupConst.RAW_DATA_SIZE); - } - } - return counters; - } - - private List getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException { - List prefixs = new LinkedList(); - StatsWork statsWork = statsTask.getWork(); - String tablePrefix = getTablePrefix(statsWork); - List> partitionSpecs = getPartitionSpecs(statsWork); - - if (partitionSpecs == null) { - prefixs.add(tablePrefix.endsWith(Path.SEPARATOR) ? tablePrefix : tablePrefix + Path.SEPARATOR); - } else { - for (Map partitionSpec : partitionSpecs) { - String prefixWithPartition = Utilities.join(tablePrefix, Warehouse.makePartPath(partitionSpec)); - prefixs.add(prefixWithPartition.endsWith(Path.SEPARATOR) ? prefixWithPartition : prefixWithPartition + Path.SEPARATOR); - } - } - - return prefixs; - } - - private String getTablePrefix(StatsWork work) throws HiveException { - String tableName; - if (work.getLoadTableDesc() != null) { - tableName = work.getLoadTableDesc().getTable().getTableName(); - } else if (work.getTableSpecs() != null) { - tableName = work.getTableSpecs().tableName; - } else { - tableName = work.getLoadFileDesc().getDestinationCreateTable(); - } - Table table; - try { - table = db.getTable(tableName); - } catch (HiveException e) { - LOG.warn("Failed to get table:" + tableName); - // For CTAS query, table does not exist in this period, just use table name as prefix. - return tableName.toLowerCase(); - } - return table.getDbName() + "." + table.getTableName(); - } - - private static StatsTask getStatsTaskInChildTasks(Task rootTask) { - - List> childTasks = rootTask.getChildTasks(); - if (childTasks == null) { - return null; - } - for (Task task : childTasks) { - if (task instanceof StatsTask) { - return (StatsTask) task; - } else { - Task childTask = getStatsTaskInChildTasks(task); - if (childTask instanceof StatsTask) { - return (StatsTask) childTask; - } else { - continue; - } - } - } - - return null; - } - - private List> getPartitionSpecs(StatsWork work) throws HiveException { - if (work.getLoadFileDesc() != null) { - return null; //we are in CTAS, so we know there are no partitions - } - Table table; - List> partitionSpecs = new ArrayList>(); - - if (work.getTableSpecs() != null) { - - // ANALYZE command - TableSpec tblSpec = work.getTableSpecs(); - table = tblSpec.tableHandle; - if (!table.isPartitioned()) { - return null; - } - // get all partitions that matches with the partition spec - List partitions = tblSpec.partitions; - if (partitions != null) { - for (Partition partition : partitions) { - partitionSpecs.add(partition.getSpec()); - } - } - } else if (work.getLoadTableDesc() != null) { - - // INSERT OVERWRITE command - LoadTableDesc tbd = work.getLoadTableDesc(); - table = db.getTable(tbd.getTable().getTableName()); - if (!table.isPartitioned()) { - return null; - } - DynamicPartitionCtx dpCtx = tbd.getDPCtx(); - if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions - // we could not get dynamic partition information before SparkTask execution. - } else { // static partition - partitionSpecs.add(tbd.getPartitionSpec()); - } - } - return partitionSpecs; - } - private Map> getOperatorCounters() { String groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); Map> counters = new HashMap>(); @@ -386,11 +242,11 @@ private static StatsTask getStatsTaskInChildTasks(Task r for (Operator operator : work.getAllOperators()) { if (operator instanceof FileSinkOperator) { for (FileSinkOperator.Counter counter : FileSinkOperator.Counter.values()) { - hiveCounters.add(counter.toString()); + hiveCounters.add(((FileSinkOperator) operator).getCounterName(counter)); } } else if (operator instanceof ReduceSinkOperator) { for (ReduceSinkOperator.Counter counter : ReduceSinkOperator.Counter.values()) { - hiveCounters.add(counter.toString()); + hiveCounters.add(((ReduceSinkOperator) operator).getCounterName(counter, conf)); } } else if (operator instanceof ScriptOperator) { for (ScriptOperator.Counter counter : ScriptOperator.Counter.values()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index a61cdc5..5a6bef9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -89,7 +89,8 @@ public static URI getURI(String path) throws URISyntaxException { */ public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException { Path localFile = new Path(source.getPath()); - Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source)); + Path remoteFile = new Path(SessionState.get().getSparkSession().getHDFSSessionDir(), + getFileName(source)); FileSystem fileSystem = FileSystem.get(conf); // Overwrite if the remote file already exists. Whether the file can be added // on executor is up to spark, i.e. spark.files.overwrite diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java index 3d4b39b..e1f8c1d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark.session; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; @@ -24,6 +25,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.SparkWork; +import java.io.IOException; + public interface SparkSession { /** * Initializes a Spark session for DAG execution. @@ -67,4 +70,9 @@ * Close session and release resources. */ void close(); + + /** + * Get an HDFS dir specific to the SparkSession + * */ + Path getHDFSSessionDir() throws IOException; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index f04e145..51c6715 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -20,6 +20,10 @@ import java.io.IOException; import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ObjectPair; @@ -37,11 +41,14 @@ public class SparkSessionImpl implements SparkSession { private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class); + private static final String SPARK_DIR = "_spark_session_dir"; private HiveConf conf; private boolean isOpen; private final String sessionId; private HiveSparkClient hiveSparkClient; + private Path scratchDir; + private final Object dirLock = new Object(); public SparkSessionImpl() { sessionId = makeSessionId(); @@ -118,6 +125,7 @@ public void close() { if (hiveSparkClient != null) { try { hiveSparkClient.close(); + cleanScratchDir(); } catch (IOException e) { LOG.error("Failed to close spark session (" + sessionId + ").", e); } @@ -125,6 +133,37 @@ public void close() { hiveSparkClient = null; } + private Path createScratchDir() throws IOException { + Path parent = new Path(SessionState.get().getHdfsScratchDirURIString(), SPARK_DIR); + Path sparkDir = new Path(parent, sessionId); + FileSystem fs = sparkDir.getFileSystem(conf); + FsPermission fsPermission = new FsPermission(HiveConf.getVar( + conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)); + fs.mkdirs(sparkDir, fsPermission); + fs.deleteOnExit(sparkDir); + return sparkDir; + } + + private void cleanScratchDir() throws IOException { + if (scratchDir != null) { + FileSystem fs = scratchDir.getFileSystem(conf); + fs.delete(scratchDir, true); + scratchDir = null; + } + } + + @Override + public Path getHDFSSessionDir() throws IOException { + if (scratchDir == null) { + synchronized (dirLock) { + if (scratchDir == null) { + scratchDir = createScratchDir(); + } + } + } + return scratchDir; + } + public static String makeSessionId() { return UUID.randomUUID().toString(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index fb0498a..6990e80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -34,10 +34,12 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { private RemoteSparkJobStatus sparkJobStatus; + private final HiveConf hiveConf; public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus) { super(hiveConf); this.sparkJobStatus = sparkJobStatus; + this.hiveConf = hiveConf; } @Override @@ -77,6 +79,7 @@ public int startMonitor() { Map progressMap = sparkJobStatus.getSparkStageProgress(); if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + printAppInfo(); // print job stages. console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId() + "] stages:"); @@ -137,4 +140,16 @@ public int startMonitor() { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); return rc; } + + private void printAppInfo() { + String sparkMaster = hiveConf.get("spark.master"); + if (sparkMaster != null && sparkMaster.startsWith("yarn")) { + String appID = sparkJobStatus.getAppID(); + if (appID != null) { + console.printInfo("Running with YARN Application = " + appID); + console.printInfo("Kill Command = " + + HiveConf.getVar(hiveConf, HiveConf.ConfVars.YARNBIN) + " application -kill " + appID); + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index fa45ec8..7959089 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -29,6 +29,8 @@ */ public interface SparkJobStatus { + String getAppID(); + int getJobId(); JobExecutionStatus getState() throws HiveException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index ebc5c16..3c15521 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -66,6 +66,11 @@ public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId, } @Override + public String getAppID() { + return sparkContext.sc().applicationId(); + } + + @Override public int getJobId() { return jobId; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index e8d581f..d84c026 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -62,6 +62,17 @@ public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle job } @Override + public String getAppID() { + Future getAppID = sparkClient.run(new GetAppIDJob()); + try { + return getAppID.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Failed to get APP ID.", e); + return null; + } + } + + @Override public int getJobId() { return jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : -1; } @@ -268,4 +279,15 @@ public JobExecutionStatus status() { } }; } + + private static class GetAppIDJob implements Job { + + public GetAppIDJob() { + } + + @Override + public String call(JobContext jc) throws Exception { + return jc.sc().sc().applicationId(); + } + } } diff --git a/ql/src/test/queries/clientpositive/gen_udf_example_add10.q b/ql/src/test/queries/clientpositive/gen_udf_example_add10.q new file mode 100644 index 0000000..69178c9 --- /dev/null +++ b/ql/src/test/queries/clientpositive/gen_udf_example_add10.q @@ -0,0 +1,13 @@ +add jar ${system:maven.local.repository}/org/apache/hive/hive-contrib/${system:hive.version}/hive-contrib-${system:hive.version}.jar; + +create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'; + +create table t1(x int,y double); +load data local inpath '../../data/files/T1.txt' into table t1; + +explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10; + +select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10; + +drop table t1; +drop temporary function example_add10; diff --git a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out new file mode 100644 index 0000000..cab2ec8 --- /dev/null +++ b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out @@ -0,0 +1,96 @@ +PREHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10' +PREHOOK: type: CREATEFUNCTION +PREHOOK: Output: example_add10 +POSTHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10' +POSTHOOK: type: CREATEFUNCTION +POSTHOOK: Output: example_add10 +PREHOOK: query: create table t1(x int,y double) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t1 +POSTHOOK: query: create table t1(x int,y double) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1 +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1 +POSTHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1 +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1 +PREHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: add10(x) (type: int), add10(y) (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: double) + sort order: -+ + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +18 28.0 +18 38.0 +17 27.0 +13 23.0 +12 22.0 +11 21.0 +PREHOOK: query: drop table t1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: drop table t1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +PREHOOK: query: drop temporary function example_add10 +PREHOOK: type: DROPFUNCTION +PREHOOK: Output: example_add10 +POSTHOOK: query: drop temporary function example_add10 +POSTHOOK: type: DROPFUNCTION +POSTHOOK: Output: example_add10 diff --git a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out new file mode 100644 index 0000000..493d0a4 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out @@ -0,0 +1,102 @@ +PREHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10' +PREHOOK: type: CREATEFUNCTION +PREHOOK: Output: example_add10 +POSTHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10' +POSTHOOK: type: CREATEFUNCTION +POSTHOOK: Output: example_add10 +PREHOOK: query: create table t1(x int,y double) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t1 +POSTHOOK: query: create table t1(x int,y double) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1 +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1 +POSTHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1 +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1 +PREHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (SORT, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: add10(x) (type: int), add10(y) (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: double) + sort order: -+ + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +18 28.0 +18 38.0 +17 27.0 +13 23.0 +12 22.0 +11 21.0 +PREHOOK: query: drop table t1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: drop table t1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +PREHOOK: query: drop temporary function example_add10 +PREHOOK: type: DROPFUNCTION +PREHOOK: Output: example_add10 +POSTHOOK: query: drop temporary function example_add10 +POSTHOOK: type: DROPFUNCTION +POSTHOOK: Output: example_add10 diff --git a/ql/src/test/templates/TestNegativeCliDriver.vm b/ql/src/test/templates/TestNegativeCliDriver.vm index d1cbbfd..592d64f 100644 --- a/ql/src/test/templates/TestNegativeCliDriver.vm +++ b/ql/src/test/templates/TestNegativeCliDriver.vm @@ -34,13 +34,17 @@ public class $className { static { MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode"); + String hiveConfDir = "$hiveConfDir"; String initScript = "$initScript"; String cleanupScript = "$cleanupScript"; try { String hadoopVer = "$hadoopVersion"; - qt = new QTestUtil((HIVE_ROOT + "$resultsDir"), (HIVE_ROOT + "$logDir"), miniMR, null, hadoopVer, - initScript, cleanupScript, false, false); + if (!hiveConfDir.isEmpty()) { + hiveConfDir = HIVE_ROOT + hiveConfDir; + } + qt = new QTestUtil((HIVE_ROOT + "$resultsDir"), (HIVE_ROOT + "$logDir"), miniMR, + hiveConfDir, hadoopVer, initScript, cleanupScript, false, false); // do a one time initialization qt.cleanUp(); qt.createSources(); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 78b4b31..5c8fa3c 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hive.service.AbstractService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.ServiceUtils; @@ -203,21 +204,19 @@ public void processContext(ServerContext serverContext, @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - // Initialize common server configs needed in both binary & http modes - String portString; - hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); + + String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); if (hiveHost == null) { hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); } try { - if (hiveHost != null && !hiveHost.isEmpty()) { - serverIPAddress = InetAddress.getByName(hiveHost); - } else { - serverIPAddress = InetAddress.getLocalHost(); - } + serverIPAddress = ServerUtils.getHostAddress(hiveHost); } catch (UnknownHostException e) { throw new ServiceException(e); } + + // Initialize common server configs needed in both binary & http modes + String portString; // HTTP mode if (HiveServer2.isHTTPTransportMode(hiveConf)) { workerKeepAliveTime = diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index c03ae35..31060a2 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -526,6 +526,8 @@ public MiniSparkShim(Configuration conf, int numberOfTaskTrackers, mr = new MiniSparkOnYARNCluster("sparkOnYarn"); conf.set("fs.defaultFS", nameNode); conf.set("yarn.resourcemanager.scheduler.class", "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + // disable resource monitoring, although it should be off by default + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, false); configureImpersonation(conf); mr.init(conf); mr.start(); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index b779f3f..6251861 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -23,6 +23,7 @@ import java.io.File; import java.net.URL; import java.net.URLClassLoader; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -43,15 +44,18 @@ * Add new elements to the classpath. * * @param newPaths Map of classpath elements and corresponding timestamp + * @return locally accessible files corresponding to the newPaths */ - public static void addToClassPath(Map newPaths, Configuration conf, File localTmpDir) - throws Exception { + public static List addToClassPath(Map newPaths, Configuration conf, + File localTmpDir) throws Exception { URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader(); List curPath = Lists.newArrayList(loader.getURLs()); + List localNewPaths = new ArrayList<>(); boolean newPathAdded = false; for (Map.Entry entry : newPaths.entrySet()) { URL newUrl = urlFromPathString(entry.getKey(), entry.getValue(), conf, localTmpDir); + localNewPaths.add(newUrl.toString()); if (newUrl != null && !curPath.contains(newUrl)) { curPath.add(newUrl); LOG.info("Added jar[" + newUrl + "] to classpath."); @@ -64,6 +68,7 @@ public static void addToClassPath(Map newPaths, Configuration conf new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader); Thread.currentThread().setContextClassLoader(newLoader); } + return localNewPaths; } /** diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index 9c8cea0..e387659 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -18,20 +18,19 @@ package org.apache.hive.spark.client.rpc; import java.io.IOException; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; + import javax.security.sasl.Sasl; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; @@ -49,15 +48,14 @@ HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, - HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname + HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname ); public static final ImmutableSet HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname ); - public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address"; - /** Prefix for other SASL options. */ public static final String RPC_SASL_OPT_PREFIX = "hive.spark.client.rpc.sasl."; @@ -91,39 +89,22 @@ int getSecretBits() { return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal; } + /** + * Here we assume that the remote driver will connect back to HS2 using the same network interface + * as if it were just a HS2 client. If this isn't true, we can have a separate configuration for that. + * For now, I think we are okay. + * @return server host name in the network + * @throws IOException + */ String getServerAddress() throws IOException { - String value = config.get(SERVER_LISTEN_ADDRESS_KEY); - if (value != null) { - return value; - } - - InetAddress address = InetAddress.getLocalHost(); - if (address.isLoopbackAddress()) { - // Address resolves to something like 127.0.1.1, which happens on Debian; - // try to find a better address using the local network interfaces - Enumeration ifaces = NetworkInterface.getNetworkInterfaces(); - while (ifaces.hasMoreElements()) { - NetworkInterface ni = ifaces.nextElement(); - Enumeration addrs = ni.getInetAddresses(); - while (addrs.hasMoreElements()) { - InetAddress addr = addrs.nextElement(); - if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress() - && addr instanceof Inet4Address) { - // We've found an address that looks reasonable! - LOG.warn("Your hostname, {}, resolves to a loopback address; using {} " - + " instead (on interface {})", address.getHostName(), addr.getHostAddress(), - ni.getName()); - LOG.warn("Set '{}' if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY); - return addr.getHostAddress(); - } - } + String hiveHost = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS); + if(StringUtils.isEmpty(hiveHost)) { + hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); + if (hiveHost == null) { + hiveHost = config.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); } } - - LOG.warn("Your hostname, {}, resolves to a loopback address, but we couldn't find " - + " any external IP address!", address.getHostName()); - LOG.warn("Set {} if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY); - return address.getHostName(); + return ServerUtils.getHostAddress(hiveHost).getHostName(); } String getRpcChannelLogLevel() {