diff --git ql/pom.xml ql/pom.xml index 06d7f27..a8e779c 100644 --- ql/pom.xml +++ ql/pom.xml @@ -61,6 +61,11 @@ hive-shims ${project.version} + + org.apache.hive + spark-client + ${project.version} + com.esotericsoftware.kryo diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java new file mode 100644 index 0000000..fc63180 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.plan.SparkWork; + +import java.io.Closeable; +import java.io.Serializable; + +public interface HiveSparkClient extends Serializable, Closeable { + /** + * HiveSparkClient should generate Spark RDD graph by given sparkWork and driverContext, + * and submit RDD graph to Spark cluster. + * @param driverContext + * @param sparkWork + * @return SparkJobRef could be used to track spark job progress and metrics. + * @throws Exception + */ + public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java new file mode 100644 index 0000000..cd423be --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.commons.compress.utils.CharsetNames; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +public class HiveSparkClientFactory { + protected static transient final Log LOG = LogFactory + .getLog(HiveSparkClientFactory.class); + + private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; + private static final String SPARK_DEFAULT_MASTER = "local"; + private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark"; + + public static HiveSparkClient createHiveSparkClient(Configuration configuration) + throws IOException, SparkException { + + Map conf = initiateSparkConf(configuration); + // Submit spark job through local spark context while spark master is local mode, otherwise submit + // spark job through remote spark context. + String master = conf.get("spark.master"); + if (master.equals("local") || master.startsWith("local[")) { + // With local spark context, all user sessions share the same spark context. + return LocalHiveSparkClient.getInstance(generateSparkConf(conf)); + } else { + return new RemoteHiveSparkClient(conf); + } + } + + private static Map initiateSparkConf(Configuration hiveConf) { + Map sparkConf = new HashMap(); + + // set default spark configurations. + sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); + sparkConf.put("spark.app.name", SAPRK_DEFAULT_APP_NAME); + sparkConf.put("spark.serializer", + "org.apache.spark.serializer.KryoSerializer"); + sparkConf.put("spark.default.parallelism", "1"); + + // load properties from spark-defaults.conf. + InputStream inputStream = null; + try { + inputStream = HiveSparkClientFactory.class.getClassLoader() + .getResourceAsStream(SPARK_DEFAULT_CONF_FILE); + if (inputStream != null) { + LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE); + Properties properties = new Properties(); + properties.load(new InputStreamReader(inputStream, CharsetNames.UTF_8)); + for (String propertyName : properties.stringPropertyNames()) { + if (propertyName.startsWith("spark")) { + String value = properties.getProperty(propertyName); + sparkConf.put(propertyName, properties.getProperty(propertyName)); + LOG.info(String.format( + "load spark configuration from %s (%s -> %s).", + SPARK_DEFAULT_CONF_FILE, propertyName, value)); + } + } + } + } catch (IOException e) { + LOG.info("Failed to open spark configuration file:" + + SPARK_DEFAULT_CONF_FILE, e); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + LOG.debug("Failed to close inputstream.", e); + } + } + } + + // load properties from hive configurations. + for (Map.Entry entry : hiveConf) { + String propertyName = entry.getKey(); + if (propertyName.startsWith("spark")) { + String value = entry.getValue(); + sparkConf.put(propertyName, value); + LOG.info(String.format( + "load spark configuration from hive configuration (%s -> %s).", + propertyName, value)); + } + } + + return sparkConf; + } + + private static SparkConf generateSparkConf(Map conf) { + SparkConf sparkConf = new SparkConf(false); + for (Map.Entry entry : conf.entrySet()) { + sparkConf.set(entry.getKey(), entry.getValue()); + } + return sparkConf; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java new file mode 100644 index 0000000..797bd11 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.SparkException; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.ui.jobs.JobProgressListener; +import scala.Tuple2; + +import java.io.IOException; +import java.util.*; + +/** + * LocalSparkClient submit Spark job in local driver, it's responsible for build spark client + * environment and execute spark work. + */ +public class LocalHiveSparkClient implements HiveSparkClient { + private static final long serialVersionUID = 1L; + + private static final String MR_JAR_PROPERTY = "tmpjars"; + protected static transient final Log LOG = LogFactory + .getLog(LocalHiveSparkClient.class); + + private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings(); + + private static LocalHiveSparkClient client; + + public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf) { + if (client == null) { + client = new LocalHiveSparkClient(sparkConf); + } + return client; + } + + /** + * Get Spark shuffle memory per task, and total number of cores. This + * information can be used to estimate how many reducers a task can have. + * + * @return a tuple, the first element is the shuffle memory per task in bytes, + * the second element is the number of total cores usable by the client + */ + public Tuple2 getMemoryAndCores() { + SparkContext sparkContext = sc.sc(); + SparkConf sparkConf = sparkContext.conf(); + int cores = sparkConf.getInt("spark.executor.cores", 1); + double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2); + // sc.executorMemory() is in MB, need to convert to bytes + long memoryPerTask = + (long) (sparkContext.executorMemory() * memoryFraction * 1024 * 1024 / cores); + int executors = sparkContext.getExecutorMemoryStatus().size(); + int totalCores = executors * cores; + LOG.info("Spark cluster current has executors: " + executors + + ", cores per executor: " + cores + ", memory per executor: " + + sparkContext.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction); + return new Tuple2(Long.valueOf(memoryPerTask), + Integer.valueOf(totalCores)); + } + + private JavaSparkContext sc; + + private List localJars = new ArrayList(); + + private List localFiles = new ArrayList(); + + private JobStateListener jobStateListener; + + private JobProgressListener jobProgressListener; + + private LocalHiveSparkClient(SparkConf sparkConf) { + sc = new JavaSparkContext(sparkConf); + jobStateListener = new JobStateListener(); + jobProgressListener = new JobProgressListener(sparkConf); + sc.sc().listenerBus().addListener(jobStateListener); + sc.sc().listenerBus().addListener(jobProgressListener); + } + + @Override + public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception { + Context ctx = driverContext.getCtx(); + HiveConf hiveConf = (HiveConf) ctx.getConf(); + refreshLocalResources(sparkWork, hiveConf); + JobConf jobConf = new JobConf(hiveConf); + + // Create temporary scratch dir + Path emptyScratchDir; + emptyScratchDir = ctx.getMRTmpPath(); + FileSystem fs = emptyScratchDir.getFileSystem(jobConf); + fs.mkdirs(emptyScratchDir); + + SparkCounters sparkCounters = new SparkCounters(sc, hiveConf); + Map> prefixes = sparkWork.getRequiredCounterPrefix(); + if (prefixes != null) { + for (String group : prefixes.keySet()) { + for (String counterName : prefixes.get(group)) { + sparkCounters.createCounter(group, counterName); + } + } + } + SparkReporter sparkReporter = new SparkReporter(sparkCounters); + + // Generate Spark plan + SparkPlanGenerator gen = + new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter); + SparkPlan plan = gen.generate(sparkWork); + + // Execute generated plan. + JavaPairRDD finalRDD = plan.generateGraph(); + // We use Spark RDD async action to submit job as it's the only way to get jobId now. + JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); + // As we always use foreach action to submit RDD graph, it would only trigger on job. + int jobId = future.jobIds().get(0); + SimpleSparkJobStatus sparkJobStatus = + new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future); + return new SparkJobRef(Integer.toString(jobId), sparkJobStatus); + } + + /** + * At this point single SparkContext is used by more than one thread, so make this + * method synchronized. + * + * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an + * issue we have to live with until multiple SparkContexts are supported in a single JVM. + */ + private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { + // add hive-exec jar + addJars((new JobConf(this.getClass())).getJar()); + + // add aux jars + addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)); + + // add added jars + String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); + addJars(addedJars); + + // add plugin module jars on demand + // jobConf will hold all the configuration for hadoop, tez, and hive + JobConf jobConf = new JobConf(conf); + jobConf.set(MR_JAR_PROPERTY, ""); + for (BaseWork work : sparkWork.getAllWork()) { + work.configureJobConf(jobConf); + } + addJars(conf.get(MR_JAR_PROPERTY)); + + // add added files + String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); + addResources(addedFiles); + + // add added archives + String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); + addResources(addedArchives); + } + + private void addResources(String addedFiles) { + for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { + if (!localFiles.contains(addedFile)) { + localFiles.add(addedFile); + sc.addFile(addedFile); + } + } + } + + private void addJars(String addedJars) { + for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { + if (!localJars.contains(addedJar)) { + localJars.add(addedJar); + sc.addJar(addedJar); + } + } + } + + @Override + public void close() { + sc.stop(); + client = null; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java new file mode 100644 index 0000000..93d486f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hive.spark.client.Job; +import org.apache.hive.spark.client.JobContext; +import org.apache.hive.spark.client.JobHandle; +import org.apache.hive.spark.client.SparkClient; +import org.apache.hive.spark.client.SparkClientFactory; +import org.apache.spark.SparkException; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaPairRDD; + +import java.io.IOException; +import java.io.Serializable; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which + * wrap a spark job request and send to an remote SparkContext. + */ +public class RemoteHiveSparkClient implements HiveSparkClient { + private static final long serialVersionUID = 1L; + + private static final String MR_JAR_PROPERTY = "tmpjars"; + protected static transient final Log LOG = LogFactory + .getLog(RemoteHiveSparkClient.class); + + private static transient final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings(); + + private transient SparkClient remoteClient; + + private transient List localJars = new ArrayList(); + + private transient List localFiles = new ArrayList(); + + RemoteHiveSparkClient(Map sparkConf) throws IOException, SparkException { + SparkClientFactory.initialize(sparkConf); + remoteClient = SparkClientFactory.createClient(sparkConf); + } + + @Override + public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception { + final Context ctx = driverContext.getCtx(); + final HiveConf hiveConf = (HiveConf) ctx.getConf(); + refreshLocalResources(sparkWork, hiveConf); + final JobConf jobConf = new JobConf(hiveConf); + + // Create temporary scratch dir + final Path emptyScratchDir = ctx.getMRTmpPath(); + FileSystem fs = emptyScratchDir.getFileSystem(jobConf); + fs.mkdirs(emptyScratchDir); + + final byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf); + final byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir); + final byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork); + + JobHandle jobHandle = remoteClient.submit(new Job() { + @Override + public Serializable call(JobContext jc) throws Exception { + JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes); + Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class); + SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class); + + SparkCounters sparkCounters = new SparkCounters(jc.sc(), localJobConf); + Map> prefixes = localSparkWork.getRequiredCounterPrefix(); + if (prefixes != null) { + for (String group : prefixes.keySet()) { + for (String counterName : prefixes.get(group)) { + sparkCounters.createCounter(group, counterName); + } + } + } + SparkReporter sparkReporter = new SparkReporter(sparkCounters); + + // Generate Spark plan + SparkPlanGenerator gen = + new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter); + SparkPlan plan = gen.generate(localSparkWork); + + // Execute generated plan. + JavaPairRDD finalRDD = plan.generateGraph(); + // We use Spark RDD async action to submit job as it's the only way to get jobId now. + JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); + jc.monitor(future); + return null; + } + }); + jobHandle.get(); + return new SparkJobRef(jobHandle.getClientJobId()); + } + + private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { + // add hive-exec jar + addJars((new JobConf(this.getClass())).getJar()); + + // add aux jars + addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)); + + // add added jars + String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); + addJars(addedJars); + + // add plugin module jars on demand + // jobConf will hold all the configuration for hadoop, tez, and hive + JobConf jobConf = new JobConf(conf); + jobConf.set(MR_JAR_PROPERTY, ""); + for (BaseWork work : sparkWork.getAllWork()) { + work.configureJobConf(jobConf); + } + addJars(conf.get(MR_JAR_PROPERTY)); + + // add added files + String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); + addResources(addedFiles); + + // add added archives + String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); + addResources(addedArchives); + } + + private void addResources(String addedFiles) { + for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { + if (!localFiles.contains(addedFile)) { + localFiles.add(addedFile); + try { + remoteClient.addFile(SparkUtilities.getURL(addedFile)); + } catch (MalformedURLException e) { + LOG.warn("Failed to add file:" + addedFile); + } + } + } + } + + private void addJars(String addedJars) { + for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { + if (!localJars.contains(addedJar)) { + localJars.add(addedJar); + try { + remoteClient.addJar(SparkUtilities.getURL(addedJar)); + } catch (MalformedURLException e) { + LOG.warn("Failed to add jar:" + addedJar); + } + } + } + } + + @Override + public void close() { + remoteClient.stop(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java deleted file mode 100644 index ee16c9e..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ /dev/null @@ -1,288 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.spark; - -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; -import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener; -import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus; -import org.apache.hadoop.hive.ql.io.HiveKey; -import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.SparkWork; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaFutureAction; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.ui.jobs.JobProgressListener; - -import scala.Tuple2; - -import com.google.common.base.Splitter; -import com.google.common.base.Strings; - -public class SparkClient implements Serializable { - private static final long serialVersionUID = 1L; - - private static final String MR_JAR_PROPERTY = "tmpjars"; - protected static transient final Log LOG = LogFactory - .getLog(SparkClient.class); - - private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings(); - - private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; - private static final String SPARK_DEFAULT_MASTER = "local"; - private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark"; - - private static SparkClient client; - - public static synchronized SparkClient getInstance(Configuration hiveConf) { - if (client == null) { - client = new SparkClient(hiveConf); - } - return client; - } - - /** - * Get Spark shuffle memory per task, and total number of cores. This - * information can be used to estimate how many reducers a task can have. - * - * @return a tuple, the first element is the shuffle memory per task in bytes, - * the second element is the number of total cores usable by the client - */ - public static Tuple2 - getMemoryAndCores(Configuration hiveConf) { - SparkClient client = getInstance(hiveConf); - SparkContext sc = client.sc.sc(); - SparkConf sparkConf = sc.conf(); - int cores = sparkConf.getInt("spark.executor.cores", sc.defaultParallelism()); - double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2); - // sc.executorMemory() is in MB, need to convert to bytes - long memoryPerTask = - (long) (sc.executorMemory() * memoryFraction * 1024 * 1024 / cores); - int executors = sc.getExecutorMemoryStatus().size(); - int totalCores = executors * cores; - LOG.info("Spark cluster current has executors: " + executors - + ", cores per executor: " + cores + ", memory per executor: " - + sc.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction); - return new Tuple2(Long.valueOf(memoryPerTask), - Integer.valueOf(totalCores)); - } - - private JavaSparkContext sc; - - private List localJars = new ArrayList(); - - private List localFiles = new ArrayList(); - - private JobStateListener jobStateListener; - - private JobProgressListener jobProgressListener; - - private SparkClient(Configuration hiveConf) { - SparkConf sparkConf = initiateSparkConf(hiveConf); - sc = new JavaSparkContext(sparkConf); - jobStateListener = new JobStateListener(); - jobProgressListener = new JobProgressListener(sparkConf); - sc.sc().listenerBus().addListener(jobStateListener); - sc.sc().listenerBus().addListener(jobProgressListener); - } - - private SparkConf initiateSparkConf(Configuration hiveConf) { - SparkConf sparkConf = new SparkConf(); - - // set default spark configurations. - sparkConf.set("spark.master", SPARK_DEFAULT_MASTER); - sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME); - sparkConf.set("spark.serializer", - "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.default.parallelism", "1"); - // load properties from spark-defaults.conf. - InputStream inputStream = null; - try { - inputStream = this.getClass().getClassLoader() - .getResourceAsStream(SPARK_DEFAULT_CONF_FILE); - if (inputStream != null) { - LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE); - Properties properties = new Properties(); - properties.load(inputStream); - for (String propertyName : properties.stringPropertyNames()) { - if (propertyName.startsWith("spark")) { - String value = properties.getProperty(propertyName); - sparkConf.set(propertyName, properties.getProperty(propertyName)); - LOG.info(String.format( - "load spark configuration from %s (%s -> %s).", - SPARK_DEFAULT_CONF_FILE, propertyName, value)); - } - } - } - } catch (IOException e) { - LOG.info("Failed to open spark configuration file:" - + SPARK_DEFAULT_CONF_FILE, e); - } finally { - if (inputStream != null) { - try { - inputStream.close(); - } catch (IOException e) { - LOG.debug("Failed to close inputstream.", e); - } - } - } - - // load properties from hive configurations. - Iterator> iterator = hiveConf.iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - String propertyName = entry.getKey(); - if (propertyName.startsWith("spark")) { - String value = entry.getValue(); - sparkConf.set(propertyName, value); - LOG.info(String.format( - "load spark configuration from hive configuration (%s -> %s).", - propertyName, value)); - } - } - - return sparkConf; - } - - public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception { - Context ctx = driverContext.getCtx(); - HiveConf hiveConf = (HiveConf) ctx.getConf(); - refreshLocalResources(sparkWork, hiveConf); - JobConf jobConf = new JobConf(hiveConf); - - // Create temporary scratch dir - Path emptyScratchDir; - emptyScratchDir = ctx.getMRTmpPath(); - FileSystem fs = emptyScratchDir.getFileSystem(jobConf); - fs.mkdirs(emptyScratchDir); - - SparkCounters sparkCounters = new SparkCounters(sc, hiveConf); - Map> prefixes = sparkWork.getRequiredCounterPrefix(); - // register spark counters before submit spark job. - if (prefixes != null) { - for (String group : prefixes.keySet()) { - for (String counter : prefixes.get(group)) { - sparkCounters.createCounter(group, counter); - } - } - } - SparkReporter sparkReporter = new SparkReporter(sparkCounters); - - // Generate Spark plan - SparkPlanGenerator gen = - new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter); - SparkPlan plan = gen.generate(sparkWork); - - // Execute generated plan. - JavaPairRDD finalRDD = plan.generateGraph(); - // We use Spark RDD async action to submit job as it's the only way to get jobId now. - JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); - // As we always use foreach action to submit RDD graph, it would only trigger on job. - int jobId = future.jobIds().get(0); - SimpleSparkJobStatus sparkJobStatus = - new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future); - return new SparkJobRef(jobId, sparkJobStatus); - } - - /** - * At this point single SparkContext is used by more than one thread, so make this - * method synchronized. - * - * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an - * issue we have to live with until multiple SparkContexts are supported in a single JVM. - */ - private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { - // add hive-exec jar - addJars((new JobConf(this.getClass())).getJar()); - - // add aux jars - addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)); - - // add added jars - String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); - addJars(addedJars); - - // add plugin module jars on demand - // jobConf will hold all the configuration for hadoop, tez, and hive - JobConf jobConf = new JobConf(conf); - jobConf.set(MR_JAR_PROPERTY, ""); - for (BaseWork work : sparkWork.getAllWork()) { - work.configureJobConf(jobConf); - } - addJars(conf.get(MR_JAR_PROPERTY)); - - // add added files - String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); - addResources(addedFiles); - - // add added archives - String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); - addResources(addedArchives); - } - - private void addResources(String addedFiles) { - for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { - if (!localFiles.contains(addedFile)) { - localFiles.add(addedFile); - sc.addFile(addedFile); - } - } - } - - private void addJars(String addedJars) { - for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { - if (!localJars.contains(addedJar)) { - localJars.add(addedJar); - sc.addJar(addedJar); - } - } - } - - public void close() { - sc.stop(); - client = null; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 2fea62d..3613784 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -110,15 +110,17 @@ public int execute(DriverContext driverContext) { SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); - sparkCounters = sparkJobStatus.getCounter(); - SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); - monitor.startMonitor(); - SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); - if (LOG.isInfoEnabled() && sparkStatistics != null) { - LOG.info(String.format("=====Spark Job[%d] statistics=====", jobRef.getJobId())); - logSparkStatistic(sparkStatistics); + if (sparkJobStatus != null) { + sparkCounters = sparkJobStatus.getCounter(); + SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); + monitor.startMonitor(); + SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); + if (LOG.isInfoEnabled() && sparkStatistics != null) { + LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); + logSparkStatistic(sparkStatistics); + } + sparkJobStatus.cleanup(); } - sparkJobStatus.cleanup(); rc = 0; } catch (Exception e) { LOG.error("Failed to execute spark task.", e); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index e3e6d16..2f09384 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -20,6 +20,12 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; + /** * Contains utilities methods used as part of Spark tasks */ @@ -41,4 +47,22 @@ public static BytesWritable copyBytesWritable(BytesWritable bw) { copy.set(bw); return copy; } + + public static URL getURL(String path) throws MalformedURLException { + URL url = null; + try { + URI uri = new URI(path); + if (path != null) { + if (uri.getScheme() != null) { + url = uri.toURL(); + } else { + // if no file schema in path, we assume it's file on local fs. + url = new File(path).toURI().toURL(); + } + } + } catch (URISyntaxException e) { + // do nothing here, just return null if input path is not a valid URI. + } + return url; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 51e0510..1f7ed83 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -18,12 +18,17 @@ package org.apache.hadoop.hive.ql.exec.spark.session; import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.exec.spark.SparkClient; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.plan.SparkWork; +import java.io.IOException; import java.util.UUID; /** @@ -31,10 +36,12 @@ * SparkClient which is shared by all SparkSession instances. */ public class SparkSessionImpl implements SparkSession { + private static final Log LOG = LogFactory.getLog(SparkSession.class); + private HiveConf conf; private boolean isOpen; private final String sessionId; - private SparkClient sparkClient; + private HiveSparkClient hiveSparkClient; public SparkSessionImpl() { sessionId = makeSessionId(); @@ -49,8 +56,9 @@ public void open(HiveConf conf) { @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs."); - sparkClient = SparkClient.getInstance(driverContext.getCtx().getConf()); - return sparkClient.execute(driverContext, sparkWork); + Configuration hiveConf = driverContext.getCtx().getConf(); + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(hiveConf); + return hiveSparkClient.execute(driverContext, sparkWork); } @Override @@ -71,10 +79,14 @@ public String getSessionId() { @Override public void close() { isOpen = false; - if (sparkClient != null) { - sparkClient.close(); + if (hiveSparkClient != null) { + try { + hiveSparkClient.close(); + } catch (IOException e) { + LOG.error("Failed to close spark session (" + sessionId + ").", e); + } } - sparkClient = null; + hiveSparkClient = null; } public static String makeSessionId() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java index bf43b6e..d16d1b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java @@ -19,26 +19,26 @@ public class SparkJobRef { - private int jobId; + private String jobId; private SparkJobStatus sparkJobStatus; public SparkJobRef() {} - public SparkJobRef(int jobId) { + public SparkJobRef(String jobId) { this.jobId = jobId; } - public SparkJobRef(int jobId, SparkJobStatus sparkJobStatus) { + public SparkJobRef(String jobId, SparkJobStatus sparkJobStatus) { this.jobId = jobId; this.sparkJobStatus = sparkJobStatus; } - public int getJobId() { + public String getJobId() { return jobId; } - public void setJobId(int jobId) { + public void setJobId(String jobId) { this.jobId = jobId; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index d4d14a3..f2bb15f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark; +import java.io.IOException; import java.util.Stack; import org.apache.commons.logging.Log; @@ -26,7 +27,9 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.SparkClient; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; +import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -35,6 +38,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.spark.SparkException; import scala.Tuple2; /** @@ -69,39 +73,54 @@ public Object process(Node nd, Stack stack, context.getVisitedReduceSinks().add(sink); + if (desc.getNumReducers() <= 0) { if (constantReducers > 0) { LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers); desc.setNumReducers(constantReducers); } else { - long numberOfBytes = 0; + try { + // TODO try to make this still work after integration with remote spark context, so that we + // don't break test, we should implement automatic calculate reduce number for remote spark + // client and refactor code later, track it with HIVE-8855. + HiveSparkClient sparkClient = HiveSparkClientFactory.createHiveSparkClient(context.getConf()); + if (sparkClient instanceof LocalHiveSparkClient) { + LocalHiveSparkClient localHiveSparkClient = (LocalHiveSparkClient)sparkClient; + long numberOfBytes = 0; + + // we need to add up all the estimates from the siblings of this reduce sink + for (Operator sibling: + sink.getChildOperators().get(0).getParentOperators()) { + if (sibling.getStatistics() != null) { + numberOfBytes += sibling.getStatistics().getDataSize(); + } else { + LOG.warn("No stats available from: " + sibling); + } + } + + if (sparkMemoryAndCores == null) { + sparkMemoryAndCores = localHiveSparkClient.getMemoryAndCores(); + } + + // Divide it by 2 so that we can have more reducers + long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2; + int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, + maxReducers, false); + + // If there are more cores, use the number of cores + int cores = sparkMemoryAndCores._2.intValue(); + if (numReducers < cores) { + numReducers = cores; + } + LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers); + desc.setNumReducers(numReducers); - // we need to add up all the estimates from the siblings of this reduce sink - for (Operator sibling: - sink.getChildOperators().get(0).getParentOperators()) { - if (sibling.getStatistics() != null) { - numberOfBytes += sibling.getStatistics().getDataSize(); } else { - LOG.warn("No stats available from: " + sibling); + sparkClient.close(); } + } catch (Exception e) { + LOG.warn("Failed to create spark client.", e); } - - if (sparkMemoryAndCores == null) { - sparkMemoryAndCores = SparkClient.getMemoryAndCores(context.getConf()); - } - - // Divide it by 2 so that we can have more reducers - long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2; - int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, - maxReducers, false); - - // If there are more cores, use the number of cores - int cores = sparkMemoryAndCores._2.intValue(); - if (numReducers < cores) { - numReducers = cores; - } - LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers); - desc.setNumReducers(numReducers); } } else { LOG.info("Number of reducers determined to be: " + desc.getNumReducers()); diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java index 8346b28..982adbd 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java @@ -27,7 +27,7 @@ * Defines the API for the Spark remote client. */ @InterfaceAudience.Private -public interface SparkClient { +public interface SparkClient extends Serializable { /** * Submits a job for asynchronous execution. diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 5af66ee..161182f 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -190,7 +190,7 @@ public void run() { LOG.info("No spark.home provided, calling SparkSubmit directly."); argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath()); - if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client")) { + if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) { String mem = conf.get("spark.driver.memory"); if (mem != null) { argv.add("-Xms" + mem);